1 /* $OpenBSD: queue_fs.c,v 1.14 2015/12/30 11:40:30 jung Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/mount.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/socket.h> 24 #include <sys/stat.h> 25 26 #include <ctype.h> 27 #include <dirent.h> 28 #include <err.h> 29 #include <errno.h> 30 #include <event.h> 31 #include <fcntl.h> 32 #include <fts.h> 33 #include <imsg.h> 34 #include <inttypes.h> 35 #include <libgen.h> 36 #include <pwd.h> 37 #include <stdio.h> 38 #include <stdlib.h> 39 #include <string.h> 40 #include <time.h> 41 #include <unistd.h> 42 43 #include "smtpd.h" 44 #include "log.h" 45 46 #define PATH_QUEUE "/queue" 47 #define PATH_CORRUPT "/corrupt" 48 #define PATH_INCOMING "/incoming" 49 #define PATH_EVPTMP PATH_INCOMING "/envelope.tmp" 50 #define PATH_MESSAGE "/message" 51 52 /* percentage of remaining space / inodes required to accept new messages */ 53 #define MINSPACE 5 54 #define MININODES 5 55 56 struct qwalk { 57 FTS *fts; 58 int depth; 59 }; 60 61 static int fsqueue_check_space(void); 62 static void fsqueue_envelope_path(uint64_t, char *, size_t); 63 static void fsqueue_envelope_incoming_path(uint64_t, char *, size_t); 64 static int fsqueue_envelope_dump(char *, const char *, size_t, int, int); 65 static void fsqueue_message_path(uint32_t, char *, size_t); 66 static void fsqueue_message_corrupt_path(uint32_t, char *, size_t); 67 static void fsqueue_message_incoming_path(uint32_t, char *, size_t); 68 static void *fsqueue_qwalk_new(void); 69 static int fsqueue_qwalk(void *, uint64_t *); 70 static void fsqueue_qwalk_close(void *); 71 72 struct tree evpcount; 73 static struct timespec startup; 74 75 #define REF (int*)0xf00 76 77 static int 78 queue_fs_message_create(uint32_t *msgid) 79 { 80 char rootdir[PATH_MAX]; 81 struct stat sb; 82 83 if (!fsqueue_check_space()) 84 return 0; 85 86 again: 87 *msgid = queue_generate_msgid(); 88 89 /* prevent possible collision later when moving to Q_QUEUE */ 90 fsqueue_message_path(*msgid, rootdir, sizeof(rootdir)); 91 if (stat(rootdir, &sb) != -1) 92 goto again; 93 94 /* we hit an unexpected error, temporarily fail */ 95 if (errno != ENOENT) { 96 *msgid = 0; 97 return 0; 98 } 99 100 fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir)); 101 if (mkdir(rootdir, 0700) == -1) { 102 if (errno == EEXIST) 103 goto again; 104 105 if (errno == ENOSPC) { 106 *msgid = 0; 107 return 0; 108 } 109 110 log_warn("warn: queue-fs: mkdir"); 111 *msgid = 0; 112 return 0; 113 } 114 115 return (1); 116 } 117 118 static int 119 queue_fs_message_commit(uint32_t msgid, const char *path) 120 { 121 char incomingdir[PATH_MAX]; 122 char queuedir[PATH_MAX]; 123 char msgdir[PATH_MAX]; 124 char msgpath[PATH_MAX]; 125 126 /* before-first, move the message content in the incoming directory */ 127 fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath)); 128 if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath)) 129 >= sizeof(msgpath)) 130 return (0); 131 if (rename(path, msgpath) == -1) 132 return (0); 133 134 fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir)); 135 fsqueue_message_path(msgid, msgdir, sizeof(msgdir)); 136 if (strlcpy(queuedir, msgdir, sizeof(queuedir)) 137 >= sizeof(queuedir)) 138 return (0); 139 140 /* first attempt to rename */ 141 if (rename(incomingdir, msgdir) == 0) 142 return 1; 143 if (errno == ENOSPC) 144 return 0; 145 if (errno != ENOENT) { 146 log_warn("warn: queue-fs: rename"); 147 return 0; 148 } 149 150 /* create the bucket */ 151 *strrchr(queuedir, '/') = '\0'; 152 if (mkdir(queuedir, 0700) == -1) { 153 if (errno == ENOSPC) 154 return 0; 155 if (errno != EEXIST) { 156 log_warn("warn: queue-fs: mkdir"); 157 return 0; 158 } 159 } 160 161 /* rename */ 162 if (rename(incomingdir, msgdir) == -1) { 163 if (errno == ENOSPC) 164 return 0; 165 log_warn("warn: queue-fs: rename"); 166 return 0; 167 } 168 169 return 1; 170 } 171 172 static int 173 queue_fs_message_fd_r(uint32_t msgid) 174 { 175 int fd; 176 char path[PATH_MAX]; 177 178 fsqueue_message_path(msgid, path, sizeof(path)); 179 if (strlcat(path, PATH_MESSAGE, sizeof(path)) 180 >= sizeof(path)) 181 return -1; 182 183 if ((fd = open(path, O_RDONLY)) == -1) { 184 log_warn("warn: queue-fs: open"); 185 return -1; 186 } 187 188 return fd; 189 } 190 191 static int 192 queue_fs_message_delete(uint32_t msgid) 193 { 194 char path[PATH_MAX]; 195 struct stat sb; 196 197 fsqueue_message_incoming_path(msgid, path, sizeof(path)); 198 if (stat(path, &sb) == -1) 199 fsqueue_message_path(msgid, path, sizeof(path)); 200 201 if (rmtree(path, 0) == -1) 202 log_warn("warn: queue-fs: rmtree"); 203 204 tree_pop(&evpcount, msgid); 205 206 return 1; 207 } 208 209 static int 210 queue_fs_message_corrupt(uint32_t msgid) 211 { 212 struct stat sb; 213 char rootdir[PATH_MAX]; 214 char corruptdir[PATH_MAX]; 215 char buf[64]; 216 int retry = 0; 217 218 fsqueue_message_path(msgid, rootdir, sizeof(rootdir)); 219 fsqueue_message_corrupt_path(msgid, corruptdir, 220 sizeof(corruptdir)); 221 222 again: 223 if (stat(corruptdir, &sb) != -1 || errno != ENOENT) { 224 fsqueue_message_corrupt_path(msgid, corruptdir, 225 sizeof(corruptdir)); 226 (void)snprintf(buf, sizeof (buf), ".%d", retry++); 227 (void)strlcat(corruptdir, buf, sizeof(corruptdir)); 228 goto again; 229 } 230 231 if (rename(rootdir, corruptdir) == -1) { 232 log_warn("warn: queue-fs: rename"); 233 return 0; 234 } 235 236 tree_pop(&evpcount, msgid); 237 238 return 1; 239 } 240 241 static int 242 queue_fs_message_uncorrupt(uint32_t msgid) 243 { 244 struct stat sb; 245 char bucketdir[PATH_MAX]; 246 char queuedir[PATH_MAX]; 247 char corruptdir[PATH_MAX]; 248 249 fsqueue_message_corrupt_path(msgid, corruptdir, sizeof(corruptdir)); 250 if (stat(corruptdir, &sb) == -1) { 251 log_warnx("warn: queue-fs: stat %s failed", corruptdir); 252 return (0); 253 } 254 255 fsqueue_message_path(msgid, queuedir, sizeof(queuedir)); 256 if (stat(queuedir, &sb) == 0) { 257 log_warnx("warn: queue-fs: %s already exists", queuedir); 258 return (0); 259 } 260 261 if (!bsnprintf(bucketdir, sizeof bucketdir, "%s/%02x", PATH_QUEUE, 262 (msgid & 0xff000000) >> 24)) { 263 log_warnx("warn: queue-fs: path too long"); 264 return (0); 265 } 266 267 /* create the bucket */ 268 if (mkdir(bucketdir, 0700) == -1) { 269 if (errno == ENOSPC) 270 return (0); 271 if (errno != EEXIST) { 272 log_warn("warn: queue-fs: mkdir"); 273 return (0); 274 } 275 } 276 277 if (rename(corruptdir, queuedir) == -1) { 278 log_warn("warn: queue-fs: rename"); 279 return (0); 280 } 281 282 return (1); 283 } 284 285 static int 286 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len, 287 uint64_t *evpid) 288 { 289 char path[PATH_MAX]; 290 int queued = 0, i, r = 0, *n; 291 struct stat sb; 292 293 if (msgid == 0) { 294 log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid); 295 goto done; 296 } 297 298 fsqueue_message_incoming_path(msgid, path, sizeof(path)); 299 if (stat(path, &sb) == -1) 300 queued = 1; 301 302 for (i = 0; i < 20; i ++) { 303 *evpid = queue_generate_evpid(msgid); 304 if (queued) 305 fsqueue_envelope_path(*evpid, path, sizeof(path)); 306 else 307 fsqueue_envelope_incoming_path(*evpid, path, 308 sizeof(path)); 309 310 r = fsqueue_envelope_dump(path, buf, len, 0, 0); 311 if (r >= 0) 312 goto done; 313 } 314 r = 0; 315 log_warnx("warn: queue-fs: could not allocate evpid"); 316 317 done: 318 if (r) { 319 n = tree_pop(&evpcount, msgid); 320 if (n == NULL) 321 n = REF; 322 n += 1; 323 tree_xset(&evpcount, msgid, n); 324 } 325 return (r); 326 } 327 328 static int 329 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len) 330 { 331 char pathname[PATH_MAX]; 332 FILE *fp; 333 size_t r; 334 335 fsqueue_envelope_path(evpid, pathname, sizeof(pathname)); 336 337 fp = fopen(pathname, "r"); 338 if (fp == NULL) { 339 if (errno != ENOENT && errno != ENFILE) 340 log_warn("warn: queue-fs: fopen"); 341 return 0; 342 } 343 344 r = fread(buf, 1, len, fp); 345 if (r) { 346 if (r == len) { 347 log_warn("warn: queue-fs: too large"); 348 r = 0; 349 } 350 else 351 buf[r] = '\0'; 352 } 353 fclose(fp); 354 355 return (r); 356 } 357 358 static int 359 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len) 360 { 361 char dest[PATH_MAX]; 362 363 fsqueue_envelope_path(evpid, dest, sizeof(dest)); 364 365 return (fsqueue_envelope_dump(dest, buf, len, 1, 1)); 366 } 367 368 static int 369 queue_fs_envelope_delete(uint64_t evpid) 370 { 371 char pathname[PATH_MAX]; 372 uint32_t msgid; 373 int *n; 374 375 fsqueue_envelope_path(evpid, pathname, sizeof(pathname)); 376 if (unlink(pathname) == -1) 377 if (errno != ENOENT) 378 return 0; 379 380 msgid = evpid_to_msgid(evpid); 381 n = tree_pop(&evpcount, msgid); 382 n -= 1; 383 384 if (n - REF == 0) 385 queue_fs_message_delete(msgid); 386 else 387 tree_xset(&evpcount, msgid, n); 388 389 return (1); 390 } 391 392 static int 393 queue_fs_message_walk(uint64_t *evpid, char *buf, size_t len, 394 uint32_t msgid, int *done, void **data) 395 { 396 struct dirent *dp; 397 DIR *dir = *data; 398 char path[PATH_MAX]; 399 char msgid_str[9]; 400 char *tmp; 401 int r, *n; 402 403 if (*done) 404 return (-1); 405 406 if (!bsnprintf(path, sizeof path, "%s/%02x/%08x", 407 PATH_QUEUE, (msgid & 0xff000000) >> 24, msgid)) 408 fatalx("queue_fs_message_walk: path does not fit buffer"); 409 410 if (dir == NULL) { 411 if ((dir = opendir(path)) == NULL) { 412 log_warn("warn: queue_fs: opendir: %s", path); 413 *done = 1; 414 return (-1); 415 } 416 417 *data = dir; 418 } 419 420 (void)snprintf(msgid_str, sizeof msgid_str, "%08" PRIx32, msgid); 421 while ((dp = readdir(dir)) != NULL) { 422 if (dp->d_type != DT_REG) 423 continue; 424 425 /* ignore files other than envelopes */ 426 if (strlen(dp->d_name) != 16 || 427 strncmp(dp->d_name, msgid_str, 8)) 428 continue; 429 430 tmp = NULL; 431 *evpid = strtoull(dp->d_name, &tmp, 16); 432 if (tmp && *tmp != '\0') { 433 log_debug("debug: fsqueue: bogus file %s", dp->d_name); 434 continue; 435 } 436 437 memset(buf, 0, len); 438 r = queue_fs_envelope_load(*evpid, buf, len); 439 if (r) { 440 n = tree_pop(&evpcount, msgid); 441 if (n == NULL) 442 n = REF; 443 444 n += 1; 445 tree_xset(&evpcount, msgid, n); 446 } 447 448 return (r); 449 } 450 451 (void)closedir(dir); 452 *done = 1; 453 return (-1); 454 } 455 456 static int 457 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len) 458 { 459 static int done = 0; 460 static void *hdl = NULL; 461 int r, *n; 462 uint32_t msgid; 463 464 if (done) 465 return (-1); 466 467 if (hdl == NULL) 468 hdl = fsqueue_qwalk_new(); 469 470 if (fsqueue_qwalk(hdl, evpid)) { 471 memset(buf, 0, len); 472 r = queue_fs_envelope_load(*evpid, buf, len); 473 if (r) { 474 msgid = evpid_to_msgid(*evpid); 475 n = tree_pop(&evpcount, msgid); 476 if (n == NULL) 477 n = REF; 478 n += 1; 479 tree_xset(&evpcount, msgid, n); 480 } 481 return (r); 482 } 483 484 fsqueue_qwalk_close(hdl); 485 done = 1; 486 return (-1); 487 } 488 489 static int 490 fsqueue_check_space(void) 491 { 492 struct statfs buf; 493 uint64_t used; 494 uint64_t total; 495 496 if (statfs(PATH_QUEUE, &buf) < 0) { 497 log_warn("warn: queue-fs: statfs"); 498 return 0; 499 } 500 501 /* 502 * f_bfree and f_ffree is not set on all filesystems. 503 * They could be signed or unsigned integers. 504 * Some systems will set them to 0, others will set them to -1. 505 */ 506 if (buf.f_bfree == 0 || buf.f_ffree == 0 || 507 (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1) 508 return 1; 509 510 used = buf.f_blocks - buf.f_bfree; 511 total = buf.f_bavail + used; 512 if (total != 0) 513 used = (float)used / (float)total * 100; 514 else 515 used = 100; 516 if (100 - used < MINSPACE) { 517 log_warnx("warn: not enough disk space: %llu%% left", 518 (unsigned long long) 100 - used); 519 log_warnx("warn: temporarily rejecting messages"); 520 return 0; 521 } 522 523 used = buf.f_files - buf.f_ffree; 524 total = buf.f_favail + used; 525 if (total != 0) 526 used = (float)used / (float)total * 100; 527 else 528 used = 100; 529 if (100 - used < MININODES) { 530 log_warnx("warn: not enough inodes: %llu%% left", 531 (unsigned long long) 100 - used); 532 log_warnx("warn: temporarily rejecting messages"); 533 return 0; 534 } 535 536 return 1; 537 } 538 539 static void 540 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len) 541 { 542 if (!bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64, 543 PATH_QUEUE, 544 (evpid_to_msgid(evpid) & 0xff000000) >> 24, 545 evpid_to_msgid(evpid), 546 evpid)) 547 fatalx("fsqueue_envelope_path: path does not fit buffer"); 548 } 549 550 static void 551 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len) 552 { 553 if (!bsnprintf(buf, len, "%s/%08x/%016" PRIx64, 554 PATH_INCOMING, 555 evpid_to_msgid(evpid), 556 evpid)) 557 fatalx("fsqueue_envelope_incoming_path: path does not fit buffer"); 558 } 559 560 static int 561 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen, 562 int do_atomic, int do_sync) 563 { 564 const char *path = do_atomic ? PATH_EVPTMP : dest; 565 FILE *fp = NULL; 566 int fd; 567 size_t w; 568 569 if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) { 570 log_warn("warn: queue-fs: open"); 571 goto tempfail; 572 } 573 574 if ((fp = fdopen(fd, "w")) == NULL) { 575 log_warn("warn: queue-fs: fdopen"); 576 goto tempfail; 577 } 578 579 w = fwrite(evpbuf, 1, evplen, fp); 580 if (w < evplen) { 581 log_warn("warn: queue-fs: short write"); 582 goto tempfail; 583 } 584 if (fflush(fp)) { 585 log_warn("warn: queue-fs: fflush"); 586 goto tempfail; 587 } 588 if (do_sync && fsync(fileno(fp))) { 589 log_warn("warn: queue-fs: fsync"); 590 goto tempfail; 591 } 592 if (fclose(fp) != 0) { 593 log_warn("warn: queue-fs: fclose"); 594 fp = NULL; 595 goto tempfail; 596 } 597 fp = NULL; 598 fd = -1; 599 600 if (do_atomic && rename(path, dest) == -1) { 601 log_warn("warn: queue-fs: rename"); 602 goto tempfail; 603 } 604 return (1); 605 606 tempfail: 607 if (fp) 608 fclose(fp); 609 else if (fd != -1) 610 close(fd); 611 if (unlink(path) == -1) 612 log_warn("warn: queue-fs: unlink"); 613 return (0); 614 } 615 616 static void 617 fsqueue_message_path(uint32_t msgid, char *buf, size_t len) 618 { 619 if (!bsnprintf(buf, len, "%s/%02x/%08x", 620 PATH_QUEUE, 621 (msgid & 0xff000000) >> 24, 622 msgid)) 623 fatalx("fsqueue_message_path: path does not fit buffer"); 624 } 625 626 static void 627 fsqueue_message_corrupt_path(uint32_t msgid, char *buf, size_t len) 628 { 629 if (!bsnprintf(buf, len, "%s/%08x", 630 PATH_CORRUPT, 631 msgid)) 632 fatalx("fsqueue_message_corrupt_path: path does not fit buffer"); 633 } 634 635 static void 636 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len) 637 { 638 if (!bsnprintf(buf, len, "%s/%08x", 639 PATH_INCOMING, 640 msgid)) 641 fatalx("fsqueue_message_incoming_path: path does not fit buffer"); 642 } 643 644 static void * 645 fsqueue_qwalk_new(void) 646 { 647 char path[PATH_MAX]; 648 char * const path_argv[] = { path, NULL }; 649 struct qwalk *q; 650 651 q = xcalloc(1, sizeof(*q), "fsqueue_qwalk_new"); 652 (void)strlcpy(path, PATH_QUEUE, sizeof(path)); 653 q->fts = fts_open(path_argv, 654 FTS_PHYSICAL | FTS_NOCHDIR, NULL); 655 656 if (q->fts == NULL) 657 err(1, "fsqueue_qwalk_new: fts_open: %s", path); 658 659 return (q); 660 } 661 662 static void 663 fsqueue_qwalk_close(void *hdl) 664 { 665 struct qwalk *q = hdl; 666 667 fts_close(q->fts); 668 669 free(q); 670 } 671 672 static int 673 fsqueue_qwalk(void *hdl, uint64_t *evpid) 674 { 675 struct qwalk *q = hdl; 676 FTSENT *e; 677 char *tmp; 678 679 while ((e = fts_read(q->fts)) != NULL) { 680 switch (e->fts_info) { 681 case FTS_D: 682 q->depth += 1; 683 if (q->depth == 2 && e->fts_namelen != 2) { 684 log_debug("debug: fsqueue: bogus directory %s", 685 e->fts_path); 686 fts_set(q->fts, e, FTS_SKIP); 687 break; 688 } 689 if (q->depth == 3 && e->fts_namelen != 8) { 690 log_debug("debug: fsqueue: bogus directory %s", 691 e->fts_path); 692 fts_set(q->fts, e, FTS_SKIP); 693 break; 694 } 695 break; 696 697 case FTS_DP: 698 case FTS_DNR: 699 q->depth -= 1; 700 break; 701 702 case FTS_F: 703 if (q->depth != 3) 704 break; 705 if (e->fts_namelen != 16) 706 break; 707 if (timespeccmp(&e->fts_statp->st_mtim, &startup, >)) 708 break; 709 tmp = NULL; 710 *evpid = strtoull(e->fts_name, &tmp, 16); 711 if (tmp && *tmp != '\0') { 712 log_debug("debug: fsqueue: bogus file %s", 713 e->fts_path); 714 break; 715 } 716 return (1); 717 default: 718 break; 719 } 720 } 721 722 return (0); 723 } 724 725 static int 726 queue_fs_init(struct passwd *pw, int server, const char *conf) 727 { 728 unsigned int n; 729 char *paths[] = { PATH_QUEUE, PATH_CORRUPT, PATH_INCOMING }; 730 char path[PATH_MAX]; 731 int ret; 732 struct timeval tv; 733 734 /* remove incoming/ if it exists */ 735 if (server) 736 mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE); 737 738 fsqueue_envelope_path(0, path, sizeof(path)); 739 740 ret = 1; 741 for (n = 0; n < nitems(paths); n++) { 742 (void)strlcpy(path, PATH_SPOOL, sizeof(path)); 743 if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path)) 744 errx(1, "path too long %s%s", PATH_SPOOL, paths[n]); 745 if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0) 746 ret = 0; 747 } 748 749 if (gettimeofday(&tv, NULL) == -1) 750 err(1, "gettimeofday"); 751 TIMEVAL_TO_TIMESPEC(&tv, &startup); 752 753 tree_init(&evpcount); 754 755 queue_api_on_message_create(queue_fs_message_create); 756 queue_api_on_message_commit(queue_fs_message_commit); 757 queue_api_on_message_delete(queue_fs_message_delete); 758 queue_api_on_message_fd_r(queue_fs_message_fd_r); 759 queue_api_on_message_corrupt(queue_fs_message_corrupt); 760 queue_api_on_message_uncorrupt(queue_fs_message_uncorrupt); 761 queue_api_on_envelope_create(queue_fs_envelope_create); 762 queue_api_on_envelope_delete(queue_fs_envelope_delete); 763 queue_api_on_envelope_update(queue_fs_envelope_update); 764 queue_api_on_envelope_load(queue_fs_envelope_load); 765 queue_api_on_envelope_walk(queue_fs_envelope_walk); 766 queue_api_on_message_walk(queue_fs_message_walk); 767 768 return (ret); 769 } 770 771 struct queue_backend queue_backend_fs = { 772 queue_fs_init, 773 }; 774