1 /* $OpenBSD: scheduler.c,v 1.53 2016/09/01 10:54:25 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> 7 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 8 * 9 * Permission to use, copy, modify, and distribute this software for any 10 * purpose with or without fee is hereby granted, provided that the above 11 * copyright notice and this permission notice appear in all copies. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 14 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 15 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 16 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 17 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 18 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 19 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 20 */ 21 22 #include <sys/types.h> 23 #include <sys/queue.h> 24 #include <sys/tree.h> 25 #include <sys/socket.h> 26 #include <sys/stat.h> 27 28 #include <ctype.h> 29 #include <dirent.h> 30 #include <err.h> 31 #include <errno.h> 32 #include <event.h> 33 #include <imsg.h> 34 #include <inttypes.h> 35 #include <libgen.h> 36 #include <pwd.h> 37 #include <signal.h> 38 #include <stdio.h> 39 #include <stdlib.h> 40 #include <string.h> 41 #include <time.h> 42 #include <unistd.h> 43 #include <limits.h> 44 45 #include "smtpd.h" 46 #include "log.h" 47 48 static void scheduler_imsg(struct mproc *, struct imsg *); 49 static void scheduler_shutdown(void); 50 static void scheduler_sig_handler(int, short, void *); 51 static void scheduler_reset_events(void); 52 static void scheduler_timeout(int, short, void *); 53 54 static struct scheduler_backend *backend = NULL; 55 static struct event ev; 56 static size_t ninflight = 0; 57 static int *types; 58 static uint64_t *evpids; 59 static uint32_t *msgids; 60 static struct evpstate *state; 61 62 extern const char *backend_scheduler; 63 64 void 65 scheduler_imsg(struct mproc *p, struct imsg *imsg) 66 { 67 struct bounce_req_msg req; 68 struct envelope evp; 69 struct scheduler_info si; 70 struct msg m; 71 uint64_t evpid, id, holdq; 72 uint32_t msgid; 73 uint32_t inflight; 74 size_t n, i; 75 time_t timestamp; 76 int v, r, type; 77 78 switch (imsg->hdr.type) { 79 80 case IMSG_QUEUE_ENVELOPE_SUBMIT: 81 m_msg(&m, imsg); 82 m_get_envelope(&m, &evp); 83 m_end(&m); 84 log_trace(TRACE_SCHEDULER, 85 "scheduler: inserting evp:%016" PRIx64, evp.id); 86 scheduler_info(&si, &evp); 87 stat_increment("scheduler.envelope.incoming", 1); 88 backend->insert(&si); 89 return; 90 91 case IMSG_QUEUE_MESSAGE_COMMIT: 92 m_msg(&m, imsg); 93 m_get_msgid(&m, &msgid); 94 m_end(&m); 95 log_trace(TRACE_SCHEDULER, 96 "scheduler: committing msg:%08" PRIx32, msgid); 97 n = backend->commit(msgid); 98 stat_decrement("scheduler.envelope.incoming", n); 99 stat_increment("scheduler.envelope", n); 100 scheduler_reset_events(); 101 return; 102 103 case IMSG_QUEUE_DISCOVER_EVPID: 104 m_msg(&m, imsg); 105 m_get_envelope(&m, &evp); 106 m_end(&m); 107 r = backend->query(evp.id); 108 if (r) { 109 log_debug("debug: scheduler: evp:%016" PRIx64 110 " already scheduled", evp.id); 111 return; 112 } 113 log_trace(TRACE_SCHEDULER, 114 "scheduler: discovering evp:%016" PRIx64, evp.id); 115 scheduler_info(&si, &evp); 116 stat_increment("scheduler.envelope.incoming", 1); 117 backend->insert(&si); 118 return; 119 120 case IMSG_QUEUE_DISCOVER_MSGID: 121 m_msg(&m, imsg); 122 m_get_msgid(&m, &msgid); 123 m_end(&m); 124 r = backend->query(msgid); 125 if (r) { 126 log_debug("debug: scheduler: msgid:%08" PRIx32 127 " already scheduled", msgid); 128 return; 129 } 130 log_trace(TRACE_SCHEDULER, 131 "scheduler: committing msg:%08" PRIx32, msgid); 132 n = backend->commit(msgid); 133 stat_decrement("scheduler.envelope.incoming", n); 134 stat_increment("scheduler.envelope", n); 135 scheduler_reset_events(); 136 return; 137 138 case IMSG_QUEUE_MESSAGE_ROLLBACK: 139 m_msg(&m, imsg); 140 m_get_msgid(&m, &msgid); 141 m_end(&m); 142 log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32, 143 msgid); 144 n = backend->rollback(msgid); 145 stat_decrement("scheduler.envelope.incoming", n); 146 scheduler_reset_events(); 147 return; 148 149 case IMSG_QUEUE_ENVELOPE_REMOVE: 150 m_msg(&m, imsg); 151 m_get_evpid(&m, &evpid); 152 m_get_u32(&m, &inflight); 153 m_end(&m); 154 log_trace(TRACE_SCHEDULER, 155 "scheduler: queue requested removal of evp:%016" PRIx64, 156 evpid); 157 stat_decrement("scheduler.envelope", 1); 158 if (!inflight) 159 backend->remove(evpid); 160 else { 161 backend->delete(evpid); 162 ninflight -= 1; 163 stat_decrement("scheduler.envelope.inflight", 1); 164 } 165 166 scheduler_reset_events(); 167 return; 168 169 case IMSG_QUEUE_ENVELOPE_ACK: 170 m_msg(&m, imsg); 171 m_get_evpid(&m, &evpid); 172 m_end(&m); 173 log_trace(TRACE_SCHEDULER, 174 "scheduler: queue ack removal of evp:%016" PRIx64, 175 evpid); 176 ninflight -= 1; 177 stat_decrement("scheduler.envelope.inflight", 1); 178 scheduler_reset_events(); 179 return; 180 181 case IMSG_QUEUE_DELIVERY_OK: 182 m_msg(&m, imsg); 183 m_get_evpid(&m, &evpid); 184 m_end(&m); 185 log_trace(TRACE_SCHEDULER, 186 "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid); 187 backend->delete(evpid); 188 ninflight -= 1; 189 stat_increment("scheduler.delivery.ok", 1); 190 stat_decrement("scheduler.envelope.inflight", 1); 191 stat_decrement("scheduler.envelope", 1); 192 scheduler_reset_events(); 193 return; 194 195 case IMSG_QUEUE_DELIVERY_TEMPFAIL: 196 m_msg(&m, imsg); 197 m_get_envelope(&m, &evp); 198 m_end(&m); 199 log_trace(TRACE_SCHEDULER, 200 "scheduler: updating evp:%016" PRIx64, evp.id); 201 scheduler_info(&si, &evp); 202 backend->update(&si); 203 ninflight -= 1; 204 stat_increment("scheduler.delivery.tempfail", 1); 205 stat_decrement("scheduler.envelope.inflight", 1); 206 207 for (i = 0; i < MAX_BOUNCE_WARN; i++) { 208 if (env->sc_bounce_warn[i] == 0) 209 break; 210 timestamp = si.creation + env->sc_bounce_warn[i]; 211 if (si.nexttry >= timestamp && 212 si.lastbounce < timestamp) { 213 req.evpid = evp.id; 214 req.timestamp = timestamp; 215 req.bounce.type = B_WARNING; 216 req.bounce.delay = env->sc_bounce_warn[i]; 217 req.bounce.expire = si.expire; 218 m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1, 219 &req, sizeof req); 220 break; 221 } 222 } 223 scheduler_reset_events(); 224 return; 225 226 case IMSG_QUEUE_DELIVERY_PERMFAIL: 227 m_msg(&m, imsg); 228 m_get_evpid(&m, &evpid); 229 m_end(&m); 230 log_trace(TRACE_SCHEDULER, 231 "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid); 232 backend->delete(evpid); 233 ninflight -= 1; 234 stat_increment("scheduler.delivery.permfail", 1); 235 stat_decrement("scheduler.envelope.inflight", 1); 236 stat_decrement("scheduler.envelope", 1); 237 scheduler_reset_events(); 238 return; 239 240 case IMSG_QUEUE_DELIVERY_LOOP: 241 m_msg(&m, imsg); 242 m_get_evpid(&m, &evpid); 243 m_end(&m); 244 log_trace(TRACE_SCHEDULER, 245 "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid); 246 backend->delete(evpid); 247 ninflight -= 1; 248 stat_increment("scheduler.delivery.loop", 1); 249 stat_decrement("scheduler.envelope.inflight", 1); 250 stat_decrement("scheduler.envelope", 1); 251 scheduler_reset_events(); 252 return; 253 254 case IMSG_QUEUE_HOLDQ_HOLD: 255 m_msg(&m, imsg); 256 m_get_evpid(&m, &evpid); 257 m_get_id(&m, &holdq); 258 m_end(&m); 259 log_trace(TRACE_SCHEDULER, 260 "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64, 261 evpid, holdq); 262 backend->hold(evpid, holdq); 263 ninflight -= 1; 264 stat_decrement("scheduler.envelope.inflight", 1); 265 scheduler_reset_events(); 266 return; 267 268 case IMSG_QUEUE_HOLDQ_RELEASE: 269 m_msg(&m, imsg); 270 m_get_int(&m, &type); 271 m_get_id(&m, &holdq); 272 m_get_int(&m, &r); 273 m_end(&m); 274 log_trace(TRACE_SCHEDULER, 275 "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")", 276 r, type, holdq); 277 backend->release(type, holdq, r); 278 scheduler_reset_events(); 279 return; 280 281 case IMSG_CTL_PAUSE_MDA: 282 log_trace(TRACE_SCHEDULER, "scheduler: pausing mda"); 283 env->sc_flags |= SMTPD_MDA_PAUSED; 284 return; 285 286 case IMSG_CTL_RESUME_MDA: 287 log_trace(TRACE_SCHEDULER, "scheduler: resuming mda"); 288 env->sc_flags &= ~SMTPD_MDA_PAUSED; 289 scheduler_reset_events(); 290 return; 291 292 case IMSG_CTL_PAUSE_MTA: 293 log_trace(TRACE_SCHEDULER, "scheduler: pausing mta"); 294 env->sc_flags |= SMTPD_MTA_PAUSED; 295 return; 296 297 case IMSG_CTL_RESUME_MTA: 298 log_trace(TRACE_SCHEDULER, "scheduler: resuming mta"); 299 env->sc_flags &= ~SMTPD_MTA_PAUSED; 300 scheduler_reset_events(); 301 return; 302 303 case IMSG_CTL_VERBOSE: 304 m_msg(&m, imsg); 305 m_get_int(&m, &v); 306 m_end(&m); 307 log_verbose(v); 308 return; 309 310 case IMSG_CTL_PROFILE: 311 m_msg(&m, imsg); 312 m_get_int(&m, &v); 313 m_end(&m); 314 profiling = v; 315 return; 316 317 case IMSG_CTL_LIST_MESSAGES: 318 msgid = *(uint32_t *)(imsg->data); 319 n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size); 320 m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1, 321 msgids, n * sizeof (*msgids)); 322 return; 323 324 case IMSG_CTL_LIST_ENVELOPES: 325 id = *(uint64_t *)(imsg->data); 326 n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size); 327 for (i = 0; i < n; i++) { 328 m_create(p_queue, IMSG_CTL_LIST_ENVELOPES, 329 imsg->hdr.peerid, 0, -1); 330 m_add_evpid(p_queue, state[i].evpid); 331 m_add_int(p_queue, state[i].flags); 332 m_add_time(p_queue, state[i].time); 333 m_close(p_queue); 334 } 335 m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES, 336 imsg->hdr.peerid, 0, -1, NULL, 0); 337 return; 338 339 case IMSG_CTL_SCHEDULE: 340 id = *(uint64_t *)(imsg->data); 341 if (id <= 0xffffffffL) 342 log_debug("debug: scheduler: " 343 "scheduling msg:%08" PRIx64, id); 344 else 345 log_debug("debug: scheduler: " 346 "scheduling evp:%016" PRIx64, id); 347 r = backend->schedule(id); 348 scheduler_reset_events(); 349 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 350 0, -1, NULL, 0); 351 return; 352 353 case IMSG_QUEUE_ENVELOPE_SCHEDULE: 354 id = *(uint64_t *)(imsg->data); 355 backend->schedule(id); 356 scheduler_reset_events(); 357 return; 358 359 case IMSG_CTL_REMOVE: 360 id = *(uint64_t *)(imsg->data); 361 if (id <= 0xffffffffL) 362 log_debug("debug: scheduler: " 363 "removing msg:%08" PRIx64, id); 364 else 365 log_debug("debug: scheduler: " 366 "removing evp:%016" PRIx64, id); 367 r = backend->remove(id); 368 scheduler_reset_events(); 369 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 370 0, -1, NULL, 0); 371 return; 372 373 case IMSG_CTL_PAUSE_EVP: 374 id = *(uint64_t *)(imsg->data); 375 if (id <= 0xffffffffL) 376 log_debug("debug: scheduler: " 377 "suspending msg:%08" PRIx64, id); 378 else 379 log_debug("debug: scheduler: " 380 "suspending evp:%016" PRIx64, id); 381 r = backend->suspend(id); 382 scheduler_reset_events(); 383 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 384 0, -1, NULL, 0); 385 return; 386 387 case IMSG_CTL_RESUME_EVP: 388 id = *(uint64_t *)(imsg->data); 389 if (id <= 0xffffffffL) 390 log_debug("debug: scheduler: " 391 "resuming msg:%08" PRIx64, id); 392 else 393 log_debug("debug: scheduler: " 394 "resuming evp:%016" PRIx64, id); 395 r = backend->resume(id); 396 scheduler_reset_events(); 397 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 398 0, -1, NULL, 0); 399 return; 400 } 401 402 errx(1, "scheduler_imsg: unexpected %s imsg", 403 imsg_to_str(imsg->hdr.type)); 404 } 405 406 static void 407 scheduler_sig_handler(int sig, short event, void *p) 408 { 409 switch (sig) { 410 case SIGINT: 411 case SIGTERM: 412 scheduler_shutdown(); 413 break; 414 default: 415 fatalx("scheduler_sig_handler: unexpected signal"); 416 } 417 } 418 419 static void 420 scheduler_shutdown(void) 421 { 422 log_info("info: scheduler handler exiting"); 423 _exit(0); 424 } 425 426 static void 427 scheduler_reset_events(void) 428 { 429 struct timeval tv; 430 431 evtimer_del(&ev); 432 tv.tv_sec = 0; 433 tv.tv_usec = 0; 434 evtimer_add(&ev, &tv); 435 } 436 437 int 438 scheduler(void) 439 { 440 struct passwd *pw; 441 struct event ev_sigint; 442 struct event ev_sigterm; 443 444 backend = scheduler_backend_lookup(backend_scheduler); 445 if (backend == NULL) 446 errx(1, "cannot find scheduler backend \"%s\"", 447 backend_scheduler); 448 449 purge_config(PURGE_EVERYTHING); 450 451 if ((pw = getpwnam(SMTPD_USER)) == NULL) 452 fatalx("unknown user " SMTPD_USER); 453 454 config_process(PROC_SCHEDULER); 455 456 backend->init(backend_scheduler); 457 458 if (chroot(PATH_CHROOT) == -1) 459 fatal("scheduler: chroot"); 460 if (chdir("/") == -1) 461 fatal("scheduler: chdir(\"/\")"); 462 463 if (setgroups(1, &pw->pw_gid) || 464 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 465 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 466 fatal("scheduler: cannot drop privileges"); 467 468 evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids, "scheduler: init evpids"); 469 types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types, "scheduler: init types"); 470 msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids, "scheduler: list msg"); 471 state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state, "scheduler: list evp"); 472 473 imsg_callback = scheduler_imsg; 474 event_init(); 475 476 signal_set(&ev_sigint, SIGINT, scheduler_sig_handler, NULL); 477 signal_set(&ev_sigterm, SIGTERM, scheduler_sig_handler, NULL); 478 signal_add(&ev_sigint, NULL); 479 signal_add(&ev_sigterm, NULL); 480 signal(SIGPIPE, SIG_IGN); 481 signal(SIGHUP, SIG_IGN); 482 483 config_peer(PROC_CONTROL); 484 config_peer(PROC_QUEUE); 485 486 evtimer_set(&ev, scheduler_timeout, NULL); 487 scheduler_reset_events(); 488 489 if (pledge("stdio", NULL) == -1) 490 err(1, "pledge"); 491 492 if (event_dispatch() < 0) 493 fatal("event_dispatch"); 494 scheduler_shutdown(); 495 496 return (0); 497 } 498 499 static void 500 scheduler_timeout(int fd, short event, void *p) 501 { 502 struct timeval tv; 503 size_t i; 504 size_t d_inflight; 505 size_t d_envelope; 506 size_t d_removed; 507 size_t d_expired; 508 size_t d_updated; 509 size_t count; 510 int mask, r, delay; 511 512 tv.tv_sec = 0; 513 tv.tv_usec = 0; 514 515 mask = SCHED_UPDATE; 516 517 if (ninflight < env->sc_scheduler_max_inflight) { 518 mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE; 519 if (!(env->sc_flags & SMTPD_MDA_PAUSED)) 520 mask |= SCHED_MDA; 521 if (!(env->sc_flags & SMTPD_MTA_PAUSED)) 522 mask |= SCHED_MTA; 523 } 524 525 count = env->sc_scheduler_max_schedule; 526 527 log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count); 528 529 r = backend->batch(mask, &delay, &count, evpids, types); 530 531 log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count); 532 533 if (r < 0) 534 fatalx("scheduler: error in batch handler"); 535 536 if (r == 0) { 537 538 if (delay < -1) 539 fatalx("scheduler: invalid delay %d", delay); 540 541 if (delay == -1) { 542 log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); 543 return; 544 } 545 546 tv.tv_sec = delay; 547 tv.tv_usec = 0; 548 log_trace(TRACE_SCHEDULER, 549 "scheduler: waiting for %s", duration_to_text(tv.tv_sec)); 550 evtimer_add(&ev, &tv); 551 return; 552 } 553 554 d_inflight = 0; 555 d_envelope = 0; 556 d_removed = 0; 557 d_expired = 0; 558 d_updated = 0; 559 560 for (i = 0; i < count; i++) { 561 switch(types[i]) { 562 case SCHED_REMOVE: 563 log_debug("debug: scheduler: evp:%016" PRIx64 564 " removed", evpids[i]); 565 m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1); 566 m_add_evpid(p_queue, evpids[i]); 567 m_close(p_queue); 568 d_envelope += 1; 569 d_removed += 1; 570 d_inflight += 1; 571 break; 572 573 case SCHED_EXPIRE: 574 log_debug("debug: scheduler: evp:%016" PRIx64 575 " expired", evpids[i]); 576 m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1); 577 m_add_evpid(p_queue, evpids[i]); 578 m_close(p_queue); 579 d_envelope += 1; 580 d_expired += 1; 581 d_inflight += 1; 582 break; 583 584 case SCHED_UPDATE: 585 log_debug("debug: scheduler: evp:%016" PRIx64 586 " scheduled (update)", evpids[i]); 587 d_updated += 1; 588 break; 589 590 case SCHED_BOUNCE: 591 log_debug("debug: scheduler: evp:%016" PRIx64 592 " scheduled (bounce)", evpids[i]); 593 m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1); 594 m_add_evpid(p_queue, evpids[i]); 595 m_close(p_queue); 596 d_inflight += 1; 597 break; 598 599 case SCHED_MDA: 600 log_debug("debug: scheduler: evp:%016" PRIx64 601 " scheduled (mda)", evpids[i]); 602 m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1); 603 m_add_evpid(p_queue, evpids[i]); 604 m_close(p_queue); 605 d_inflight += 1; 606 break; 607 608 case SCHED_MTA: 609 log_debug("debug: scheduler: evp:%016" PRIx64 610 " scheduled (mta)", evpids[i]); 611 m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1); 612 m_add_evpid(p_queue, evpids[i]); 613 m_close(p_queue); 614 d_inflight += 1; 615 break; 616 } 617 } 618 619 stat_decrement("scheduler.envelope", d_envelope); 620 stat_increment("scheduler.envelope.inflight", d_inflight); 621 stat_increment("scheduler.envelope.expired", d_expired); 622 stat_increment("scheduler.envelope.removed", d_removed); 623 stat_increment("scheduler.envelope.updated", d_updated); 624 625 ninflight += d_inflight; 626 627 tv.tv_sec = 0; 628 tv.tv_usec = 0; 629 evtimer_add(&ev, &tv); 630 } 631