1 /*	$OpenBSD: queue_backend.c,v 1.66 2020/04/22 11:35:34 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include "includes.h"
20 
21 #include <sys/types.h>
22 #include <sys/queue.h>
23 #include <sys/tree.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 
27 #include <ctype.h>
28 #include <err.h>
29 #include <errno.h>
30 #include <event.h>
31 #include <fcntl.h>
32 #include <grp.h>
33 #include <imsg.h>
34 #include <limits.h>
35 #include <inttypes.h>
36 #include <pwd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <time.h>
41 #include <unistd.h>
42 
43 #include "smtpd.h"
44 #include "log.h"
45 
46 static const char* envelope_validate(struct envelope *);
47 
48 extern struct queue_backend	queue_backend_fs;
49 extern struct queue_backend	queue_backend_null;
50 extern struct queue_backend	queue_backend_proc;
51 extern struct queue_backend	queue_backend_ram;
52 
53 static void queue_envelope_cache_add(struct envelope *);
54 static void queue_envelope_cache_update(struct envelope *);
55 static void queue_envelope_cache_del(uint64_t evpid);
56 
57 TAILQ_HEAD(evplst, envelope);
58 
59 static struct tree		evpcache_tree;
60 static struct evplst		evpcache_list;
61 static struct queue_backend	*backend;
62 
63 static int (*handler_close)(void);
64 static int (*handler_message_create)(uint32_t *);
65 static int (*handler_message_commit)(uint32_t, const char*);
66 static int (*handler_message_delete)(uint32_t);
67 static int (*handler_message_fd_r)(uint32_t);
68 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
69 static int (*handler_envelope_delete)(uint64_t);
70 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
71 static int (*handler_envelope_load)(uint64_t, char *, size_t);
72 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
73 static int (*handler_message_walk)(uint64_t *, char *, size_t,
74     uint32_t, int *, void **);
75 
76 #ifdef QUEUE_PROFILING
77 
78 static struct {
79 	struct timespec	 t0;
80 	const char	*name;
81 } profile;
82 
profile_enter(const char * name)83 static inline void profile_enter(const char *name)
84 {
85 	if ((profiling & PROFILE_QUEUE) == 0)
86 		return;
87 
88 	profile.name = name;
89 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
90 }
91 
profile_leave(void)92 static inline void profile_leave(void)
93 {
94 	struct timespec	 t1, dt;
95 
96 	if ((profiling & PROFILE_QUEUE) == 0)
97 		return;
98 
99 	clock_gettime(CLOCK_MONOTONIC, &t1);
100 	timespecsub(&t1, &profile.t0, &dt);
101 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
102 	    (long long)dt.tv_sec, dt.tv_nsec);
103 }
104 #else
105 #define profile_enter(x)	do {} while (0)
106 #define profile_leave()		do {} while (0)
107 #endif
108 
109 static int
queue_message_path(uint32_t msgid,char * buf,size_t len)110 queue_message_path(uint32_t msgid, char *buf, size_t len)
111 {
112 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
113 }
114 
115 int
queue_init(const char * name,int server)116 queue_init(const char *name, int server)
117 {
118 	struct passwd	*pwq;
119 	struct group	*gr;
120 	int		 r;
121 
122 	pwq = getpwnam(SMTPD_QUEUE_USER);
123 	if (pwq == NULL)
124 		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
125 
126 	gr = getgrnam(SMTPD_QUEUE_GROUP);
127 	if (gr == NULL)
128 		errx(1, "unknown group %s", SMTPD_QUEUE_GROUP);
129 
130 	tree_init(&evpcache_tree);
131 	TAILQ_INIT(&evpcache_list);
132 
133 	if (!strcmp(name, "fs"))
134 		backend = &queue_backend_fs;
135 	else if (!strcmp(name, "null"))
136 		backend = &queue_backend_null;
137 	else if (!strcmp(name, "ram"))
138 		backend = &queue_backend_ram;
139 	else
140 		backend = &queue_backend_proc;
141 
142 	if (server) {
143 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
144 			errx(1, "error in spool directory setup");
145 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
146 			errx(1, "error in offline directory setup");
147 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
148 			errx(1, "error in purge directory setup");
149 
150 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
151 
152 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
153 			errx(1, "error in purge directory setup");
154 	}
155 
156 	r = backend->init(pwq, server, name);
157 
158 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
159 
160 	return (r);
161 }
162 
163 int
queue_close(void)164 queue_close(void)
165 {
166 	if (handler_close)
167 		return (handler_close());
168 
169 	return (1);
170 }
171 
172 int
queue_message_create(uint32_t * msgid)173 queue_message_create(uint32_t *msgid)
174 {
175 	int	r;
176 
177 	profile_enter("queue_message_create");
178 	r = handler_message_create(msgid);
179 	profile_leave();
180 
181 	log_trace(TRACE_QUEUE,
182 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
183 	    r, *msgid);
184 
185 	return (r);
186 }
187 
188 int
queue_message_delete(uint32_t msgid)189 queue_message_delete(uint32_t msgid)
190 {
191 	char	msgpath[PATH_MAX];
192 	uint64_t evpid;
193 	void   *iter;
194 	int	r;
195 
196 	profile_enter("queue_message_delete");
197 	r = handler_message_delete(msgid);
198 	profile_leave();
199 
200 	/* in case the message is incoming */
201 	queue_message_path(msgid, msgpath, sizeof(msgpath));
202 	unlink(msgpath);
203 
204 	/* remove remaining envelopes from the cache if any (on rollback) */
205 	evpid = msgid_to_evpid(msgid);
206 	for (;;) {
207 		iter = NULL;
208 		if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
209 			break;
210 		if (evpid_to_msgid(evpid) != msgid)
211 			break;
212 		queue_envelope_cache_del(evpid);
213 	}
214 
215 	log_trace(TRACE_QUEUE,
216 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
217 
218 	return (r);
219 }
220 
221 int
queue_message_commit(uint32_t msgid)222 queue_message_commit(uint32_t msgid)
223 {
224 	int	r;
225 	char	msgpath[PATH_MAX];
226 	char	tmppath[PATH_MAX];
227 	FILE	*ifp = NULL;
228 	FILE	*ofp = NULL;
229 
230 	profile_enter("queue_message_commit");
231 
232 	queue_message_path(msgid, msgpath, sizeof(msgpath));
233 
234 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
235 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
236 		ifp = fopen(msgpath, "r");
237 		ofp = fopen(tmppath, "w+");
238 		if (ifp == NULL || ofp == NULL)
239 			goto err;
240 		if (!compress_file(ifp, ofp))
241 			goto err;
242 		fclose(ifp);
243 		fclose(ofp);
244 		ifp = NULL;
245 		ofp = NULL;
246 
247 		if (rename(tmppath, msgpath) == -1) {
248 			if (errno == ENOSPC)
249 				return (0);
250 			unlink(tmppath);
251 			log_warn("rename");
252 			return (0);
253 		}
254 	}
255 
256 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
257 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
258 		ifp = fopen(msgpath, "r");
259 		ofp = fopen(tmppath, "w+");
260 		if (ifp == NULL || ofp == NULL)
261 			goto err;
262 		if (!crypto_encrypt_file(ifp, ofp))
263 			goto err;
264 		fclose(ifp);
265 		fclose(ofp);
266 		ifp = NULL;
267 		ofp = NULL;
268 
269 		if (rename(tmppath, msgpath) == -1) {
270 			if (errno == ENOSPC)
271 				return (0);
272 			unlink(tmppath);
273 			log_warn("rename");
274 			return (0);
275 		}
276 	}
277 
278 	r = handler_message_commit(msgid, msgpath);
279 	profile_leave();
280 
281 	/* in case it's not done by the backend */
282 	unlink(msgpath);
283 
284 	log_trace(TRACE_QUEUE,
285 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
286 	    msgid, r);
287 
288 	return (r);
289 
290 err:
291 	if (ifp)
292 		fclose(ifp);
293 	if (ofp)
294 		fclose(ofp);
295 	return 0;
296 }
297 
298 int
queue_message_fd_r(uint32_t msgid)299 queue_message_fd_r(uint32_t msgid)
300 {
301 	int	fdin = -1, fdout = -1, fd = -1;
302 	FILE	*ifp = NULL;
303 	FILE	*ofp = NULL;
304 
305 	profile_enter("queue_message_fd_r");
306 	fdin = handler_message_fd_r(msgid);
307 	profile_leave();
308 
309 	log_trace(TRACE_QUEUE,
310 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
311 
312 	if (fdin == -1)
313 		return (-1);
314 
315 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
316 		if ((fdout = mktmpfile()) == -1)
317 			goto err;
318 		if ((fd = dup(fdout)) == -1)
319 			goto err;
320 		if ((ifp = fdopen(fdin, "r")) == NULL)
321 			goto err;
322 		fdin = fd;
323 		fd = -1;
324 		if ((ofp = fdopen(fdout, "w+")) == NULL)
325 			goto err;
326 
327 		if (!crypto_decrypt_file(ifp, ofp))
328 			goto err;
329 
330 		fclose(ifp);
331 		ifp = NULL;
332 		fclose(ofp);
333 		ofp = NULL;
334 		lseek(fdin, SEEK_SET, 0);
335 	}
336 
337 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
338 		if ((fdout = mktmpfile()) == -1)
339 			goto err;
340 		if ((fd = dup(fdout)) == -1)
341 			goto err;
342 		if ((ifp = fdopen(fdin, "r")) == NULL)
343 			goto err;
344 		fdin = fd;
345 		fd = -1;
346 		if ((ofp = fdopen(fdout, "w+")) == NULL)
347 			goto err;
348 
349 		if (!uncompress_file(ifp, ofp))
350 			goto err;
351 
352 		fclose(ifp);
353 		ifp = NULL;
354 		fclose(ofp);
355 		ofp = NULL;
356 		lseek(fdin, SEEK_SET, 0);
357 	}
358 
359 	return (fdin);
360 
361 err:
362 	if (fd != -1)
363 		close(fd);
364 	if (fdin != -1)
365 		close(fdin);
366 	if (fdout != -1)
367 		close(fdout);
368 	if (ifp)
369 		fclose(ifp);
370 	if (ofp)
371 		fclose(ofp);
372 	return -1;
373 }
374 
375 int
queue_message_fd_rw(uint32_t msgid)376 queue_message_fd_rw(uint32_t msgid)
377 {
378 	char buf[PATH_MAX];
379 
380 	queue_message_path(msgid, buf, sizeof(buf));
381 
382 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
383 }
384 
385 static int
queue_envelope_dump_buffer(struct envelope * ep,char * evpbuf,size_t evpbufsize)386 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
387 {
388 	char   *evp;
389 	size_t	evplen;
390 	size_t	complen;
391 	char	compbuf[sizeof(struct envelope)];
392 	size_t	enclen;
393 	char	encbuf[sizeof(struct envelope)];
394 
395 	evp = evpbuf;
396 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
397 	if (evplen == 0)
398 		return (0);
399 
400 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
401 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
402 		if (complen == 0)
403 			return (0);
404 		evp = compbuf;
405 		evplen = complen;
406 	}
407 
408 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
409 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
410 		if (enclen == 0)
411 			return (0);
412 		evp = encbuf;
413 		evplen = enclen;
414 	}
415 
416 	memmove(evpbuf, evp, evplen);
417 
418 	return (evplen);
419 }
420 
421 static int
queue_envelope_load_buffer(struct envelope * ep,char * evpbuf,size_t evpbufsize)422 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
423 {
424 	char		*evp;
425 	size_t		 evplen;
426 	char		 compbuf[sizeof(struct envelope)];
427 	size_t		 complen;
428 	char		 encbuf[sizeof(struct envelope)];
429 	size_t		 enclen;
430 
431 	evp = evpbuf;
432 	evplen = evpbufsize;
433 
434 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
435 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
436 		if (enclen == 0)
437 			return (0);
438 		evp = encbuf;
439 		evplen = enclen;
440 	}
441 
442 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
443 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
444 		if (complen == 0)
445 			return (0);
446 		evp = compbuf;
447 		evplen = complen;
448 	}
449 
450 	return (envelope_load_buffer(ep, evp, evplen));
451 }
452 
453 static void
queue_envelope_cache_add(struct envelope * e)454 queue_envelope_cache_add(struct envelope *e)
455 {
456 	struct envelope *cached;
457 
458 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
459 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
460 
461 	cached = xcalloc(1, sizeof *cached);
462 	*cached = *e;
463 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
464 	tree_xset(&evpcache_tree, e->id, cached);
465 	stat_increment("queue.evpcache.size", 1);
466 }
467 
468 static void
queue_envelope_cache_update(struct envelope * e)469 queue_envelope_cache_update(struct envelope *e)
470 {
471 	struct envelope *cached;
472 
473 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
474 		queue_envelope_cache_add(e);
475 		stat_increment("queue.evpcache.update.missed", 1);
476 	} else {
477 		TAILQ_REMOVE(&evpcache_list, cached, entry);
478 		*cached = *e;
479 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
480 		stat_increment("queue.evpcache.update.hit", 1);
481 	}
482 }
483 
484 static void
queue_envelope_cache_del(uint64_t evpid)485 queue_envelope_cache_del(uint64_t evpid)
486 {
487 	struct envelope *cached;
488 
489 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
490 		return;
491 
492 	TAILQ_REMOVE(&evpcache_list, cached, entry);
493 	free(cached);
494 	stat_decrement("queue.evpcache.size", 1);
495 }
496 
497 int
queue_envelope_create(struct envelope * ep)498 queue_envelope_create(struct envelope *ep)
499 {
500 	int		 r;
501 	char		 evpbuf[sizeof(struct envelope)];
502 	size_t		 evplen;
503 	uint64_t	 evpid;
504 	uint32_t	 msgid;
505 
506 	ep->creation = time(NULL);
507 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
508 	if (evplen == 0)
509 		return (0);
510 
511 	evpid = ep->id;
512 	msgid = evpid_to_msgid(evpid);
513 
514 	profile_enter("queue_envelope_create");
515 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
516 	profile_leave();
517 
518 	log_trace(TRACE_QUEUE,
519 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
520 	    evpid, evplen, r, ep->id);
521 
522 	if (!r) {
523 		ep->creation = 0;
524 		ep->id = 0;
525 	}
526 
527 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
528 		queue_envelope_cache_add(ep);
529 
530 	return (r);
531 }
532 
533 int
queue_envelope_delete(uint64_t evpid)534 queue_envelope_delete(uint64_t evpid)
535 {
536 	int	r;
537 
538 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
539 		queue_envelope_cache_del(evpid);
540 
541 	profile_enter("queue_envelope_delete");
542 	r = handler_envelope_delete(evpid);
543 	profile_leave();
544 
545 	log_trace(TRACE_QUEUE,
546 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
547 	    evpid, r);
548 
549 	return (r);
550 }
551 
552 int
queue_envelope_load(uint64_t evpid,struct envelope * ep)553 queue_envelope_load(uint64_t evpid, struct envelope *ep)
554 {
555 	const char	*e;
556 	char		 evpbuf[sizeof(struct envelope)];
557 	size_t		 evplen;
558 	struct envelope	*cached;
559 
560 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
561 	    (cached = tree_get(&evpcache_tree, evpid))) {
562 		*ep = *cached;
563 		stat_increment("queue.evpcache.load.hit", 1);
564 		return (1);
565 	}
566 
567 	ep->id = evpid;
568 	profile_enter("queue_envelope_load");
569 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
570 	profile_leave();
571 
572 	log_trace(TRACE_QUEUE,
573 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
574 	    evpid, evplen);
575 
576 	if (evplen == 0)
577 		return (0);
578 
579 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
580 		if ((e = envelope_validate(ep)) == NULL) {
581 			ep->id = evpid;
582 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
583 				queue_envelope_cache_add(ep);
584 				stat_increment("queue.evpcache.load.missed", 1);
585 			}
586 			return (1);
587 		}
588 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
589 		    evpid, e);
590 	}
591 	return (0);
592 }
593 
594 int
queue_envelope_update(struct envelope * ep)595 queue_envelope_update(struct envelope *ep)
596 {
597 	char	evpbuf[sizeof(struct envelope)];
598 	size_t	evplen;
599 	int	r;
600 
601 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
602 	if (evplen == 0)
603 		return (0);
604 
605 	profile_enter("queue_envelope_update");
606 	r = handler_envelope_update(ep->id, evpbuf, evplen);
607 	profile_leave();
608 
609 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
610 		queue_envelope_cache_update(ep);
611 
612 	log_trace(TRACE_QUEUE,
613 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
614 	    ep->id, r);
615 
616 	return (r);
617 }
618 
619 int
queue_message_walk(struct envelope * ep,uint32_t msgid,int * done,void ** data)620 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
621 {
622 	char		 evpbuf[sizeof(struct envelope)];
623 	uint64_t	 evpid;
624 	int		 r;
625 	const char	*e;
626 
627 	profile_enter("queue_message_walk");
628 	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
629 	    msgid, done, data);
630 	profile_leave();
631 
632 	log_trace(TRACE_QUEUE,
633 	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
634 	    r, evpid);
635 
636 	if (r == -1)
637 		return (r);
638 
639 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
640 		if ((e = envelope_validate(ep)) == NULL) {
641 			ep->id = evpid;
642 			/*
643 			 * do not cache the envelope here, while discovering
644 			 * envelopes one could re-run discover on already
645 			 * scheduled envelopes which leads to triggering of
646 			 * strict checks in caching. Envelopes could anyway
647 			 * be loaded from backend if it isn't cached.
648 			 */
649 			return (1);
650 		}
651 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
652 		    evpid, e);
653 	}
654 	return (0);
655 }
656 
657 int
queue_envelope_walk(struct envelope * ep)658 queue_envelope_walk(struct envelope *ep)
659 {
660 	const char	*e;
661 	uint64_t	 evpid;
662 	char		 evpbuf[sizeof(struct envelope)];
663 	int		 r;
664 
665 	profile_enter("queue_envelope_walk");
666 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
667 	profile_leave();
668 
669 	log_trace(TRACE_QUEUE,
670 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
671 	    r, evpid);
672 
673 	if (r == -1)
674 		return (r);
675 
676 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
677 		if ((e = envelope_validate(ep)) == NULL) {
678 			ep->id = evpid;
679 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
680 				queue_envelope_cache_add(ep);
681 			return (1);
682 		}
683 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
684 		    evpid, e);
685 	}
686 	return (0);
687 }
688 
689 uint32_t
queue_generate_msgid(void)690 queue_generate_msgid(void)
691 {
692 	uint32_t msgid;
693 
694 	while ((msgid = arc4random()) == 0)
695 		;
696 
697 	return msgid;
698 }
699 
700 uint64_t
queue_generate_evpid(uint32_t msgid)701 queue_generate_evpid(uint32_t msgid)
702 {
703 	uint32_t rnd;
704 	uint64_t evpid;
705 
706 	while ((rnd = arc4random()) == 0)
707 		;
708 
709 	evpid = msgid;
710 	evpid <<= 32;
711 	evpid |= rnd;
712 
713 	return evpid;
714 }
715 
716 static const char*
envelope_validate(struct envelope * ep)717 envelope_validate(struct envelope *ep)
718 {
719 	if (ep->version != SMTPD_ENVELOPE_VERSION)
720 		return "version mismatch";
721 
722 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
723 		return "invalid helo";
724 	if (ep->helo[0] == '\0')
725 		return "empty helo";
726 
727 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
728 		return "invalid hostname";
729 	if (ep->hostname[0] == '\0')
730 		return "empty hostname";
731 
732 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
733 		return "invalid error line";
734 
735 	if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
736 		return "unknown dispatcher";
737 
738 	return NULL;
739 }
740 
741 void
queue_api_on_close(int (* cb)(void))742 queue_api_on_close(int(*cb)(void))
743 {
744 	handler_close = cb;
745 }
746 
747 void
queue_api_on_message_create(int (* cb)(uint32_t *))748 queue_api_on_message_create(int(*cb)(uint32_t *))
749 {
750 	handler_message_create = cb;
751 }
752 
753 void
queue_api_on_message_commit(int (* cb)(uint32_t,const char *))754 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
755 {
756 	handler_message_commit = cb;
757 }
758 
759 void
queue_api_on_message_delete(int (* cb)(uint32_t))760 queue_api_on_message_delete(int(*cb)(uint32_t))
761 {
762 	handler_message_delete = cb;
763 }
764 
765 void
queue_api_on_message_fd_r(int (* cb)(uint32_t))766 queue_api_on_message_fd_r(int(*cb)(uint32_t))
767 {
768 	handler_message_fd_r = cb;
769 }
770 
771 void
queue_api_on_envelope_create(int (* cb)(uint32_t,const char *,size_t,uint64_t *))772 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
773 {
774 	handler_envelope_create = cb;
775 }
776 
777 void
queue_api_on_envelope_delete(int (* cb)(uint64_t))778 queue_api_on_envelope_delete(int(*cb)(uint64_t))
779 {
780 	handler_envelope_delete = cb;
781 }
782 
783 void
queue_api_on_envelope_update(int (* cb)(uint64_t,const char *,size_t))784 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
785 {
786 	handler_envelope_update = cb;
787 }
788 
789 void
queue_api_on_envelope_load(int (* cb)(uint64_t,char *,size_t))790 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
791 {
792 	handler_envelope_load = cb;
793 }
794 
795 void
queue_api_on_envelope_walk(int (* cb)(uint64_t *,char *,size_t))796 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
797 {
798 	handler_envelope_walk = cb;
799 }
800 
801 void
queue_api_on_message_walk(int (* cb)(uint64_t *,char *,size_t,uint32_t,int *,void **))802 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
803     uint32_t, int *, void **))
804 {
805 	handler_message_walk = cb;
806 }
807