1 /* $OpenBSD: scheduler.c,v 1.62 2021/06/14 17:58:16 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> 7 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 8 * 9 * Permission to use, copy, modify, and distribute this software for any 10 * purpose with or without fee is hereby granted, provided that the above 11 * copyright notice and this permission notice appear in all copies. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 14 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 15 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 16 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 17 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 18 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 19 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 20 */ 21 22 #include <inttypes.h> 23 #include <pwd.h> 24 #include <signal.h> 25 #include <unistd.h> 26 27 #include "smtpd.h" 28 #include "log.h" 29 30 static void scheduler_imsg(struct mproc *, struct imsg *); 31 static void scheduler_shutdown(void); 32 static void scheduler_reset_events(void); 33 static void scheduler_timeout(int, short, void *); 34 35 static struct scheduler_backend *backend = NULL; 36 static struct event ev; 37 static size_t ninflight = 0; 38 static int *types; 39 static uint64_t *evpids; 40 static uint32_t *msgids; 41 static struct evpstate *state; 42 43 extern const char *backend_scheduler; 44 45 void 46 scheduler_imsg(struct mproc *p, struct imsg *imsg) 47 { 48 struct bounce_req_msg req; 49 struct envelope evp; 50 struct scheduler_info si; 51 struct msg m; 52 uint64_t evpid, id, holdq; 53 uint32_t msgid; 54 uint32_t inflight; 55 size_t n, i; 56 time_t timestamp; 57 int v, r, type; 58 59 if (imsg == NULL) 60 scheduler_shutdown(); 61 62 switch (imsg->hdr.type) { 63 64 case IMSG_QUEUE_ENVELOPE_SUBMIT: 65 m_msg(&m, imsg); 66 m_get_envelope(&m, &evp); 67 m_end(&m); 68 log_trace(TRACE_SCHEDULER, 69 "scheduler: inserting evp:%016" PRIx64, evp.id); 70 scheduler_info(&si, &evp); 71 stat_increment("scheduler.envelope.incoming", 1); 72 backend->insert(&si); 73 return; 74 75 case IMSG_QUEUE_MESSAGE_COMMIT: 76 m_msg(&m, imsg); 77 m_get_msgid(&m, &msgid); 78 m_end(&m); 79 log_trace(TRACE_SCHEDULER, 80 "scheduler: committing msg:%08" PRIx32, msgid); 81 n = backend->commit(msgid); 82 stat_decrement("scheduler.envelope.incoming", n); 83 stat_increment("scheduler.envelope", n); 84 scheduler_reset_events(); 85 return; 86 87 case IMSG_QUEUE_DISCOVER_EVPID: 88 m_msg(&m, imsg); 89 m_get_envelope(&m, &evp); 90 m_end(&m); 91 r = backend->query(evp.id); 92 if (r) { 93 log_debug("debug: scheduler: evp:%016" PRIx64 94 " already scheduled", evp.id); 95 return; 96 } 97 log_trace(TRACE_SCHEDULER, 98 "scheduler: discovering evp:%016" PRIx64, evp.id); 99 scheduler_info(&si, &evp); 100 stat_increment("scheduler.envelope.incoming", 1); 101 backend->insert(&si); 102 return; 103 104 case IMSG_QUEUE_DISCOVER_MSGID: 105 m_msg(&m, imsg); 106 m_get_msgid(&m, &msgid); 107 m_end(&m); 108 r = backend->query(msgid); 109 if (r) { 110 log_debug("debug: scheduler: msgid:%08" PRIx32 111 " already scheduled", msgid); 112 return; 113 } 114 log_trace(TRACE_SCHEDULER, 115 "scheduler: committing msg:%08" PRIx32, msgid); 116 n = backend->commit(msgid); 117 stat_decrement("scheduler.envelope.incoming", n); 118 stat_increment("scheduler.envelope", n); 119 scheduler_reset_events(); 120 return; 121 122 case IMSG_QUEUE_MESSAGE_ROLLBACK: 123 m_msg(&m, imsg); 124 m_get_msgid(&m, &msgid); 125 m_end(&m); 126 log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32, 127 msgid); 128 n = backend->rollback(msgid); 129 stat_decrement("scheduler.envelope.incoming", n); 130 scheduler_reset_events(); 131 return; 132 133 case IMSG_QUEUE_ENVELOPE_REMOVE: 134 m_msg(&m, imsg); 135 m_get_evpid(&m, &evpid); 136 m_get_u32(&m, &inflight); 137 m_end(&m); 138 log_trace(TRACE_SCHEDULER, 139 "scheduler: queue requested removal of evp:%016" PRIx64, 140 evpid); 141 stat_decrement("scheduler.envelope", 1); 142 if (!inflight) 143 backend->remove(evpid); 144 else { 145 backend->delete(evpid); 146 ninflight -= 1; 147 stat_decrement("scheduler.envelope.inflight", 1); 148 } 149 150 scheduler_reset_events(); 151 return; 152 153 case IMSG_QUEUE_ENVELOPE_ACK: 154 m_msg(&m, imsg); 155 m_get_evpid(&m, &evpid); 156 m_end(&m); 157 log_trace(TRACE_SCHEDULER, 158 "scheduler: queue ack removal of evp:%016" PRIx64, 159 evpid); 160 ninflight -= 1; 161 stat_decrement("scheduler.envelope.inflight", 1); 162 scheduler_reset_events(); 163 return; 164 165 case IMSG_QUEUE_DELIVERY_OK: 166 m_msg(&m, imsg); 167 m_get_evpid(&m, &evpid); 168 m_end(&m); 169 log_trace(TRACE_SCHEDULER, 170 "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid); 171 backend->delete(evpid); 172 ninflight -= 1; 173 stat_increment("scheduler.delivery.ok", 1); 174 stat_decrement("scheduler.envelope.inflight", 1); 175 stat_decrement("scheduler.envelope", 1); 176 scheduler_reset_events(); 177 return; 178 179 case IMSG_QUEUE_DELIVERY_TEMPFAIL: 180 m_msg(&m, imsg); 181 m_get_envelope(&m, &evp); 182 m_end(&m); 183 log_trace(TRACE_SCHEDULER, 184 "scheduler: updating evp:%016" PRIx64, evp.id); 185 scheduler_info(&si, &evp); 186 backend->update(&si); 187 ninflight -= 1; 188 stat_increment("scheduler.delivery.tempfail", 1); 189 stat_decrement("scheduler.envelope.inflight", 1); 190 191 for (i = 0; i < MAX_BOUNCE_WARN; i++) { 192 if (env->sc_bounce_warn[i] == 0) 193 break; 194 timestamp = si.creation + env->sc_bounce_warn[i]; 195 if (si.nexttry >= timestamp && 196 si.lastbounce < timestamp) { 197 req.evpid = evp.id; 198 req.timestamp = timestamp; 199 req.bounce.type = B_DELAYED; 200 req.bounce.delay = env->sc_bounce_warn[i]; 201 req.bounce.ttl = si.ttl; 202 m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1, 203 &req, sizeof req); 204 break; 205 } 206 } 207 scheduler_reset_events(); 208 return; 209 210 case IMSG_QUEUE_DELIVERY_PERMFAIL: 211 m_msg(&m, imsg); 212 m_get_evpid(&m, &evpid); 213 m_end(&m); 214 log_trace(TRACE_SCHEDULER, 215 "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid); 216 backend->delete(evpid); 217 ninflight -= 1; 218 stat_increment("scheduler.delivery.permfail", 1); 219 stat_decrement("scheduler.envelope.inflight", 1); 220 stat_decrement("scheduler.envelope", 1); 221 scheduler_reset_events(); 222 return; 223 224 case IMSG_QUEUE_DELIVERY_LOOP: 225 m_msg(&m, imsg); 226 m_get_evpid(&m, &evpid); 227 m_end(&m); 228 log_trace(TRACE_SCHEDULER, 229 "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid); 230 backend->delete(evpid); 231 ninflight -= 1; 232 stat_increment("scheduler.delivery.loop", 1); 233 stat_decrement("scheduler.envelope.inflight", 1); 234 stat_decrement("scheduler.envelope", 1); 235 scheduler_reset_events(); 236 return; 237 238 case IMSG_QUEUE_HOLDQ_HOLD: 239 m_msg(&m, imsg); 240 m_get_evpid(&m, &evpid); 241 m_get_id(&m, &holdq); 242 m_end(&m); 243 log_trace(TRACE_SCHEDULER, 244 "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64, 245 evpid, holdq); 246 backend->hold(evpid, holdq); 247 ninflight -= 1; 248 stat_decrement("scheduler.envelope.inflight", 1); 249 scheduler_reset_events(); 250 return; 251 252 case IMSG_QUEUE_HOLDQ_RELEASE: 253 m_msg(&m, imsg); 254 m_get_int(&m, &type); 255 m_get_id(&m, &holdq); 256 m_get_int(&m, &r); 257 m_end(&m); 258 log_trace(TRACE_SCHEDULER, 259 "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")", 260 r, type, holdq); 261 backend->release(type, holdq, r); 262 scheduler_reset_events(); 263 return; 264 265 case IMSG_CTL_PAUSE_MDA: 266 log_trace(TRACE_SCHEDULER, "scheduler: pausing mda"); 267 env->sc_flags |= SMTPD_MDA_PAUSED; 268 return; 269 270 case IMSG_CTL_RESUME_MDA: 271 log_trace(TRACE_SCHEDULER, "scheduler: resuming mda"); 272 env->sc_flags &= ~SMTPD_MDA_PAUSED; 273 scheduler_reset_events(); 274 return; 275 276 case IMSG_CTL_PAUSE_MTA: 277 log_trace(TRACE_SCHEDULER, "scheduler: pausing mta"); 278 env->sc_flags |= SMTPD_MTA_PAUSED; 279 return; 280 281 case IMSG_CTL_RESUME_MTA: 282 log_trace(TRACE_SCHEDULER, "scheduler: resuming mta"); 283 env->sc_flags &= ~SMTPD_MTA_PAUSED; 284 scheduler_reset_events(); 285 return; 286 287 case IMSG_CTL_VERBOSE: 288 m_msg(&m, imsg); 289 m_get_int(&m, &v); 290 m_end(&m); 291 log_setverbose(v); 292 return; 293 294 case IMSG_CTL_PROFILE: 295 m_msg(&m, imsg); 296 m_get_int(&m, &v); 297 m_end(&m); 298 profiling = v; 299 return; 300 301 case IMSG_CTL_LIST_MESSAGES: 302 msgid = *(uint32_t *)(imsg->data); 303 n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size); 304 m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1, 305 msgids, n * sizeof (*msgids)); 306 return; 307 308 case IMSG_CTL_LIST_ENVELOPES: 309 id = *(uint64_t *)(imsg->data); 310 n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size); 311 for (i = 0; i < n; i++) { 312 m_create(p_queue, IMSG_CTL_LIST_ENVELOPES, 313 imsg->hdr.peerid, 0, -1); 314 m_add_evpid(p_queue, state[i].evpid); 315 m_add_int(p_queue, state[i].flags); 316 m_add_time(p_queue, state[i].time); 317 m_close(p_queue); 318 } 319 m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES, 320 imsg->hdr.peerid, 0, -1, NULL, 0); 321 return; 322 323 case IMSG_CTL_SCHEDULE: 324 id = *(uint64_t *)(imsg->data); 325 if (id <= 0xffffffffL) 326 log_debug("debug: scheduler: " 327 "scheduling msg:%08" PRIx64, id); 328 else 329 log_debug("debug: scheduler: " 330 "scheduling evp:%016" PRIx64, id); 331 r = backend->schedule(id); 332 scheduler_reset_events(); 333 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 334 0, -1, NULL, 0); 335 return; 336 337 case IMSG_QUEUE_ENVELOPE_SCHEDULE: 338 id = *(uint64_t *)(imsg->data); 339 backend->schedule(id); 340 scheduler_reset_events(); 341 return; 342 343 case IMSG_CTL_REMOVE: 344 id = *(uint64_t *)(imsg->data); 345 if (id <= 0xffffffffL) 346 log_debug("debug: scheduler: " 347 "removing msg:%08" PRIx64, id); 348 else 349 log_debug("debug: scheduler: " 350 "removing evp:%016" PRIx64, id); 351 r = backend->remove(id); 352 scheduler_reset_events(); 353 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 354 0, -1, NULL, 0); 355 return; 356 357 case IMSG_CTL_PAUSE_EVP: 358 id = *(uint64_t *)(imsg->data); 359 if (id <= 0xffffffffL) 360 log_debug("debug: scheduler: " 361 "suspending msg:%08" PRIx64, id); 362 else 363 log_debug("debug: scheduler: " 364 "suspending evp:%016" PRIx64, id); 365 r = backend->suspend(id); 366 scheduler_reset_events(); 367 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 368 0, -1, NULL, 0); 369 return; 370 371 case IMSG_CTL_RESUME_EVP: 372 id = *(uint64_t *)(imsg->data); 373 if (id <= 0xffffffffL) 374 log_debug("debug: scheduler: " 375 "resuming msg:%08" PRIx64, id); 376 else 377 log_debug("debug: scheduler: " 378 "resuming evp:%016" PRIx64, id); 379 r = backend->resume(id); 380 scheduler_reset_events(); 381 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 382 0, -1, NULL, 0); 383 return; 384 } 385 386 fatalx("scheduler_imsg: unexpected %s imsg", 387 imsg_to_str(imsg->hdr.type)); 388 } 389 390 static void 391 scheduler_shutdown(void) 392 { 393 log_debug("debug: scheduler agent exiting"); 394 _exit(0); 395 } 396 397 static void 398 scheduler_reset_events(void) 399 { 400 struct timeval tv; 401 402 evtimer_del(&ev); 403 tv.tv_sec = 0; 404 tv.tv_usec = 0; 405 evtimer_add(&ev, &tv); 406 } 407 408 int 409 scheduler(void) 410 { 411 struct passwd *pw; 412 413 backend = scheduler_backend_lookup(backend_scheduler); 414 if (backend == NULL) 415 fatalx("cannot find scheduler backend \"%s\"", 416 backend_scheduler); 417 418 purge_config(PURGE_EVERYTHING & ~PURGE_DISPATCHERS); 419 420 if ((pw = getpwnam(SMTPD_USER)) == NULL) 421 fatalx("unknown user " SMTPD_USER); 422 423 config_process(PROC_SCHEDULER); 424 425 backend->init(backend_scheduler); 426 427 if (chroot(PATH_CHROOT) == -1) 428 fatal("scheduler: chroot"); 429 if (chdir("/") == -1) 430 fatal("scheduler: chdir(\"/\")"); 431 432 if (setgroups(1, &pw->pw_gid) || 433 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 434 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 435 fatal("scheduler: cannot drop privileges"); 436 437 evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids); 438 types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types); 439 msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids); 440 state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state); 441 442 imsg_callback = scheduler_imsg; 443 event_init(); 444 445 signal(SIGINT, SIG_IGN); 446 signal(SIGTERM, SIG_IGN); 447 signal(SIGPIPE, SIG_IGN); 448 signal(SIGHUP, SIG_IGN); 449 450 config_peer(PROC_CONTROL); 451 config_peer(PROC_QUEUE); 452 453 evtimer_set(&ev, scheduler_timeout, NULL); 454 scheduler_reset_events(); 455 456 if (pledge("stdio", NULL) == -1) 457 fatal("pledge"); 458 459 event_dispatch(); 460 fatalx("exited event loop"); 461 462 return (0); 463 } 464 465 static void 466 scheduler_timeout(int fd, short event, void *p) 467 { 468 struct timeval tv; 469 size_t i; 470 size_t d_inflight; 471 size_t d_envelope; 472 size_t d_removed; 473 size_t d_expired; 474 size_t d_updated; 475 size_t count; 476 int mask, r, delay; 477 478 tv.tv_sec = 0; 479 tv.tv_usec = 0; 480 481 mask = SCHED_UPDATE; 482 483 if (ninflight < env->sc_scheduler_max_inflight) { 484 mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE; 485 if (!(env->sc_flags & SMTPD_MDA_PAUSED)) 486 mask |= SCHED_MDA; 487 if (!(env->sc_flags & SMTPD_MTA_PAUSED)) 488 mask |= SCHED_MTA; 489 } 490 491 count = env->sc_scheduler_max_schedule; 492 493 log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count); 494 495 r = backend->batch(mask, &delay, &count, evpids, types); 496 497 log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count); 498 499 if (r < 0) 500 fatalx("scheduler: error in batch handler"); 501 502 if (r == 0) { 503 504 if (delay < -1) 505 fatalx("scheduler: invalid delay %d", delay); 506 507 if (delay == -1) { 508 log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); 509 return; 510 } 511 512 tv.tv_sec = delay; 513 tv.tv_usec = 0; 514 log_trace(TRACE_SCHEDULER, 515 "scheduler: waiting for %s", duration_to_text(tv.tv_sec)); 516 evtimer_add(&ev, &tv); 517 return; 518 } 519 520 d_inflight = 0; 521 d_envelope = 0; 522 d_removed = 0; 523 d_expired = 0; 524 d_updated = 0; 525 526 for (i = 0; i < count; i++) { 527 switch(types[i]) { 528 case SCHED_REMOVE: 529 log_debug("debug: scheduler: evp:%016" PRIx64 530 " removed", evpids[i]); 531 m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1); 532 m_add_evpid(p_queue, evpids[i]); 533 m_close(p_queue); 534 d_envelope += 1; 535 d_removed += 1; 536 d_inflight += 1; 537 break; 538 539 case SCHED_EXPIRE: 540 log_debug("debug: scheduler: evp:%016" PRIx64 541 " expired", evpids[i]); 542 m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1); 543 m_add_evpid(p_queue, evpids[i]); 544 m_close(p_queue); 545 d_envelope += 1; 546 d_expired += 1; 547 d_inflight += 1; 548 break; 549 550 case SCHED_UPDATE: 551 log_debug("debug: scheduler: evp:%016" PRIx64 552 " scheduled (update)", evpids[i]); 553 d_updated += 1; 554 break; 555 556 case SCHED_BOUNCE: 557 log_debug("debug: scheduler: evp:%016" PRIx64 558 " scheduled (bounce)", evpids[i]); 559 m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1); 560 m_add_evpid(p_queue, evpids[i]); 561 m_close(p_queue); 562 d_inflight += 1; 563 break; 564 565 case SCHED_MDA: 566 log_debug("debug: scheduler: evp:%016" PRIx64 567 " scheduled (mda)", evpids[i]); 568 m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1); 569 m_add_evpid(p_queue, evpids[i]); 570 m_close(p_queue); 571 d_inflight += 1; 572 break; 573 574 case SCHED_MTA: 575 log_debug("debug: scheduler: evp:%016" PRIx64 576 " scheduled (mta)", evpids[i]); 577 m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1); 578 m_add_evpid(p_queue, evpids[i]); 579 m_close(p_queue); 580 d_inflight += 1; 581 break; 582 } 583 } 584 585 stat_decrement("scheduler.envelope", d_envelope); 586 stat_increment("scheduler.envelope.inflight", d_inflight); 587 stat_increment("scheduler.envelope.expired", d_expired); 588 stat_increment("scheduler.envelope.removed", d_removed); 589 stat_increment("scheduler.envelope.updated", d_updated); 590 591 ninflight += d_inflight; 592 593 tv.tv_sec = 0; 594 tv.tv_usec = 0; 595 evtimer_add(&ev, &tv); 596 } 597