xref: /openbsd/usr.sbin/smtpd/scheduler.c (revision 9b7c3dbb)
1 /*	$OpenBSD: scheduler.c,v 1.53 2016/09/01 10:54:25 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
7  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
8  *
9  * Permission to use, copy, modify, and distribute this software for any
10  * purpose with or without fee is hereby granted, provided that the above
11  * copyright notice and this permission notice appear in all copies.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20  */
21 
22 #include <sys/types.h>
23 #include <sys/queue.h>
24 #include <sys/tree.h>
25 #include <sys/socket.h>
26 #include <sys/stat.h>
27 
28 #include <ctype.h>
29 #include <dirent.h>
30 #include <err.h>
31 #include <errno.h>
32 #include <event.h>
33 #include <imsg.h>
34 #include <inttypes.h>
35 #include <libgen.h>
36 #include <pwd.h>
37 #include <signal.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <time.h>
42 #include <unistd.h>
43 #include <limits.h>
44 
45 #include "smtpd.h"
46 #include "log.h"
47 
48 static void scheduler_imsg(struct mproc *, struct imsg *);
49 static void scheduler_shutdown(void);
50 static void scheduler_sig_handler(int, short, void *);
51 static void scheduler_reset_events(void);
52 static void scheduler_timeout(int, short, void *);
53 
54 static struct scheduler_backend *backend = NULL;
55 static struct event		 ev;
56 static size_t			 ninflight = 0;
57 static int			*types;
58 static uint64_t			*evpids;
59 static uint32_t			*msgids;
60 static struct evpstate		*state;
61 
62 extern const char *backend_scheduler;
63 
64 void
65 scheduler_imsg(struct mproc *p, struct imsg *imsg)
66 {
67 	struct bounce_req_msg	 req;
68 	struct envelope		 evp;
69 	struct scheduler_info	 si;
70 	struct msg		 m;
71 	uint64_t		 evpid, id, holdq;
72 	uint32_t		 msgid;
73 	uint32_t       		 inflight;
74 	size_t			 n, i;
75 	time_t			 timestamp;
76 	int			 v, r, type;
77 
78 	switch (imsg->hdr.type) {
79 
80 	case IMSG_QUEUE_ENVELOPE_SUBMIT:
81 		m_msg(&m, imsg);
82 		m_get_envelope(&m, &evp);
83 		m_end(&m);
84 		log_trace(TRACE_SCHEDULER,
85 		    "scheduler: inserting evp:%016" PRIx64, evp.id);
86 		scheduler_info(&si, &evp);
87 		stat_increment("scheduler.envelope.incoming", 1);
88 		backend->insert(&si);
89 		return;
90 
91 	case IMSG_QUEUE_MESSAGE_COMMIT:
92 		m_msg(&m, imsg);
93 		m_get_msgid(&m, &msgid);
94 		m_end(&m);
95 		log_trace(TRACE_SCHEDULER,
96 		    "scheduler: committing msg:%08" PRIx32, msgid);
97 		n = backend->commit(msgid);
98 		stat_decrement("scheduler.envelope.incoming", n);
99 		stat_increment("scheduler.envelope", n);
100 		scheduler_reset_events();
101 		return;
102 
103 	case IMSG_QUEUE_DISCOVER_EVPID:
104 		m_msg(&m, imsg);
105 		m_get_envelope(&m, &evp);
106 		m_end(&m);
107 		r = backend->query(evp.id);
108 		if (r) {
109 			log_debug("debug: scheduler: evp:%016" PRIx64
110 			    " already scheduled", evp.id);
111 			return;
112 		}
113 		log_trace(TRACE_SCHEDULER,
114 		    "scheduler: discovering evp:%016" PRIx64, evp.id);
115 		scheduler_info(&si, &evp);
116 		stat_increment("scheduler.envelope.incoming", 1);
117 		backend->insert(&si);
118 		return;
119 
120 	case IMSG_QUEUE_DISCOVER_MSGID:
121 		m_msg(&m, imsg);
122 		m_get_msgid(&m, &msgid);
123 		m_end(&m);
124 		r = backend->query(msgid);
125 		if (r) {
126 			log_debug("debug: scheduler: msgid:%08" PRIx32
127 			    " already scheduled", msgid);
128 			return;
129 		}
130 		log_trace(TRACE_SCHEDULER,
131 		    "scheduler: committing msg:%08" PRIx32, msgid);
132 		n = backend->commit(msgid);
133 		stat_decrement("scheduler.envelope.incoming", n);
134 		stat_increment("scheduler.envelope", n);
135 		scheduler_reset_events();
136 		return;
137 
138 	case IMSG_QUEUE_MESSAGE_ROLLBACK:
139 		m_msg(&m, imsg);
140 		m_get_msgid(&m, &msgid);
141 		m_end(&m);
142 		log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
143 		    msgid);
144 		n = backend->rollback(msgid);
145 		stat_decrement("scheduler.envelope.incoming", n);
146 		scheduler_reset_events();
147 		return;
148 
149 	case IMSG_QUEUE_ENVELOPE_REMOVE:
150 		m_msg(&m, imsg);
151 		m_get_evpid(&m, &evpid);
152 		m_get_u32(&m, &inflight);
153 		m_end(&m);
154 		log_trace(TRACE_SCHEDULER,
155 		    "scheduler: queue requested removal of evp:%016" PRIx64,
156 		    evpid);
157 		stat_decrement("scheduler.envelope", 1);
158 		if (!inflight)
159 			backend->remove(evpid);
160 		else {
161 			backend->delete(evpid);
162 			ninflight -= 1;
163 			stat_decrement("scheduler.envelope.inflight", 1);
164 		}
165 
166 		scheduler_reset_events();
167 		return;
168 
169 	case IMSG_QUEUE_ENVELOPE_ACK:
170 		m_msg(&m, imsg);
171 		m_get_evpid(&m, &evpid);
172 		m_end(&m);
173 		log_trace(TRACE_SCHEDULER,
174 		    "scheduler: queue ack removal of evp:%016" PRIx64,
175 		    evpid);
176 		ninflight -= 1;
177 		stat_decrement("scheduler.envelope.inflight", 1);
178 		scheduler_reset_events();
179 		return;
180 
181 	case IMSG_QUEUE_DELIVERY_OK:
182 		m_msg(&m, imsg);
183 		m_get_evpid(&m, &evpid);
184 		m_end(&m);
185 		log_trace(TRACE_SCHEDULER,
186 		    "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid);
187 		backend->delete(evpid);
188 		ninflight -= 1;
189 		stat_increment("scheduler.delivery.ok", 1);
190 		stat_decrement("scheduler.envelope.inflight", 1);
191 		stat_decrement("scheduler.envelope", 1);
192 		scheduler_reset_events();
193 		return;
194 
195 	case IMSG_QUEUE_DELIVERY_TEMPFAIL:
196 		m_msg(&m, imsg);
197 		m_get_envelope(&m, &evp);
198 		m_end(&m);
199 		log_trace(TRACE_SCHEDULER,
200 		    "scheduler: updating evp:%016" PRIx64, evp.id);
201 		scheduler_info(&si, &evp);
202 		backend->update(&si);
203 		ninflight -= 1;
204 		stat_increment("scheduler.delivery.tempfail", 1);
205 		stat_decrement("scheduler.envelope.inflight", 1);
206 
207 		for (i = 0; i < MAX_BOUNCE_WARN; i++) {
208 			if (env->sc_bounce_warn[i] == 0)
209 				break;
210 			timestamp = si.creation + env->sc_bounce_warn[i];
211 			if (si.nexttry >= timestamp &&
212 			    si.lastbounce < timestamp) {
213 	    			req.evpid = evp.id;
214 				req.timestamp = timestamp;
215 				req.bounce.type = B_WARNING;
216 				req.bounce.delay = env->sc_bounce_warn[i];
217 				req.bounce.expire = si.expire;
218 				m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1,
219 				    &req, sizeof req);
220 				break;
221 			}
222 		}
223 		scheduler_reset_events();
224 		return;
225 
226 	case IMSG_QUEUE_DELIVERY_PERMFAIL:
227 		m_msg(&m, imsg);
228 		m_get_evpid(&m, &evpid);
229 		m_end(&m);
230 		log_trace(TRACE_SCHEDULER,
231 		    "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid);
232 		backend->delete(evpid);
233 		ninflight -= 1;
234 		stat_increment("scheduler.delivery.permfail", 1);
235 		stat_decrement("scheduler.envelope.inflight", 1);
236 		stat_decrement("scheduler.envelope", 1);
237 		scheduler_reset_events();
238 		return;
239 
240 	case IMSG_QUEUE_DELIVERY_LOOP:
241 		m_msg(&m, imsg);
242 		m_get_evpid(&m, &evpid);
243 		m_end(&m);
244 		log_trace(TRACE_SCHEDULER,
245 		    "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid);
246 		backend->delete(evpid);
247 		ninflight -= 1;
248 		stat_increment("scheduler.delivery.loop", 1);
249 		stat_decrement("scheduler.envelope.inflight", 1);
250 		stat_decrement("scheduler.envelope", 1);
251 		scheduler_reset_events();
252 		return;
253 
254 	case IMSG_QUEUE_HOLDQ_HOLD:
255 		m_msg(&m, imsg);
256 		m_get_evpid(&m, &evpid);
257 		m_get_id(&m, &holdq);
258 		m_end(&m);
259 		log_trace(TRACE_SCHEDULER,
260 		    "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64,
261 		    evpid, holdq);
262 		backend->hold(evpid, holdq);
263 		ninflight -= 1;
264 		stat_decrement("scheduler.envelope.inflight", 1);
265 		scheduler_reset_events();
266 		return;
267 
268 	case IMSG_QUEUE_HOLDQ_RELEASE:
269 		m_msg(&m, imsg);
270 		m_get_int(&m, &type);
271 		m_get_id(&m, &holdq);
272 		m_get_int(&m, &r);
273 		m_end(&m);
274 		log_trace(TRACE_SCHEDULER,
275 		    "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")",
276 		    r, type, holdq);
277 		backend->release(type, holdq, r);
278 		scheduler_reset_events();
279 		return;
280 
281 	case IMSG_CTL_PAUSE_MDA:
282 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
283 		env->sc_flags |= SMTPD_MDA_PAUSED;
284 		return;
285 
286 	case IMSG_CTL_RESUME_MDA:
287 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
288 		env->sc_flags &= ~SMTPD_MDA_PAUSED;
289 		scheduler_reset_events();
290 		return;
291 
292 	case IMSG_CTL_PAUSE_MTA:
293 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
294 		env->sc_flags |= SMTPD_MTA_PAUSED;
295 		return;
296 
297 	case IMSG_CTL_RESUME_MTA:
298 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
299 		env->sc_flags &= ~SMTPD_MTA_PAUSED;
300 		scheduler_reset_events();
301 		return;
302 
303 	case IMSG_CTL_VERBOSE:
304 		m_msg(&m, imsg);
305 		m_get_int(&m, &v);
306 		m_end(&m);
307 		log_verbose(v);
308 		return;
309 
310 	case IMSG_CTL_PROFILE:
311 		m_msg(&m, imsg);
312 		m_get_int(&m, &v);
313 		m_end(&m);
314 		profiling = v;
315 		return;
316 
317 	case IMSG_CTL_LIST_MESSAGES:
318 		msgid = *(uint32_t *)(imsg->data);
319 		n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size);
320 		m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1,
321 		    msgids, n * sizeof (*msgids));
322 		return;
323 
324 	case IMSG_CTL_LIST_ENVELOPES:
325 		id = *(uint64_t *)(imsg->data);
326 		n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size);
327 		for (i = 0; i < n; i++) {
328 			m_create(p_queue, IMSG_CTL_LIST_ENVELOPES,
329 			    imsg->hdr.peerid, 0, -1);
330 			m_add_evpid(p_queue, state[i].evpid);
331 			m_add_int(p_queue, state[i].flags);
332 			m_add_time(p_queue, state[i].time);
333 			m_close(p_queue);
334 		}
335 		m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES,
336 		    imsg->hdr.peerid, 0, -1, NULL, 0);
337 		return;
338 
339 	case IMSG_CTL_SCHEDULE:
340 		id = *(uint64_t *)(imsg->data);
341 		if (id <= 0xffffffffL)
342 			log_debug("debug: scheduler: "
343 			    "scheduling msg:%08" PRIx64, id);
344 		else
345 			log_debug("debug: scheduler: "
346 			    "scheduling evp:%016" PRIx64, id);
347 		r = backend->schedule(id);
348 		scheduler_reset_events();
349 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
350 		    0, -1, NULL, 0);
351 		return;
352 
353 	case IMSG_QUEUE_ENVELOPE_SCHEDULE:
354 		id = *(uint64_t *)(imsg->data);
355 		backend->schedule(id);
356 		scheduler_reset_events();
357 		return;
358 
359 	case IMSG_CTL_REMOVE:
360 		id = *(uint64_t *)(imsg->data);
361 		if (id <= 0xffffffffL)
362 			log_debug("debug: scheduler: "
363 			    "removing msg:%08" PRIx64, id);
364 		else
365 			log_debug("debug: scheduler: "
366 			    "removing evp:%016" PRIx64, id);
367 		r = backend->remove(id);
368 		scheduler_reset_events();
369 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
370 		    0, -1, NULL, 0);
371 		return;
372 
373 	case IMSG_CTL_PAUSE_EVP:
374 		id = *(uint64_t *)(imsg->data);
375 		if (id <= 0xffffffffL)
376 			log_debug("debug: scheduler: "
377 			    "suspending msg:%08" PRIx64, id);
378 		else
379 			log_debug("debug: scheduler: "
380 			    "suspending evp:%016" PRIx64, id);
381 		r = backend->suspend(id);
382 		scheduler_reset_events();
383 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
384 		    0, -1, NULL, 0);
385 		return;
386 
387 	case IMSG_CTL_RESUME_EVP:
388 		id = *(uint64_t *)(imsg->data);
389 		if (id <= 0xffffffffL)
390 			log_debug("debug: scheduler: "
391 			    "resuming msg:%08" PRIx64, id);
392 		else
393 			log_debug("debug: scheduler: "
394 			    "resuming evp:%016" PRIx64, id);
395 		r = backend->resume(id);
396 		scheduler_reset_events();
397 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
398 		    0, -1, NULL, 0);
399 		return;
400 	}
401 
402 	errx(1, "scheduler_imsg: unexpected %s imsg",
403 	    imsg_to_str(imsg->hdr.type));
404 }
405 
406 static void
407 scheduler_sig_handler(int sig, short event, void *p)
408 {
409 	switch (sig) {
410 	case SIGINT:
411 	case SIGTERM:
412 		scheduler_shutdown();
413 		break;
414 	default:
415 		fatalx("scheduler_sig_handler: unexpected signal");
416 	}
417 }
418 
419 static void
420 scheduler_shutdown(void)
421 {
422 	log_info("info: scheduler handler exiting");
423 	_exit(0);
424 }
425 
426 static void
427 scheduler_reset_events(void)
428 {
429 	struct timeval	 tv;
430 
431 	evtimer_del(&ev);
432 	tv.tv_sec = 0;
433 	tv.tv_usec = 0;
434 	evtimer_add(&ev, &tv);
435 }
436 
437 int
438 scheduler(void)
439 {
440 	struct passwd	*pw;
441 	struct event	 ev_sigint;
442 	struct event	 ev_sigterm;
443 
444 	backend = scheduler_backend_lookup(backend_scheduler);
445 	if (backend == NULL)
446 		errx(1, "cannot find scheduler backend \"%s\"",
447 		    backend_scheduler);
448 
449 	purge_config(PURGE_EVERYTHING);
450 
451 	if ((pw = getpwnam(SMTPD_USER)) == NULL)
452 		fatalx("unknown user " SMTPD_USER);
453 
454 	config_process(PROC_SCHEDULER);
455 
456 	backend->init(backend_scheduler);
457 
458 	if (chroot(PATH_CHROOT) == -1)
459 		fatal("scheduler: chroot");
460 	if (chdir("/") == -1)
461 		fatal("scheduler: chdir(\"/\")");
462 
463 	if (setgroups(1, &pw->pw_gid) ||
464 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
465 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
466 		fatal("scheduler: cannot drop privileges");
467 
468 	evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids, "scheduler: init evpids");
469 	types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types, "scheduler: init types");
470 	msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids, "scheduler: list msg");
471 	state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state, "scheduler: list evp");
472 
473 	imsg_callback = scheduler_imsg;
474 	event_init();
475 
476 	signal_set(&ev_sigint, SIGINT, scheduler_sig_handler, NULL);
477 	signal_set(&ev_sigterm, SIGTERM, scheduler_sig_handler, NULL);
478 	signal_add(&ev_sigint, NULL);
479 	signal_add(&ev_sigterm, NULL);
480 	signal(SIGPIPE, SIG_IGN);
481 	signal(SIGHUP, SIG_IGN);
482 
483 	config_peer(PROC_CONTROL);
484 	config_peer(PROC_QUEUE);
485 
486 	evtimer_set(&ev, scheduler_timeout, NULL);
487 	scheduler_reset_events();
488 
489 	if (pledge("stdio", NULL) == -1)
490 		err(1, "pledge");
491 
492 	if (event_dispatch() < 0)
493 		fatal("event_dispatch");
494 	scheduler_shutdown();
495 
496 	return (0);
497 }
498 
499 static void
500 scheduler_timeout(int fd, short event, void *p)
501 {
502 	struct timeval		tv;
503 	size_t			i;
504 	size_t			d_inflight;
505 	size_t			d_envelope;
506 	size_t			d_removed;
507 	size_t			d_expired;
508 	size_t			d_updated;
509 	size_t			count;
510 	int			mask, r, delay;
511 
512 	tv.tv_sec = 0;
513 	tv.tv_usec = 0;
514 
515 	mask = SCHED_UPDATE;
516 
517 	if (ninflight <  env->sc_scheduler_max_inflight) {
518 		mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE;
519 		if (!(env->sc_flags & SMTPD_MDA_PAUSED))
520 			mask |= SCHED_MDA;
521 		if (!(env->sc_flags & SMTPD_MTA_PAUSED))
522 			mask |= SCHED_MTA;
523 	}
524 
525 	count = env->sc_scheduler_max_schedule;
526 
527 	log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count);
528 
529 	r = backend->batch(mask, &delay, &count, evpids, types);
530 
531 	log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count);
532 
533 	if (r < 0)
534 		fatalx("scheduler: error in batch handler");
535 
536 	if (r == 0) {
537 
538 		if (delay < -1)
539 			fatalx("scheduler: invalid delay %d", delay);
540 
541 		if (delay == -1) {
542 			log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
543 			return;
544 		}
545 
546 		tv.tv_sec = delay;
547 		tv.tv_usec = 0;
548 		log_trace(TRACE_SCHEDULER,
549 		    "scheduler: waiting for %s", duration_to_text(tv.tv_sec));
550 		evtimer_add(&ev, &tv);
551 		return;
552 	}
553 
554 	d_inflight = 0;
555 	d_envelope = 0;
556 	d_removed = 0;
557 	d_expired = 0;
558 	d_updated = 0;
559 
560 	for (i = 0; i < count; i++) {
561 		switch(types[i]) {
562 		case SCHED_REMOVE:
563 			log_debug("debug: scheduler: evp:%016" PRIx64
564 			    " removed", evpids[i]);
565 			m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1);
566 			m_add_evpid(p_queue, evpids[i]);
567 			m_close(p_queue);
568 			d_envelope += 1;
569 			d_removed += 1;
570 			d_inflight += 1;
571 			break;
572 
573 		case SCHED_EXPIRE:
574 			log_debug("debug: scheduler: evp:%016" PRIx64
575 			    " expired", evpids[i]);
576 			m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1);
577 			m_add_evpid(p_queue, evpids[i]);
578 			m_close(p_queue);
579 			d_envelope += 1;
580 			d_expired += 1;
581 			d_inflight += 1;
582 			break;
583 
584 		case SCHED_UPDATE:
585 			log_debug("debug: scheduler: evp:%016" PRIx64
586 			    " scheduled (update)", evpids[i]);
587 			d_updated += 1;
588 			break;
589 
590 		case SCHED_BOUNCE:
591 			log_debug("debug: scheduler: evp:%016" PRIx64
592 			    " scheduled (bounce)", evpids[i]);
593 			m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1);
594 			m_add_evpid(p_queue, evpids[i]);
595 			m_close(p_queue);
596 			d_inflight += 1;
597 			break;
598 
599 		case SCHED_MDA:
600 			log_debug("debug: scheduler: evp:%016" PRIx64
601 			    " scheduled (mda)", evpids[i]);
602 			m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1);
603 			m_add_evpid(p_queue, evpids[i]);
604 			m_close(p_queue);
605 			d_inflight += 1;
606 			break;
607 
608 		case SCHED_MTA:
609 			log_debug("debug: scheduler: evp:%016" PRIx64
610 			    " scheduled (mta)", evpids[i]);
611 			m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1);
612 			m_add_evpid(p_queue, evpids[i]);
613 			m_close(p_queue);
614 			d_inflight += 1;
615 			break;
616 		}
617 	}
618 
619 	stat_decrement("scheduler.envelope", d_envelope);
620 	stat_increment("scheduler.envelope.inflight", d_inflight);
621 	stat_increment("scheduler.envelope.expired", d_expired);
622 	stat_increment("scheduler.envelope.removed", d_removed);
623 	stat_increment("scheduler.envelope.updated", d_updated);
624 
625 	ninflight += d_inflight;
626 
627 	tv.tv_sec = 0;
628 	tv.tv_usec = 0;
629 	evtimer_add(&ev, &tv);
630 }
631