1 /* $OpenBSD: scheduler.c,v 1.60 2018/12/30 23:09:58 guenther Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> 5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 6 * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> 7 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 8 * 9 * Permission to use, copy, modify, and distribute this software for any 10 * purpose with or without fee is hereby granted, provided that the above 11 * copyright notice and this permission notice appear in all copies. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 14 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 15 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 16 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 17 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 18 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 19 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 20 */ 21 22 #include <sys/types.h> 23 #include <sys/queue.h> 24 #include <sys/tree.h> 25 #include <sys/socket.h> 26 #include <sys/stat.h> 27 28 #include <ctype.h> 29 #include <dirent.h> 30 #include <err.h> 31 #include <errno.h> 32 #include <event.h> 33 #include <imsg.h> 34 #include <inttypes.h> 35 #include <pwd.h> 36 #include <signal.h> 37 #include <stdio.h> 38 #include <stdlib.h> 39 #include <string.h> 40 #include <time.h> 41 #include <unistd.h> 42 #include <limits.h> 43 44 #include "smtpd.h" 45 #include "log.h" 46 47 static void scheduler_imsg(struct mproc *, struct imsg *); 48 static void scheduler_shutdown(void); 49 static void scheduler_reset_events(void); 50 static void scheduler_timeout(int, short, void *); 51 52 static struct scheduler_backend *backend = NULL; 53 static struct event ev; 54 static size_t ninflight = 0; 55 static int *types; 56 static uint64_t *evpids; 57 static uint32_t *msgids; 58 static struct evpstate *state; 59 60 extern const char *backend_scheduler; 61 62 void 63 scheduler_imsg(struct mproc *p, struct imsg *imsg) 64 { 65 struct bounce_req_msg req; 66 struct envelope evp; 67 struct scheduler_info si; 68 struct msg m; 69 uint64_t evpid, id, holdq; 70 uint32_t msgid; 71 uint32_t inflight; 72 size_t n, i; 73 time_t timestamp; 74 int v, r, type; 75 76 if (imsg == NULL) 77 scheduler_shutdown(); 78 79 switch (imsg->hdr.type) { 80 81 case IMSG_QUEUE_ENVELOPE_SUBMIT: 82 m_msg(&m, imsg); 83 m_get_envelope(&m, &evp); 84 m_end(&m); 85 log_trace(TRACE_SCHEDULER, 86 "scheduler: inserting evp:%016" PRIx64, evp.id); 87 scheduler_info(&si, &evp); 88 stat_increment("scheduler.envelope.incoming", 1); 89 backend->insert(&si); 90 return; 91 92 case IMSG_QUEUE_MESSAGE_COMMIT: 93 m_msg(&m, imsg); 94 m_get_msgid(&m, &msgid); 95 m_end(&m); 96 log_trace(TRACE_SCHEDULER, 97 "scheduler: committing msg:%08" PRIx32, msgid); 98 n = backend->commit(msgid); 99 stat_decrement("scheduler.envelope.incoming", n); 100 stat_increment("scheduler.envelope", n); 101 scheduler_reset_events(); 102 return; 103 104 case IMSG_QUEUE_DISCOVER_EVPID: 105 m_msg(&m, imsg); 106 m_get_envelope(&m, &evp); 107 m_end(&m); 108 r = backend->query(evp.id); 109 if (r) { 110 log_debug("debug: scheduler: evp:%016" PRIx64 111 " already scheduled", evp.id); 112 return; 113 } 114 log_trace(TRACE_SCHEDULER, 115 "scheduler: discovering evp:%016" PRIx64, evp.id); 116 scheduler_info(&si, &evp); 117 stat_increment("scheduler.envelope.incoming", 1); 118 backend->insert(&si); 119 return; 120 121 case IMSG_QUEUE_DISCOVER_MSGID: 122 m_msg(&m, imsg); 123 m_get_msgid(&m, &msgid); 124 m_end(&m); 125 r = backend->query(msgid); 126 if (r) { 127 log_debug("debug: scheduler: msgid:%08" PRIx32 128 " already scheduled", msgid); 129 return; 130 } 131 log_trace(TRACE_SCHEDULER, 132 "scheduler: committing msg:%08" PRIx32, msgid); 133 n = backend->commit(msgid); 134 stat_decrement("scheduler.envelope.incoming", n); 135 stat_increment("scheduler.envelope", n); 136 scheduler_reset_events(); 137 return; 138 139 case IMSG_QUEUE_MESSAGE_ROLLBACK: 140 m_msg(&m, imsg); 141 m_get_msgid(&m, &msgid); 142 m_end(&m); 143 log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32, 144 msgid); 145 n = backend->rollback(msgid); 146 stat_decrement("scheduler.envelope.incoming", n); 147 scheduler_reset_events(); 148 return; 149 150 case IMSG_QUEUE_ENVELOPE_REMOVE: 151 m_msg(&m, imsg); 152 m_get_evpid(&m, &evpid); 153 m_get_u32(&m, &inflight); 154 m_end(&m); 155 log_trace(TRACE_SCHEDULER, 156 "scheduler: queue requested removal of evp:%016" PRIx64, 157 evpid); 158 stat_decrement("scheduler.envelope", 1); 159 if (!inflight) 160 backend->remove(evpid); 161 else { 162 backend->delete(evpid); 163 ninflight -= 1; 164 stat_decrement("scheduler.envelope.inflight", 1); 165 } 166 167 scheduler_reset_events(); 168 return; 169 170 case IMSG_QUEUE_ENVELOPE_ACK: 171 m_msg(&m, imsg); 172 m_get_evpid(&m, &evpid); 173 m_end(&m); 174 log_trace(TRACE_SCHEDULER, 175 "scheduler: queue ack removal of evp:%016" PRIx64, 176 evpid); 177 ninflight -= 1; 178 stat_decrement("scheduler.envelope.inflight", 1); 179 scheduler_reset_events(); 180 return; 181 182 case IMSG_QUEUE_DELIVERY_OK: 183 m_msg(&m, imsg); 184 m_get_evpid(&m, &evpid); 185 m_end(&m); 186 log_trace(TRACE_SCHEDULER, 187 "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid); 188 backend->delete(evpid); 189 ninflight -= 1; 190 stat_increment("scheduler.delivery.ok", 1); 191 stat_decrement("scheduler.envelope.inflight", 1); 192 stat_decrement("scheduler.envelope", 1); 193 scheduler_reset_events(); 194 return; 195 196 case IMSG_QUEUE_DELIVERY_TEMPFAIL: 197 m_msg(&m, imsg); 198 m_get_envelope(&m, &evp); 199 m_end(&m); 200 log_trace(TRACE_SCHEDULER, 201 "scheduler: updating evp:%016" PRIx64, evp.id); 202 scheduler_info(&si, &evp); 203 backend->update(&si); 204 ninflight -= 1; 205 stat_increment("scheduler.delivery.tempfail", 1); 206 stat_decrement("scheduler.envelope.inflight", 1); 207 208 for (i = 0; i < MAX_BOUNCE_WARN; i++) { 209 if (env->sc_bounce_warn[i] == 0) 210 break; 211 timestamp = si.creation + env->sc_bounce_warn[i]; 212 if (si.nexttry >= timestamp && 213 si.lastbounce < timestamp) { 214 req.evpid = evp.id; 215 req.timestamp = timestamp; 216 req.bounce.type = B_DELAYED; 217 req.bounce.delay = env->sc_bounce_warn[i]; 218 req.bounce.ttl = si.ttl; 219 m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1, 220 &req, sizeof req); 221 break; 222 } 223 } 224 scheduler_reset_events(); 225 return; 226 227 case IMSG_QUEUE_DELIVERY_PERMFAIL: 228 m_msg(&m, imsg); 229 m_get_evpid(&m, &evpid); 230 m_end(&m); 231 log_trace(TRACE_SCHEDULER, 232 "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid); 233 backend->delete(evpid); 234 ninflight -= 1; 235 stat_increment("scheduler.delivery.permfail", 1); 236 stat_decrement("scheduler.envelope.inflight", 1); 237 stat_decrement("scheduler.envelope", 1); 238 scheduler_reset_events(); 239 return; 240 241 case IMSG_QUEUE_DELIVERY_LOOP: 242 m_msg(&m, imsg); 243 m_get_evpid(&m, &evpid); 244 m_end(&m); 245 log_trace(TRACE_SCHEDULER, 246 "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid); 247 backend->delete(evpid); 248 ninflight -= 1; 249 stat_increment("scheduler.delivery.loop", 1); 250 stat_decrement("scheduler.envelope.inflight", 1); 251 stat_decrement("scheduler.envelope", 1); 252 scheduler_reset_events(); 253 return; 254 255 case IMSG_QUEUE_HOLDQ_HOLD: 256 m_msg(&m, imsg); 257 m_get_evpid(&m, &evpid); 258 m_get_id(&m, &holdq); 259 m_end(&m); 260 log_trace(TRACE_SCHEDULER, 261 "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64, 262 evpid, holdq); 263 backend->hold(evpid, holdq); 264 ninflight -= 1; 265 stat_decrement("scheduler.envelope.inflight", 1); 266 scheduler_reset_events(); 267 return; 268 269 case IMSG_QUEUE_HOLDQ_RELEASE: 270 m_msg(&m, imsg); 271 m_get_int(&m, &type); 272 m_get_id(&m, &holdq); 273 m_get_int(&m, &r); 274 m_end(&m); 275 log_trace(TRACE_SCHEDULER, 276 "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")", 277 r, type, holdq); 278 backend->release(type, holdq, r); 279 scheduler_reset_events(); 280 return; 281 282 case IMSG_CTL_PAUSE_MDA: 283 log_trace(TRACE_SCHEDULER, "scheduler: pausing mda"); 284 env->sc_flags |= SMTPD_MDA_PAUSED; 285 return; 286 287 case IMSG_CTL_RESUME_MDA: 288 log_trace(TRACE_SCHEDULER, "scheduler: resuming mda"); 289 env->sc_flags &= ~SMTPD_MDA_PAUSED; 290 scheduler_reset_events(); 291 return; 292 293 case IMSG_CTL_PAUSE_MTA: 294 log_trace(TRACE_SCHEDULER, "scheduler: pausing mta"); 295 env->sc_flags |= SMTPD_MTA_PAUSED; 296 return; 297 298 case IMSG_CTL_RESUME_MTA: 299 log_trace(TRACE_SCHEDULER, "scheduler: resuming mta"); 300 env->sc_flags &= ~SMTPD_MTA_PAUSED; 301 scheduler_reset_events(); 302 return; 303 304 case IMSG_CTL_VERBOSE: 305 m_msg(&m, imsg); 306 m_get_int(&m, &v); 307 m_end(&m); 308 log_setverbose(v); 309 return; 310 311 case IMSG_CTL_PROFILE: 312 m_msg(&m, imsg); 313 m_get_int(&m, &v); 314 m_end(&m); 315 profiling = v; 316 return; 317 318 case IMSG_CTL_LIST_MESSAGES: 319 msgid = *(uint32_t *)(imsg->data); 320 n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size); 321 m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1, 322 msgids, n * sizeof (*msgids)); 323 return; 324 325 case IMSG_CTL_LIST_ENVELOPES: 326 id = *(uint64_t *)(imsg->data); 327 n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size); 328 for (i = 0; i < n; i++) { 329 m_create(p_queue, IMSG_CTL_LIST_ENVELOPES, 330 imsg->hdr.peerid, 0, -1); 331 m_add_evpid(p_queue, state[i].evpid); 332 m_add_int(p_queue, state[i].flags); 333 m_add_time(p_queue, state[i].time); 334 m_close(p_queue); 335 } 336 m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES, 337 imsg->hdr.peerid, 0, -1, NULL, 0); 338 return; 339 340 case IMSG_CTL_SCHEDULE: 341 id = *(uint64_t *)(imsg->data); 342 if (id <= 0xffffffffL) 343 log_debug("debug: scheduler: " 344 "scheduling msg:%08" PRIx64, id); 345 else 346 log_debug("debug: scheduler: " 347 "scheduling evp:%016" PRIx64, id); 348 r = backend->schedule(id); 349 scheduler_reset_events(); 350 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 351 0, -1, NULL, 0); 352 return; 353 354 case IMSG_QUEUE_ENVELOPE_SCHEDULE: 355 id = *(uint64_t *)(imsg->data); 356 backend->schedule(id); 357 scheduler_reset_events(); 358 return; 359 360 case IMSG_CTL_REMOVE: 361 id = *(uint64_t *)(imsg->data); 362 if (id <= 0xffffffffL) 363 log_debug("debug: scheduler: " 364 "removing msg:%08" PRIx64, id); 365 else 366 log_debug("debug: scheduler: " 367 "removing evp:%016" PRIx64, id); 368 r = backend->remove(id); 369 scheduler_reset_events(); 370 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 371 0, -1, NULL, 0); 372 return; 373 374 case IMSG_CTL_PAUSE_EVP: 375 id = *(uint64_t *)(imsg->data); 376 if (id <= 0xffffffffL) 377 log_debug("debug: scheduler: " 378 "suspending msg:%08" PRIx64, id); 379 else 380 log_debug("debug: scheduler: " 381 "suspending evp:%016" PRIx64, id); 382 r = backend->suspend(id); 383 scheduler_reset_events(); 384 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 385 0, -1, NULL, 0); 386 return; 387 388 case IMSG_CTL_RESUME_EVP: 389 id = *(uint64_t *)(imsg->data); 390 if (id <= 0xffffffffL) 391 log_debug("debug: scheduler: " 392 "resuming msg:%08" PRIx64, id); 393 else 394 log_debug("debug: scheduler: " 395 "resuming evp:%016" PRIx64, id); 396 r = backend->resume(id); 397 scheduler_reset_events(); 398 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid, 399 0, -1, NULL, 0); 400 return; 401 } 402 403 errx(1, "scheduler_imsg: unexpected %s imsg", 404 imsg_to_str(imsg->hdr.type)); 405 } 406 407 static void 408 scheduler_shutdown(void) 409 { 410 log_debug("debug: scheduler agent exiting"); 411 _exit(0); 412 } 413 414 static void 415 scheduler_reset_events(void) 416 { 417 struct timeval tv; 418 419 evtimer_del(&ev); 420 tv.tv_sec = 0; 421 tv.tv_usec = 0; 422 evtimer_add(&ev, &tv); 423 } 424 425 int 426 scheduler(void) 427 { 428 struct passwd *pw; 429 430 backend = scheduler_backend_lookup(backend_scheduler); 431 if (backend == NULL) 432 errx(1, "cannot find scheduler backend \"%s\"", 433 backend_scheduler); 434 435 purge_config(PURGE_EVERYTHING & ~PURGE_DISPATCHERS); 436 437 if ((pw = getpwnam(SMTPD_USER)) == NULL) 438 fatalx("unknown user " SMTPD_USER); 439 440 config_process(PROC_SCHEDULER); 441 442 backend->init(backend_scheduler); 443 444 if (chroot(PATH_CHROOT) == -1) 445 fatal("scheduler: chroot"); 446 if (chdir("/") == -1) 447 fatal("scheduler: chdir(\"/\")"); 448 449 if (setgroups(1, &pw->pw_gid) || 450 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 451 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 452 fatal("scheduler: cannot drop privileges"); 453 454 evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids); 455 types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types); 456 msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids); 457 state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state); 458 459 imsg_callback = scheduler_imsg; 460 event_init(); 461 462 signal(SIGINT, SIG_IGN); 463 signal(SIGTERM, SIG_IGN); 464 signal(SIGPIPE, SIG_IGN); 465 signal(SIGHUP, SIG_IGN); 466 467 config_peer(PROC_CONTROL); 468 config_peer(PROC_QUEUE); 469 470 evtimer_set(&ev, scheduler_timeout, NULL); 471 scheduler_reset_events(); 472 473 if (pledge("stdio", NULL) == -1) 474 err(1, "pledge"); 475 476 event_dispatch(); 477 fatalx("exited event loop"); 478 479 return (0); 480 } 481 482 static void 483 scheduler_timeout(int fd, short event, void *p) 484 { 485 struct timeval tv; 486 size_t i; 487 size_t d_inflight; 488 size_t d_envelope; 489 size_t d_removed; 490 size_t d_expired; 491 size_t d_updated; 492 size_t count; 493 int mask, r, delay; 494 495 tv.tv_sec = 0; 496 tv.tv_usec = 0; 497 498 mask = SCHED_UPDATE; 499 500 if (ninflight < env->sc_scheduler_max_inflight) { 501 mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE; 502 if (!(env->sc_flags & SMTPD_MDA_PAUSED)) 503 mask |= SCHED_MDA; 504 if (!(env->sc_flags & SMTPD_MTA_PAUSED)) 505 mask |= SCHED_MTA; 506 } 507 508 count = env->sc_scheduler_max_schedule; 509 510 log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count); 511 512 r = backend->batch(mask, &delay, &count, evpids, types); 513 514 log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count); 515 516 if (r < 0) 517 fatalx("scheduler: error in batch handler"); 518 519 if (r == 0) { 520 521 if (delay < -1) 522 fatalx("scheduler: invalid delay %d", delay); 523 524 if (delay == -1) { 525 log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); 526 return; 527 } 528 529 tv.tv_sec = delay; 530 tv.tv_usec = 0; 531 log_trace(TRACE_SCHEDULER, 532 "scheduler: waiting for %s", duration_to_text(tv.tv_sec)); 533 evtimer_add(&ev, &tv); 534 return; 535 } 536 537 d_inflight = 0; 538 d_envelope = 0; 539 d_removed = 0; 540 d_expired = 0; 541 d_updated = 0; 542 543 for (i = 0; i < count; i++) { 544 switch(types[i]) { 545 case SCHED_REMOVE: 546 log_debug("debug: scheduler: evp:%016" PRIx64 547 " removed", evpids[i]); 548 m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1); 549 m_add_evpid(p_queue, evpids[i]); 550 m_close(p_queue); 551 d_envelope += 1; 552 d_removed += 1; 553 d_inflight += 1; 554 break; 555 556 case SCHED_EXPIRE: 557 log_debug("debug: scheduler: evp:%016" PRIx64 558 " expired", evpids[i]); 559 m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1); 560 m_add_evpid(p_queue, evpids[i]); 561 m_close(p_queue); 562 d_envelope += 1; 563 d_expired += 1; 564 d_inflight += 1; 565 break; 566 567 case SCHED_UPDATE: 568 log_debug("debug: scheduler: evp:%016" PRIx64 569 " scheduled (update)", evpids[i]); 570 d_updated += 1; 571 break; 572 573 case SCHED_BOUNCE: 574 log_debug("debug: scheduler: evp:%016" PRIx64 575 " scheduled (bounce)", evpids[i]); 576 m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1); 577 m_add_evpid(p_queue, evpids[i]); 578 m_close(p_queue); 579 d_inflight += 1; 580 break; 581 582 case SCHED_MDA: 583 log_debug("debug: scheduler: evp:%016" PRIx64 584 " scheduled (mda)", evpids[i]); 585 m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1); 586 m_add_evpid(p_queue, evpids[i]); 587 m_close(p_queue); 588 d_inflight += 1; 589 break; 590 591 case SCHED_MTA: 592 log_debug("debug: scheduler: evp:%016" PRIx64 593 " scheduled (mta)", evpids[i]); 594 m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1); 595 m_add_evpid(p_queue, evpids[i]); 596 m_close(p_queue); 597 d_inflight += 1; 598 break; 599 } 600 } 601 602 stat_decrement("scheduler.envelope", d_envelope); 603 stat_increment("scheduler.envelope.inflight", d_inflight); 604 stat_increment("scheduler.envelope.expired", d_expired); 605 stat_increment("scheduler.envelope.removed", d_removed); 606 stat_increment("scheduler.envelope.updated", d_updated); 607 608 ninflight += d_inflight; 609 610 tv.tv_sec = 0; 611 tv.tv_usec = 0; 612 evtimer_add(&ev, &tv); 613 } 614