xref: /openbsd/usr.sbin/smtpd/scheduler.c (revision d3140113)
1 /*	$OpenBSD: scheduler.c,v 1.62 2021/06/14 17:58:16 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
7  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
8  *
9  * Permission to use, copy, modify, and distribute this software for any
10  * purpose with or without fee is hereby granted, provided that the above
11  * copyright notice and this permission notice appear in all copies.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20  */
21 
22 #include <inttypes.h>
23 #include <pwd.h>
24 #include <signal.h>
25 #include <unistd.h>
26 
27 #include "smtpd.h"
28 #include "log.h"
29 
30 static void scheduler_imsg(struct mproc *, struct imsg *);
31 static void scheduler_shutdown(void);
32 static void scheduler_reset_events(void);
33 static void scheduler_timeout(int, short, void *);
34 
35 static struct scheduler_backend *backend = NULL;
36 static struct event		 ev;
37 static size_t			 ninflight = 0;
38 static int			*types;
39 static uint64_t			*evpids;
40 static uint32_t			*msgids;
41 static struct evpstate		*state;
42 
43 extern const char *backend_scheduler;
44 
45 void
scheduler_imsg(struct mproc * p,struct imsg * imsg)46 scheduler_imsg(struct mproc *p, struct imsg *imsg)
47 {
48 	struct bounce_req_msg	 req;
49 	struct envelope		 evp;
50 	struct scheduler_info	 si;
51 	struct msg		 m;
52 	uint64_t		 evpid, id, holdq;
53 	uint32_t		 msgid;
54 	uint32_t       		 inflight;
55 	size_t			 n, i;
56 	time_t			 timestamp;
57 	int			 v, r, type;
58 
59 	if (imsg == NULL)
60 		scheduler_shutdown();
61 
62 	switch (imsg->hdr.type) {
63 
64 	case IMSG_QUEUE_ENVELOPE_SUBMIT:
65 		m_msg(&m, imsg);
66 		m_get_envelope(&m, &evp);
67 		m_end(&m);
68 		log_trace(TRACE_SCHEDULER,
69 		    "scheduler: inserting evp:%016" PRIx64, evp.id);
70 		scheduler_info(&si, &evp);
71 		stat_increment("scheduler.envelope.incoming", 1);
72 		backend->insert(&si);
73 		return;
74 
75 	case IMSG_QUEUE_MESSAGE_COMMIT:
76 		m_msg(&m, imsg);
77 		m_get_msgid(&m, &msgid);
78 		m_end(&m);
79 		log_trace(TRACE_SCHEDULER,
80 		    "scheduler: committing msg:%08" PRIx32, msgid);
81 		n = backend->commit(msgid);
82 		stat_decrement("scheduler.envelope.incoming", n);
83 		stat_increment("scheduler.envelope", n);
84 		scheduler_reset_events();
85 		return;
86 
87 	case IMSG_QUEUE_DISCOVER_EVPID:
88 		m_msg(&m, imsg);
89 		m_get_envelope(&m, &evp);
90 		m_end(&m);
91 		r = backend->query(evp.id);
92 		if (r) {
93 			log_debug("debug: scheduler: evp:%016" PRIx64
94 			    " already scheduled", evp.id);
95 			return;
96 		}
97 		log_trace(TRACE_SCHEDULER,
98 		    "scheduler: discovering evp:%016" PRIx64, evp.id);
99 		scheduler_info(&si, &evp);
100 		stat_increment("scheduler.envelope.incoming", 1);
101 		backend->insert(&si);
102 		return;
103 
104 	case IMSG_QUEUE_DISCOVER_MSGID:
105 		m_msg(&m, imsg);
106 		m_get_msgid(&m, &msgid);
107 		m_end(&m);
108 		r = backend->query(msgid);
109 		if (r) {
110 			log_debug("debug: scheduler: msgid:%08" PRIx32
111 			    " already scheduled", msgid);
112 			return;
113 		}
114 		log_trace(TRACE_SCHEDULER,
115 		    "scheduler: committing msg:%08" PRIx32, msgid);
116 		n = backend->commit(msgid);
117 		stat_decrement("scheduler.envelope.incoming", n);
118 		stat_increment("scheduler.envelope", n);
119 		scheduler_reset_events();
120 		return;
121 
122 	case IMSG_QUEUE_MESSAGE_ROLLBACK:
123 		m_msg(&m, imsg);
124 		m_get_msgid(&m, &msgid);
125 		m_end(&m);
126 		log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
127 		    msgid);
128 		n = backend->rollback(msgid);
129 		stat_decrement("scheduler.envelope.incoming", n);
130 		scheduler_reset_events();
131 		return;
132 
133 	case IMSG_QUEUE_ENVELOPE_REMOVE:
134 		m_msg(&m, imsg);
135 		m_get_evpid(&m, &evpid);
136 		m_get_u32(&m, &inflight);
137 		m_end(&m);
138 		log_trace(TRACE_SCHEDULER,
139 		    "scheduler: queue requested removal of evp:%016" PRIx64,
140 		    evpid);
141 		stat_decrement("scheduler.envelope", 1);
142 		if (!inflight)
143 			backend->remove(evpid);
144 		else {
145 			backend->delete(evpid);
146 			ninflight -= 1;
147 			stat_decrement("scheduler.envelope.inflight", 1);
148 		}
149 
150 		scheduler_reset_events();
151 		return;
152 
153 	case IMSG_QUEUE_ENVELOPE_ACK:
154 		m_msg(&m, imsg);
155 		m_get_evpid(&m, &evpid);
156 		m_end(&m);
157 		log_trace(TRACE_SCHEDULER,
158 		    "scheduler: queue ack removal of evp:%016" PRIx64,
159 		    evpid);
160 		ninflight -= 1;
161 		stat_decrement("scheduler.envelope.inflight", 1);
162 		scheduler_reset_events();
163 		return;
164 
165 	case IMSG_QUEUE_DELIVERY_OK:
166 		m_msg(&m, imsg);
167 		m_get_evpid(&m, &evpid);
168 		m_end(&m);
169 		log_trace(TRACE_SCHEDULER,
170 		    "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid);
171 		backend->delete(evpid);
172 		ninflight -= 1;
173 		stat_increment("scheduler.delivery.ok", 1);
174 		stat_decrement("scheduler.envelope.inflight", 1);
175 		stat_decrement("scheduler.envelope", 1);
176 		scheduler_reset_events();
177 		return;
178 
179 	case IMSG_QUEUE_DELIVERY_TEMPFAIL:
180 		m_msg(&m, imsg);
181 		m_get_envelope(&m, &evp);
182 		m_end(&m);
183 		log_trace(TRACE_SCHEDULER,
184 		    "scheduler: updating evp:%016" PRIx64, evp.id);
185 		scheduler_info(&si, &evp);
186 		backend->update(&si);
187 		ninflight -= 1;
188 		stat_increment("scheduler.delivery.tempfail", 1);
189 		stat_decrement("scheduler.envelope.inflight", 1);
190 
191 		for (i = 0; i < MAX_BOUNCE_WARN; i++) {
192 			if (env->sc_bounce_warn[i] == 0)
193 				break;
194 			timestamp = si.creation + env->sc_bounce_warn[i];
195 			if (si.nexttry >= timestamp &&
196 			    si.lastbounce < timestamp) {
197 	    			req.evpid = evp.id;
198 				req.timestamp = timestamp;
199 				req.bounce.type = B_DELAYED;
200 				req.bounce.delay = env->sc_bounce_warn[i];
201 				req.bounce.ttl = si.ttl;
202 				m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1,
203 				    &req, sizeof req);
204 				break;
205 			}
206 		}
207 		scheduler_reset_events();
208 		return;
209 
210 	case IMSG_QUEUE_DELIVERY_PERMFAIL:
211 		m_msg(&m, imsg);
212 		m_get_evpid(&m, &evpid);
213 		m_end(&m);
214 		log_trace(TRACE_SCHEDULER,
215 		    "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid);
216 		backend->delete(evpid);
217 		ninflight -= 1;
218 		stat_increment("scheduler.delivery.permfail", 1);
219 		stat_decrement("scheduler.envelope.inflight", 1);
220 		stat_decrement("scheduler.envelope", 1);
221 		scheduler_reset_events();
222 		return;
223 
224 	case IMSG_QUEUE_DELIVERY_LOOP:
225 		m_msg(&m, imsg);
226 		m_get_evpid(&m, &evpid);
227 		m_end(&m);
228 		log_trace(TRACE_SCHEDULER,
229 		    "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid);
230 		backend->delete(evpid);
231 		ninflight -= 1;
232 		stat_increment("scheduler.delivery.loop", 1);
233 		stat_decrement("scheduler.envelope.inflight", 1);
234 		stat_decrement("scheduler.envelope", 1);
235 		scheduler_reset_events();
236 		return;
237 
238 	case IMSG_QUEUE_HOLDQ_HOLD:
239 		m_msg(&m, imsg);
240 		m_get_evpid(&m, &evpid);
241 		m_get_id(&m, &holdq);
242 		m_end(&m);
243 		log_trace(TRACE_SCHEDULER,
244 		    "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64,
245 		    evpid, holdq);
246 		backend->hold(evpid, holdq);
247 		ninflight -= 1;
248 		stat_decrement("scheduler.envelope.inflight", 1);
249 		scheduler_reset_events();
250 		return;
251 
252 	case IMSG_QUEUE_HOLDQ_RELEASE:
253 		m_msg(&m, imsg);
254 		m_get_int(&m, &type);
255 		m_get_id(&m, &holdq);
256 		m_get_int(&m, &r);
257 		m_end(&m);
258 		log_trace(TRACE_SCHEDULER,
259 		    "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")",
260 		    r, type, holdq);
261 		backend->release(type, holdq, r);
262 		scheduler_reset_events();
263 		return;
264 
265 	case IMSG_CTL_PAUSE_MDA:
266 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
267 		env->sc_flags |= SMTPD_MDA_PAUSED;
268 		return;
269 
270 	case IMSG_CTL_RESUME_MDA:
271 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
272 		env->sc_flags &= ~SMTPD_MDA_PAUSED;
273 		scheduler_reset_events();
274 		return;
275 
276 	case IMSG_CTL_PAUSE_MTA:
277 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
278 		env->sc_flags |= SMTPD_MTA_PAUSED;
279 		return;
280 
281 	case IMSG_CTL_RESUME_MTA:
282 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
283 		env->sc_flags &= ~SMTPD_MTA_PAUSED;
284 		scheduler_reset_events();
285 		return;
286 
287 	case IMSG_CTL_VERBOSE:
288 		m_msg(&m, imsg);
289 		m_get_int(&m, &v);
290 		m_end(&m);
291 		log_setverbose(v);
292 		return;
293 
294 	case IMSG_CTL_PROFILE:
295 		m_msg(&m, imsg);
296 		m_get_int(&m, &v);
297 		m_end(&m);
298 		profiling = v;
299 		return;
300 
301 	case IMSG_CTL_LIST_MESSAGES:
302 		msgid = *(uint32_t *)(imsg->data);
303 		n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size);
304 		m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1,
305 		    msgids, n * sizeof (*msgids));
306 		return;
307 
308 	case IMSG_CTL_LIST_ENVELOPES:
309 		id = *(uint64_t *)(imsg->data);
310 		n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size);
311 		for (i = 0; i < n; i++) {
312 			m_create(p_queue, IMSG_CTL_LIST_ENVELOPES,
313 			    imsg->hdr.peerid, 0, -1);
314 			m_add_evpid(p_queue, state[i].evpid);
315 			m_add_int(p_queue, state[i].flags);
316 			m_add_time(p_queue, state[i].time);
317 			m_close(p_queue);
318 		}
319 		m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES,
320 		    imsg->hdr.peerid, 0, -1, NULL, 0);
321 		return;
322 
323 	case IMSG_CTL_SCHEDULE:
324 		id = *(uint64_t *)(imsg->data);
325 		if (id <= 0xffffffffL)
326 			log_debug("debug: scheduler: "
327 			    "scheduling msg:%08" PRIx64, id);
328 		else
329 			log_debug("debug: scheduler: "
330 			    "scheduling evp:%016" PRIx64, id);
331 		r = backend->schedule(id);
332 		scheduler_reset_events();
333 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
334 		    0, -1, NULL, 0);
335 		return;
336 
337 	case IMSG_QUEUE_ENVELOPE_SCHEDULE:
338 		id = *(uint64_t *)(imsg->data);
339 		backend->schedule(id);
340 		scheduler_reset_events();
341 		return;
342 
343 	case IMSG_CTL_REMOVE:
344 		id = *(uint64_t *)(imsg->data);
345 		if (id <= 0xffffffffL)
346 			log_debug("debug: scheduler: "
347 			    "removing msg:%08" PRIx64, id);
348 		else
349 			log_debug("debug: scheduler: "
350 			    "removing evp:%016" PRIx64, id);
351 		r = backend->remove(id);
352 		scheduler_reset_events();
353 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
354 		    0, -1, NULL, 0);
355 		return;
356 
357 	case IMSG_CTL_PAUSE_EVP:
358 		id = *(uint64_t *)(imsg->data);
359 		if (id <= 0xffffffffL)
360 			log_debug("debug: scheduler: "
361 			    "suspending msg:%08" PRIx64, id);
362 		else
363 			log_debug("debug: scheduler: "
364 			    "suspending evp:%016" PRIx64, id);
365 		r = backend->suspend(id);
366 		scheduler_reset_events();
367 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
368 		    0, -1, NULL, 0);
369 		return;
370 
371 	case IMSG_CTL_RESUME_EVP:
372 		id = *(uint64_t *)(imsg->data);
373 		if (id <= 0xffffffffL)
374 			log_debug("debug: scheduler: "
375 			    "resuming msg:%08" PRIx64, id);
376 		else
377 			log_debug("debug: scheduler: "
378 			    "resuming evp:%016" PRIx64, id);
379 		r = backend->resume(id);
380 		scheduler_reset_events();
381 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
382 		    0, -1, NULL, 0);
383 		return;
384 	}
385 
386 	fatalx("scheduler_imsg: unexpected %s imsg",
387 	    imsg_to_str(imsg->hdr.type));
388 }
389 
390 static void
scheduler_shutdown(void)391 scheduler_shutdown(void)
392 {
393 	log_debug("debug: scheduler agent exiting");
394 	_exit(0);
395 }
396 
397 static void
scheduler_reset_events(void)398 scheduler_reset_events(void)
399 {
400 	struct timeval	 tv;
401 
402 	evtimer_del(&ev);
403 	tv.tv_sec = 0;
404 	tv.tv_usec = 0;
405 	evtimer_add(&ev, &tv);
406 }
407 
408 int
scheduler(void)409 scheduler(void)
410 {
411 	struct passwd	*pw;
412 
413 	backend = scheduler_backend_lookup(backend_scheduler);
414 	if (backend == NULL)
415 		fatalx("cannot find scheduler backend \"%s\"",
416 		    backend_scheduler);
417 
418 	purge_config(PURGE_EVERYTHING & ~PURGE_DISPATCHERS);
419 
420 	if ((pw = getpwnam(SMTPD_USER)) == NULL)
421 		fatalx("unknown user " SMTPD_USER);
422 
423 	config_process(PROC_SCHEDULER);
424 
425 	backend->init(backend_scheduler);
426 
427 	if (chroot(PATH_CHROOT) == -1)
428 		fatal("scheduler: chroot");
429 	if (chdir("/") == -1)
430 		fatal("scheduler: chdir(\"/\")");
431 
432 	if (setgroups(1, &pw->pw_gid) ||
433 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
434 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
435 		fatal("scheduler: cannot drop privileges");
436 
437 	evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids);
438 	types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types);
439 	msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids);
440 	state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state);
441 
442 	imsg_callback = scheduler_imsg;
443 	event_init();
444 
445 	signal(SIGINT, SIG_IGN);
446 	signal(SIGTERM, SIG_IGN);
447 	signal(SIGPIPE, SIG_IGN);
448 	signal(SIGHUP, SIG_IGN);
449 
450 	config_peer(PROC_CONTROL);
451 	config_peer(PROC_QUEUE);
452 
453 	evtimer_set(&ev, scheduler_timeout, NULL);
454 	scheduler_reset_events();
455 
456 	if (pledge("stdio", NULL) == -1)
457 		fatal("pledge");
458 
459 	event_dispatch();
460 	fatalx("exited event loop");
461 
462 	return (0);
463 }
464 
465 static void
scheduler_timeout(int fd,short event,void * p)466 scheduler_timeout(int fd, short event, void *p)
467 {
468 	struct timeval		tv;
469 	size_t			i;
470 	size_t			d_inflight;
471 	size_t			d_envelope;
472 	size_t			d_removed;
473 	size_t			d_expired;
474 	size_t			d_updated;
475 	size_t			count;
476 	int			mask, r, delay;
477 
478 	tv.tv_sec = 0;
479 	tv.tv_usec = 0;
480 
481 	mask = SCHED_UPDATE;
482 
483 	if (ninflight <  env->sc_scheduler_max_inflight) {
484 		mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE;
485 		if (!(env->sc_flags & SMTPD_MDA_PAUSED))
486 			mask |= SCHED_MDA;
487 		if (!(env->sc_flags & SMTPD_MTA_PAUSED))
488 			mask |= SCHED_MTA;
489 	}
490 
491 	count = env->sc_scheduler_max_schedule;
492 
493 	log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count);
494 
495 	r = backend->batch(mask, &delay, &count, evpids, types);
496 
497 	log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count);
498 
499 	if (r < 0)
500 		fatalx("scheduler: error in batch handler");
501 
502 	if (r == 0) {
503 
504 		if (delay < -1)
505 			fatalx("scheduler: invalid delay %d", delay);
506 
507 		if (delay == -1) {
508 			log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
509 			return;
510 		}
511 
512 		tv.tv_sec = delay;
513 		tv.tv_usec = 0;
514 		log_trace(TRACE_SCHEDULER,
515 		    "scheduler: waiting for %s", duration_to_text(tv.tv_sec));
516 		evtimer_add(&ev, &tv);
517 		return;
518 	}
519 
520 	d_inflight = 0;
521 	d_envelope = 0;
522 	d_removed = 0;
523 	d_expired = 0;
524 	d_updated = 0;
525 
526 	for (i = 0; i < count; i++) {
527 		switch(types[i]) {
528 		case SCHED_REMOVE:
529 			log_debug("debug: scheduler: evp:%016" PRIx64
530 			    " removed", evpids[i]);
531 			m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1);
532 			m_add_evpid(p_queue, evpids[i]);
533 			m_close(p_queue);
534 			d_envelope += 1;
535 			d_removed += 1;
536 			d_inflight += 1;
537 			break;
538 
539 		case SCHED_EXPIRE:
540 			log_debug("debug: scheduler: evp:%016" PRIx64
541 			    " expired", evpids[i]);
542 			m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1);
543 			m_add_evpid(p_queue, evpids[i]);
544 			m_close(p_queue);
545 			d_envelope += 1;
546 			d_expired += 1;
547 			d_inflight += 1;
548 			break;
549 
550 		case SCHED_UPDATE:
551 			log_debug("debug: scheduler: evp:%016" PRIx64
552 			    " scheduled (update)", evpids[i]);
553 			d_updated += 1;
554 			break;
555 
556 		case SCHED_BOUNCE:
557 			log_debug("debug: scheduler: evp:%016" PRIx64
558 			    " scheduled (bounce)", evpids[i]);
559 			m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1);
560 			m_add_evpid(p_queue, evpids[i]);
561 			m_close(p_queue);
562 			d_inflight += 1;
563 			break;
564 
565 		case SCHED_MDA:
566 			log_debug("debug: scheduler: evp:%016" PRIx64
567 			    " scheduled (mda)", evpids[i]);
568 			m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1);
569 			m_add_evpid(p_queue, evpids[i]);
570 			m_close(p_queue);
571 			d_inflight += 1;
572 			break;
573 
574 		case SCHED_MTA:
575 			log_debug("debug: scheduler: evp:%016" PRIx64
576 			    " scheduled (mta)", evpids[i]);
577 			m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1);
578 			m_add_evpid(p_queue, evpids[i]);
579 			m_close(p_queue);
580 			d_inflight += 1;
581 			break;
582 		}
583 	}
584 
585 	stat_decrement("scheduler.envelope", d_envelope);
586 	stat_increment("scheduler.envelope.inflight", d_inflight);
587 	stat_increment("scheduler.envelope.expired", d_expired);
588 	stat_increment("scheduler.envelope.removed", d_removed);
589 	stat_increment("scheduler.envelope.updated", d_updated);
590 
591 	ninflight += d_inflight;
592 
593 	tv.tv_sec = 0;
594 	tv.tv_usec = 0;
595 	evtimer_add(&ev, &tv);
596 }
597