xref: /openbsd/usr.sbin/smtpd/queue_fs.c (revision a9835440)
1 /*	$OpenBSD: queue_fs.c,v 1.10 2015/10/29 10:25:36 sunil Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/mount.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/socket.h>
24 #include <sys/stat.h>
25 
26 #include <ctype.h>
27 #include <dirent.h>
28 #include <err.h>
29 #include <errno.h>
30 #include <event.h>
31 #include <fcntl.h>
32 #include <fts.h>
33 #include <imsg.h>
34 #include <inttypes.h>
35 #include <libgen.h>
36 #include <pwd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <time.h>
41 #include <unistd.h>
42 
43 #include "smtpd.h"
44 #include "log.h"
45 
46 #define PATH_QUEUE		"/queue"
47 #define PATH_CORRUPT		"/corrupt"
48 #define PATH_INCOMING		"/incoming"
49 #define PATH_EVPTMP		PATH_INCOMING "/envelope.tmp"
50 #define PATH_MESSAGE		"/message"
51 
52 /* percentage of remaining space / inodes required to accept new messages */
53 #define	MINSPACE		5
54 #define	MININODES		5
55 
56 struct qwalk {
57 	FTS	*fts;
58 	int	 depth;
59 };
60 
61 static int	fsqueue_check_space(void);
62 static void	fsqueue_envelope_path(uint64_t, char *, size_t);
63 static void	fsqueue_envelope_incoming_path(uint64_t, char *, size_t);
64 static int	fsqueue_envelope_dump(char *, const char *, size_t, int, int);
65 static void	fsqueue_message_path(uint32_t, char *, size_t);
66 static void	fsqueue_message_corrupt_path(uint32_t, char *, size_t);
67 static void	fsqueue_message_incoming_path(uint32_t, char *, size_t);
68 static void    *fsqueue_qwalk_new(void);
69 static int	fsqueue_qwalk(void *, uint64_t *);
70 static void	fsqueue_qwalk_close(void *);
71 
72 struct tree evpcount;
73 static struct timespec startup;
74 
75 #define REF	(int*)0xf00
76 
77 static int
78 queue_fs_message_create(uint32_t *msgid)
79 {
80 	char		rootdir[PATH_MAX];
81 	struct stat	sb;
82 
83 	if (! fsqueue_check_space())
84 		return 0;
85 
86 again:
87 	*msgid = queue_generate_msgid();
88 
89 	/* prevent possible collision later when moving to Q_QUEUE */
90 	fsqueue_message_path(*msgid, rootdir, sizeof(rootdir));
91 	if (stat(rootdir, &sb) != -1)
92 		goto again;
93 
94 	/* we hit an unexpected error, temporarily fail */
95 	if (errno != ENOENT) {
96 		*msgid = 0;
97 		return 0;
98 	}
99 
100 	fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir));
101 	if (mkdir(rootdir, 0700) == -1) {
102 		if (errno == EEXIST)
103 			goto again;
104 
105 		if (errno == ENOSPC) {
106 			*msgid = 0;
107 			return 0;
108 		}
109 
110 		log_warn("warn: queue-fs: mkdir");
111 		*msgid = 0;
112 		return 0;
113 	}
114 
115 	return (1);
116 }
117 
118 static int
119 queue_fs_message_commit(uint32_t msgid, const char *path)
120 {
121 	char incomingdir[PATH_MAX];
122 	char queuedir[PATH_MAX];
123 	char msgdir[PATH_MAX];
124 	char msgpath[PATH_MAX];
125 
126 	/* before-first, move the message content in the incoming directory */
127 	fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath));
128 	if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath))
129 	    >= sizeof(msgpath))
130 		return (0);
131 	if (rename(path, msgpath) == -1)
132 		return (0);
133 
134 	fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir));
135 	fsqueue_message_path(msgid, msgdir, sizeof(msgdir));
136 	if (strlcpy(queuedir, msgdir, sizeof(queuedir))
137 	    >= sizeof(queuedir))
138 		return (0);
139 
140 	/* first attempt to rename */
141 	if (rename(incomingdir, msgdir) == 0)
142 		return 1;
143 	if (errno == ENOSPC)
144 		return 0;
145 	if (errno != ENOENT) {
146 		log_warn("warn: queue-fs: rename");
147 		return 0;
148 	}
149 
150 	/* create the bucket */
151 	*strrchr(queuedir, '/') = '\0';
152 	if (mkdir(queuedir, 0700) == -1) {
153 		if (errno == ENOSPC)
154 			return 0;
155 		if (errno != EEXIST) {
156 			log_warn("warn: queue-fs: mkdir");
157 			return 0;
158 		}
159 	}
160 
161 	/* rename */
162 	if (rename(incomingdir, msgdir) == -1) {
163 		if (errno == ENOSPC)
164 			return 0;
165 		log_warn("warn: queue-fs: rename");
166 		return 0;
167 	}
168 
169 	return 1;
170 }
171 
172 static int
173 queue_fs_message_fd_r(uint32_t msgid)
174 {
175 	int fd;
176 	char path[PATH_MAX];
177 
178 	fsqueue_message_path(msgid, path, sizeof(path));
179 	if (strlcat(path, PATH_MESSAGE, sizeof(path))
180 	    >= sizeof(path))
181 		return -1;
182 
183 	if ((fd = open(path, O_RDONLY)) == -1) {
184 		log_warn("warn: queue-fs: open");
185 		return -1;
186 	}
187 
188 	return fd;
189 }
190 
191 static int
192 queue_fs_message_delete(uint32_t msgid)
193 {
194 	char		path[PATH_MAX];
195 	struct stat	sb;
196 
197 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
198 	if (stat(path, &sb) == -1)
199 		fsqueue_message_path(msgid, path, sizeof(path));
200 
201 	if (rmtree(path, 0) == -1)
202 		log_warn("warn: queue-fs: rmtree");
203 
204 	tree_pop(&evpcount, msgid);
205 
206 	return 1;
207 }
208 
209 static int
210 queue_fs_message_corrupt(uint32_t msgid)
211 {
212 	struct stat sb;
213 	char rootdir[PATH_MAX];
214 	char corruptdir[PATH_MAX];
215 	char buf[64];
216 	int  retry = 0;
217 
218 	fsqueue_message_path(msgid, rootdir, sizeof(rootdir));
219 	fsqueue_message_corrupt_path(msgid, corruptdir,
220 	    sizeof(corruptdir));
221 
222 again:
223 	if (stat(corruptdir, &sb) != -1 || errno != ENOENT) {
224 		fsqueue_message_corrupt_path(msgid, corruptdir,
225 		    sizeof(corruptdir));
226 		(void)snprintf(buf, sizeof (buf), ".%d", retry++);
227 		(void)strlcat(corruptdir, buf, sizeof(corruptdir));
228 		goto again;
229 	}
230 
231 	if (rename(rootdir, corruptdir) == -1) {
232 		log_warn("warn: queue-fs: rename");
233 		return 0;
234 	}
235 
236 	tree_pop(&evpcount, msgid);
237 
238 	return 1;
239 }
240 
241 static int
242 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len,
243     uint64_t *evpid)
244 {
245 	char		path[PATH_MAX];
246 	int		queued = 0, i, r = 0, *n;
247 	struct stat	sb;
248 
249 	if (msgid == 0) {
250 		log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid);
251 		goto done;
252 	}
253 
254 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
255 	if (stat(path, &sb) == -1)
256 		queued = 1;
257 
258 	for (i = 0; i < 20; i ++) {
259 		*evpid = queue_generate_evpid(msgid);
260 		if (queued)
261 			fsqueue_envelope_path(*evpid, path, sizeof(path));
262 		else
263 			fsqueue_envelope_incoming_path(*evpid, path,
264 			    sizeof(path));
265 
266 		r = fsqueue_envelope_dump(path, buf, len, 0, 0);
267 		if (r >= 0)
268 			goto done;
269 	}
270 	r = 0;
271 	log_warnx("warn: queue-fs: could not allocate evpid");
272 
273 done:
274 	if (r) {
275 		n = tree_pop(&evpcount, msgid);
276 		if (n == NULL)
277 			n = REF;
278 		n += 1;
279 		tree_xset(&evpcount, msgid, n);
280 	}
281 	return (r);
282 }
283 
284 static int
285 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len)
286 {
287 	char	 pathname[PATH_MAX];
288 	FILE	*fp;
289 	size_t	 r;
290 
291 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
292 
293 	fp = fopen(pathname, "r");
294 	if (fp == NULL) {
295 		if (errno != ENOENT && errno != ENFILE)
296 			log_warn("warn: queue-fs: fopen");
297 		return 0;
298 	}
299 
300 	r = fread(buf, 1, len, fp);
301 	if (r) {
302 		if (r == len) {
303 			log_warn("warn: queue-fs: too large");
304 			r = 0;
305 		}
306 		else
307 			buf[r] = '\0';
308 	}
309 	fclose(fp);
310 
311 	return (r);
312 }
313 
314 static int
315 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len)
316 {
317 	char dest[PATH_MAX];
318 
319 	fsqueue_envelope_path(evpid, dest, sizeof(dest));
320 
321 	return (fsqueue_envelope_dump(dest, buf, len, 1, 1));
322 }
323 
324 static int
325 queue_fs_envelope_delete(uint64_t evpid)
326 {
327 	char		pathname[PATH_MAX];
328 	uint32_t	msgid;
329 	int		*n;
330 
331 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
332 	if (unlink(pathname) == -1)
333 		if (errno != ENOENT)
334 			return 0;
335 
336 	msgid = evpid_to_msgid(evpid);
337 	n = tree_pop(&evpcount, msgid);
338 	n -= 1;
339 
340 	if (n - REF == 0)
341 		queue_fs_message_delete(msgid);
342 	else
343 		tree_xset(&evpcount, msgid, n);
344 
345 	return (1);
346 }
347 
348 static int
349 queue_fs_message_walk(uint64_t *evpid, char *buf, size_t len,
350     uint32_t msgid, int *done, void **data)
351 {
352 	struct dirent	*dp;
353 	DIR		*dir = *data;
354 	char		 path[PATH_MAX];
355 	char		 msgid_str[9];
356 	char		*tmp;
357 	int		 r, *n;
358 
359 	if (*done)
360 		return (-1);
361 
362 	if (! bsnprintf(path, sizeof path, "%s/%02x/%08x",
363 	    PATH_QUEUE, (msgid  & 0xff000000) >> 24, msgid))
364 		fatalx("queue_fs_message_walk: path does not fit buffer");
365 
366 	if (dir == NULL) {
367 		if ((dir = opendir(path)) == NULL) {
368 			log_warn("warn: queue_fs: opendir: %s", path);
369 			*done = 1;
370 			return (-1);
371 		}
372 
373 		*data = dir;
374 	}
375 
376 	(void)snprintf(msgid_str, sizeof msgid_str, "%08" PRIx32, msgid);
377 	while ((dp = readdir(dir)) != NULL) {
378 		if (dp->d_type != DT_REG)
379 			continue;
380 
381 		/* ignore files other than envelopes */
382 		if (dp->d_namlen != 16 || strncmp(dp->d_name, msgid_str, 8))
383 			continue;
384 
385 		tmp = NULL;
386 		*evpid = strtoull(dp->d_name, &tmp, 16);
387 		if (tmp && *tmp !=  '\0') {
388 			log_debug("debug: fsqueue: bogus file %s", dp->d_name);
389 			continue;
390 		}
391 
392 		memset(buf, 0, len);
393 		r = queue_fs_envelope_load(*evpid, buf, len);
394 		if (r) {
395 			n = tree_pop(&evpcount, msgid);
396 			if (n == NULL)
397 				n = REF;
398 
399 			n += 1;
400 			tree_xset(&evpcount, msgid, n);
401 		}
402 
403 		return (r);
404 	}
405 
406 	(void)closedir(dir);
407 	*done = 1;
408 	return (-1);
409 }
410 
411 static int
412 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len)
413 {
414 	static int	 done = 0;
415 	static void	*hdl = NULL;
416 	int		 r, *n;
417 	uint32_t	 msgid;
418 
419 	if (done)
420 		return (-1);
421 
422 	if (hdl == NULL)
423 		hdl = fsqueue_qwalk_new();
424 
425 	if (fsqueue_qwalk(hdl, evpid)) {
426 		memset(buf, 0, len);
427 		r = queue_fs_envelope_load(*evpid, buf, len);
428 		if (r) {
429 			msgid = evpid_to_msgid(*evpid);
430 			n = tree_pop(&evpcount, msgid);
431 			if (n == NULL)
432 				n = REF;
433 			n += 1;
434 			tree_xset(&evpcount, msgid, n);
435 		}
436 		return (r);
437 	}
438 
439 	fsqueue_qwalk_close(hdl);
440 	done = 1;
441 	return (-1);
442 }
443 
444 static int
445 fsqueue_check_space(void)
446 {
447 	struct statfs	buf;
448 	uint64_t	used;
449 	uint64_t	total;
450 
451 	if (statfs(PATH_QUEUE, &buf) < 0) {
452 		log_warn("warn: queue-fs: statfs");
453 		return 0;
454 	}
455 
456 	/*
457 	 * f_bfree and f_ffree is not set on all filesystems.
458 	 * They could be signed or unsigned integers.
459 	 * Some systems will set them to 0, others will set them to -1.
460 	 */
461 	if (buf.f_bfree == 0 || buf.f_ffree == 0 ||
462 	    (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1)
463 		return 1;
464 
465 	used = buf.f_blocks - buf.f_bfree;
466 	total = buf.f_bavail + used;
467 	if (total != 0)
468 		used = (float)used / (float)total * 100;
469 	else
470 		used = 100;
471 	if (100 - used < MINSPACE) {
472 		log_warnx("warn: not enough disk space: %llu%% left",
473 		    (unsigned long long) 100 - used);
474 		log_warnx("warn: temporarily rejecting messages");
475 		return 0;
476 	}
477 
478 	used = buf.f_files - buf.f_ffree;
479 	total = buf.f_favail + used;
480 	if (total != 0)
481 		used = (float)used / (float)total * 100;
482 	else
483 		used = 100;
484 	if (100 - used < MININODES) {
485 		log_warnx("warn: not enough inodes: %llu%% left",
486 		    (unsigned long long) 100 - used);
487 		log_warnx("warn: temporarily rejecting messages");
488 		return 0;
489 	}
490 
491 	return 1;
492 }
493 
494 static void
495 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len)
496 {
497 	if (! bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64,
498 		PATH_QUEUE,
499 		(evpid_to_msgid(evpid) & 0xff000000) >> 24,
500 		evpid_to_msgid(evpid),
501 		evpid))
502 		fatalx("fsqueue_envelope_path: path does not fit buffer");
503 }
504 
505 static void
506 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len)
507 {
508 	if (! bsnprintf(buf, len, "%s/%08x/%016" PRIx64,
509 		PATH_INCOMING,
510 		evpid_to_msgid(evpid),
511 		evpid))
512 		fatalx("fsqueue_envelope_incoming_path: path does not fit buffer");
513 }
514 
515 static int
516 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen,
517     int do_atomic, int do_sync)
518 {
519 	const char     *path = do_atomic ? PATH_EVPTMP : dest;
520 	FILE	       *fp = NULL;
521 	int		fd;
522 	size_t		w;
523 
524 	if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) {
525 		log_warn("warn: queue-fs: open");
526 		goto tempfail;
527 	}
528 
529 	if ((fp = fdopen(fd, "w")) == NULL) {
530 		log_warn("warn: queue-fs: fdopen");
531 		goto tempfail;
532 	}
533 
534 	w = fwrite(evpbuf, 1, evplen, fp);
535 	if (w < evplen) {
536 		log_warn("warn: queue-fs: short write");
537 		goto tempfail;
538 	}
539 	if (fflush(fp)) {
540 		log_warn("warn: queue-fs: fflush");
541 		goto tempfail;
542 	}
543 	if (do_sync && fsync(fileno(fp))) {
544 		log_warn("warn: queue-fs: fsync");
545 		goto tempfail;
546 	}
547 	if (fclose(fp) != 0) {
548 		log_warn("warn: queue-fs: fclose");
549 		fp = NULL;
550 		goto tempfail;
551 	}
552 	fp = NULL;
553 	fd = -1;
554 
555 	if (do_atomic && rename(path, dest) == -1) {
556 		log_warn("warn: queue-fs: rename");
557 		goto tempfail;
558 	}
559 	return (1);
560 
561 tempfail:
562 	if (fp)
563 		fclose(fp);
564 	else if (fd != -1)
565 		close(fd);
566 	if (unlink(path) == -1)
567 		log_warn("warn: queue-fs: unlink");
568 	return (0);
569 }
570 
571 static void
572 fsqueue_message_path(uint32_t msgid, char *buf, size_t len)
573 {
574 	if (! bsnprintf(buf, len, "%s/%02x/%08x",
575 		PATH_QUEUE,
576 		(msgid & 0xff000000) >> 24,
577 		msgid))
578 		fatalx("fsqueue_message_path: path does not fit buffer");
579 }
580 
581 static void
582 fsqueue_message_corrupt_path(uint32_t msgid, char *buf, size_t len)
583 {
584 	if (! bsnprintf(buf, len, "%s/%08x",
585 		PATH_CORRUPT,
586 		msgid))
587 		fatalx("fsqueue_message_corrupt_path: path does not fit buffer");
588 }
589 
590 static void
591 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
592 {
593 	if (! bsnprintf(buf, len, "%s/%08x",
594 		PATH_INCOMING,
595 		msgid))
596 		fatalx("fsqueue_message_incoming_path: path does not fit buffer");
597 }
598 
599 static void *
600 fsqueue_qwalk_new(void)
601 {
602 	char		 path[PATH_MAX];
603 	char * const	 path_argv[] = { path, NULL };
604 	struct qwalk	*q;
605 
606 	q = xcalloc(1, sizeof(*q), "fsqueue_qwalk_new");
607 	(void)strlcpy(path, PATH_QUEUE, sizeof(path));
608 	q->fts = fts_open(path_argv,
609 	    FTS_PHYSICAL | FTS_NOCHDIR, NULL);
610 
611 	if (q->fts == NULL)
612 		err(1, "fsqueue_qwalk_new: fts_open: %s", path);
613 
614 	return (q);
615 }
616 
617 static void
618 fsqueue_qwalk_close(void *hdl)
619 {
620 	struct qwalk	*q = hdl;
621 
622 	fts_close(q->fts);
623 
624 	free(q);
625 }
626 
627 static int
628 fsqueue_qwalk(void *hdl, uint64_t *evpid)
629 {
630 	struct qwalk	*q = hdl;
631 	FTSENT		*e;
632 	char		*tmp;
633 
634 	while ((e = fts_read(q->fts)) != NULL) {
635 		switch (e->fts_info) {
636 		case FTS_D:
637 			q->depth += 1;
638 			if (q->depth == 2 && e->fts_namelen != 2) {
639 				log_debug("debug: fsqueue: bogus directory %s",
640 				    e->fts_path);
641 				fts_set(q->fts, e, FTS_SKIP);
642 				break;
643 			}
644 			if (q->depth == 3 && e->fts_namelen != 8) {
645 				log_debug("debug: fsqueue: bogus directory %s",
646 				    e->fts_path);
647 				fts_set(q->fts, e, FTS_SKIP);
648 				break;
649 			}
650 			break;
651 
652 		case FTS_DP:
653 		case FTS_DNR:
654 			q->depth -= 1;
655 			break;
656 
657 		case FTS_F:
658 			if (q->depth != 3)
659 				break;
660 			if (e->fts_namelen != 16)
661 				break;
662 			if (timespeccmp(&e->fts_statp->st_mtim, &startup, >))
663 				break;
664 			tmp = NULL;
665 			*evpid = strtoull(e->fts_name, &tmp, 16);
666 			if (tmp && *tmp !=  '\0') {
667 				log_debug("debug: fsqueue: bogus file %s",
668 				    e->fts_path);
669 				break;
670 			}
671 			return (1);
672 		default:
673 			break;
674 		}
675 	}
676 
677 	return (0);
678 }
679 
680 static int
681 queue_fs_init(struct passwd *pw, int server, const char *conf)
682 {
683 	unsigned int	 n;
684 	char		*paths[] = { PATH_QUEUE, PATH_CORRUPT, PATH_INCOMING };
685 	char		 path[PATH_MAX];
686 	int		 ret;
687 	struct timeval	 tv;
688 
689 	/* remove incoming/ if it exists */
690 	if (server)
691 		mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE);
692 
693 	fsqueue_envelope_path(0, path, sizeof(path));
694 
695 	ret = 1;
696 	for (n = 0; n < nitems(paths); n++) {
697 		(void)strlcpy(path, PATH_SPOOL, sizeof(path));
698 		if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path))
699 			errx(1, "path too long %s%s", PATH_SPOOL, paths[n]);
700 		if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0)
701 			ret = 0;
702 	}
703 
704 	if (gettimeofday(&tv, NULL) == -1)
705 		err(1, "gettimeofday");
706 	TIMEVAL_TO_TIMESPEC(&tv, &startup);
707 
708 	tree_init(&evpcount);
709 
710 	queue_api_on_message_create(queue_fs_message_create);
711 	queue_api_on_message_commit(queue_fs_message_commit);
712 	queue_api_on_message_delete(queue_fs_message_delete);
713 	queue_api_on_message_fd_r(queue_fs_message_fd_r);
714 	queue_api_on_message_corrupt(queue_fs_message_corrupt);
715 	queue_api_on_envelope_create(queue_fs_envelope_create);
716 	queue_api_on_envelope_delete(queue_fs_envelope_delete);
717 	queue_api_on_envelope_update(queue_fs_envelope_update);
718 	queue_api_on_envelope_load(queue_fs_envelope_load);
719 	queue_api_on_envelope_walk(queue_fs_envelope_walk);
720 	queue_api_on_message_walk(queue_fs_message_walk);
721 
722 	return (ret);
723 }
724 
725 struct queue_backend	queue_backend_fs = {
726 	queue_fs_init,
727 };
728