1 /* $OpenBSD: scheduler.c,v 1.62 2021/06/14 17:58:16 eric Exp $ */
2
3 /*
4 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
5 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6 * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
7 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
8 *
9 * Permission to use, copy, modify, and distribute this software for any
10 * purpose with or without fee is hereby granted, provided that the above
11 * copyright notice and this permission notice appear in all copies.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 */
21
22 #include <inttypes.h>
23 #include <pwd.h>
24 #include <signal.h>
25 #include <unistd.h>
26
27 #include "smtpd.h"
28 #include "log.h"
29
30 static void scheduler_imsg(struct mproc *, struct imsg *);
31 static void scheduler_shutdown(void);
32 static void scheduler_reset_events(void);
33 static void scheduler_timeout(int, short, void *);
34
35 static struct scheduler_backend *backend = NULL;
36 static struct event ev;
37 static size_t ninflight = 0;
38 static int *types;
39 static uint64_t *evpids;
40 static uint32_t *msgids;
41 static struct evpstate *state;
42
43 extern const char *backend_scheduler;
44
45 void
scheduler_imsg(struct mproc * p,struct imsg * imsg)46 scheduler_imsg(struct mproc *p, struct imsg *imsg)
47 {
48 struct bounce_req_msg req;
49 struct envelope evp;
50 struct scheduler_info si;
51 struct msg m;
52 uint64_t evpid, id, holdq;
53 uint32_t msgid;
54 uint32_t inflight;
55 size_t n, i;
56 time_t timestamp;
57 int v, r, type;
58
59 if (imsg == NULL)
60 scheduler_shutdown();
61
62 switch (imsg->hdr.type) {
63
64 case IMSG_QUEUE_ENVELOPE_SUBMIT:
65 m_msg(&m, imsg);
66 m_get_envelope(&m, &evp);
67 m_end(&m);
68 log_trace(TRACE_SCHEDULER,
69 "scheduler: inserting evp:%016" PRIx64, evp.id);
70 scheduler_info(&si, &evp);
71 stat_increment("scheduler.envelope.incoming", 1);
72 backend->insert(&si);
73 return;
74
75 case IMSG_QUEUE_MESSAGE_COMMIT:
76 m_msg(&m, imsg);
77 m_get_msgid(&m, &msgid);
78 m_end(&m);
79 log_trace(TRACE_SCHEDULER,
80 "scheduler: committing msg:%08" PRIx32, msgid);
81 n = backend->commit(msgid);
82 stat_decrement("scheduler.envelope.incoming", n);
83 stat_increment("scheduler.envelope", n);
84 scheduler_reset_events();
85 return;
86
87 case IMSG_QUEUE_DISCOVER_EVPID:
88 m_msg(&m, imsg);
89 m_get_envelope(&m, &evp);
90 m_end(&m);
91 r = backend->query(evp.id);
92 if (r) {
93 log_debug("debug: scheduler: evp:%016" PRIx64
94 " already scheduled", evp.id);
95 return;
96 }
97 log_trace(TRACE_SCHEDULER,
98 "scheduler: discovering evp:%016" PRIx64, evp.id);
99 scheduler_info(&si, &evp);
100 stat_increment("scheduler.envelope.incoming", 1);
101 backend->insert(&si);
102 return;
103
104 case IMSG_QUEUE_DISCOVER_MSGID:
105 m_msg(&m, imsg);
106 m_get_msgid(&m, &msgid);
107 m_end(&m);
108 r = backend->query(msgid);
109 if (r) {
110 log_debug("debug: scheduler: msgid:%08" PRIx32
111 " already scheduled", msgid);
112 return;
113 }
114 log_trace(TRACE_SCHEDULER,
115 "scheduler: committing msg:%08" PRIx32, msgid);
116 n = backend->commit(msgid);
117 stat_decrement("scheduler.envelope.incoming", n);
118 stat_increment("scheduler.envelope", n);
119 scheduler_reset_events();
120 return;
121
122 case IMSG_QUEUE_MESSAGE_ROLLBACK:
123 m_msg(&m, imsg);
124 m_get_msgid(&m, &msgid);
125 m_end(&m);
126 log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
127 msgid);
128 n = backend->rollback(msgid);
129 stat_decrement("scheduler.envelope.incoming", n);
130 scheduler_reset_events();
131 return;
132
133 case IMSG_QUEUE_ENVELOPE_REMOVE:
134 m_msg(&m, imsg);
135 m_get_evpid(&m, &evpid);
136 m_get_u32(&m, &inflight);
137 m_end(&m);
138 log_trace(TRACE_SCHEDULER,
139 "scheduler: queue requested removal of evp:%016" PRIx64,
140 evpid);
141 stat_decrement("scheduler.envelope", 1);
142 if (!inflight)
143 backend->remove(evpid);
144 else {
145 backend->delete(evpid);
146 ninflight -= 1;
147 stat_decrement("scheduler.envelope.inflight", 1);
148 }
149
150 scheduler_reset_events();
151 return;
152
153 case IMSG_QUEUE_ENVELOPE_ACK:
154 m_msg(&m, imsg);
155 m_get_evpid(&m, &evpid);
156 m_end(&m);
157 log_trace(TRACE_SCHEDULER,
158 "scheduler: queue ack removal of evp:%016" PRIx64,
159 evpid);
160 ninflight -= 1;
161 stat_decrement("scheduler.envelope.inflight", 1);
162 scheduler_reset_events();
163 return;
164
165 case IMSG_QUEUE_DELIVERY_OK:
166 m_msg(&m, imsg);
167 m_get_evpid(&m, &evpid);
168 m_end(&m);
169 log_trace(TRACE_SCHEDULER,
170 "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid);
171 backend->delete(evpid);
172 ninflight -= 1;
173 stat_increment("scheduler.delivery.ok", 1);
174 stat_decrement("scheduler.envelope.inflight", 1);
175 stat_decrement("scheduler.envelope", 1);
176 scheduler_reset_events();
177 return;
178
179 case IMSG_QUEUE_DELIVERY_TEMPFAIL:
180 m_msg(&m, imsg);
181 m_get_envelope(&m, &evp);
182 m_end(&m);
183 log_trace(TRACE_SCHEDULER,
184 "scheduler: updating evp:%016" PRIx64, evp.id);
185 scheduler_info(&si, &evp);
186 backend->update(&si);
187 ninflight -= 1;
188 stat_increment("scheduler.delivery.tempfail", 1);
189 stat_decrement("scheduler.envelope.inflight", 1);
190
191 for (i = 0; i < MAX_BOUNCE_WARN; i++) {
192 if (env->sc_bounce_warn[i] == 0)
193 break;
194 timestamp = si.creation + env->sc_bounce_warn[i];
195 if (si.nexttry >= timestamp &&
196 si.lastbounce < timestamp) {
197 req.evpid = evp.id;
198 req.timestamp = timestamp;
199 req.bounce.type = B_DELAYED;
200 req.bounce.delay = env->sc_bounce_warn[i];
201 req.bounce.ttl = si.ttl;
202 m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1,
203 &req, sizeof req);
204 break;
205 }
206 }
207 scheduler_reset_events();
208 return;
209
210 case IMSG_QUEUE_DELIVERY_PERMFAIL:
211 m_msg(&m, imsg);
212 m_get_evpid(&m, &evpid);
213 m_end(&m);
214 log_trace(TRACE_SCHEDULER,
215 "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid);
216 backend->delete(evpid);
217 ninflight -= 1;
218 stat_increment("scheduler.delivery.permfail", 1);
219 stat_decrement("scheduler.envelope.inflight", 1);
220 stat_decrement("scheduler.envelope", 1);
221 scheduler_reset_events();
222 return;
223
224 case IMSG_QUEUE_DELIVERY_LOOP:
225 m_msg(&m, imsg);
226 m_get_evpid(&m, &evpid);
227 m_end(&m);
228 log_trace(TRACE_SCHEDULER,
229 "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid);
230 backend->delete(evpid);
231 ninflight -= 1;
232 stat_increment("scheduler.delivery.loop", 1);
233 stat_decrement("scheduler.envelope.inflight", 1);
234 stat_decrement("scheduler.envelope", 1);
235 scheduler_reset_events();
236 return;
237
238 case IMSG_QUEUE_HOLDQ_HOLD:
239 m_msg(&m, imsg);
240 m_get_evpid(&m, &evpid);
241 m_get_id(&m, &holdq);
242 m_end(&m);
243 log_trace(TRACE_SCHEDULER,
244 "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64,
245 evpid, holdq);
246 backend->hold(evpid, holdq);
247 ninflight -= 1;
248 stat_decrement("scheduler.envelope.inflight", 1);
249 scheduler_reset_events();
250 return;
251
252 case IMSG_QUEUE_HOLDQ_RELEASE:
253 m_msg(&m, imsg);
254 m_get_int(&m, &type);
255 m_get_id(&m, &holdq);
256 m_get_int(&m, &r);
257 m_end(&m);
258 log_trace(TRACE_SCHEDULER,
259 "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")",
260 r, type, holdq);
261 backend->release(type, holdq, r);
262 scheduler_reset_events();
263 return;
264
265 case IMSG_CTL_PAUSE_MDA:
266 log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
267 env->sc_flags |= SMTPD_MDA_PAUSED;
268 return;
269
270 case IMSG_CTL_RESUME_MDA:
271 log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
272 env->sc_flags &= ~SMTPD_MDA_PAUSED;
273 scheduler_reset_events();
274 return;
275
276 case IMSG_CTL_PAUSE_MTA:
277 log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
278 env->sc_flags |= SMTPD_MTA_PAUSED;
279 return;
280
281 case IMSG_CTL_RESUME_MTA:
282 log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
283 env->sc_flags &= ~SMTPD_MTA_PAUSED;
284 scheduler_reset_events();
285 return;
286
287 case IMSG_CTL_VERBOSE:
288 m_msg(&m, imsg);
289 m_get_int(&m, &v);
290 m_end(&m);
291 log_setverbose(v);
292 return;
293
294 case IMSG_CTL_PROFILE:
295 m_msg(&m, imsg);
296 m_get_int(&m, &v);
297 m_end(&m);
298 profiling = v;
299 return;
300
301 case IMSG_CTL_LIST_MESSAGES:
302 msgid = *(uint32_t *)(imsg->data);
303 n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size);
304 m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1,
305 msgids, n * sizeof (*msgids));
306 return;
307
308 case IMSG_CTL_LIST_ENVELOPES:
309 id = *(uint64_t *)(imsg->data);
310 n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size);
311 for (i = 0; i < n; i++) {
312 m_create(p_queue, IMSG_CTL_LIST_ENVELOPES,
313 imsg->hdr.peerid, 0, -1);
314 m_add_evpid(p_queue, state[i].evpid);
315 m_add_int(p_queue, state[i].flags);
316 m_add_time(p_queue, state[i].time);
317 m_close(p_queue);
318 }
319 m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES,
320 imsg->hdr.peerid, 0, -1, NULL, 0);
321 return;
322
323 case IMSG_CTL_SCHEDULE:
324 id = *(uint64_t *)(imsg->data);
325 if (id <= 0xffffffffL)
326 log_debug("debug: scheduler: "
327 "scheduling msg:%08" PRIx64, id);
328 else
329 log_debug("debug: scheduler: "
330 "scheduling evp:%016" PRIx64, id);
331 r = backend->schedule(id);
332 scheduler_reset_events();
333 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
334 0, -1, NULL, 0);
335 return;
336
337 case IMSG_QUEUE_ENVELOPE_SCHEDULE:
338 id = *(uint64_t *)(imsg->data);
339 backend->schedule(id);
340 scheduler_reset_events();
341 return;
342
343 case IMSG_CTL_REMOVE:
344 id = *(uint64_t *)(imsg->data);
345 if (id <= 0xffffffffL)
346 log_debug("debug: scheduler: "
347 "removing msg:%08" PRIx64, id);
348 else
349 log_debug("debug: scheduler: "
350 "removing evp:%016" PRIx64, id);
351 r = backend->remove(id);
352 scheduler_reset_events();
353 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
354 0, -1, NULL, 0);
355 return;
356
357 case IMSG_CTL_PAUSE_EVP:
358 id = *(uint64_t *)(imsg->data);
359 if (id <= 0xffffffffL)
360 log_debug("debug: scheduler: "
361 "suspending msg:%08" PRIx64, id);
362 else
363 log_debug("debug: scheduler: "
364 "suspending evp:%016" PRIx64, id);
365 r = backend->suspend(id);
366 scheduler_reset_events();
367 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
368 0, -1, NULL, 0);
369 return;
370
371 case IMSG_CTL_RESUME_EVP:
372 id = *(uint64_t *)(imsg->data);
373 if (id <= 0xffffffffL)
374 log_debug("debug: scheduler: "
375 "resuming msg:%08" PRIx64, id);
376 else
377 log_debug("debug: scheduler: "
378 "resuming evp:%016" PRIx64, id);
379 r = backend->resume(id);
380 scheduler_reset_events();
381 m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
382 0, -1, NULL, 0);
383 return;
384 }
385
386 fatalx("scheduler_imsg: unexpected %s imsg",
387 imsg_to_str(imsg->hdr.type));
388 }
389
390 static void
scheduler_shutdown(void)391 scheduler_shutdown(void)
392 {
393 log_debug("debug: scheduler agent exiting");
394 _exit(0);
395 }
396
397 static void
scheduler_reset_events(void)398 scheduler_reset_events(void)
399 {
400 struct timeval tv;
401
402 evtimer_del(&ev);
403 tv.tv_sec = 0;
404 tv.tv_usec = 0;
405 evtimer_add(&ev, &tv);
406 }
407
408 int
scheduler(void)409 scheduler(void)
410 {
411 struct passwd *pw;
412
413 backend = scheduler_backend_lookup(backend_scheduler);
414 if (backend == NULL)
415 fatalx("cannot find scheduler backend \"%s\"",
416 backend_scheduler);
417
418 purge_config(PURGE_EVERYTHING & ~PURGE_DISPATCHERS);
419
420 if ((pw = getpwnam(SMTPD_USER)) == NULL)
421 fatalx("unknown user " SMTPD_USER);
422
423 config_process(PROC_SCHEDULER);
424
425 backend->init(backend_scheduler);
426
427 if (chroot(PATH_CHROOT) == -1)
428 fatal("scheduler: chroot");
429 if (chdir("/") == -1)
430 fatal("scheduler: chdir(\"/\")");
431
432 if (setgroups(1, &pw->pw_gid) ||
433 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
434 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
435 fatal("scheduler: cannot drop privileges");
436
437 evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids);
438 types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types);
439 msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids);
440 state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state);
441
442 imsg_callback = scheduler_imsg;
443 event_init();
444
445 signal(SIGINT, SIG_IGN);
446 signal(SIGTERM, SIG_IGN);
447 signal(SIGPIPE, SIG_IGN);
448 signal(SIGHUP, SIG_IGN);
449
450 config_peer(PROC_CONTROL);
451 config_peer(PROC_QUEUE);
452
453 evtimer_set(&ev, scheduler_timeout, NULL);
454 scheduler_reset_events();
455
456 if (pledge("stdio", NULL) == -1)
457 fatal("pledge");
458
459 event_dispatch();
460 fatalx("exited event loop");
461
462 return (0);
463 }
464
465 static void
scheduler_timeout(int fd,short event,void * p)466 scheduler_timeout(int fd, short event, void *p)
467 {
468 struct timeval tv;
469 size_t i;
470 size_t d_inflight;
471 size_t d_envelope;
472 size_t d_removed;
473 size_t d_expired;
474 size_t d_updated;
475 size_t count;
476 int mask, r, delay;
477
478 tv.tv_sec = 0;
479 tv.tv_usec = 0;
480
481 mask = SCHED_UPDATE;
482
483 if (ninflight < env->sc_scheduler_max_inflight) {
484 mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE;
485 if (!(env->sc_flags & SMTPD_MDA_PAUSED))
486 mask |= SCHED_MDA;
487 if (!(env->sc_flags & SMTPD_MTA_PAUSED))
488 mask |= SCHED_MTA;
489 }
490
491 count = env->sc_scheduler_max_schedule;
492
493 log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count);
494
495 r = backend->batch(mask, &delay, &count, evpids, types);
496
497 log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count);
498
499 if (r < 0)
500 fatalx("scheduler: error in batch handler");
501
502 if (r == 0) {
503
504 if (delay < -1)
505 fatalx("scheduler: invalid delay %d", delay);
506
507 if (delay == -1) {
508 log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
509 return;
510 }
511
512 tv.tv_sec = delay;
513 tv.tv_usec = 0;
514 log_trace(TRACE_SCHEDULER,
515 "scheduler: waiting for %s", duration_to_text(tv.tv_sec));
516 evtimer_add(&ev, &tv);
517 return;
518 }
519
520 d_inflight = 0;
521 d_envelope = 0;
522 d_removed = 0;
523 d_expired = 0;
524 d_updated = 0;
525
526 for (i = 0; i < count; i++) {
527 switch(types[i]) {
528 case SCHED_REMOVE:
529 log_debug("debug: scheduler: evp:%016" PRIx64
530 " removed", evpids[i]);
531 m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1);
532 m_add_evpid(p_queue, evpids[i]);
533 m_close(p_queue);
534 d_envelope += 1;
535 d_removed += 1;
536 d_inflight += 1;
537 break;
538
539 case SCHED_EXPIRE:
540 log_debug("debug: scheduler: evp:%016" PRIx64
541 " expired", evpids[i]);
542 m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1);
543 m_add_evpid(p_queue, evpids[i]);
544 m_close(p_queue);
545 d_envelope += 1;
546 d_expired += 1;
547 d_inflight += 1;
548 break;
549
550 case SCHED_UPDATE:
551 log_debug("debug: scheduler: evp:%016" PRIx64
552 " scheduled (update)", evpids[i]);
553 d_updated += 1;
554 break;
555
556 case SCHED_BOUNCE:
557 log_debug("debug: scheduler: evp:%016" PRIx64
558 " scheduled (bounce)", evpids[i]);
559 m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1);
560 m_add_evpid(p_queue, evpids[i]);
561 m_close(p_queue);
562 d_inflight += 1;
563 break;
564
565 case SCHED_MDA:
566 log_debug("debug: scheduler: evp:%016" PRIx64
567 " scheduled (mda)", evpids[i]);
568 m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1);
569 m_add_evpid(p_queue, evpids[i]);
570 m_close(p_queue);
571 d_inflight += 1;
572 break;
573
574 case SCHED_MTA:
575 log_debug("debug: scheduler: evp:%016" PRIx64
576 " scheduled (mta)", evpids[i]);
577 m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1);
578 m_add_evpid(p_queue, evpids[i]);
579 m_close(p_queue);
580 d_inflight += 1;
581 break;
582 }
583 }
584
585 stat_decrement("scheduler.envelope", d_envelope);
586 stat_increment("scheduler.envelope.inflight", d_inflight);
587 stat_increment("scheduler.envelope.expired", d_expired);
588 stat_increment("scheduler.envelope.removed", d_removed);
589 stat_increment("scheduler.envelope.updated", d_updated);
590
591 ninflight += d_inflight;
592
593 tv.tv_sec = 0;
594 tv.tv_usec = 0;
595 evtimer_add(&ev, &tv);
596 }
597