xref: /openbsd/usr.sbin/smtpd/queue_backend.c (revision 0dcffd0d)
1 /*	$OpenBSD: queue_backend.c,v 1.69 2023/05/31 16:51:46 op Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <grp.h>
22 #include <inttypes.h>
23 #include <pwd.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <time.h>
27 #include <unistd.h>
28 
29 #include "smtpd.h"
30 #include "log.h"
31 
32 static const char* envelope_validate(struct envelope *);
33 
34 extern struct queue_backend	queue_backend_fs;
35 extern struct queue_backend	queue_backend_null;
36 extern struct queue_backend	queue_backend_proc;
37 extern struct queue_backend	queue_backend_ram;
38 
39 static void queue_envelope_cache_add(struct envelope *);
40 static void queue_envelope_cache_update(struct envelope *);
41 static void queue_envelope_cache_del(uint64_t evpid);
42 
43 TAILQ_HEAD(evplst, envelope);
44 
45 static struct tree		evpcache_tree;
46 static struct evplst		evpcache_list;
47 static struct queue_backend	*backend;
48 
49 static int (*handler_close)(void);
50 static int (*handler_message_create)(uint32_t *);
51 static int (*handler_message_commit)(uint32_t, const char*);
52 static int (*handler_message_delete)(uint32_t);
53 static int (*handler_message_fd_r)(uint32_t);
54 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
55 static int (*handler_envelope_delete)(uint64_t);
56 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
57 static int (*handler_envelope_load)(uint64_t, char *, size_t);
58 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
59 static int (*handler_message_walk)(uint64_t *, char *, size_t,
60     uint32_t, int *, void **);
61 
62 #ifdef QUEUE_PROFILING
63 
64 static struct {
65 	struct timespec	 t0;
66 	const char	*name;
67 } profile;
68 
profile_enter(const char * name)69 static inline void profile_enter(const char *name)
70 {
71 	if ((profiling & PROFILE_QUEUE) == 0)
72 		return;
73 
74 	profile.name = name;
75 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
76 }
77 
profile_leave(void)78 static inline void profile_leave(void)
79 {
80 	struct timespec	 t1, dt;
81 
82 	if ((profiling & PROFILE_QUEUE) == 0)
83 		return;
84 
85 	clock_gettime(CLOCK_MONOTONIC, &t1);
86 	timespecsub(&t1, &profile.t0, &dt);
87 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
88 	    (long long)dt.tv_sec, dt.tv_nsec);
89 }
90 #else
91 #define profile_enter(x)	do {} while (0)
92 #define profile_leave()		do {} while (0)
93 #endif
94 
95 static int
queue_message_path(uint32_t msgid,char * buf,size_t len)96 queue_message_path(uint32_t msgid, char *buf, size_t len)
97 {
98 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
99 }
100 
101 int
queue_init(const char * name,int server)102 queue_init(const char *name, int server)
103 {
104 	struct passwd	*pwq;
105 	struct group	*gr;
106 	int		 r;
107 
108 	pwq = getpwnam(SMTPD_QUEUE_USER);
109 	if (pwq == NULL)
110 		fatalx("unknown user %s", SMTPD_QUEUE_USER);
111 
112 	gr = getgrnam(SMTPD_QUEUE_GROUP);
113 	if (gr == NULL)
114 		fatalx("unknown group %s", SMTPD_QUEUE_GROUP);
115 
116 	tree_init(&evpcache_tree);
117 	TAILQ_INIT(&evpcache_list);
118 
119 	if (!strcmp(name, "fs"))
120 		backend = &queue_backend_fs;
121 	else if (!strcmp(name, "null"))
122 		backend = &queue_backend_null;
123 	else if (!strcmp(name, "ram"))
124 		backend = &queue_backend_ram;
125 	else
126 		backend = &queue_backend_proc;
127 
128 	if (server) {
129 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
130 			fatalx("error in spool directory setup");
131 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
132 			fatalx("error in offline directory setup");
133 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
134 			fatalx("error in purge directory setup");
135 
136 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
137 
138 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
139 			fatalx("error in purge directory setup");
140 	}
141 
142 	r = backend->init(pwq, server, name);
143 
144 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
145 
146 	return (r);
147 }
148 
149 int
queue_close(void)150 queue_close(void)
151 {
152 	if (handler_close)
153 		return (handler_close());
154 
155 	return (1);
156 }
157 
158 int
queue_message_create(uint32_t * msgid)159 queue_message_create(uint32_t *msgid)
160 {
161 	int	r;
162 
163 	profile_enter("queue_message_create");
164 	r = handler_message_create(msgid);
165 	profile_leave();
166 
167 	log_trace(TRACE_QUEUE,
168 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
169 	    r, *msgid);
170 
171 	return (r);
172 }
173 
174 int
queue_message_delete(uint32_t msgid)175 queue_message_delete(uint32_t msgid)
176 {
177 	char	msgpath[PATH_MAX];
178 	uint64_t evpid;
179 	void   *iter;
180 	int	r;
181 
182 	profile_enter("queue_message_delete");
183 	r = handler_message_delete(msgid);
184 	profile_leave();
185 
186 	/* in case the message is incoming */
187 	queue_message_path(msgid, msgpath, sizeof(msgpath));
188 	unlink(msgpath);
189 
190 	/* remove remaining envelopes from the cache if any (on rollback) */
191 	evpid = msgid_to_evpid(msgid);
192 	for (;;) {
193 		iter = NULL;
194 		if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
195 			break;
196 		if (evpid_to_msgid(evpid) != msgid)
197 			break;
198 		queue_envelope_cache_del(evpid);
199 	}
200 
201 	log_trace(TRACE_QUEUE,
202 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
203 
204 	return (r);
205 }
206 
207 int
queue_message_commit(uint32_t msgid)208 queue_message_commit(uint32_t msgid)
209 {
210 	int	r;
211 	char	msgpath[PATH_MAX];
212 	char	tmppath[PATH_MAX];
213 	FILE	*ifp = NULL;
214 	FILE	*ofp = NULL;
215 
216 	profile_enter("queue_message_commit");
217 
218 	queue_message_path(msgid, msgpath, sizeof(msgpath));
219 
220 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
221 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
222 		ifp = fopen(msgpath, "r");
223 		ofp = fopen(tmppath, "w+");
224 		if (ifp == NULL || ofp == NULL)
225 			goto err;
226 		if (!compress_file(ifp, ofp))
227 			goto err;
228 		fclose(ifp);
229 		fclose(ofp);
230 		ifp = NULL;
231 		ofp = NULL;
232 
233 		if (rename(tmppath, msgpath) == -1) {
234 			if (errno == ENOSPC)
235 				return (0);
236 			unlink(tmppath);
237 			log_warn("rename");
238 			return (0);
239 		}
240 	}
241 
242 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
243 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
244 		ifp = fopen(msgpath, "r");
245 		ofp = fopen(tmppath, "w+");
246 		if (ifp == NULL || ofp == NULL)
247 			goto err;
248 		if (!crypto_encrypt_file(ifp, ofp))
249 			goto err;
250 		fclose(ifp);
251 		fclose(ofp);
252 		ifp = NULL;
253 		ofp = NULL;
254 
255 		if (rename(tmppath, msgpath) == -1) {
256 			if (errno == ENOSPC)
257 				return (0);
258 			unlink(tmppath);
259 			log_warn("rename");
260 			return (0);
261 		}
262 	}
263 
264 	r = handler_message_commit(msgid, msgpath);
265 	profile_leave();
266 
267 	/* in case it's not done by the backend */
268 	unlink(msgpath);
269 
270 	log_trace(TRACE_QUEUE,
271 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
272 	    msgid, r);
273 
274 	return (r);
275 
276 err:
277 	if (ifp)
278 		fclose(ifp);
279 	if (ofp)
280 		fclose(ofp);
281 	return 0;
282 }
283 
284 int
queue_message_fd_r(uint32_t msgid)285 queue_message_fd_r(uint32_t msgid)
286 {
287 	int	fdin = -1, fdout = -1, fd = -1;
288 	FILE	*ifp = NULL;
289 	FILE	*ofp = NULL;
290 
291 	profile_enter("queue_message_fd_r");
292 	fdin = handler_message_fd_r(msgid);
293 	profile_leave();
294 
295 	log_trace(TRACE_QUEUE,
296 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
297 
298 	if (fdin == -1)
299 		return (-1);
300 
301 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
302 		if ((fdout = mktmpfile()) == -1)
303 			goto err;
304 		if ((fd = dup(fdout)) == -1)
305 			goto err;
306 		if ((ifp = fdopen(fdin, "r")) == NULL)
307 			goto err;
308 		fdin = fd;
309 		fd = -1;
310 		if ((ofp = fdopen(fdout, "w+")) == NULL)
311 			goto err;
312 
313 		if (!crypto_decrypt_file(ifp, ofp))
314 			goto err;
315 
316 		fclose(ifp);
317 		ifp = NULL;
318 		fclose(ofp);
319 		ofp = NULL;
320 		lseek(fdin, SEEK_SET, 0);
321 	}
322 
323 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
324 		if ((fdout = mktmpfile()) == -1)
325 			goto err;
326 		if ((fd = dup(fdout)) == -1)
327 			goto err;
328 		if ((ifp = fdopen(fdin, "r")) == NULL)
329 			goto err;
330 		fdin = fd;
331 		fd = -1;
332 		if ((ofp = fdopen(fdout, "w+")) == NULL)
333 			goto err;
334 
335 		if (!uncompress_file(ifp, ofp))
336 			goto err;
337 
338 		fclose(ifp);
339 		ifp = NULL;
340 		fclose(ofp);
341 		ofp = NULL;
342 		lseek(fdin, SEEK_SET, 0);
343 	}
344 
345 	return (fdin);
346 
347 err:
348 	if (fd != -1)
349 		close(fd);
350 	if (fdin != -1)
351 		close(fdin);
352 	if (fdout != -1)
353 		close(fdout);
354 	if (ifp)
355 		fclose(ifp);
356 	if (ofp)
357 		fclose(ofp);
358 	return -1;
359 }
360 
361 int
queue_message_fd_rw(uint32_t msgid)362 queue_message_fd_rw(uint32_t msgid)
363 {
364 	char buf[PATH_MAX];
365 
366 	queue_message_path(msgid, buf, sizeof(buf));
367 
368 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
369 }
370 
371 static int
queue_envelope_dump_buffer(struct envelope * ep,char * evpbuf,size_t evpbufsize)372 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
373 {
374 	char   *evp;
375 	size_t	evplen;
376 	size_t	complen;
377 	char	compbuf[sizeof(struct envelope)];
378 	size_t	enclen;
379 	char	encbuf[sizeof(struct envelope)];
380 
381 	evp = evpbuf;
382 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
383 	if (evplen == 0)
384 		return (0);
385 
386 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
387 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
388 		if (complen == 0)
389 			return (0);
390 		evp = compbuf;
391 		evplen = complen;
392 	}
393 
394 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
395 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
396 		if (enclen == 0)
397 			return (0);
398 		evp = encbuf;
399 		evplen = enclen;
400 	}
401 
402 	memmove(evpbuf, evp, evplen);
403 
404 	return (evplen);
405 }
406 
407 static int
queue_envelope_load_buffer(struct envelope * ep,char * evpbuf,size_t evpbufsize)408 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
409 {
410 	char		*evp;
411 	size_t		 evplen;
412 	char		 compbuf[sizeof(struct envelope)];
413 	size_t		 complen;
414 	char		 encbuf[sizeof(struct envelope)];
415 	size_t		 enclen;
416 
417 	evp = evpbuf;
418 	evplen = evpbufsize;
419 
420 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
421 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
422 		if (enclen == 0)
423 			return (0);
424 		evp = encbuf;
425 		evplen = enclen;
426 	}
427 
428 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
429 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
430 		if (complen == 0)
431 			return (0);
432 		evp = compbuf;
433 		evplen = complen;
434 	}
435 
436 	return (envelope_load_buffer(ep, evp, evplen));
437 }
438 
439 static void
queue_envelope_cache_add(struct envelope * e)440 queue_envelope_cache_add(struct envelope *e)
441 {
442 	struct envelope *cached;
443 
444 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
445 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
446 
447 	cached = xcalloc(1, sizeof *cached);
448 	*cached = *e;
449 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
450 	tree_xset(&evpcache_tree, e->id, cached);
451 	stat_increment("queue.evpcache.size", 1);
452 }
453 
454 static void
queue_envelope_cache_update(struct envelope * e)455 queue_envelope_cache_update(struct envelope *e)
456 {
457 	struct envelope *cached;
458 
459 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
460 		queue_envelope_cache_add(e);
461 		stat_increment("queue.evpcache.update.missed", 1);
462 	} else {
463 		TAILQ_REMOVE(&evpcache_list, cached, entry);
464 		*cached = *e;
465 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
466 		stat_increment("queue.evpcache.update.hit", 1);
467 	}
468 }
469 
470 static void
queue_envelope_cache_del(uint64_t evpid)471 queue_envelope_cache_del(uint64_t evpid)
472 {
473 	struct envelope *cached;
474 
475 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
476 		return;
477 
478 	TAILQ_REMOVE(&evpcache_list, cached, entry);
479 	free(cached);
480 	stat_decrement("queue.evpcache.size", 1);
481 }
482 
483 int
queue_envelope_create(struct envelope * ep)484 queue_envelope_create(struct envelope *ep)
485 {
486 	int		 r;
487 	char		 evpbuf[sizeof(struct envelope)];
488 	size_t		 evplen;
489 	uint64_t	 evpid;
490 	uint32_t	 msgid;
491 
492 	ep->creation = time(NULL);
493 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
494 	if (evplen == 0)
495 		return (0);
496 
497 	evpid = ep->id;
498 	msgid = evpid_to_msgid(evpid);
499 
500 	profile_enter("queue_envelope_create");
501 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
502 	profile_leave();
503 
504 	log_trace(TRACE_QUEUE,
505 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
506 	    evpid, evplen, r, ep->id);
507 
508 	if (!r) {
509 		ep->creation = 0;
510 		ep->id = 0;
511 	}
512 
513 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
514 		queue_envelope_cache_add(ep);
515 
516 	return (r);
517 }
518 
519 int
queue_envelope_delete(uint64_t evpid)520 queue_envelope_delete(uint64_t evpid)
521 {
522 	int	r;
523 
524 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
525 		queue_envelope_cache_del(evpid);
526 
527 	profile_enter("queue_envelope_delete");
528 	r = handler_envelope_delete(evpid);
529 	profile_leave();
530 
531 	log_trace(TRACE_QUEUE,
532 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
533 	    evpid, r);
534 
535 	return (r);
536 }
537 
538 int
queue_envelope_load(uint64_t evpid,struct envelope * ep)539 queue_envelope_load(uint64_t evpid, struct envelope *ep)
540 {
541 	const char	*e;
542 	char		 evpbuf[sizeof(struct envelope)];
543 	size_t		 evplen;
544 	struct envelope	*cached;
545 
546 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
547 	    (cached = tree_get(&evpcache_tree, evpid))) {
548 		*ep = *cached;
549 		stat_increment("queue.evpcache.load.hit", 1);
550 		return (1);
551 	}
552 
553 	ep->id = evpid;
554 	profile_enter("queue_envelope_load");
555 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
556 	profile_leave();
557 
558 	log_trace(TRACE_QUEUE,
559 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
560 	    evpid, evplen);
561 
562 	if (evplen == 0)
563 		return (0);
564 
565 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
566 		if ((e = envelope_validate(ep)) == NULL) {
567 			ep->id = evpid;
568 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
569 				queue_envelope_cache_add(ep);
570 				stat_increment("queue.evpcache.load.missed", 1);
571 			}
572 			return (1);
573 		}
574 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
575 		    evpid, e);
576 	}
577 	return (0);
578 }
579 
580 int
queue_envelope_update(struct envelope * ep)581 queue_envelope_update(struct envelope *ep)
582 {
583 	char	evpbuf[sizeof(struct envelope)];
584 	size_t	evplen;
585 	int	r;
586 
587 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
588 	if (evplen == 0)
589 		return (0);
590 
591 	profile_enter("queue_envelope_update");
592 	r = handler_envelope_update(ep->id, evpbuf, evplen);
593 	profile_leave();
594 
595 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
596 		queue_envelope_cache_update(ep);
597 
598 	log_trace(TRACE_QUEUE,
599 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
600 	    ep->id, r);
601 
602 	return (r);
603 }
604 
605 int
queue_message_walk(struct envelope * ep,uint32_t msgid,int * done,void ** data)606 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
607 {
608 	char		 evpbuf[sizeof(struct envelope)];
609 	uint64_t	 evpid;
610 	int		 r;
611 	const char	*e;
612 
613 	profile_enter("queue_message_walk");
614 	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
615 	    msgid, done, data);
616 	profile_leave();
617 
618 	log_trace(TRACE_QUEUE,
619 	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
620 	    r, evpid);
621 
622 	if (r == -1)
623 		return (r);
624 
625 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
626 		if ((e = envelope_validate(ep)) == NULL) {
627 			ep->id = evpid;
628 			/*
629 			 * do not cache the envelope here, while discovering
630 			 * envelopes one could re-run discover on already
631 			 * scheduled envelopes which leads to triggering of
632 			 * strict checks in caching. Envelopes could anyway
633 			 * be loaded from backend if it isn't cached.
634 			 */
635 			return (1);
636 		}
637 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
638 		    evpid, e);
639 	}
640 	return (0);
641 }
642 
643 int
queue_envelope_walk(struct envelope * ep)644 queue_envelope_walk(struct envelope *ep)
645 {
646 	const char	*e;
647 	uint64_t	 evpid;
648 	char		 evpbuf[sizeof(struct envelope)];
649 	int		 r;
650 
651 	profile_enter("queue_envelope_walk");
652 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
653 	profile_leave();
654 
655 	log_trace(TRACE_QUEUE,
656 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
657 	    r, evpid);
658 
659 	if (r == -1)
660 		return (r);
661 
662 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
663 		if ((e = envelope_validate(ep)) == NULL) {
664 			ep->id = evpid;
665 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
666 				queue_envelope_cache_add(ep);
667 			return (1);
668 		}
669 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
670 		    evpid, e);
671 	}
672 	return (0);
673 }
674 
675 uint32_t
queue_generate_msgid(void)676 queue_generate_msgid(void)
677 {
678 	uint32_t msgid;
679 
680 	while ((msgid = arc4random()) == 0)
681 		;
682 
683 	return msgid;
684 }
685 
686 uint64_t
queue_generate_evpid(uint32_t msgid)687 queue_generate_evpid(uint32_t msgid)
688 {
689 	uint32_t rnd;
690 	uint64_t evpid;
691 
692 	while ((rnd = arc4random()) == 0)
693 		;
694 
695 	evpid = msgid;
696 	evpid <<= 32;
697 	evpid |= rnd;
698 
699 	return evpid;
700 }
701 
702 static const char*
envelope_validate(struct envelope * ep)703 envelope_validate(struct envelope *ep)
704 {
705 	if (ep->version != SMTPD_ENVELOPE_VERSION)
706 		return "version mismatch";
707 
708 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
709 		return "invalid helo";
710 	if (ep->helo[0] == '\0')
711 		return "empty helo";
712 
713 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
714 		return "invalid hostname";
715 	if (ep->hostname[0] == '\0')
716 		return "empty hostname";
717 
718 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
719 		return "invalid error line";
720 
721 	if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
722 		return "unknown dispatcher";
723 
724 	return NULL;
725 }
726 
727 void
queue_api_on_close(int (* cb)(void))728 queue_api_on_close(int(*cb)(void))
729 {
730 	handler_close = cb;
731 }
732 
733 void
queue_api_on_message_create(int (* cb)(uint32_t *))734 queue_api_on_message_create(int(*cb)(uint32_t *))
735 {
736 	handler_message_create = cb;
737 }
738 
739 void
queue_api_on_message_commit(int (* cb)(uint32_t,const char *))740 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
741 {
742 	handler_message_commit = cb;
743 }
744 
745 void
queue_api_on_message_delete(int (* cb)(uint32_t))746 queue_api_on_message_delete(int(*cb)(uint32_t))
747 {
748 	handler_message_delete = cb;
749 }
750 
751 void
queue_api_on_message_fd_r(int (* cb)(uint32_t))752 queue_api_on_message_fd_r(int(*cb)(uint32_t))
753 {
754 	handler_message_fd_r = cb;
755 }
756 
757 void
queue_api_on_envelope_create(int (* cb)(uint32_t,const char *,size_t,uint64_t *))758 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
759 {
760 	handler_envelope_create = cb;
761 }
762 
763 void
queue_api_on_envelope_delete(int (* cb)(uint64_t))764 queue_api_on_envelope_delete(int(*cb)(uint64_t))
765 {
766 	handler_envelope_delete = cb;
767 }
768 
769 void
queue_api_on_envelope_update(int (* cb)(uint64_t,const char *,size_t))770 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
771 {
772 	handler_envelope_update = cb;
773 }
774 
775 void
queue_api_on_envelope_load(int (* cb)(uint64_t,char *,size_t))776 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
777 {
778 	handler_envelope_load = cb;
779 }
780 
781 void
queue_api_on_envelope_walk(int (* cb)(uint64_t *,char *,size_t))782 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
783 {
784 	handler_envelope_walk = cb;
785 }
786 
787 void
queue_api_on_message_walk(int (* cb)(uint64_t *,char *,size_t,uint32_t,int *,void **))788 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
789     uint32_t, int *, void **))
790 {
791 	handler_message_walk = cb;
792 }
793