xref: /openbsd/usr.sbin/smtpd/queue_backend.c (revision 8529ddd3)
1 /*	$OpenBSD: queue_backend.c,v 1.55 2015/01/20 17:37:54 deraadt Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <event.h>
29 #include <fcntl.h>
30 #include <imsg.h>
31 #include <limits.h>
32 #include <inttypes.h>
33 #include <libgen.h>
34 #include <pwd.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 
41 #include "smtpd.h"
42 #include "log.h"
43 
44 static const char* envelope_validate(struct envelope *);
45 
46 extern struct queue_backend	queue_backend_fs;
47 extern struct queue_backend	queue_backend_null;
48 extern struct queue_backend	queue_backend_proc;
49 extern struct queue_backend	queue_backend_ram;
50 
51 static void queue_envelope_cache_add(struct envelope *);
52 static void queue_envelope_cache_update(struct envelope *);
53 static void queue_envelope_cache_del(uint64_t evpid);
54 
55 TAILQ_HEAD(evplst, envelope);
56 
57 static struct tree		evpcache_tree;
58 static struct evplst		evpcache_list;
59 static struct queue_backend	*backend;
60 
61 static int (*handler_close)(void);
62 static int (*handler_message_create)(uint32_t *);
63 static int (*handler_message_commit)(uint32_t, const char*);
64 static int (*handler_message_delete)(uint32_t);
65 static int (*handler_message_fd_r)(uint32_t);
66 static int (*handler_message_corrupt)(uint32_t);
67 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
68 static int (*handler_envelope_delete)(uint64_t);
69 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
70 static int (*handler_envelope_load)(uint64_t, char *, size_t);
71 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
72 
73 #ifdef QUEUE_PROFILING
74 
75 static struct {
76 	struct timespec	 t0;
77 	const char	*name;
78 } profile;
79 
80 static inline void profile_enter(const char *name)
81 {
82 	if ((profiling & PROFILE_QUEUE) == 0)
83 		return;
84 
85 	profile.name = name;
86 	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
87 }
88 
89 static inline void profile_leave(void)
90 {
91 	struct timespec	 t1, dt;
92 
93 	if ((profiling & PROFILE_QUEUE) == 0)
94 		return;
95 
96 	clock_gettime(CLOCK_MONOTONIC, &t1);
97 	timespecsub(&t1, &profile.t0, &dt);
98 	log_debug("profile-queue: %s %lld.%09ld", profile.name,
99 	    (long long)dt.tv_sec, dt.tv_nsec);
100 }
101 #else
102 #define profile_enter(x)	do {} while (0)
103 #define profile_leave()		do {} while (0)
104 #endif
105 
106 static int
107 queue_message_path(uint32_t msgid, char *buf, size_t len)
108 {
109 	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
110 }
111 
112 int
113 queue_init(const char *name, int server)
114 {
115 	struct passwd	*pwq;
116 	int		 r;
117 
118 	pwq = getpwnam(SMTPD_QUEUE_USER);
119 	if (pwq == NULL)
120 		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
121 
122 	tree_init(&evpcache_tree);
123 	TAILQ_INIT(&evpcache_list);
124 
125 	if (!strcmp(name, "fs"))
126 		backend = &queue_backend_fs;
127 	else if (!strcmp(name, "null"))
128 		backend = &queue_backend_null;
129 	else if (!strcmp(name, "ram"))
130 		backend = &queue_backend_ram;
131 	else
132 		backend = &queue_backend_proc;
133 
134 	if (server) {
135 		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
136 			errx(1, "error in spool directory setup");
137 		if (ckdir(PATH_SPOOL PATH_OFFLINE, 01777, 0, 0, 1) == 0)
138 			errx(1, "error in offline directory setup");
139 		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
140 			errx(1, "error in purge directory setup");
141 
142 		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
143 
144 		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
145 			errx(1, "error in purge directory setup");
146 	}
147 
148 	r = backend->init(pwq, server, name);
149 
150 	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
151 
152 	return (r);
153 }
154 
155 int
156 queue_close(void)
157 {
158 	if (handler_close)
159 		return (handler_close());
160 
161 	return (1);
162 }
163 
164 int
165 queue_message_create(uint32_t *msgid)
166 {
167 	int	r;
168 
169 	profile_enter("queue_message_create");
170 	r = handler_message_create(msgid);
171 	profile_leave();
172 
173 	log_trace(TRACE_QUEUE,
174 	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
175 	    r, *msgid);
176 
177 	return (r);
178 }
179 
180 int
181 queue_message_delete(uint32_t msgid)
182 {
183 	char	msgpath[PATH_MAX];
184 	int	r;
185 
186 	profile_enter("queue_message_delete");
187 	r = handler_message_delete(msgid);
188 	profile_leave();
189 
190 	/* in case the message is incoming */
191 	queue_message_path(msgid, msgpath, sizeof(msgpath));
192 	unlink(msgpath);
193 
194 	log_trace(TRACE_QUEUE,
195 	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
196 
197 	return (r);
198 }
199 
200 int
201 queue_message_commit(uint32_t msgid)
202 {
203 	int	r;
204 	char	msgpath[PATH_MAX];
205 	char	tmppath[PATH_MAX];
206 	FILE	*ifp = NULL;
207 	FILE	*ofp = NULL;
208 
209 	profile_enter("queue_message_commit");
210 
211 	queue_message_path(msgid, msgpath, sizeof(msgpath));
212 
213 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
214 		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
215 		ifp = fopen(msgpath, "r");
216 		ofp = fopen(tmppath, "w+");
217 		if (ifp == NULL || ofp == NULL)
218 			goto err;
219 		if (! compress_file(ifp, ofp))
220 			goto err;
221 		fclose(ifp);
222 		fclose(ofp);
223 		ifp = NULL;
224 		ofp = NULL;
225 
226 		if (rename(tmppath, msgpath) == -1) {
227 			if (errno == ENOSPC)
228 				return (0);
229 			unlink(tmppath);
230 			log_warn("rename");
231 			return (0);
232 		}
233 	}
234 
235 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
236 		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
237 		ifp = fopen(msgpath, "r");
238 		ofp = fopen(tmppath, "w+");
239 		if (ifp == NULL || ofp == NULL)
240 			goto err;
241 		if (! crypto_encrypt_file(ifp, ofp))
242 			goto err;
243 		fclose(ifp);
244 		fclose(ofp);
245 		ifp = NULL;
246 		ofp = NULL;
247 
248 		if (rename(tmppath, msgpath) == -1) {
249 			if (errno == ENOSPC)
250 				return (0);
251 			unlink(tmppath);
252 			log_warn("rename");
253 			return (0);
254 		}
255 	}
256 
257 	r = handler_message_commit(msgid, msgpath);
258 	profile_leave();
259 
260 	/* in case it's not done by the backend */
261 	unlink(msgpath);
262 
263 	log_trace(TRACE_QUEUE,
264 	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
265 	    msgid, r);
266 
267 	return (r);
268 
269 err:
270 	if (ifp)
271 		fclose(ifp);
272 	if (ofp)
273 		fclose(ofp);
274 	return 0;
275 }
276 
277 int
278 queue_message_corrupt(uint32_t msgid)
279 {
280 	int	r;
281 
282 	profile_enter("queue_message_corrupt");
283 	r = handler_message_corrupt(msgid);
284 	profile_leave();
285 
286 	log_trace(TRACE_QUEUE,
287 	    "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r);
288 
289 	return (r);
290 }
291 
292 int
293 queue_message_fd_r(uint32_t msgid)
294 {
295 	int	fdin = -1, fdout = -1, fd = -1;
296 	FILE	*ifp = NULL;
297 	FILE	*ofp = NULL;
298 
299 	profile_enter("queue_message_fd_r");
300 	fdin = handler_message_fd_r(msgid);
301 	profile_leave();
302 
303 	log_trace(TRACE_QUEUE,
304 	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
305 
306 	if (fdin == -1)
307 		return (-1);
308 
309 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
310 		if ((fdout = mktmpfile()) == -1)
311 			goto err;
312 		if ((fd = dup(fdout)) == -1)
313 			goto err;
314 		if ((ifp = fdopen(fdin, "r")) == NULL)
315 			goto err;
316 		fdin = fd;
317 		fd = -1;
318 		if ((ofp = fdopen(fdout, "w+")) == NULL)
319 			goto err;
320 
321 		if (! crypto_decrypt_file(ifp, ofp))
322 			goto err;
323 
324 		fclose(ifp);
325 		ifp = NULL;
326 		fclose(ofp);
327 		ofp = NULL;
328 		lseek(fdin, SEEK_SET, 0);
329 	}
330 
331 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
332 		if ((fdout = mktmpfile()) == -1)
333 			goto err;
334 		if ((fd = dup(fdout)) == -1)
335 			goto err;
336 		if ((ifp = fdopen(fdin, "r")) == NULL)
337 			goto err;
338 		fdin = fd;
339 		fd = -1;
340 		if ((ofp = fdopen(fdout, "w+")) == NULL)
341 			goto err;
342 
343 		if (! uncompress_file(ifp, ofp))
344 			goto err;
345 
346 		fclose(ifp);
347 		ifp = NULL;
348 		fclose(ofp);
349 		ofp = NULL;
350 		lseek(fdin, SEEK_SET, 0);
351 	}
352 
353 	return (fdin);
354 
355 err:
356 	if (fd != -1)
357 		close(fd);
358 	if (fdin != -1)
359 		close(fdin);
360 	if (fdout != -1)
361 		close(fdout);
362 	if (ifp)
363 		fclose(ifp);
364 	if (ofp)
365 		fclose(ofp);
366 	return -1;
367 }
368 
369 int
370 queue_message_fd_rw(uint32_t msgid)
371 {
372 	char buf[PATH_MAX];
373 
374 	queue_message_path(msgid, buf, sizeof(buf));
375 
376 	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
377 }
378 
379 static int
380 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
381 {
382 	char   *evp;
383 	size_t	evplen;
384 	size_t	complen;
385 	char	compbuf[sizeof(struct envelope)];
386 	size_t	enclen;
387 	char	encbuf[sizeof(struct envelope)];
388 
389 	evp = evpbuf;
390 	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
391 	if (evplen == 0)
392 		return (0);
393 
394 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
395 		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
396 		if (complen == 0)
397 			return (0);
398 		evp = compbuf;
399 		evplen = complen;
400 	}
401 
402 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
403 		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
404 		if (enclen == 0)
405 			return (0);
406 		evp = encbuf;
407 		evplen = enclen;
408 	}
409 
410 	memmove(evpbuf, evp, evplen);
411 
412 	return (evplen);
413 }
414 
415 static int
416 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
417 {
418 	char		*evp;
419 	size_t		 evplen;
420 	char		 compbuf[sizeof(struct envelope)];
421 	size_t		 complen;
422 	char		 encbuf[sizeof(struct envelope)];
423 	size_t		 enclen;
424 
425 	evp = evpbuf;
426 	evplen = evpbufsize;
427 
428 	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
429 		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
430 		if (enclen == 0)
431 			return (0);
432 		evp = encbuf;
433 		evplen = enclen;
434 	}
435 
436 	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
437 		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
438 		if (complen == 0)
439 			return (0);
440 		evp = compbuf;
441 		evplen = complen;
442 	}
443 
444 	return (envelope_load_buffer(ep, evp, evplen));
445 }
446 
447 static void
448 queue_envelope_cache_add(struct envelope *e)
449 {
450 	struct envelope *cached;
451 
452 	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
453 		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
454 
455 	cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
456 	*cached = *e;
457 	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
458 	tree_xset(&evpcache_tree, e->id, cached);
459 	stat_increment("queue.evpcache.size", 1);
460 }
461 
462 static void
463 queue_envelope_cache_update(struct envelope *e)
464 {
465 	struct envelope *cached;
466 
467 	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
468 		queue_envelope_cache_add(e);
469 		stat_increment("queue.evpcache.update.missed", 1);
470 	} else {
471 		TAILQ_REMOVE(&evpcache_list, cached, entry);
472 		*cached = *e;
473 		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
474 		stat_increment("queue.evpcache.update.hit", 1);
475 	}
476 }
477 
478 static void
479 queue_envelope_cache_del(uint64_t evpid)
480 {
481 	struct envelope *cached;
482 
483 	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
484 		return;
485 
486 	TAILQ_REMOVE(&evpcache_list, cached, entry);
487 	free(cached);
488 	stat_decrement("queue.evpcache.size", 1);
489 }
490 
491 int
492 queue_envelope_create(struct envelope *ep)
493 {
494 	int		 r;
495 	char		 evpbuf[sizeof(struct envelope)];
496 	size_t		 evplen;
497 	uint64_t	 evpid;
498 	uint32_t	 msgid;
499 
500 	ep->creation = time(NULL);
501 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
502 	if (evplen == 0)
503 		return (0);
504 
505 	evpid = ep->id;
506 	msgid = evpid_to_msgid(evpid);
507 
508 	profile_enter("queue_envelope_create");
509 	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
510 	profile_leave();
511 
512 	log_trace(TRACE_QUEUE,
513 	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
514 	    evpid, evplen, r, ep->id);
515 
516 	if (!r) {
517 		ep->creation = 0;
518 		ep->id = 0;
519 	}
520 
521 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
522 		queue_envelope_cache_add(ep);
523 
524 	return (r);
525 }
526 
527 int
528 queue_envelope_delete(uint64_t evpid)
529 {
530 	int	r;
531 
532 	if (env->sc_queue_flags & QUEUE_EVPCACHE)
533 		queue_envelope_cache_del(evpid);
534 
535 	profile_enter("queue_envelope_delete");
536 	r = handler_envelope_delete(evpid);
537 	profile_leave();
538 
539 	log_trace(TRACE_QUEUE,
540 	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
541 	    evpid, r);
542 
543 	return (r);
544 }
545 
546 int
547 queue_envelope_load(uint64_t evpid, struct envelope *ep)
548 {
549 	const char	*e;
550 	char		 evpbuf[sizeof(struct envelope)];
551 	size_t		 evplen;
552 	struct envelope	*cached;
553 
554 	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
555 	    (cached = tree_get(&evpcache_tree, evpid))) {
556 		*ep = *cached;
557 		stat_increment("queue.evpcache.load.hit", 1);
558 		return (1);
559 	}
560 
561 	ep->id = evpid;
562 	profile_enter("queue_envelope_load");
563 	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
564 	profile_leave();
565 
566 	log_trace(TRACE_QUEUE,
567 	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
568 	    evpid, evplen);
569 
570 	if (evplen == 0)
571 		return (0);
572 
573 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
574 		if ((e = envelope_validate(ep)) == NULL) {
575 			ep->id = evpid;
576 			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
577 				queue_envelope_cache_add(ep);
578 				stat_increment("queue.evpcache.load.missed", 1);
579 			}
580 			return (1);
581 		}
582 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
583 		    ep->id, e);
584 	}
585 
586 	(void)queue_message_corrupt(evpid_to_msgid(evpid));
587 	return (0);
588 }
589 
590 int
591 queue_envelope_update(struct envelope *ep)
592 {
593 	char	evpbuf[sizeof(struct envelope)];
594 	size_t	evplen;
595 	int	r;
596 
597 	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
598 	if (evplen == 0)
599 		return (0);
600 
601 	profile_enter("queue_envelope_update");
602 	r = handler_envelope_update(ep->id, evpbuf, evplen);
603 	profile_leave();
604 
605 	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
606 		queue_envelope_cache_update(ep);
607 
608 	log_trace(TRACE_QUEUE,
609 	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
610 	    ep->id, r);
611 
612 	return (r);
613 }
614 
615 int
616 queue_envelope_walk(struct envelope *ep)
617 {
618 	const char	*e;
619 	uint64_t	 evpid;
620 	char		 evpbuf[sizeof(struct envelope)];
621 	int		 r;
622 
623 	profile_enter("queue_envelope_walk");
624 	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
625 	profile_leave();
626 
627 	log_trace(TRACE_QUEUE,
628 	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
629 	    r, evpid);
630 
631 	if (r == -1)
632 		return (r);
633 
634 	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
635 		if ((e = envelope_validate(ep)) == NULL) {
636 			ep->id = evpid;
637 			if (env->sc_queue_flags & QUEUE_EVPCACHE)
638 				queue_envelope_cache_add(ep);
639 			return (1);
640 		}
641 		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
642 		    ep->id, e);
643 		(void)queue_message_corrupt(evpid_to_msgid(evpid));
644 	}
645 
646 	return (0);
647 }
648 
649 uint32_t
650 queue_generate_msgid(void)
651 {
652 	uint32_t msgid;
653 
654 	while ((msgid = arc4random()) == 0)
655 		;
656 
657 	return msgid;
658 }
659 
660 uint64_t
661 queue_generate_evpid(uint32_t msgid)
662 {
663 	uint32_t rnd;
664 	uint64_t evpid;
665 
666 	while ((rnd = arc4random()) == 0)
667 		;
668 
669 	evpid = msgid;
670 	evpid <<= 32;
671 	evpid |= rnd;
672 
673 	return evpid;
674 }
675 
676 static const char*
677 envelope_validate(struct envelope *ep)
678 {
679 	if (ep->version != SMTPD_ENVELOPE_VERSION)
680 		return "version mismatch";
681 
682 	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
683 		return "invalid helo";
684 	if (ep->helo[0] == '\0')
685 		return "empty helo";
686 
687 	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
688 		return "invalid hostname";
689 	if (ep->hostname[0] == '\0')
690 		return "empty hostname";
691 
692 	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
693 		return "invalid error line";
694 
695 	return NULL;
696 }
697 
698 void
699 queue_api_on_close(int(*cb)(void))
700 {
701 	handler_close = cb;
702 }
703 
704 void
705 queue_api_on_message_create(int(*cb)(uint32_t *))
706 {
707 	handler_message_create = cb;
708 }
709 
710 void
711 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
712 {
713 	handler_message_commit = cb;
714 }
715 
716 void
717 queue_api_on_message_delete(int(*cb)(uint32_t))
718 {
719 	handler_message_delete = cb;
720 }
721 
722 void
723 queue_api_on_message_fd_r(int(*cb)(uint32_t))
724 {
725 	handler_message_fd_r = cb;
726 }
727 
728 void
729 queue_api_on_message_corrupt(int(*cb)(uint32_t))
730 {
731 	handler_message_corrupt = cb;
732 }
733 
734 void
735 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
736 {
737 	handler_envelope_create = cb;
738 }
739 
740 void
741 queue_api_on_envelope_delete(int(*cb)(uint64_t))
742 {
743 	handler_envelope_delete = cb;
744 }
745 
746 void
747 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
748 {
749 	handler_envelope_update = cb;
750 }
751 
752 void
753 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
754 {
755 	handler_envelope_load = cb;
756 }
757 
758 void
759 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
760 {
761 	handler_envelope_walk = cb;
762 }
763