1 /* $OpenBSD: scheduler_ramqueue.c,v 1.48 2023/05/31 16:51:46 op Exp $ */ 2 3 /* 4 * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 6 * 7 * Permission to use, copy, modify, and distribute this software for any 8 * purpose with or without fee is hereby granted, provided that the above 9 * copyright notice and this permission notice appear in all copies. 10 * 11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 18 */ 19 20 #include <inttypes.h> 21 #include <stdlib.h> 22 #include <string.h> 23 #include <time.h> 24 25 #include "smtpd.h" 26 #include "log.h" 27 28 TAILQ_HEAD(evplist, rq_envelope); 29 30 struct rq_message { 31 uint32_t msgid; 32 struct tree envelopes; 33 }; 34 35 struct rq_envelope { 36 TAILQ_ENTRY(rq_envelope) entry; 37 SPLAY_ENTRY(rq_envelope) t_entry; 38 39 uint64_t evpid; 40 uint64_t holdq; 41 enum delivery_type type; 42 43 #define RQ_EVPSTATE_PENDING 0 44 #define RQ_EVPSTATE_SCHEDULED 1 45 #define RQ_EVPSTATE_INFLIGHT 2 46 #define RQ_EVPSTATE_HELD 3 47 uint8_t state; 48 49 #define RQ_ENVELOPE_EXPIRED 0x01 50 #define RQ_ENVELOPE_REMOVED 0x02 51 #define RQ_ENVELOPE_SUSPEND 0x04 52 #define RQ_ENVELOPE_UPDATE 0x08 53 #define RQ_ENVELOPE_OVERFLOW 0x10 54 uint8_t flags; 55 56 time_t ctime; 57 time_t sched; 58 time_t expire; 59 60 struct rq_message *message; 61 62 time_t t_inflight; 63 time_t t_scheduled; 64 }; 65 66 struct rq_holdq { 67 struct evplist q; 68 size_t count; 69 }; 70 71 struct rq_queue { 72 size_t evpcount; 73 struct tree messages; 74 SPLAY_HEAD(prioqtree, rq_envelope) q_priotree; 75 76 struct evplist q_pending; 77 struct evplist q_inflight; 78 79 struct evplist q_mta; 80 struct evplist q_mda; 81 struct evplist q_bounce; 82 struct evplist q_update; 83 struct evplist q_expired; 84 struct evplist q_removed; 85 }; 86 87 static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *); 88 89 SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); 90 static int scheduler_ram_init(const char *); 91 static int scheduler_ram_insert(struct scheduler_info *); 92 static size_t scheduler_ram_commit(uint32_t); 93 static size_t scheduler_ram_rollback(uint32_t); 94 static int scheduler_ram_update(struct scheduler_info *); 95 static int scheduler_ram_delete(uint64_t); 96 static int scheduler_ram_hold(uint64_t, uint64_t); 97 static int scheduler_ram_release(int, uint64_t, int); 98 static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *); 99 static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t); 100 static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t); 101 static int scheduler_ram_schedule(uint64_t); 102 static int scheduler_ram_remove(uint64_t); 103 static int scheduler_ram_suspend(uint64_t); 104 static int scheduler_ram_resume(uint64_t); 105 static int scheduler_ram_query(uint64_t); 106 107 static void sorted_insert(struct rq_queue *, struct rq_envelope *); 108 109 static void rq_queue_init(struct rq_queue *); 110 static void rq_queue_merge(struct rq_queue *, struct rq_queue *); 111 static void rq_queue_dump(struct rq_queue *, const char *); 112 static void rq_queue_schedule(struct rq_queue *rq); 113 static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *); 114 static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *); 115 static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *); 116 static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *); 117 static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *); 118 static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *); 119 static const char *rq_envelope_to_text(struct rq_envelope *); 120 121 struct scheduler_backend scheduler_backend_ramqueue = { 122 scheduler_ram_init, 123 124 scheduler_ram_insert, 125 scheduler_ram_commit, 126 scheduler_ram_rollback, 127 128 scheduler_ram_update, 129 scheduler_ram_delete, 130 scheduler_ram_hold, 131 scheduler_ram_release, 132 133 scheduler_ram_batch, 134 135 scheduler_ram_messages, 136 scheduler_ram_envelopes, 137 scheduler_ram_schedule, 138 scheduler_ram_remove, 139 scheduler_ram_suspend, 140 scheduler_ram_resume, 141 scheduler_ram_query, 142 }; 143 144 static struct rq_queue ramqueue; 145 static struct tree updates; 146 static struct tree holdqs[3]; /* delivery type */ 147 148 static time_t currtime; 149 150 #define BACKOFF_TRANSFER 400 151 #define BACKOFF_DELIVERY 10 152 #define BACKOFF_OVERFLOW 3 153 154 static time_t 155 scheduler_backoff(time_t t0, time_t base, uint32_t step) 156 { 157 return (t0 + base * step * step); 158 } 159 160 static time_t 161 scheduler_next(time_t t0, time_t base, uint32_t step) 162 { 163 time_t t; 164 165 /* XXX be more efficient */ 166 while ((t = scheduler_backoff(t0, base, step)) <= currtime) 167 step++; 168 169 return (t); 170 } 171 172 static int 173 scheduler_ram_init(const char *arg) 174 { 175 rq_queue_init(&ramqueue); 176 tree_init(&updates); 177 tree_init(&holdqs[D_MDA]); 178 tree_init(&holdqs[D_MTA]); 179 tree_init(&holdqs[D_BOUNCE]); 180 181 return (1); 182 } 183 184 static int 185 scheduler_ram_insert(struct scheduler_info *si) 186 { 187 struct rq_queue *update; 188 struct rq_message *message; 189 struct rq_envelope *envelope; 190 uint32_t msgid; 191 192 currtime = time(NULL); 193 194 msgid = evpid_to_msgid(si->evpid); 195 196 /* find/prepare a ramqueue update */ 197 if ((update = tree_get(&updates, msgid)) == NULL) { 198 update = xcalloc(1, sizeof *update); 199 stat_increment("scheduler.ramqueue.update", 1); 200 rq_queue_init(update); 201 tree_xset(&updates, msgid, update); 202 } 203 204 /* find/prepare the msgtree message in ramqueue update */ 205 if ((message = tree_get(&update->messages, msgid)) == NULL) { 206 message = xcalloc(1, sizeof *message); 207 message->msgid = msgid; 208 tree_init(&message->envelopes); 209 tree_xset(&update->messages, msgid, message); 210 stat_increment("scheduler.ramqueue.message", 1); 211 } 212 213 /* create envelope in ramqueue message */ 214 envelope = xcalloc(1, sizeof *envelope); 215 envelope->evpid = si->evpid; 216 envelope->type = si->type; 217 envelope->message = message; 218 envelope->ctime = si->creation; 219 envelope->expire = si->creation + si->ttl; 220 envelope->sched = scheduler_backoff(si->creation, 221 (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); 222 tree_xset(&message->envelopes, envelope->evpid, envelope); 223 224 update->evpcount++; 225 stat_increment("scheduler.ramqueue.envelope", 1); 226 227 envelope->state = RQ_EVPSTATE_PENDING; 228 TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry); 229 230 si->nexttry = envelope->sched; 231 232 return (1); 233 } 234 235 static size_t 236 scheduler_ram_commit(uint32_t msgid) 237 { 238 struct rq_queue *update; 239 size_t r; 240 241 currtime = time(NULL); 242 243 update = tree_xpop(&updates, msgid); 244 r = update->evpcount; 245 246 if (tracing & TRACE_SCHEDULER) 247 rq_queue_dump(update, "update to commit"); 248 249 rq_queue_merge(&ramqueue, update); 250 251 if (tracing & TRACE_SCHEDULER) 252 rq_queue_dump(&ramqueue, "resulting queue"); 253 254 rq_queue_schedule(&ramqueue); 255 256 free(update); 257 stat_decrement("scheduler.ramqueue.update", 1); 258 259 return (r); 260 } 261 262 static size_t 263 scheduler_ram_rollback(uint32_t msgid) 264 { 265 struct rq_queue *update; 266 struct rq_envelope *evp; 267 size_t r; 268 269 currtime = time(NULL); 270 271 if ((update = tree_pop(&updates, msgid)) == NULL) 272 return (0); 273 r = update->evpcount; 274 275 while ((evp = TAILQ_FIRST(&update->q_pending))) { 276 TAILQ_REMOVE(&update->q_pending, evp, entry); 277 rq_envelope_delete(update, evp); 278 } 279 280 free(update); 281 stat_decrement("scheduler.ramqueue.update", 1); 282 283 return (r); 284 } 285 286 static int 287 scheduler_ram_update(struct scheduler_info *si) 288 { 289 struct rq_message *msg; 290 struct rq_envelope *evp; 291 uint32_t msgid; 292 293 currtime = time(NULL); 294 295 msgid = evpid_to_msgid(si->evpid); 296 msg = tree_xget(&ramqueue.messages, msgid); 297 evp = tree_xget(&msg->envelopes, si->evpid); 298 299 /* it *must* be in-flight */ 300 if (evp->state != RQ_EVPSTATE_INFLIGHT) 301 fatalx("evp:%016" PRIx64 " not in-flight", si->evpid); 302 303 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 304 305 /* 306 * If the envelope was removed while inflight, schedule it for 307 * removal immediately. 308 */ 309 if (evp->flags & RQ_ENVELOPE_REMOVED) { 310 TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry); 311 evp->state = RQ_EVPSTATE_SCHEDULED; 312 evp->t_scheduled = currtime; 313 return (1); 314 } 315 316 evp->sched = scheduler_next(evp->ctime, 317 (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry); 318 319 evp->state = RQ_EVPSTATE_PENDING; 320 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 321 sorted_insert(&ramqueue, evp); 322 323 si->nexttry = evp->sched; 324 325 return (1); 326 } 327 328 static int 329 scheduler_ram_delete(uint64_t evpid) 330 { 331 struct rq_message *msg; 332 struct rq_envelope *evp; 333 uint32_t msgid; 334 335 currtime = time(NULL); 336 337 msgid = evpid_to_msgid(evpid); 338 msg = tree_xget(&ramqueue.messages, msgid); 339 evp = tree_xget(&msg->envelopes, evpid); 340 341 /* it *must* be in-flight */ 342 if (evp->state != RQ_EVPSTATE_INFLIGHT) 343 fatalx("evp:%016" PRIx64 " not in-flight", evpid); 344 345 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 346 347 rq_envelope_delete(&ramqueue, evp); 348 349 return (1); 350 } 351 352 #define HOLDQ_MAXSIZE 1000 353 354 static int 355 scheduler_ram_hold(uint64_t evpid, uint64_t holdq) 356 { 357 struct rq_holdq *hq; 358 struct rq_message *msg; 359 struct rq_envelope *evp; 360 uint32_t msgid; 361 362 currtime = time(NULL); 363 364 msgid = evpid_to_msgid(evpid); 365 msg = tree_xget(&ramqueue.messages, msgid); 366 evp = tree_xget(&msg->envelopes, evpid); 367 368 /* it *must* be in-flight */ 369 if (evp->state != RQ_EVPSTATE_INFLIGHT) 370 fatalx("evp:%016" PRIx64 " not in-flight", evpid); 371 372 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); 373 374 /* If the envelope is suspended, just mark it as pending */ 375 if (evp->flags & RQ_ENVELOPE_SUSPEND) { 376 evp->state = RQ_EVPSTATE_PENDING; 377 return (0); 378 } 379 380 hq = tree_get(&holdqs[evp->type], holdq); 381 if (hq == NULL) { 382 hq = xcalloc(1, sizeof(*hq)); 383 TAILQ_INIT(&hq->q); 384 tree_xset(&holdqs[evp->type], holdq, hq); 385 stat_increment("scheduler.ramqueue.holdq", 1); 386 } 387 388 /* If the holdq is full, just "tempfail" the envelope */ 389 if (hq->count >= HOLDQ_MAXSIZE) { 390 evp->state = RQ_EVPSTATE_PENDING; 391 evp->flags |= RQ_ENVELOPE_UPDATE; 392 evp->flags |= RQ_ENVELOPE_OVERFLOW; 393 sorted_insert(&ramqueue, evp); 394 stat_increment("scheduler.ramqueue.hold-overflow", 1); 395 return (0); 396 } 397 398 evp->state = RQ_EVPSTATE_HELD; 399 evp->holdq = holdq; 400 /* This is an optimization: upon release, the envelopes will be 401 * inserted in the pending queue from the first element to the last. 402 * Since elements already in the queue were received first, they 403 * were scheduled first, so they will be reinserted before the 404 * current element. 405 */ 406 TAILQ_INSERT_HEAD(&hq->q, evp, entry); 407 hq->count += 1; 408 stat_increment("scheduler.ramqueue.hold", 1); 409 410 return (1); 411 } 412 413 static int 414 scheduler_ram_release(int type, uint64_t holdq, int n) 415 { 416 struct rq_holdq *hq; 417 struct rq_envelope *evp; 418 int i, update; 419 420 currtime = time(NULL); 421 422 hq = tree_get(&holdqs[type], holdq); 423 if (hq == NULL) 424 return (0); 425 426 if (n == -1) { 427 n = 0; 428 update = 1; 429 } 430 else 431 update = 0; 432 433 for (i = 0; n == 0 || i < n; i++) { 434 evp = TAILQ_FIRST(&hq->q); 435 if (evp == NULL) 436 break; 437 438 TAILQ_REMOVE(&hq->q, evp, entry); 439 hq->count -= 1; 440 evp->holdq = 0; 441 442 /* When released, all envelopes are put in the pending queue 443 * and will be rescheduled immediately. As an optimization, 444 * we could just schedule them directly. 445 */ 446 evp->state = RQ_EVPSTATE_PENDING; 447 if (update) 448 evp->flags |= RQ_ENVELOPE_UPDATE; 449 sorted_insert(&ramqueue, evp); 450 } 451 452 if (TAILQ_EMPTY(&hq->q)) { 453 tree_xpop(&holdqs[type], holdq); 454 free(hq); 455 stat_decrement("scheduler.ramqueue.holdq", 1); 456 } 457 stat_decrement("scheduler.ramqueue.hold", i); 458 459 return (i); 460 } 461 462 static int 463 scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types) 464 { 465 struct rq_envelope *evp; 466 size_t i, n; 467 time_t t; 468 469 currtime = time(NULL); 470 471 rq_queue_schedule(&ramqueue); 472 if (tracing & TRACE_SCHEDULER) 473 rq_queue_dump(&ramqueue, "scheduler_ram_batch()"); 474 475 i = 0; 476 n = 0; 477 478 for (;;) { 479 480 if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) { 481 TAILQ_REMOVE(&ramqueue.q_removed, evp, entry); 482 types[i] = SCHED_REMOVE; 483 evpids[i] = evp->evpid; 484 rq_envelope_delete(&ramqueue, evp); 485 486 if (++i == *count) 487 break; 488 } 489 490 if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) { 491 TAILQ_REMOVE(&ramqueue.q_expired, evp, entry); 492 types[i] = SCHED_EXPIRE; 493 evpids[i] = evp->evpid; 494 rq_envelope_delete(&ramqueue, evp); 495 496 if (++i == *count) 497 break; 498 } 499 500 if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) { 501 TAILQ_REMOVE(&ramqueue.q_update, evp, entry); 502 types[i] = SCHED_UPDATE; 503 evpids[i] = evp->evpid; 504 505 if (evp->flags & RQ_ENVELOPE_OVERFLOW) 506 t = BACKOFF_OVERFLOW; 507 else if (evp->type == D_MTA) 508 t = BACKOFF_TRANSFER; 509 else 510 t = BACKOFF_DELIVERY; 511 512 evp->sched = scheduler_next(evp->ctime, t, 0); 513 evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW); 514 evp->state = RQ_EVPSTATE_PENDING; 515 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 516 sorted_insert(&ramqueue, evp); 517 518 if (++i == *count) 519 break; 520 } 521 522 if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) { 523 TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry); 524 types[i] = SCHED_BOUNCE; 525 evpids[i] = evp->evpid; 526 527 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 528 evp->state = RQ_EVPSTATE_INFLIGHT; 529 evp->t_inflight = currtime; 530 531 if (++i == *count) 532 break; 533 } 534 535 if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) { 536 TAILQ_REMOVE(&ramqueue.q_mda, evp, entry); 537 types[i] = SCHED_MDA; 538 evpids[i] = evp->evpid; 539 540 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 541 evp->state = RQ_EVPSTATE_INFLIGHT; 542 evp->t_inflight = currtime; 543 544 if (++i == *count) 545 break; 546 } 547 548 if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) { 549 TAILQ_REMOVE(&ramqueue.q_mta, evp, entry); 550 types[i] = SCHED_MTA; 551 evpids[i] = evp->evpid; 552 553 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); 554 evp->state = RQ_EVPSTATE_INFLIGHT; 555 evp->t_inflight = currtime; 556 557 if (++i == *count) 558 break; 559 } 560 561 /* nothing seen this round */ 562 if (i == n) 563 break; 564 565 n = i; 566 } 567 568 if (i) { 569 *count = i; 570 return (1); 571 } 572 573 if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) { 574 if (evp->sched < evp->expire) 575 t = evp->sched; 576 else 577 t = evp->expire; 578 *delay = (t < currtime) ? 0 : (t - currtime); 579 } 580 else 581 *delay = -1; 582 583 return (0); 584 } 585 586 static size_t 587 scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size) 588 { 589 uint64_t id; 590 size_t n; 591 void *i; 592 593 for (n = 0, i = NULL; n < size; n++) { 594 if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0) 595 break; 596 dst[n] = id; 597 } 598 599 return (n); 600 } 601 602 static size_t 603 scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size) 604 { 605 struct rq_message *msg; 606 struct rq_envelope *evp; 607 void *i; 608 size_t n; 609 610 if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL) 611 return (0); 612 613 for (n = 0, i = NULL; n < size; ) { 614 615 if (tree_iterfrom(&msg->envelopes, &i, from, NULL, 616 (void**)&evp) == 0) 617 break; 618 619 if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) 620 continue; 621 622 dst[n].evpid = evp->evpid; 623 dst[n].flags = 0; 624 dst[n].retry = 0; 625 dst[n].time = 0; 626 627 if (evp->state == RQ_EVPSTATE_PENDING) { 628 dst[n].time = evp->sched; 629 dst[n].flags = EF_PENDING; 630 } 631 else if (evp->state == RQ_EVPSTATE_SCHEDULED) { 632 dst[n].time = evp->t_scheduled; 633 dst[n].flags = EF_PENDING; 634 } 635 else if (evp->state == RQ_EVPSTATE_INFLIGHT) { 636 dst[n].time = evp->t_inflight; 637 dst[n].flags = EF_INFLIGHT; 638 } 639 else if (evp->state == RQ_EVPSTATE_HELD) { 640 /* same as scheduled */ 641 dst[n].time = evp->t_scheduled; 642 dst[n].flags = EF_PENDING; 643 dst[n].flags |= EF_HOLD; 644 } 645 if (evp->flags & RQ_ENVELOPE_SUSPEND) 646 dst[n].flags |= EF_SUSPEND; 647 648 n++; 649 } 650 651 return (n); 652 } 653 654 static int 655 scheduler_ram_schedule(uint64_t evpid) 656 { 657 struct rq_message *msg; 658 struct rq_envelope *evp; 659 uint32_t msgid; 660 void *i; 661 int r; 662 663 currtime = time(NULL); 664 665 if (evpid > 0xffffffff) { 666 msgid = evpid_to_msgid(evpid); 667 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 668 return (0); 669 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 670 return (0); 671 if (evp->state == RQ_EVPSTATE_INFLIGHT) 672 return (0); 673 rq_envelope_schedule(&ramqueue, evp); 674 return (1); 675 } 676 else { 677 msgid = evpid; 678 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 679 return (0); 680 i = NULL; 681 r = 0; 682 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) { 683 if (evp->state == RQ_EVPSTATE_INFLIGHT) 684 continue; 685 rq_envelope_schedule(&ramqueue, evp); 686 r++; 687 } 688 return (r); 689 } 690 } 691 692 static int 693 scheduler_ram_remove(uint64_t evpid) 694 { 695 struct rq_message *msg; 696 struct rq_envelope *evp; 697 uint32_t msgid; 698 void *i; 699 int r; 700 701 currtime = time(NULL); 702 703 if (evpid > 0xffffffff) { 704 msgid = evpid_to_msgid(evpid); 705 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 706 return (0); 707 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 708 return (0); 709 if (rq_envelope_remove(&ramqueue, evp)) 710 return (1); 711 return (0); 712 } 713 else { 714 msgid = evpid; 715 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 716 return (0); 717 i = NULL; 718 r = 0; 719 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 720 if (rq_envelope_remove(&ramqueue, evp)) 721 r++; 722 return (r); 723 } 724 } 725 726 static int 727 scheduler_ram_suspend(uint64_t evpid) 728 { 729 struct rq_message *msg; 730 struct rq_envelope *evp; 731 uint32_t msgid; 732 void *i; 733 int r; 734 735 currtime = time(NULL); 736 737 if (evpid > 0xffffffff) { 738 msgid = evpid_to_msgid(evpid); 739 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 740 return (0); 741 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 742 return (0); 743 if (rq_envelope_suspend(&ramqueue, evp)) 744 return (1); 745 return (0); 746 } 747 else { 748 msgid = evpid; 749 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 750 return (0); 751 i = NULL; 752 r = 0; 753 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 754 if (rq_envelope_suspend(&ramqueue, evp)) 755 r++; 756 return (r); 757 } 758 } 759 760 static int 761 scheduler_ram_resume(uint64_t evpid) 762 { 763 struct rq_message *msg; 764 struct rq_envelope *evp; 765 uint32_t msgid; 766 void *i; 767 int r; 768 769 currtime = time(NULL); 770 771 if (evpid > 0xffffffff) { 772 msgid = evpid_to_msgid(evpid); 773 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 774 return (0); 775 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) 776 return (0); 777 if (rq_envelope_resume(&ramqueue, evp)) 778 return (1); 779 return (0); 780 } 781 else { 782 msgid = evpid; 783 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) 784 return (0); 785 i = NULL; 786 r = 0; 787 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) 788 if (rq_envelope_resume(&ramqueue, evp)) 789 r++; 790 return (r); 791 } 792 } 793 794 static int 795 scheduler_ram_query(uint64_t evpid) 796 { 797 uint32_t msgid; 798 799 if (evpid > 0xffffffff) 800 msgid = evpid_to_msgid(evpid); 801 else 802 msgid = evpid; 803 804 if (tree_get(&ramqueue.messages, msgid) == NULL) 805 return (0); 806 807 return (1); 808 } 809 810 static void 811 sorted_insert(struct rq_queue *rq, struct rq_envelope *evp) 812 { 813 struct rq_envelope *evp2; 814 815 SPLAY_INSERT(prioqtree, &rq->q_priotree, evp); 816 evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp); 817 if (evp2) 818 TAILQ_INSERT_BEFORE(evp2, evp, entry); 819 else 820 TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry); 821 } 822 823 static void 824 rq_queue_init(struct rq_queue *rq) 825 { 826 memset(rq, 0, sizeof *rq); 827 tree_init(&rq->messages); 828 TAILQ_INIT(&rq->q_pending); 829 TAILQ_INIT(&rq->q_inflight); 830 TAILQ_INIT(&rq->q_mta); 831 TAILQ_INIT(&rq->q_mda); 832 TAILQ_INIT(&rq->q_bounce); 833 TAILQ_INIT(&rq->q_update); 834 TAILQ_INIT(&rq->q_expired); 835 TAILQ_INIT(&rq->q_removed); 836 SPLAY_INIT(&rq->q_priotree); 837 } 838 839 static void 840 rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) 841 { 842 struct rq_message *message, *tomessage; 843 struct rq_envelope *envelope; 844 uint64_t id; 845 void *i; 846 847 while (tree_poproot(&update->messages, &id, (void*)&message)) { 848 if ((tomessage = tree_get(&rq->messages, id)) == NULL) { 849 /* message does not exist. re-use structure */ 850 tree_xset(&rq->messages, id, message); 851 continue; 852 } 853 /* need to re-link all envelopes before merging them */ 854 i = NULL; 855 while ((tree_iter(&message->envelopes, &i, &id, 856 (void*)&envelope))) 857 envelope->message = tomessage; 858 tree_merge(&tomessage->envelopes, &message->envelopes); 859 free(message); 860 stat_decrement("scheduler.ramqueue.message", 1); 861 } 862 863 /* Sorted insert in the pending queue */ 864 while ((envelope = TAILQ_FIRST(&update->q_pending))) { 865 TAILQ_REMOVE(&update->q_pending, envelope, entry); 866 sorted_insert(rq, envelope); 867 } 868 869 rq->evpcount += update->evpcount; 870 } 871 872 #define SCHEDULEMAX 1024 873 874 static void 875 rq_queue_schedule(struct rq_queue *rq) 876 { 877 struct rq_envelope *evp; 878 size_t n; 879 880 n = 0; 881 while ((evp = TAILQ_FIRST(&rq->q_pending))) { 882 if (evp->sched > currtime && evp->expire > currtime) 883 break; 884 885 if (n == SCHEDULEMAX) 886 break; 887 888 if (evp->state != RQ_EVPSTATE_PENDING) 889 fatalx("evp:%016" PRIx64 " flags=0x%x", evp->evpid, 890 evp->flags); 891 892 if (evp->expire <= currtime) { 893 TAILQ_REMOVE(&rq->q_pending, evp, entry); 894 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 895 TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry); 896 evp->state = RQ_EVPSTATE_SCHEDULED; 897 evp->flags |= RQ_ENVELOPE_EXPIRED; 898 evp->t_scheduled = currtime; 899 continue; 900 } 901 rq_envelope_schedule(rq, evp); 902 n += 1; 903 } 904 } 905 906 static struct evplist * 907 rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) 908 { 909 switch (evp->state) { 910 case RQ_EVPSTATE_PENDING: 911 return &rq->q_pending; 912 913 case RQ_EVPSTATE_SCHEDULED: 914 if (evp->flags & RQ_ENVELOPE_EXPIRED) 915 return &rq->q_expired; 916 if (evp->flags & RQ_ENVELOPE_REMOVED) 917 return &rq->q_removed; 918 if (evp->flags & RQ_ENVELOPE_UPDATE) 919 return &rq->q_update; 920 if (evp->type == D_MTA) 921 return &rq->q_mta; 922 if (evp->type == D_MDA) 923 return &rq->q_mda; 924 if (evp->type == D_BOUNCE) 925 return &rq->q_bounce; 926 fatalx("%016" PRIx64 " bad evp type %d", evp->evpid, evp->type); 927 928 case RQ_EVPSTATE_INFLIGHT: 929 return &rq->q_inflight; 930 931 case RQ_EVPSTATE_HELD: 932 return (NULL); 933 } 934 935 fatalx("%016" PRIx64 " bad state %d", evp->evpid, evp->state); 936 return (NULL); 937 } 938 939 static void 940 rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) 941 { 942 struct rq_holdq *hq; 943 struct evplist *q = NULL; 944 945 switch (evp->type) { 946 case D_MTA: 947 q = &rq->q_mta; 948 break; 949 case D_MDA: 950 q = &rq->q_mda; 951 break; 952 case D_BOUNCE: 953 q = &rq->q_bounce; 954 break; 955 } 956 957 if (evp->flags & RQ_ENVELOPE_UPDATE) 958 q = &rq->q_update; 959 960 if (evp->state == RQ_EVPSTATE_HELD) { 961 hq = tree_xget(&holdqs[evp->type], evp->holdq); 962 TAILQ_REMOVE(&hq->q, evp, entry); 963 hq->count -= 1; 964 if (TAILQ_EMPTY(&hq->q)) { 965 tree_xpop(&holdqs[evp->type], evp->holdq); 966 free(hq); 967 } 968 evp->holdq = 0; 969 stat_decrement("scheduler.ramqueue.hold", 1); 970 } 971 else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { 972 TAILQ_REMOVE(&rq->q_pending, evp, entry); 973 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 974 } 975 976 TAILQ_INSERT_TAIL(q, evp, entry); 977 evp->state = RQ_EVPSTATE_SCHEDULED; 978 evp->t_scheduled = currtime; 979 } 980 981 static int 982 rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) 983 { 984 struct rq_holdq *hq; 985 struct evplist *evl; 986 987 if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) 988 return (0); 989 /* 990 * If envelope is inflight, mark it envelope for removal. 991 */ 992 if (evp->state == RQ_EVPSTATE_INFLIGHT) { 993 evp->flags |= RQ_ENVELOPE_REMOVED; 994 return (1); 995 } 996 997 if (evp->state == RQ_EVPSTATE_HELD) { 998 hq = tree_xget(&holdqs[evp->type], evp->holdq); 999 TAILQ_REMOVE(&hq->q, evp, entry); 1000 hq->count -= 1; 1001 if (TAILQ_EMPTY(&hq->q)) { 1002 tree_xpop(&holdqs[evp->type], evp->holdq); 1003 free(hq); 1004 } 1005 evp->holdq = 0; 1006 stat_decrement("scheduler.ramqueue.hold", 1); 1007 } 1008 else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { 1009 evl = rq_envelope_list(rq, evp); 1010 TAILQ_REMOVE(evl, evp, entry); 1011 if (evl == &rq->q_pending) 1012 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 1013 } 1014 1015 TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry); 1016 evp->state = RQ_EVPSTATE_SCHEDULED; 1017 evp->flags |= RQ_ENVELOPE_REMOVED; 1018 evp->t_scheduled = currtime; 1019 1020 return (1); 1021 } 1022 1023 static int 1024 rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) 1025 { 1026 struct rq_holdq *hq; 1027 struct evplist *evl; 1028 1029 if (evp->flags & RQ_ENVELOPE_SUSPEND) 1030 return (0); 1031 1032 if (evp->state == RQ_EVPSTATE_HELD) { 1033 hq = tree_xget(&holdqs[evp->type], evp->holdq); 1034 TAILQ_REMOVE(&hq->q, evp, entry); 1035 hq->count -= 1; 1036 if (TAILQ_EMPTY(&hq->q)) { 1037 tree_xpop(&holdqs[evp->type], evp->holdq); 1038 free(hq); 1039 } 1040 evp->holdq = 0; 1041 evp->state = RQ_EVPSTATE_PENDING; 1042 stat_decrement("scheduler.ramqueue.hold", 1); 1043 } 1044 else if (evp->state != RQ_EVPSTATE_INFLIGHT) { 1045 evl = rq_envelope_list(rq, evp); 1046 TAILQ_REMOVE(evl, evp, entry); 1047 if (evl == &rq->q_pending) 1048 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); 1049 } 1050 1051 evp->flags |= RQ_ENVELOPE_SUSPEND; 1052 1053 return (1); 1054 } 1055 1056 static int 1057 rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp) 1058 { 1059 struct evplist *evl; 1060 1061 if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) 1062 return (0); 1063 1064 if (evp->state != RQ_EVPSTATE_INFLIGHT) { 1065 evl = rq_envelope_list(rq, evp); 1066 if (evl == &rq->q_pending) 1067 sorted_insert(rq, evp); 1068 else 1069 TAILQ_INSERT_TAIL(evl, evp, entry); 1070 } 1071 1072 evp->flags &= ~RQ_ENVELOPE_SUSPEND; 1073 1074 return (1); 1075 } 1076 1077 static void 1078 rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp) 1079 { 1080 tree_xpop(&evp->message->envelopes, evp->evpid); 1081 if (tree_empty(&evp->message->envelopes)) { 1082 tree_xpop(&rq->messages, evp->message->msgid); 1083 free(evp->message); 1084 stat_decrement("scheduler.ramqueue.message", 1); 1085 } 1086 1087 free(evp); 1088 rq->evpcount--; 1089 stat_decrement("scheduler.ramqueue.envelope", 1); 1090 } 1091 1092 static const char * 1093 rq_envelope_to_text(struct rq_envelope *e) 1094 { 1095 static char buf[256]; 1096 char t[64]; 1097 1098 (void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid); 1099 1100 if (e->type == D_BOUNCE) 1101 (void)strlcat(buf, "bounce", sizeof buf); 1102 else if (e->type == D_MDA) 1103 (void)strlcat(buf, "mda", sizeof buf); 1104 else if (e->type == D_MTA) 1105 (void)strlcat(buf, "mta", sizeof buf); 1106 1107 (void)snprintf(t, sizeof t, ",expire=%s", 1108 duration_to_text(e->expire - currtime)); 1109 (void)strlcat(buf, t, sizeof buf); 1110 1111 1112 switch (e->state) { 1113 case RQ_EVPSTATE_PENDING: 1114 (void)snprintf(t, sizeof t, ",pending=%s", 1115 duration_to_text(e->sched - currtime)); 1116 (void)strlcat(buf, t, sizeof buf); 1117 break; 1118 1119 case RQ_EVPSTATE_SCHEDULED: 1120 (void)snprintf(t, sizeof t, ",scheduled=%s", 1121 duration_to_text(currtime - e->t_scheduled)); 1122 (void)strlcat(buf, t, sizeof buf); 1123 break; 1124 1125 case RQ_EVPSTATE_INFLIGHT: 1126 (void)snprintf(t, sizeof t, ",inflight=%s", 1127 duration_to_text(currtime - e->t_inflight)); 1128 (void)strlcat(buf, t, sizeof buf); 1129 break; 1130 1131 case RQ_EVPSTATE_HELD: 1132 (void)snprintf(t, sizeof t, ",held=%s", 1133 duration_to_text(currtime - e->t_inflight)); 1134 (void)strlcat(buf, t, sizeof buf); 1135 break; 1136 default: 1137 fatalx("%016" PRIx64 " bad state %d", e->evpid, e->state); 1138 } 1139 1140 if (e->flags & RQ_ENVELOPE_REMOVED) 1141 (void)strlcat(buf, ",removed", sizeof buf); 1142 if (e->flags & RQ_ENVELOPE_EXPIRED) 1143 (void)strlcat(buf, ",expired", sizeof buf); 1144 if (e->flags & RQ_ENVELOPE_SUSPEND) 1145 (void)strlcat(buf, ",suspended", sizeof buf); 1146 1147 (void)strlcat(buf, "]", sizeof buf); 1148 1149 return (buf); 1150 } 1151 1152 static void 1153 rq_queue_dump(struct rq_queue *rq, const char * name) 1154 { 1155 struct rq_message *message; 1156 struct rq_envelope *envelope; 1157 void *i, *j; 1158 uint64_t id; 1159 1160 log_debug("debug: /--- ramqueue: %s", name); 1161 1162 i = NULL; 1163 while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) { 1164 log_debug("debug: | msg:%08" PRIx32, message->msgid); 1165 j = NULL; 1166 while ((tree_iter(&message->envelopes, &j, &id, 1167 (void*)&envelope))) 1168 log_debug("debug: | %s", 1169 rq_envelope_to_text(envelope)); 1170 } 1171 log_debug("debug: \\---"); 1172 } 1173 1174 static int 1175 rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2) 1176 { 1177 time_t ref1, ref2; 1178 1179 ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire; 1180 ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire; 1181 if (ref1 != ref2) 1182 return (ref1 < ref2) ? -1 : 1; 1183 1184 if (e1->evpid != e2->evpid) 1185 return (e1->evpid < e2->evpid) ? -1 : 1; 1186 1187 return 0; 1188 } 1189 1190 SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); 1191