xref: /openbsd/usr.sbin/smtpd/scheduler.c (revision 4cfece93)
1 /*	$OpenBSD: scheduler.c,v 1.60 2018/12/30 23:09:58 guenther Exp $	*/
2 
3 /*
4  * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6  * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
7  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
8  *
9  * Permission to use, copy, modify, and distribute this software for any
10  * purpose with or without fee is hereby granted, provided that the above
11  * copyright notice and this permission notice appear in all copies.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20  */
21 
22 #include <sys/types.h>
23 #include <sys/queue.h>
24 #include <sys/tree.h>
25 #include <sys/socket.h>
26 #include <sys/stat.h>
27 
28 #include <ctype.h>
29 #include <dirent.h>
30 #include <err.h>
31 #include <errno.h>
32 #include <event.h>
33 #include <imsg.h>
34 #include <inttypes.h>
35 #include <pwd.h>
36 #include <signal.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <time.h>
41 #include <unistd.h>
42 #include <limits.h>
43 
44 #include "smtpd.h"
45 #include "log.h"
46 
47 static void scheduler_imsg(struct mproc *, struct imsg *);
48 static void scheduler_shutdown(void);
49 static void scheduler_reset_events(void);
50 static void scheduler_timeout(int, short, void *);
51 
52 static struct scheduler_backend *backend = NULL;
53 static struct event		 ev;
54 static size_t			 ninflight = 0;
55 static int			*types;
56 static uint64_t			*evpids;
57 static uint32_t			*msgids;
58 static struct evpstate		*state;
59 
60 extern const char *backend_scheduler;
61 
62 void
63 scheduler_imsg(struct mproc *p, struct imsg *imsg)
64 {
65 	struct bounce_req_msg	 req;
66 	struct envelope		 evp;
67 	struct scheduler_info	 si;
68 	struct msg		 m;
69 	uint64_t		 evpid, id, holdq;
70 	uint32_t		 msgid;
71 	uint32_t       		 inflight;
72 	size_t			 n, i;
73 	time_t			 timestamp;
74 	int			 v, r, type;
75 
76 	if (imsg == NULL)
77 		scheduler_shutdown();
78 
79 	switch (imsg->hdr.type) {
80 
81 	case IMSG_QUEUE_ENVELOPE_SUBMIT:
82 		m_msg(&m, imsg);
83 		m_get_envelope(&m, &evp);
84 		m_end(&m);
85 		log_trace(TRACE_SCHEDULER,
86 		    "scheduler: inserting evp:%016" PRIx64, evp.id);
87 		scheduler_info(&si, &evp);
88 		stat_increment("scheduler.envelope.incoming", 1);
89 		backend->insert(&si);
90 		return;
91 
92 	case IMSG_QUEUE_MESSAGE_COMMIT:
93 		m_msg(&m, imsg);
94 		m_get_msgid(&m, &msgid);
95 		m_end(&m);
96 		log_trace(TRACE_SCHEDULER,
97 		    "scheduler: committing msg:%08" PRIx32, msgid);
98 		n = backend->commit(msgid);
99 		stat_decrement("scheduler.envelope.incoming", n);
100 		stat_increment("scheduler.envelope", n);
101 		scheduler_reset_events();
102 		return;
103 
104 	case IMSG_QUEUE_DISCOVER_EVPID:
105 		m_msg(&m, imsg);
106 		m_get_envelope(&m, &evp);
107 		m_end(&m);
108 		r = backend->query(evp.id);
109 		if (r) {
110 			log_debug("debug: scheduler: evp:%016" PRIx64
111 			    " already scheduled", evp.id);
112 			return;
113 		}
114 		log_trace(TRACE_SCHEDULER,
115 		    "scheduler: discovering evp:%016" PRIx64, evp.id);
116 		scheduler_info(&si, &evp);
117 		stat_increment("scheduler.envelope.incoming", 1);
118 		backend->insert(&si);
119 		return;
120 
121 	case IMSG_QUEUE_DISCOVER_MSGID:
122 		m_msg(&m, imsg);
123 		m_get_msgid(&m, &msgid);
124 		m_end(&m);
125 		r = backend->query(msgid);
126 		if (r) {
127 			log_debug("debug: scheduler: msgid:%08" PRIx32
128 			    " already scheduled", msgid);
129 			return;
130 		}
131 		log_trace(TRACE_SCHEDULER,
132 		    "scheduler: committing msg:%08" PRIx32, msgid);
133 		n = backend->commit(msgid);
134 		stat_decrement("scheduler.envelope.incoming", n);
135 		stat_increment("scheduler.envelope", n);
136 		scheduler_reset_events();
137 		return;
138 
139 	case IMSG_QUEUE_MESSAGE_ROLLBACK:
140 		m_msg(&m, imsg);
141 		m_get_msgid(&m, &msgid);
142 		m_end(&m);
143 		log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
144 		    msgid);
145 		n = backend->rollback(msgid);
146 		stat_decrement("scheduler.envelope.incoming", n);
147 		scheduler_reset_events();
148 		return;
149 
150 	case IMSG_QUEUE_ENVELOPE_REMOVE:
151 		m_msg(&m, imsg);
152 		m_get_evpid(&m, &evpid);
153 		m_get_u32(&m, &inflight);
154 		m_end(&m);
155 		log_trace(TRACE_SCHEDULER,
156 		    "scheduler: queue requested removal of evp:%016" PRIx64,
157 		    evpid);
158 		stat_decrement("scheduler.envelope", 1);
159 		if (!inflight)
160 			backend->remove(evpid);
161 		else {
162 			backend->delete(evpid);
163 			ninflight -= 1;
164 			stat_decrement("scheduler.envelope.inflight", 1);
165 		}
166 
167 		scheduler_reset_events();
168 		return;
169 
170 	case IMSG_QUEUE_ENVELOPE_ACK:
171 		m_msg(&m, imsg);
172 		m_get_evpid(&m, &evpid);
173 		m_end(&m);
174 		log_trace(TRACE_SCHEDULER,
175 		    "scheduler: queue ack removal of evp:%016" PRIx64,
176 		    evpid);
177 		ninflight -= 1;
178 		stat_decrement("scheduler.envelope.inflight", 1);
179 		scheduler_reset_events();
180 		return;
181 
182 	case IMSG_QUEUE_DELIVERY_OK:
183 		m_msg(&m, imsg);
184 		m_get_evpid(&m, &evpid);
185 		m_end(&m);
186 		log_trace(TRACE_SCHEDULER,
187 		    "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid);
188 		backend->delete(evpid);
189 		ninflight -= 1;
190 		stat_increment("scheduler.delivery.ok", 1);
191 		stat_decrement("scheduler.envelope.inflight", 1);
192 		stat_decrement("scheduler.envelope", 1);
193 		scheduler_reset_events();
194 		return;
195 
196 	case IMSG_QUEUE_DELIVERY_TEMPFAIL:
197 		m_msg(&m, imsg);
198 		m_get_envelope(&m, &evp);
199 		m_end(&m);
200 		log_trace(TRACE_SCHEDULER,
201 		    "scheduler: updating evp:%016" PRIx64, evp.id);
202 		scheduler_info(&si, &evp);
203 		backend->update(&si);
204 		ninflight -= 1;
205 		stat_increment("scheduler.delivery.tempfail", 1);
206 		stat_decrement("scheduler.envelope.inflight", 1);
207 
208 		for (i = 0; i < MAX_BOUNCE_WARN; i++) {
209 			if (env->sc_bounce_warn[i] == 0)
210 				break;
211 			timestamp = si.creation + env->sc_bounce_warn[i];
212 			if (si.nexttry >= timestamp &&
213 			    si.lastbounce < timestamp) {
214 	    			req.evpid = evp.id;
215 				req.timestamp = timestamp;
216 				req.bounce.type = B_DELAYED;
217 				req.bounce.delay = env->sc_bounce_warn[i];
218 				req.bounce.ttl = si.ttl;
219 				m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1,
220 				    &req, sizeof req);
221 				break;
222 			}
223 		}
224 		scheduler_reset_events();
225 		return;
226 
227 	case IMSG_QUEUE_DELIVERY_PERMFAIL:
228 		m_msg(&m, imsg);
229 		m_get_evpid(&m, &evpid);
230 		m_end(&m);
231 		log_trace(TRACE_SCHEDULER,
232 		    "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid);
233 		backend->delete(evpid);
234 		ninflight -= 1;
235 		stat_increment("scheduler.delivery.permfail", 1);
236 		stat_decrement("scheduler.envelope.inflight", 1);
237 		stat_decrement("scheduler.envelope", 1);
238 		scheduler_reset_events();
239 		return;
240 
241 	case IMSG_QUEUE_DELIVERY_LOOP:
242 		m_msg(&m, imsg);
243 		m_get_evpid(&m, &evpid);
244 		m_end(&m);
245 		log_trace(TRACE_SCHEDULER,
246 		    "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid);
247 		backend->delete(evpid);
248 		ninflight -= 1;
249 		stat_increment("scheduler.delivery.loop", 1);
250 		stat_decrement("scheduler.envelope.inflight", 1);
251 		stat_decrement("scheduler.envelope", 1);
252 		scheduler_reset_events();
253 		return;
254 
255 	case IMSG_QUEUE_HOLDQ_HOLD:
256 		m_msg(&m, imsg);
257 		m_get_evpid(&m, &evpid);
258 		m_get_id(&m, &holdq);
259 		m_end(&m);
260 		log_trace(TRACE_SCHEDULER,
261 		    "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64,
262 		    evpid, holdq);
263 		backend->hold(evpid, holdq);
264 		ninflight -= 1;
265 		stat_decrement("scheduler.envelope.inflight", 1);
266 		scheduler_reset_events();
267 		return;
268 
269 	case IMSG_QUEUE_HOLDQ_RELEASE:
270 		m_msg(&m, imsg);
271 		m_get_int(&m, &type);
272 		m_get_id(&m, &holdq);
273 		m_get_int(&m, &r);
274 		m_end(&m);
275 		log_trace(TRACE_SCHEDULER,
276 		    "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")",
277 		    r, type, holdq);
278 		backend->release(type, holdq, r);
279 		scheduler_reset_events();
280 		return;
281 
282 	case IMSG_CTL_PAUSE_MDA:
283 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
284 		env->sc_flags |= SMTPD_MDA_PAUSED;
285 		return;
286 
287 	case IMSG_CTL_RESUME_MDA:
288 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
289 		env->sc_flags &= ~SMTPD_MDA_PAUSED;
290 		scheduler_reset_events();
291 		return;
292 
293 	case IMSG_CTL_PAUSE_MTA:
294 		log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
295 		env->sc_flags |= SMTPD_MTA_PAUSED;
296 		return;
297 
298 	case IMSG_CTL_RESUME_MTA:
299 		log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
300 		env->sc_flags &= ~SMTPD_MTA_PAUSED;
301 		scheduler_reset_events();
302 		return;
303 
304 	case IMSG_CTL_VERBOSE:
305 		m_msg(&m, imsg);
306 		m_get_int(&m, &v);
307 		m_end(&m);
308 		log_setverbose(v);
309 		return;
310 
311 	case IMSG_CTL_PROFILE:
312 		m_msg(&m, imsg);
313 		m_get_int(&m, &v);
314 		m_end(&m);
315 		profiling = v;
316 		return;
317 
318 	case IMSG_CTL_LIST_MESSAGES:
319 		msgid = *(uint32_t *)(imsg->data);
320 		n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size);
321 		m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1,
322 		    msgids, n * sizeof (*msgids));
323 		return;
324 
325 	case IMSG_CTL_LIST_ENVELOPES:
326 		id = *(uint64_t *)(imsg->data);
327 		n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size);
328 		for (i = 0; i < n; i++) {
329 			m_create(p_queue, IMSG_CTL_LIST_ENVELOPES,
330 			    imsg->hdr.peerid, 0, -1);
331 			m_add_evpid(p_queue, state[i].evpid);
332 			m_add_int(p_queue, state[i].flags);
333 			m_add_time(p_queue, state[i].time);
334 			m_close(p_queue);
335 		}
336 		m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES,
337 		    imsg->hdr.peerid, 0, -1, NULL, 0);
338 		return;
339 
340 	case IMSG_CTL_SCHEDULE:
341 		id = *(uint64_t *)(imsg->data);
342 		if (id <= 0xffffffffL)
343 			log_debug("debug: scheduler: "
344 			    "scheduling msg:%08" PRIx64, id);
345 		else
346 			log_debug("debug: scheduler: "
347 			    "scheduling evp:%016" PRIx64, id);
348 		r = backend->schedule(id);
349 		scheduler_reset_events();
350 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
351 		    0, -1, NULL, 0);
352 		return;
353 
354 	case IMSG_QUEUE_ENVELOPE_SCHEDULE:
355 		id = *(uint64_t *)(imsg->data);
356 		backend->schedule(id);
357 		scheduler_reset_events();
358 		return;
359 
360 	case IMSG_CTL_REMOVE:
361 		id = *(uint64_t *)(imsg->data);
362 		if (id <= 0xffffffffL)
363 			log_debug("debug: scheduler: "
364 			    "removing msg:%08" PRIx64, id);
365 		else
366 			log_debug("debug: scheduler: "
367 			    "removing evp:%016" PRIx64, id);
368 		r = backend->remove(id);
369 		scheduler_reset_events();
370 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
371 		    0, -1, NULL, 0);
372 		return;
373 
374 	case IMSG_CTL_PAUSE_EVP:
375 		id = *(uint64_t *)(imsg->data);
376 		if (id <= 0xffffffffL)
377 			log_debug("debug: scheduler: "
378 			    "suspending msg:%08" PRIx64, id);
379 		else
380 			log_debug("debug: scheduler: "
381 			    "suspending evp:%016" PRIx64, id);
382 		r = backend->suspend(id);
383 		scheduler_reset_events();
384 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
385 		    0, -1, NULL, 0);
386 		return;
387 
388 	case IMSG_CTL_RESUME_EVP:
389 		id = *(uint64_t *)(imsg->data);
390 		if (id <= 0xffffffffL)
391 			log_debug("debug: scheduler: "
392 			    "resuming msg:%08" PRIx64, id);
393 		else
394 			log_debug("debug: scheduler: "
395 			    "resuming evp:%016" PRIx64, id);
396 		r = backend->resume(id);
397 		scheduler_reset_events();
398 		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
399 		    0, -1, NULL, 0);
400 		return;
401 	}
402 
403 	errx(1, "scheduler_imsg: unexpected %s imsg",
404 	    imsg_to_str(imsg->hdr.type));
405 }
406 
407 static void
408 scheduler_shutdown(void)
409 {
410 	log_debug("debug: scheduler agent exiting");
411 	_exit(0);
412 }
413 
414 static void
415 scheduler_reset_events(void)
416 {
417 	struct timeval	 tv;
418 
419 	evtimer_del(&ev);
420 	tv.tv_sec = 0;
421 	tv.tv_usec = 0;
422 	evtimer_add(&ev, &tv);
423 }
424 
425 int
426 scheduler(void)
427 {
428 	struct passwd	*pw;
429 
430 	backend = scheduler_backend_lookup(backend_scheduler);
431 	if (backend == NULL)
432 		errx(1, "cannot find scheduler backend \"%s\"",
433 		    backend_scheduler);
434 
435 	purge_config(PURGE_EVERYTHING & ~PURGE_DISPATCHERS);
436 
437 	if ((pw = getpwnam(SMTPD_USER)) == NULL)
438 		fatalx("unknown user " SMTPD_USER);
439 
440 	config_process(PROC_SCHEDULER);
441 
442 	backend->init(backend_scheduler);
443 
444 	if (chroot(PATH_CHROOT) == -1)
445 		fatal("scheduler: chroot");
446 	if (chdir("/") == -1)
447 		fatal("scheduler: chdir(\"/\")");
448 
449 	if (setgroups(1, &pw->pw_gid) ||
450 	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
451 	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
452 		fatal("scheduler: cannot drop privileges");
453 
454 	evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids);
455 	types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types);
456 	msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids);
457 	state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state);
458 
459 	imsg_callback = scheduler_imsg;
460 	event_init();
461 
462 	signal(SIGINT, SIG_IGN);
463 	signal(SIGTERM, SIG_IGN);
464 	signal(SIGPIPE, SIG_IGN);
465 	signal(SIGHUP, SIG_IGN);
466 
467 	config_peer(PROC_CONTROL);
468 	config_peer(PROC_QUEUE);
469 
470 	evtimer_set(&ev, scheduler_timeout, NULL);
471 	scheduler_reset_events();
472 
473 	if (pledge("stdio", NULL) == -1)
474 		err(1, "pledge");
475 
476 	event_dispatch();
477 	fatalx("exited event loop");
478 
479 	return (0);
480 }
481 
482 static void
483 scheduler_timeout(int fd, short event, void *p)
484 {
485 	struct timeval		tv;
486 	size_t			i;
487 	size_t			d_inflight;
488 	size_t			d_envelope;
489 	size_t			d_removed;
490 	size_t			d_expired;
491 	size_t			d_updated;
492 	size_t			count;
493 	int			mask, r, delay;
494 
495 	tv.tv_sec = 0;
496 	tv.tv_usec = 0;
497 
498 	mask = SCHED_UPDATE;
499 
500 	if (ninflight <  env->sc_scheduler_max_inflight) {
501 		mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE;
502 		if (!(env->sc_flags & SMTPD_MDA_PAUSED))
503 			mask |= SCHED_MDA;
504 		if (!(env->sc_flags & SMTPD_MTA_PAUSED))
505 			mask |= SCHED_MTA;
506 	}
507 
508 	count = env->sc_scheduler_max_schedule;
509 
510 	log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count);
511 
512 	r = backend->batch(mask, &delay, &count, evpids, types);
513 
514 	log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count);
515 
516 	if (r < 0)
517 		fatalx("scheduler: error in batch handler");
518 
519 	if (r == 0) {
520 
521 		if (delay < -1)
522 			fatalx("scheduler: invalid delay %d", delay);
523 
524 		if (delay == -1) {
525 			log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
526 			return;
527 		}
528 
529 		tv.tv_sec = delay;
530 		tv.tv_usec = 0;
531 		log_trace(TRACE_SCHEDULER,
532 		    "scheduler: waiting for %s", duration_to_text(tv.tv_sec));
533 		evtimer_add(&ev, &tv);
534 		return;
535 	}
536 
537 	d_inflight = 0;
538 	d_envelope = 0;
539 	d_removed = 0;
540 	d_expired = 0;
541 	d_updated = 0;
542 
543 	for (i = 0; i < count; i++) {
544 		switch(types[i]) {
545 		case SCHED_REMOVE:
546 			log_debug("debug: scheduler: evp:%016" PRIx64
547 			    " removed", evpids[i]);
548 			m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1);
549 			m_add_evpid(p_queue, evpids[i]);
550 			m_close(p_queue);
551 			d_envelope += 1;
552 			d_removed += 1;
553 			d_inflight += 1;
554 			break;
555 
556 		case SCHED_EXPIRE:
557 			log_debug("debug: scheduler: evp:%016" PRIx64
558 			    " expired", evpids[i]);
559 			m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1);
560 			m_add_evpid(p_queue, evpids[i]);
561 			m_close(p_queue);
562 			d_envelope += 1;
563 			d_expired += 1;
564 			d_inflight += 1;
565 			break;
566 
567 		case SCHED_UPDATE:
568 			log_debug("debug: scheduler: evp:%016" PRIx64
569 			    " scheduled (update)", evpids[i]);
570 			d_updated += 1;
571 			break;
572 
573 		case SCHED_BOUNCE:
574 			log_debug("debug: scheduler: evp:%016" PRIx64
575 			    " scheduled (bounce)", evpids[i]);
576 			m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1);
577 			m_add_evpid(p_queue, evpids[i]);
578 			m_close(p_queue);
579 			d_inflight += 1;
580 			break;
581 
582 		case SCHED_MDA:
583 			log_debug("debug: scheduler: evp:%016" PRIx64
584 			    " scheduled (mda)", evpids[i]);
585 			m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1);
586 			m_add_evpid(p_queue, evpids[i]);
587 			m_close(p_queue);
588 			d_inflight += 1;
589 			break;
590 
591 		case SCHED_MTA:
592 			log_debug("debug: scheduler: evp:%016" PRIx64
593 			    " scheduled (mta)", evpids[i]);
594 			m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1);
595 			m_add_evpid(p_queue, evpids[i]);
596 			m_close(p_queue);
597 			d_inflight += 1;
598 			break;
599 		}
600 	}
601 
602 	stat_decrement("scheduler.envelope", d_envelope);
603 	stat_increment("scheduler.envelope.inflight", d_inflight);
604 	stat_increment("scheduler.envelope.expired", d_expired);
605 	stat_increment("scheduler.envelope.removed", d_removed);
606 	stat_increment("scheduler.envelope.updated", d_updated);
607 
608 	ninflight += d_inflight;
609 
610 	tv.tv_sec = 0;
611 	tv.tv_usec = 0;
612 	evtimer_add(&ev, &tv);
613 }
614