1 /* $OpenBSD: queue_fs.c,v 1.10 2015/10/29 10:25:36 sunil Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/mount.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/socket.h> 24 #include <sys/stat.h> 25 26 #include <ctype.h> 27 #include <dirent.h> 28 #include <err.h> 29 #include <errno.h> 30 #include <event.h> 31 #include <fcntl.h> 32 #include <fts.h> 33 #include <imsg.h> 34 #include <inttypes.h> 35 #include <libgen.h> 36 #include <pwd.h> 37 #include <stdio.h> 38 #include <stdlib.h> 39 #include <string.h> 40 #include <time.h> 41 #include <unistd.h> 42 43 #include "smtpd.h" 44 #include "log.h" 45 46 #define PATH_QUEUE "/queue" 47 #define PATH_CORRUPT "/corrupt" 48 #define PATH_INCOMING "/incoming" 49 #define PATH_EVPTMP PATH_INCOMING "/envelope.tmp" 50 #define PATH_MESSAGE "/message" 51 52 /* percentage of remaining space / inodes required to accept new messages */ 53 #define MINSPACE 5 54 #define MININODES 5 55 56 struct qwalk { 57 FTS *fts; 58 int depth; 59 }; 60 61 static int fsqueue_check_space(void); 62 static void fsqueue_envelope_path(uint64_t, char *, size_t); 63 static void fsqueue_envelope_incoming_path(uint64_t, char *, size_t); 64 static int fsqueue_envelope_dump(char *, const char *, size_t, int, int); 65 static void fsqueue_message_path(uint32_t, char *, size_t); 66 static void fsqueue_message_corrupt_path(uint32_t, char *, size_t); 67 static void fsqueue_message_incoming_path(uint32_t, char *, size_t); 68 static void *fsqueue_qwalk_new(void); 69 static int fsqueue_qwalk(void *, uint64_t *); 70 static void fsqueue_qwalk_close(void *); 71 72 struct tree evpcount; 73 static struct timespec startup; 74 75 #define REF (int*)0xf00 76 77 static int 78 queue_fs_message_create(uint32_t *msgid) 79 { 80 char rootdir[PATH_MAX]; 81 struct stat sb; 82 83 if (! fsqueue_check_space()) 84 return 0; 85 86 again: 87 *msgid = queue_generate_msgid(); 88 89 /* prevent possible collision later when moving to Q_QUEUE */ 90 fsqueue_message_path(*msgid, rootdir, sizeof(rootdir)); 91 if (stat(rootdir, &sb) != -1) 92 goto again; 93 94 /* we hit an unexpected error, temporarily fail */ 95 if (errno != ENOENT) { 96 *msgid = 0; 97 return 0; 98 } 99 100 fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir)); 101 if (mkdir(rootdir, 0700) == -1) { 102 if (errno == EEXIST) 103 goto again; 104 105 if (errno == ENOSPC) { 106 *msgid = 0; 107 return 0; 108 } 109 110 log_warn("warn: queue-fs: mkdir"); 111 *msgid = 0; 112 return 0; 113 } 114 115 return (1); 116 } 117 118 static int 119 queue_fs_message_commit(uint32_t msgid, const char *path) 120 { 121 char incomingdir[PATH_MAX]; 122 char queuedir[PATH_MAX]; 123 char msgdir[PATH_MAX]; 124 char msgpath[PATH_MAX]; 125 126 /* before-first, move the message content in the incoming directory */ 127 fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath)); 128 if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath)) 129 >= sizeof(msgpath)) 130 return (0); 131 if (rename(path, msgpath) == -1) 132 return (0); 133 134 fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir)); 135 fsqueue_message_path(msgid, msgdir, sizeof(msgdir)); 136 if (strlcpy(queuedir, msgdir, sizeof(queuedir)) 137 >= sizeof(queuedir)) 138 return (0); 139 140 /* first attempt to rename */ 141 if (rename(incomingdir, msgdir) == 0) 142 return 1; 143 if (errno == ENOSPC) 144 return 0; 145 if (errno != ENOENT) { 146 log_warn("warn: queue-fs: rename"); 147 return 0; 148 } 149 150 /* create the bucket */ 151 *strrchr(queuedir, '/') = '\0'; 152 if (mkdir(queuedir, 0700) == -1) { 153 if (errno == ENOSPC) 154 return 0; 155 if (errno != EEXIST) { 156 log_warn("warn: queue-fs: mkdir"); 157 return 0; 158 } 159 } 160 161 /* rename */ 162 if (rename(incomingdir, msgdir) == -1) { 163 if (errno == ENOSPC) 164 return 0; 165 log_warn("warn: queue-fs: rename"); 166 return 0; 167 } 168 169 return 1; 170 } 171 172 static int 173 queue_fs_message_fd_r(uint32_t msgid) 174 { 175 int fd; 176 char path[PATH_MAX]; 177 178 fsqueue_message_path(msgid, path, sizeof(path)); 179 if (strlcat(path, PATH_MESSAGE, sizeof(path)) 180 >= sizeof(path)) 181 return -1; 182 183 if ((fd = open(path, O_RDONLY)) == -1) { 184 log_warn("warn: queue-fs: open"); 185 return -1; 186 } 187 188 return fd; 189 } 190 191 static int 192 queue_fs_message_delete(uint32_t msgid) 193 { 194 char path[PATH_MAX]; 195 struct stat sb; 196 197 fsqueue_message_incoming_path(msgid, path, sizeof(path)); 198 if (stat(path, &sb) == -1) 199 fsqueue_message_path(msgid, path, sizeof(path)); 200 201 if (rmtree(path, 0) == -1) 202 log_warn("warn: queue-fs: rmtree"); 203 204 tree_pop(&evpcount, msgid); 205 206 return 1; 207 } 208 209 static int 210 queue_fs_message_corrupt(uint32_t msgid) 211 { 212 struct stat sb; 213 char rootdir[PATH_MAX]; 214 char corruptdir[PATH_MAX]; 215 char buf[64]; 216 int retry = 0; 217 218 fsqueue_message_path(msgid, rootdir, sizeof(rootdir)); 219 fsqueue_message_corrupt_path(msgid, corruptdir, 220 sizeof(corruptdir)); 221 222 again: 223 if (stat(corruptdir, &sb) != -1 || errno != ENOENT) { 224 fsqueue_message_corrupt_path(msgid, corruptdir, 225 sizeof(corruptdir)); 226 (void)snprintf(buf, sizeof (buf), ".%d", retry++); 227 (void)strlcat(corruptdir, buf, sizeof(corruptdir)); 228 goto again; 229 } 230 231 if (rename(rootdir, corruptdir) == -1) { 232 log_warn("warn: queue-fs: rename"); 233 return 0; 234 } 235 236 tree_pop(&evpcount, msgid); 237 238 return 1; 239 } 240 241 static int 242 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len, 243 uint64_t *evpid) 244 { 245 char path[PATH_MAX]; 246 int queued = 0, i, r = 0, *n; 247 struct stat sb; 248 249 if (msgid == 0) { 250 log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid); 251 goto done; 252 } 253 254 fsqueue_message_incoming_path(msgid, path, sizeof(path)); 255 if (stat(path, &sb) == -1) 256 queued = 1; 257 258 for (i = 0; i < 20; i ++) { 259 *evpid = queue_generate_evpid(msgid); 260 if (queued) 261 fsqueue_envelope_path(*evpid, path, sizeof(path)); 262 else 263 fsqueue_envelope_incoming_path(*evpid, path, 264 sizeof(path)); 265 266 r = fsqueue_envelope_dump(path, buf, len, 0, 0); 267 if (r >= 0) 268 goto done; 269 } 270 r = 0; 271 log_warnx("warn: queue-fs: could not allocate evpid"); 272 273 done: 274 if (r) { 275 n = tree_pop(&evpcount, msgid); 276 if (n == NULL) 277 n = REF; 278 n += 1; 279 tree_xset(&evpcount, msgid, n); 280 } 281 return (r); 282 } 283 284 static int 285 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len) 286 { 287 char pathname[PATH_MAX]; 288 FILE *fp; 289 size_t r; 290 291 fsqueue_envelope_path(evpid, pathname, sizeof(pathname)); 292 293 fp = fopen(pathname, "r"); 294 if (fp == NULL) { 295 if (errno != ENOENT && errno != ENFILE) 296 log_warn("warn: queue-fs: fopen"); 297 return 0; 298 } 299 300 r = fread(buf, 1, len, fp); 301 if (r) { 302 if (r == len) { 303 log_warn("warn: queue-fs: too large"); 304 r = 0; 305 } 306 else 307 buf[r] = '\0'; 308 } 309 fclose(fp); 310 311 return (r); 312 } 313 314 static int 315 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len) 316 { 317 char dest[PATH_MAX]; 318 319 fsqueue_envelope_path(evpid, dest, sizeof(dest)); 320 321 return (fsqueue_envelope_dump(dest, buf, len, 1, 1)); 322 } 323 324 static int 325 queue_fs_envelope_delete(uint64_t evpid) 326 { 327 char pathname[PATH_MAX]; 328 uint32_t msgid; 329 int *n; 330 331 fsqueue_envelope_path(evpid, pathname, sizeof(pathname)); 332 if (unlink(pathname) == -1) 333 if (errno != ENOENT) 334 return 0; 335 336 msgid = evpid_to_msgid(evpid); 337 n = tree_pop(&evpcount, msgid); 338 n -= 1; 339 340 if (n - REF == 0) 341 queue_fs_message_delete(msgid); 342 else 343 tree_xset(&evpcount, msgid, n); 344 345 return (1); 346 } 347 348 static int 349 queue_fs_message_walk(uint64_t *evpid, char *buf, size_t len, 350 uint32_t msgid, int *done, void **data) 351 { 352 struct dirent *dp; 353 DIR *dir = *data; 354 char path[PATH_MAX]; 355 char msgid_str[9]; 356 char *tmp; 357 int r, *n; 358 359 if (*done) 360 return (-1); 361 362 if (! bsnprintf(path, sizeof path, "%s/%02x/%08x", 363 PATH_QUEUE, (msgid & 0xff000000) >> 24, msgid)) 364 fatalx("queue_fs_message_walk: path does not fit buffer"); 365 366 if (dir == NULL) { 367 if ((dir = opendir(path)) == NULL) { 368 log_warn("warn: queue_fs: opendir: %s", path); 369 *done = 1; 370 return (-1); 371 } 372 373 *data = dir; 374 } 375 376 (void)snprintf(msgid_str, sizeof msgid_str, "%08" PRIx32, msgid); 377 while ((dp = readdir(dir)) != NULL) { 378 if (dp->d_type != DT_REG) 379 continue; 380 381 /* ignore files other than envelopes */ 382 if (dp->d_namlen != 16 || strncmp(dp->d_name, msgid_str, 8)) 383 continue; 384 385 tmp = NULL; 386 *evpid = strtoull(dp->d_name, &tmp, 16); 387 if (tmp && *tmp != '\0') { 388 log_debug("debug: fsqueue: bogus file %s", dp->d_name); 389 continue; 390 } 391 392 memset(buf, 0, len); 393 r = queue_fs_envelope_load(*evpid, buf, len); 394 if (r) { 395 n = tree_pop(&evpcount, msgid); 396 if (n == NULL) 397 n = REF; 398 399 n += 1; 400 tree_xset(&evpcount, msgid, n); 401 } 402 403 return (r); 404 } 405 406 (void)closedir(dir); 407 *done = 1; 408 return (-1); 409 } 410 411 static int 412 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len) 413 { 414 static int done = 0; 415 static void *hdl = NULL; 416 int r, *n; 417 uint32_t msgid; 418 419 if (done) 420 return (-1); 421 422 if (hdl == NULL) 423 hdl = fsqueue_qwalk_new(); 424 425 if (fsqueue_qwalk(hdl, evpid)) { 426 memset(buf, 0, len); 427 r = queue_fs_envelope_load(*evpid, buf, len); 428 if (r) { 429 msgid = evpid_to_msgid(*evpid); 430 n = tree_pop(&evpcount, msgid); 431 if (n == NULL) 432 n = REF; 433 n += 1; 434 tree_xset(&evpcount, msgid, n); 435 } 436 return (r); 437 } 438 439 fsqueue_qwalk_close(hdl); 440 done = 1; 441 return (-1); 442 } 443 444 static int 445 fsqueue_check_space(void) 446 { 447 struct statfs buf; 448 uint64_t used; 449 uint64_t total; 450 451 if (statfs(PATH_QUEUE, &buf) < 0) { 452 log_warn("warn: queue-fs: statfs"); 453 return 0; 454 } 455 456 /* 457 * f_bfree and f_ffree is not set on all filesystems. 458 * They could be signed or unsigned integers. 459 * Some systems will set them to 0, others will set them to -1. 460 */ 461 if (buf.f_bfree == 0 || buf.f_ffree == 0 || 462 (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1) 463 return 1; 464 465 used = buf.f_blocks - buf.f_bfree; 466 total = buf.f_bavail + used; 467 if (total != 0) 468 used = (float)used / (float)total * 100; 469 else 470 used = 100; 471 if (100 - used < MINSPACE) { 472 log_warnx("warn: not enough disk space: %llu%% left", 473 (unsigned long long) 100 - used); 474 log_warnx("warn: temporarily rejecting messages"); 475 return 0; 476 } 477 478 used = buf.f_files - buf.f_ffree; 479 total = buf.f_favail + used; 480 if (total != 0) 481 used = (float)used / (float)total * 100; 482 else 483 used = 100; 484 if (100 - used < MININODES) { 485 log_warnx("warn: not enough inodes: %llu%% left", 486 (unsigned long long) 100 - used); 487 log_warnx("warn: temporarily rejecting messages"); 488 return 0; 489 } 490 491 return 1; 492 } 493 494 static void 495 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len) 496 { 497 if (! bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64, 498 PATH_QUEUE, 499 (evpid_to_msgid(evpid) & 0xff000000) >> 24, 500 evpid_to_msgid(evpid), 501 evpid)) 502 fatalx("fsqueue_envelope_path: path does not fit buffer"); 503 } 504 505 static void 506 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len) 507 { 508 if (! bsnprintf(buf, len, "%s/%08x/%016" PRIx64, 509 PATH_INCOMING, 510 evpid_to_msgid(evpid), 511 evpid)) 512 fatalx("fsqueue_envelope_incoming_path: path does not fit buffer"); 513 } 514 515 static int 516 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen, 517 int do_atomic, int do_sync) 518 { 519 const char *path = do_atomic ? PATH_EVPTMP : dest; 520 FILE *fp = NULL; 521 int fd; 522 size_t w; 523 524 if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) { 525 log_warn("warn: queue-fs: open"); 526 goto tempfail; 527 } 528 529 if ((fp = fdopen(fd, "w")) == NULL) { 530 log_warn("warn: queue-fs: fdopen"); 531 goto tempfail; 532 } 533 534 w = fwrite(evpbuf, 1, evplen, fp); 535 if (w < evplen) { 536 log_warn("warn: queue-fs: short write"); 537 goto tempfail; 538 } 539 if (fflush(fp)) { 540 log_warn("warn: queue-fs: fflush"); 541 goto tempfail; 542 } 543 if (do_sync && fsync(fileno(fp))) { 544 log_warn("warn: queue-fs: fsync"); 545 goto tempfail; 546 } 547 if (fclose(fp) != 0) { 548 log_warn("warn: queue-fs: fclose"); 549 fp = NULL; 550 goto tempfail; 551 } 552 fp = NULL; 553 fd = -1; 554 555 if (do_atomic && rename(path, dest) == -1) { 556 log_warn("warn: queue-fs: rename"); 557 goto tempfail; 558 } 559 return (1); 560 561 tempfail: 562 if (fp) 563 fclose(fp); 564 else if (fd != -1) 565 close(fd); 566 if (unlink(path) == -1) 567 log_warn("warn: queue-fs: unlink"); 568 return (0); 569 } 570 571 static void 572 fsqueue_message_path(uint32_t msgid, char *buf, size_t len) 573 { 574 if (! bsnprintf(buf, len, "%s/%02x/%08x", 575 PATH_QUEUE, 576 (msgid & 0xff000000) >> 24, 577 msgid)) 578 fatalx("fsqueue_message_path: path does not fit buffer"); 579 } 580 581 static void 582 fsqueue_message_corrupt_path(uint32_t msgid, char *buf, size_t len) 583 { 584 if (! bsnprintf(buf, len, "%s/%08x", 585 PATH_CORRUPT, 586 msgid)) 587 fatalx("fsqueue_message_corrupt_path: path does not fit buffer"); 588 } 589 590 static void 591 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len) 592 { 593 if (! bsnprintf(buf, len, "%s/%08x", 594 PATH_INCOMING, 595 msgid)) 596 fatalx("fsqueue_message_incoming_path: path does not fit buffer"); 597 } 598 599 static void * 600 fsqueue_qwalk_new(void) 601 { 602 char path[PATH_MAX]; 603 char * const path_argv[] = { path, NULL }; 604 struct qwalk *q; 605 606 q = xcalloc(1, sizeof(*q), "fsqueue_qwalk_new"); 607 (void)strlcpy(path, PATH_QUEUE, sizeof(path)); 608 q->fts = fts_open(path_argv, 609 FTS_PHYSICAL | FTS_NOCHDIR, NULL); 610 611 if (q->fts == NULL) 612 err(1, "fsqueue_qwalk_new: fts_open: %s", path); 613 614 return (q); 615 } 616 617 static void 618 fsqueue_qwalk_close(void *hdl) 619 { 620 struct qwalk *q = hdl; 621 622 fts_close(q->fts); 623 624 free(q); 625 } 626 627 static int 628 fsqueue_qwalk(void *hdl, uint64_t *evpid) 629 { 630 struct qwalk *q = hdl; 631 FTSENT *e; 632 char *tmp; 633 634 while ((e = fts_read(q->fts)) != NULL) { 635 switch (e->fts_info) { 636 case FTS_D: 637 q->depth += 1; 638 if (q->depth == 2 && e->fts_namelen != 2) { 639 log_debug("debug: fsqueue: bogus directory %s", 640 e->fts_path); 641 fts_set(q->fts, e, FTS_SKIP); 642 break; 643 } 644 if (q->depth == 3 && e->fts_namelen != 8) { 645 log_debug("debug: fsqueue: bogus directory %s", 646 e->fts_path); 647 fts_set(q->fts, e, FTS_SKIP); 648 break; 649 } 650 break; 651 652 case FTS_DP: 653 case FTS_DNR: 654 q->depth -= 1; 655 break; 656 657 case FTS_F: 658 if (q->depth != 3) 659 break; 660 if (e->fts_namelen != 16) 661 break; 662 if (timespeccmp(&e->fts_statp->st_mtim, &startup, >)) 663 break; 664 tmp = NULL; 665 *evpid = strtoull(e->fts_name, &tmp, 16); 666 if (tmp && *tmp != '\0') { 667 log_debug("debug: fsqueue: bogus file %s", 668 e->fts_path); 669 break; 670 } 671 return (1); 672 default: 673 break; 674 } 675 } 676 677 return (0); 678 } 679 680 static int 681 queue_fs_init(struct passwd *pw, int server, const char *conf) 682 { 683 unsigned int n; 684 char *paths[] = { PATH_QUEUE, PATH_CORRUPT, PATH_INCOMING }; 685 char path[PATH_MAX]; 686 int ret; 687 struct timeval tv; 688 689 /* remove incoming/ if it exists */ 690 if (server) 691 mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE); 692 693 fsqueue_envelope_path(0, path, sizeof(path)); 694 695 ret = 1; 696 for (n = 0; n < nitems(paths); n++) { 697 (void)strlcpy(path, PATH_SPOOL, sizeof(path)); 698 if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path)) 699 errx(1, "path too long %s%s", PATH_SPOOL, paths[n]); 700 if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0) 701 ret = 0; 702 } 703 704 if (gettimeofday(&tv, NULL) == -1) 705 err(1, "gettimeofday"); 706 TIMEVAL_TO_TIMESPEC(&tv, &startup); 707 708 tree_init(&evpcount); 709 710 queue_api_on_message_create(queue_fs_message_create); 711 queue_api_on_message_commit(queue_fs_message_commit); 712 queue_api_on_message_delete(queue_fs_message_delete); 713 queue_api_on_message_fd_r(queue_fs_message_fd_r); 714 queue_api_on_message_corrupt(queue_fs_message_corrupt); 715 queue_api_on_envelope_create(queue_fs_envelope_create); 716 queue_api_on_envelope_delete(queue_fs_envelope_delete); 717 queue_api_on_envelope_update(queue_fs_envelope_update); 718 queue_api_on_envelope_load(queue_fs_envelope_load); 719 queue_api_on_envelope_walk(queue_fs_envelope_walk); 720 queue_api_on_message_walk(queue_fs_message_walk); 721 722 return (ret); 723 } 724 725 struct queue_backend queue_backend_fs = { 726 queue_fs_init, 727 }; 728