xref: /openbsd/usr.sbin/smtpd/queue_backend.c (revision 3cab2bb3)
1 /*	$OpenBSD: queue_backend.c,v 1.66 2020/04/22 11:35:34 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <grp.h>
31 #include <imsg.h>
32 #include <limits.h>
33 #include <inttypes.h>
34 #include <pwd.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 
41 #include "smtpd.h"
42 #include "log.h"
43 
44 static const char* envelope_validate(struct envelope *);
45 
46 extern struct queue_backend	queue_backend_fs;
47 extern struct queue_backend	queue_backend_null;
48 extern struct queue_backend	queue_backend_proc;
49 extern struct queue_backend	queue_backend_ram;
50 
51 static void queue_envelope_cache_add(struct envelope *);
52 static void queue_envelope_cache_update(struct envelope *);
53 static void queue_envelope_cache_del(uint64_t evpid);
54 
55 TAILQ_HEAD(evplst, envelope);
56 
57 static struct tree		evpcache_tree;
58 static struct evplst		evpcache_list;
59 static struct queue_backend	*backend;
60 
61 static int (*handler_close)(void);
62 static int (*handler_message_create)(uint32_t *);
63 static int (*handler_message_commit)(uint32_t, const char*);
64 static int (*handler_message_delete)(uint32_t);
65 static int (*handler_message_fd_r)(uint32_t);
66 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
67 static int (*handler_envelope_delete)(uint64_t);
68 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
69 static int (*handler_envelope_load)(uint64_t, char *, size_t);
70 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
71 static int (*handler_message_walk)(uint64_t *, char *, size_t,
72     uint32_t, int *, void **);
73 
74 #ifdef QUEUE_PROFILING
75 
76 static struct {
77 	struct timespec	 t0;
78 	const char	*name;
79 } profile;
80 
81 static inline void profile_enter(const char *name)
82 {
83 	if ((profiling & PROFILE_QUEUE) == 0)
84 		return;
85 
86 	profile.name = name;
87 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
88 }
89 
90 static inline void profile_leave(void)
91 {
92 	struct timespec	 t1, dt;
93 
94 	if ((profiling & PROFILE_QUEUE) == 0)
95 		return;
96 
97 	clock_gettime(CLOCK_MONOTONIC, &t1);
98 	timespecsub(&t1, &profile.t0, &dt);
99 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
100 	    (long long)dt.tv_sec, dt.tv_nsec);
101 }
102 #else
103 #define profile_enter(x)	do {} while (0)
104 #define profile_leave()		do {} while (0)
105 #endif
106 
107 static int
108 queue_message_path(uint32_t msgid, char *buf, size_t len)
109 {
110 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
111 }
112 
113 int
114 queue_init(const char *name, int server)
115 {
116 	struct passwd	*pwq;
117 	struct group	*gr;
118 	int		 r;
119 
120 	pwq = getpwnam(SMTPD_QUEUE_USER);
121 	if (pwq == NULL)
122 		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
123 
124 	gr = getgrnam(SMTPD_QUEUE_GROUP);
125 	if (gr == NULL)
126 		errx(1, "unknown group %s", SMTPD_QUEUE_GROUP);
127 
128 	tree_init(&evpcache_tree);
129 	TAILQ_INIT(&evpcache_list);
130 
131 	if (!strcmp(name, "fs"))
132 		backend = &queue_backend_fs;
133 	else if (!strcmp(name, "null"))
134 		backend = &queue_backend_null;
135 	else if (!strcmp(name, "ram"))
136 		backend = &queue_backend_ram;
137 	else
138 		backend = &queue_backend_proc;
139 
140 	if (server) {
141 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
142 			errx(1, "error in spool directory setup");
143 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
144 			errx(1, "error in offline directory setup");
145 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
146 			errx(1, "error in purge directory setup");
147 
148 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
149 
150 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
151 			errx(1, "error in purge directory setup");
152 	}
153 
154 	r = backend->init(pwq, server, name);
155 
156 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
157 
158 	return (r);
159 }
160 
161 int
162 queue_close(void)
163 {
164 	if (handler_close)
165 		return (handler_close());
166 
167 	return (1);
168 }
169 
170 int
171 queue_message_create(uint32_t *msgid)
172 {
173 	int	r;
174 
175 	profile_enter("queue_message_create");
176 	r = handler_message_create(msgid);
177 	profile_leave();
178 
179 	log_trace(TRACE_QUEUE,
180 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
181 	    r, *msgid);
182 
183 	return (r);
184 }
185 
186 int
187 queue_message_delete(uint32_t msgid)
188 {
189 	char	msgpath[PATH_MAX];
190 	uint64_t evpid;
191 	void   *iter;
192 	int	r;
193 
194 	profile_enter("queue_message_delete");
195 	r = handler_message_delete(msgid);
196 	profile_leave();
197 
198 	/* in case the message is incoming */
199 	queue_message_path(msgid, msgpath, sizeof(msgpath));
200 	unlink(msgpath);
201 
202 	/* remove remaining envelopes from the cache if any (on rollback) */
203 	evpid = msgid_to_evpid(msgid);
204 	for (;;) {
205 		iter = NULL;
206 		if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
207 			break;
208 		if (evpid_to_msgid(evpid) != msgid)
209 			break;
210 		queue_envelope_cache_del(evpid);
211 	}
212 
213 	log_trace(TRACE_QUEUE,
214 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
215 
216 	return (r);
217 }
218 
219 int
220 queue_message_commit(uint32_t msgid)
221 {
222 	int	r;
223 	char	msgpath[PATH_MAX];
224 	char	tmppath[PATH_MAX];
225 	FILE	*ifp = NULL;
226 	FILE	*ofp = NULL;
227 
228 	profile_enter("queue_message_commit");
229 
230 	queue_message_path(msgid, msgpath, sizeof(msgpath));
231 
232 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
233 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
234 		ifp = fopen(msgpath, "r");
235 		ofp = fopen(tmppath, "w+");
236 		if (ifp == NULL || ofp == NULL)
237 			goto err;
238 		if (!compress_file(ifp, ofp))
239 			goto err;
240 		fclose(ifp);
241 		fclose(ofp);
242 		ifp = NULL;
243 		ofp = NULL;
244 
245 		if (rename(tmppath, msgpath) == -1) {
246 			if (errno == ENOSPC)
247 				return (0);
248 			unlink(tmppath);
249 			log_warn("rename");
250 			return (0);
251 		}
252 	}
253 
254 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
255 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
256 		ifp = fopen(msgpath, "r");
257 		ofp = fopen(tmppath, "w+");
258 		if (ifp == NULL || ofp == NULL)
259 			goto err;
260 		if (!crypto_encrypt_file(ifp, ofp))
261 			goto err;
262 		fclose(ifp);
263 		fclose(ofp);
264 		ifp = NULL;
265 		ofp = NULL;
266 
267 		if (rename(tmppath, msgpath) == -1) {
268 			if (errno == ENOSPC)
269 				return (0);
270 			unlink(tmppath);
271 			log_warn("rename");
272 			return (0);
273 		}
274 	}
275 
276 	r = handler_message_commit(msgid, msgpath);
277 	profile_leave();
278 
279 	/* in case it's not done by the backend */
280 	unlink(msgpath);
281 
282 	log_trace(TRACE_QUEUE,
283 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
284 	    msgid, r);
285 
286 	return (r);
287 
288 err:
289 	if (ifp)
290 		fclose(ifp);
291 	if (ofp)
292 		fclose(ofp);
293 	return 0;
294 }
295 
296 int
297 queue_message_fd_r(uint32_t msgid)
298 {
299 	int	fdin = -1, fdout = -1, fd = -1;
300 	FILE	*ifp = NULL;
301 	FILE	*ofp = NULL;
302 
303 	profile_enter("queue_message_fd_r");
304 	fdin = handler_message_fd_r(msgid);
305 	profile_leave();
306 
307 	log_trace(TRACE_QUEUE,
308 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
309 
310 	if (fdin == -1)
311 		return (-1);
312 
313 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
314 		if ((fdout = mktmpfile()) == -1)
315 			goto err;
316 		if ((fd = dup(fdout)) == -1)
317 			goto err;
318 		if ((ifp = fdopen(fdin, "r")) == NULL)
319 			goto err;
320 		fdin = fd;
321 		fd = -1;
322 		if ((ofp = fdopen(fdout, "w+")) == NULL)
323 			goto err;
324 
325 		if (!crypto_decrypt_file(ifp, ofp))
326 			goto err;
327 
328 		fclose(ifp);
329 		ifp = NULL;
330 		fclose(ofp);
331 		ofp = NULL;
332 		lseek(fdin, SEEK_SET, 0);
333 	}
334 
335 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
336 		if ((fdout = mktmpfile()) == -1)
337 			goto err;
338 		if ((fd = dup(fdout)) == -1)
339 			goto err;
340 		if ((ifp = fdopen(fdin, "r")) == NULL)
341 			goto err;
342 		fdin = fd;
343 		fd = -1;
344 		if ((ofp = fdopen(fdout, "w+")) == NULL)
345 			goto err;
346 
347 		if (!uncompress_file(ifp, ofp))
348 			goto err;
349 
350 		fclose(ifp);
351 		ifp = NULL;
352 		fclose(ofp);
353 		ofp = NULL;
354 		lseek(fdin, SEEK_SET, 0);
355 	}
356 
357 	return (fdin);
358 
359 err:
360 	if (fd != -1)
361 		close(fd);
362 	if (fdin != -1)
363 		close(fdin);
364 	if (fdout != -1)
365 		close(fdout);
366 	if (ifp)
367 		fclose(ifp);
368 	if (ofp)
369 		fclose(ofp);
370 	return -1;
371 }
372 
373 int
374 queue_message_fd_rw(uint32_t msgid)
375 {
376 	char buf[PATH_MAX];
377 
378 	queue_message_path(msgid, buf, sizeof(buf));
379 
380 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
381 }
382 
383 static int
384 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
385 {
386 	char   *evp;
387 	size_t	evplen;
388 	size_t	complen;
389 	char	compbuf[sizeof(struct envelope)];
390 	size_t	enclen;
391 	char	encbuf[sizeof(struct envelope)];
392 
393 	evp = evpbuf;
394 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
395 	if (evplen == 0)
396 		return (0);
397 
398 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
399 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
400 		if (complen == 0)
401 			return (0);
402 		evp = compbuf;
403 		evplen = complen;
404 	}
405 
406 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
407 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
408 		if (enclen == 0)
409 			return (0);
410 		evp = encbuf;
411 		evplen = enclen;
412 	}
413 
414 	memmove(evpbuf, evp, evplen);
415 
416 	return (evplen);
417 }
418 
419 static int
420 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
421 {
422 	char		*evp;
423 	size_t		 evplen;
424 	char		 compbuf[sizeof(struct envelope)];
425 	size_t		 complen;
426 	char		 encbuf[sizeof(struct envelope)];
427 	size_t		 enclen;
428 
429 	evp = evpbuf;
430 	evplen = evpbufsize;
431 
432 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
433 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
434 		if (enclen == 0)
435 			return (0);
436 		evp = encbuf;
437 		evplen = enclen;
438 	}
439 
440 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
441 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
442 		if (complen == 0)
443 			return (0);
444 		evp = compbuf;
445 		evplen = complen;
446 	}
447 
448 	return (envelope_load_buffer(ep, evp, evplen));
449 }
450 
451 static void
452 queue_envelope_cache_add(struct envelope *e)
453 {
454 	struct envelope *cached;
455 
456 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
457 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
458 
459 	cached = xcalloc(1, sizeof *cached);
460 	*cached = *e;
461 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
462 	tree_xset(&evpcache_tree, e->id, cached);
463 	stat_increment("queue.evpcache.size", 1);
464 }
465 
466 static void
467 queue_envelope_cache_update(struct envelope *e)
468 {
469 	struct envelope *cached;
470 
471 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
472 		queue_envelope_cache_add(e);
473 		stat_increment("queue.evpcache.update.missed", 1);
474 	} else {
475 		TAILQ_REMOVE(&evpcache_list, cached, entry);
476 		*cached = *e;
477 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
478 		stat_increment("queue.evpcache.update.hit", 1);
479 	}
480 }
481 
482 static void
483 queue_envelope_cache_del(uint64_t evpid)
484 {
485 	struct envelope *cached;
486 
487 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
488 		return;
489 
490 	TAILQ_REMOVE(&evpcache_list, cached, entry);
491 	free(cached);
492 	stat_decrement("queue.evpcache.size", 1);
493 }
494 
495 int
496 queue_envelope_create(struct envelope *ep)
497 {
498 	int		 r;
499 	char		 evpbuf[sizeof(struct envelope)];
500 	size_t		 evplen;
501 	uint64_t	 evpid;
502 	uint32_t	 msgid;
503 
504 	ep->creation = time(NULL);
505 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
506 	if (evplen == 0)
507 		return (0);
508 
509 	evpid = ep->id;
510 	msgid = evpid_to_msgid(evpid);
511 
512 	profile_enter("queue_envelope_create");
513 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
514 	profile_leave();
515 
516 	log_trace(TRACE_QUEUE,
517 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
518 	    evpid, evplen, r, ep->id);
519 
520 	if (!r) {
521 		ep->creation = 0;
522 		ep->id = 0;
523 	}
524 
525 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
526 		queue_envelope_cache_add(ep);
527 
528 	return (r);
529 }
530 
531 int
532 queue_envelope_delete(uint64_t evpid)
533 {
534 	int	r;
535 
536 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
537 		queue_envelope_cache_del(evpid);
538 
539 	profile_enter("queue_envelope_delete");
540 	r = handler_envelope_delete(evpid);
541 	profile_leave();
542 
543 	log_trace(TRACE_QUEUE,
544 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
545 	    evpid, r);
546 
547 	return (r);
548 }
549 
550 int
551 queue_envelope_load(uint64_t evpid, struct envelope *ep)
552 {
553 	const char	*e;
554 	char		 evpbuf[sizeof(struct envelope)];
555 	size_t		 evplen;
556 	struct envelope	*cached;
557 
558 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
559 	    (cached = tree_get(&evpcache_tree, evpid))) {
560 		*ep = *cached;
561 		stat_increment("queue.evpcache.load.hit", 1);
562 		return (1);
563 	}
564 
565 	ep->id = evpid;
566 	profile_enter("queue_envelope_load");
567 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
568 	profile_leave();
569 
570 	log_trace(TRACE_QUEUE,
571 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
572 	    evpid, evplen);
573 
574 	if (evplen == 0)
575 		return (0);
576 
577 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
578 		if ((e = envelope_validate(ep)) == NULL) {
579 			ep->id = evpid;
580 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
581 				queue_envelope_cache_add(ep);
582 				stat_increment("queue.evpcache.load.missed", 1);
583 			}
584 			return (1);
585 		}
586 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
587 		    evpid, e);
588 	}
589 	return (0);
590 }
591 
592 int
593 queue_envelope_update(struct envelope *ep)
594 {
595 	char	evpbuf[sizeof(struct envelope)];
596 	size_t	evplen;
597 	int	r;
598 
599 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
600 	if (evplen == 0)
601 		return (0);
602 
603 	profile_enter("queue_envelope_update");
604 	r = handler_envelope_update(ep->id, evpbuf, evplen);
605 	profile_leave();
606 
607 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
608 		queue_envelope_cache_update(ep);
609 
610 	log_trace(TRACE_QUEUE,
611 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
612 	    ep->id, r);
613 
614 	return (r);
615 }
616 
617 int
618 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
619 {
620 	char		 evpbuf[sizeof(struct envelope)];
621 	uint64_t	 evpid;
622 	int		 r;
623 	const char	*e;
624 
625 	profile_enter("queue_message_walk");
626 	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
627 	    msgid, done, data);
628 	profile_leave();
629 
630 	log_trace(TRACE_QUEUE,
631 	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
632 	    r, evpid);
633 
634 	if (r == -1)
635 		return (r);
636 
637 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
638 		if ((e = envelope_validate(ep)) == NULL) {
639 			ep->id = evpid;
640 			/*
641 			 * do not cache the envelope here, while discovering
642 			 * envelopes one could re-run discover on already
643 			 * scheduled envelopes which leads to triggering of
644 			 * strict checks in caching. Envelopes could anyway
645 			 * be loaded from backend if it isn't cached.
646 			 */
647 			return (1);
648 		}
649 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
650 		    evpid, e);
651 	}
652 	return (0);
653 }
654 
655 int
656 queue_envelope_walk(struct envelope *ep)
657 {
658 	const char	*e;
659 	uint64_t	 evpid;
660 	char		 evpbuf[sizeof(struct envelope)];
661 	int		 r;
662 
663 	profile_enter("queue_envelope_walk");
664 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
665 	profile_leave();
666 
667 	log_trace(TRACE_QUEUE,
668 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
669 	    r, evpid);
670 
671 	if (r == -1)
672 		return (r);
673 
674 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
675 		if ((e = envelope_validate(ep)) == NULL) {
676 			ep->id = evpid;
677 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
678 				queue_envelope_cache_add(ep);
679 			return (1);
680 		}
681 		log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
682 		    evpid, e);
683 	}
684 	return (0);
685 }
686 
687 uint32_t
688 queue_generate_msgid(void)
689 {
690 	uint32_t msgid;
691 
692 	while ((msgid = arc4random()) == 0)
693 		;
694 
695 	return msgid;
696 }
697 
698 uint64_t
699 queue_generate_evpid(uint32_t msgid)
700 {
701 	uint32_t rnd;
702 	uint64_t evpid;
703 
704 	while ((rnd = arc4random()) == 0)
705 		;
706 
707 	evpid = msgid;
708 	evpid <<= 32;
709 	evpid |= rnd;
710 
711 	return evpid;
712 }
713 
714 static const char*
715 envelope_validate(struct envelope *ep)
716 {
717 	if (ep->version != SMTPD_ENVELOPE_VERSION)
718 		return "version mismatch";
719 
720 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
721 		return "invalid helo";
722 	if (ep->helo[0] == '\0')
723 		return "empty helo";
724 
725 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
726 		return "invalid hostname";
727 	if (ep->hostname[0] == '\0')
728 		return "empty hostname";
729 
730 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
731 		return "invalid error line";
732 
733 	if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
734 		return "unknown dispatcher";
735 
736 	return NULL;
737 }
738 
739 void
740 queue_api_on_close(int(*cb)(void))
741 {
742 	handler_close = cb;
743 }
744 
745 void
746 queue_api_on_message_create(int(*cb)(uint32_t *))
747 {
748 	handler_message_create = cb;
749 }
750 
751 void
752 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
753 {
754 	handler_message_commit = cb;
755 }
756 
757 void
758 queue_api_on_message_delete(int(*cb)(uint32_t))
759 {
760 	handler_message_delete = cb;
761 }
762 
763 void
764 queue_api_on_message_fd_r(int(*cb)(uint32_t))
765 {
766 	handler_message_fd_r = cb;
767 }
768 
769 void
770 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
771 {
772 	handler_envelope_create = cb;
773 }
774 
775 void
776 queue_api_on_envelope_delete(int(*cb)(uint64_t))
777 {
778 	handler_envelope_delete = cb;
779 }
780 
781 void
782 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
783 {
784 	handler_envelope_update = cb;
785 }
786 
787 void
788 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
789 {
790 	handler_envelope_load = cb;
791 }
792 
793 void
794 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
795 {
796 	handler_envelope_walk = cb;
797 }
798 
799 void
800 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
801     uint32_t, int *, void **))
802 {
803 	handler_message_walk = cb;
804 }
805