1 /* $OpenBSD: queue_fs.c,v 1.6 2014/07/08 15:45:32 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 #include <sys/param.h> 25 #include <sys/mount.h> 26 27 #include <ctype.h> 28 #include <err.h> 29 #include <errno.h> 30 #include <event.h> 31 #include <fcntl.h> 32 #include <fts.h> 33 #include <imsg.h> 34 #include <inttypes.h> 35 #include <libgen.h> 36 #include <pwd.h> 37 #include <stdio.h> 38 #include <stdlib.h> 39 #include <string.h> 40 #include <time.h> 41 #include <unistd.h> 42 43 #include "smtpd.h" 44 #include "log.h" 45 46 #define PATH_QUEUE "/queue" 47 #define PATH_CORRUPT "/corrupt" 48 #define PATH_INCOMING "/incoming" 49 #define PATH_EVPTMP PATH_INCOMING "/envelope.tmp" 50 #define PATH_MESSAGE "/message" 51 52 /* percentage of remaining space / inodes required to accept new messages */ 53 #define MINSPACE 5 54 #define MININODES 5 55 56 struct qwalk { 57 FTS *fts; 58 int depth; 59 }; 60 61 static int fsqueue_check_space(void); 62 static void fsqueue_envelope_path(uint64_t, char *, size_t); 63 static void fsqueue_envelope_incoming_path(uint64_t, char *, size_t); 64 static int fsqueue_envelope_dump(char *, const char *, size_t, int, int); 65 static void fsqueue_message_path(uint32_t, char *, size_t); 66 static void fsqueue_message_corrupt_path(uint32_t, char *, size_t); 67 static void fsqueue_message_incoming_path(uint32_t, char *, size_t); 68 static void *fsqueue_qwalk_new(void); 69 static int fsqueue_qwalk(void *, uint64_t *); 70 static void fsqueue_qwalk_close(void *); 71 72 struct tree evpcount; 73 static struct timespec startup; 74 75 #define REF (int*)0xf00 76 77 static int 78 queue_fs_message_create(uint32_t *msgid) 79 { 80 char rootdir[SMTPD_MAXPATHLEN]; 81 struct stat sb; 82 83 if (! fsqueue_check_space()) 84 return 0; 85 86 again: 87 *msgid = queue_generate_msgid(); 88 89 /* prevent possible collision later when moving to Q_QUEUE */ 90 fsqueue_message_path(*msgid, rootdir, sizeof(rootdir)); 91 if (stat(rootdir, &sb) != -1) 92 goto again; 93 94 /* we hit an unexpected error, temporarily fail */ 95 if (errno != ENOENT) { 96 *msgid = 0; 97 return 0; 98 } 99 100 fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir)); 101 if (mkdir(rootdir, 0700) == -1) { 102 if (errno == EEXIST) 103 goto again; 104 105 if (errno == ENOSPC) { 106 *msgid = 0; 107 return 0; 108 } 109 110 log_warn("warn: queue-fs: mkdir"); 111 *msgid = 0; 112 return 0; 113 } 114 115 return (1); 116 } 117 118 static int 119 queue_fs_message_commit(uint32_t msgid, const char *path) 120 { 121 char incomingdir[SMTPD_MAXPATHLEN]; 122 char queuedir[SMTPD_MAXPATHLEN]; 123 char msgdir[SMTPD_MAXPATHLEN]; 124 char msgpath[SMTPD_MAXPATHLEN]; 125 126 /* before-first, move the message content in the incoming directory */ 127 fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath)); 128 if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath)) 129 >= sizeof(msgpath)) 130 return (0); 131 if (rename(path, msgpath) == -1) 132 return (0); 133 134 fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir)); 135 fsqueue_message_path(msgid, msgdir, sizeof(msgdir)); 136 if (strlcpy(queuedir, msgdir, sizeof(queuedir)) 137 >= sizeof(queuedir)) 138 return (0); 139 140 /* first attempt to rename */ 141 if (rename(incomingdir, msgdir) == 0) 142 return 1; 143 if (errno == ENOSPC) 144 return 0; 145 if (errno != ENOENT) { 146 log_warn("warn: queue-fs: rename"); 147 return 0; 148 } 149 150 /* create the bucket */ 151 *strrchr(queuedir, '/') = '\0'; 152 if (mkdir(queuedir, 0700) == -1) { 153 if (errno == ENOSPC) 154 return 0; 155 if (errno != EEXIST) { 156 log_warn("warn: queue-fs: mkdir"); 157 return 0; 158 } 159 } 160 161 /* rename */ 162 if (rename(incomingdir, msgdir) == -1) { 163 if (errno == ENOSPC) 164 return 0; 165 log_warn("warn: queue-fs: rename"); 166 return 0; 167 } 168 169 /* best effort */ 170 sync(); 171 172 return 1; 173 } 174 175 static int 176 queue_fs_message_fd_r(uint32_t msgid) 177 { 178 int fd; 179 char path[SMTPD_MAXPATHLEN]; 180 181 fsqueue_message_path(msgid, path, sizeof(path)); 182 if (strlcat(path, PATH_MESSAGE, sizeof(path)) 183 >= sizeof(path)) 184 return -1; 185 186 if ((fd = open(path, O_RDONLY)) == -1) { 187 log_warn("warn: queue-fs: open"); 188 return -1; 189 } 190 191 return fd; 192 } 193 194 static int 195 queue_fs_message_delete(uint32_t msgid) 196 { 197 char path[SMTPD_MAXPATHLEN]; 198 struct stat sb; 199 200 fsqueue_message_incoming_path(msgid, path, sizeof(path)); 201 if (stat(path, &sb) == -1) 202 fsqueue_message_path(msgid, path, sizeof(path)); 203 204 if (rmtree(path, 0) == -1) 205 log_warn("warn: queue-fs: rmtree"); 206 207 tree_pop(&evpcount, msgid); 208 209 return 1; 210 } 211 212 static int 213 queue_fs_message_corrupt(uint32_t msgid) 214 { 215 struct stat sb; 216 char rootdir[SMTPD_MAXPATHLEN]; 217 char corruptdir[SMTPD_MAXPATHLEN]; 218 char buf[64]; 219 int retry = 0; 220 221 fsqueue_message_path(msgid, rootdir, sizeof(rootdir)); 222 fsqueue_message_corrupt_path(msgid, corruptdir, 223 sizeof(corruptdir)); 224 225 again: 226 if (stat(corruptdir, &sb) != -1 || errno != ENOENT) { 227 fsqueue_message_corrupt_path(msgid, corruptdir, 228 sizeof(corruptdir)); 229 (void)snprintf(buf, sizeof (buf), ".%d", retry++); 230 (void)strlcat(corruptdir, buf, sizeof(corruptdir)); 231 goto again; 232 } 233 234 if (rename(rootdir, corruptdir) == -1) { 235 log_warn("warn: queue-fs: rename"); 236 return 0; 237 } 238 239 tree_pop(&evpcount, msgid); 240 241 return 1; 242 } 243 244 static int 245 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len, 246 uint64_t *evpid) 247 { 248 char path[SMTPD_MAXPATHLEN]; 249 int queued = 0, i, r = 0, *n; 250 struct stat sb; 251 252 if (msgid == 0) { 253 log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid); 254 goto done; 255 } 256 257 fsqueue_message_incoming_path(msgid, path, sizeof(path)); 258 if (stat(path, &sb) == -1) 259 queued = 1; 260 261 for (i = 0; i < 20; i ++) { 262 *evpid = queue_generate_evpid(msgid); 263 if (queued) 264 fsqueue_envelope_path(*evpid, path, sizeof(path)); 265 else 266 fsqueue_envelope_incoming_path(*evpid, path, 267 sizeof(path)); 268 269 r = fsqueue_envelope_dump(path, buf, len, 0, 0); 270 if (r >= 0) 271 goto done; 272 } 273 r = 0; 274 log_warnx("warn: queue-fs: could not allocate evpid"); 275 276 done: 277 if (r) { 278 n = tree_pop(&evpcount, msgid); 279 if (n == NULL) 280 n = REF; 281 n += 1; 282 tree_xset(&evpcount, msgid, n); 283 } 284 return (r); 285 } 286 287 static int 288 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len) 289 { 290 char pathname[SMTPD_MAXPATHLEN]; 291 FILE *fp; 292 size_t r; 293 294 fsqueue_envelope_path(evpid, pathname, sizeof(pathname)); 295 296 fp = fopen(pathname, "r"); 297 if (fp == NULL) { 298 if (errno != ENOENT && errno != ENFILE) 299 log_warn("warn: queue-fs: fopen"); 300 return 0; 301 } 302 303 r = fread(buf, 1, len, fp); 304 if (r) { 305 if (r == len) { 306 log_warn("warn: queue-fs: too large"); 307 r = 0; 308 } 309 else 310 buf[r] = '\0'; 311 } 312 fclose(fp); 313 314 return (r); 315 } 316 317 static int 318 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len) 319 { 320 char dest[SMTPD_MAXPATHLEN]; 321 322 fsqueue_envelope_path(evpid, dest, sizeof(dest)); 323 324 return (fsqueue_envelope_dump(dest, buf, len, 1, 1)); 325 } 326 327 static int 328 queue_fs_envelope_delete(uint64_t evpid) 329 { 330 char pathname[SMTPD_MAXPATHLEN]; 331 uint32_t msgid; 332 int *n; 333 334 fsqueue_envelope_path(evpid, pathname, sizeof(pathname)); 335 if (unlink(pathname) == -1) 336 if (errno != ENOENT) 337 return 0; 338 339 msgid = evpid_to_msgid(evpid); 340 n = tree_pop(&evpcount, msgid); 341 n -= 1; 342 343 if (n - REF == 0) 344 queue_fs_message_delete(msgid); 345 else 346 tree_xset(&evpcount, msgid, n); 347 348 return (1); 349 } 350 351 static int 352 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len) 353 { 354 static int done = 0; 355 static void *hdl = NULL; 356 int r, *n; 357 uint32_t msgid; 358 359 if (done) 360 return (-1); 361 362 if (hdl == NULL) 363 hdl = fsqueue_qwalk_new(); 364 365 if (fsqueue_qwalk(hdl, evpid)) { 366 memset(buf, 0, len); 367 r = queue_fs_envelope_load(*evpid, buf, len); 368 if (r) { 369 msgid = evpid_to_msgid(*evpid); 370 n = tree_pop(&evpcount, msgid); 371 if (n == NULL) 372 n = REF; 373 n += 1; 374 tree_xset(&evpcount, msgid, n); 375 } 376 return (r); 377 } 378 379 fsqueue_qwalk_close(hdl); 380 done = 1; 381 return (-1); 382 } 383 384 static int 385 fsqueue_check_space(void) 386 { 387 struct statfs buf; 388 uint64_t used; 389 uint64_t total; 390 391 if (statfs(PATH_QUEUE, &buf) < 0) { 392 log_warn("warn: queue-fs: statfs"); 393 return 0; 394 } 395 396 /* 397 * f_bfree and f_ffree is not set on all filesystems. 398 * They could be signed or unsigned integers. 399 * Some systems will set them to 0, others will set them to -1. 400 */ 401 if (buf.f_bfree == 0 || buf.f_ffree == 0 || 402 (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1) 403 return 1; 404 405 used = buf.f_blocks - buf.f_bfree; 406 total = buf.f_bavail + used; 407 if (total != 0) 408 used = (float)used / (float)total * 100; 409 else 410 used = 100; 411 if (100 - used < MINSPACE) { 412 log_warnx("warn: not enough disk space: %llu%% left", 413 (unsigned long long) 100 - used); 414 log_warnx("warn: temporarily rejecting messages"); 415 return 0; 416 } 417 418 used = buf.f_files - buf.f_ffree; 419 total = buf.f_favail + used; 420 if (total != 0) 421 used = (float)used / (float)total * 100; 422 else 423 used = 100; 424 if (100 - used < MININODES) { 425 log_warnx("warn: not enough inodes: %llu%% left", 426 (unsigned long long) 100 - used); 427 log_warnx("warn: temporarily rejecting messages"); 428 return 0; 429 } 430 431 return 1; 432 } 433 434 static void 435 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len) 436 { 437 if (! bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64, 438 PATH_QUEUE, 439 (evpid_to_msgid(evpid) & 0xff000000) >> 24, 440 evpid_to_msgid(evpid), 441 evpid)) 442 fatalx("fsqueue_envelope_path: path does not fit buffer"); 443 } 444 445 static void 446 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len) 447 { 448 if (! bsnprintf(buf, len, "%s/%08x/%016" PRIx64, 449 PATH_INCOMING, 450 evpid_to_msgid(evpid), 451 evpid)) 452 fatalx("fsqueue_envelope_incoming_path: path does not fit buffer"); 453 } 454 455 static int 456 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen, 457 int do_atomic, int do_sync) 458 { 459 const char *path = do_atomic ? PATH_EVPTMP : dest; 460 FILE *fp = NULL; 461 int fd; 462 size_t w; 463 464 if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) { 465 log_warn("warn: queue-fs: open"); 466 goto tempfail; 467 } 468 469 if ((fp = fdopen(fd, "w")) == NULL) { 470 log_warn("warn: queue-fs: fdopen"); 471 goto tempfail; 472 } 473 474 w = fwrite(evpbuf, 1, evplen, fp); 475 if (w < evplen) { 476 log_warn("warn: queue-fs: short write"); 477 goto tempfail; 478 } 479 if (fflush(fp)) { 480 log_warn("warn: queue-fs: fflush"); 481 goto tempfail; 482 } 483 if (do_sync && fsync(fileno(fp))) { 484 log_warn("warn: queue-fs: fsync"); 485 goto tempfail; 486 } 487 if (fclose(fp) != 0) { 488 log_warn("warn: queue-fs: fclose"); 489 fp = NULL; 490 goto tempfail; 491 } 492 fp = NULL; 493 fd = -1; 494 495 if (do_atomic && rename(path, dest) == -1) { 496 log_warn("warn: queue-fs: rename"); 497 goto tempfail; 498 } 499 return (1); 500 501 tempfail: 502 if (fp) 503 fclose(fp); 504 else if (fd != -1) 505 close(fd); 506 if (unlink(path) == -1) 507 log_warn("warn: queue-fs: unlink"); 508 return (0); 509 } 510 511 static void 512 fsqueue_message_path(uint32_t msgid, char *buf, size_t len) 513 { 514 if (! bsnprintf(buf, len, "%s/%02x/%08x", 515 PATH_QUEUE, 516 (msgid & 0xff000000) >> 24, 517 msgid)) 518 fatalx("fsqueue_message_path: path does not fit buffer"); 519 } 520 521 static void 522 fsqueue_message_corrupt_path(uint32_t msgid, char *buf, size_t len) 523 { 524 if (! bsnprintf(buf, len, "%s/%08x", 525 PATH_CORRUPT, 526 msgid)) 527 fatalx("fsqueue_message_corrupt_path: path does not fit buffer"); 528 } 529 530 static void 531 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len) 532 { 533 if (! bsnprintf(buf, len, "%s/%08x", 534 PATH_INCOMING, 535 msgid)) 536 fatalx("fsqueue_message_incoming_path: path does not fit buffer"); 537 } 538 539 static void * 540 fsqueue_qwalk_new(void) 541 { 542 char path[SMTPD_MAXPATHLEN]; 543 char * const path_argv[] = { path, NULL }; 544 struct qwalk *q; 545 546 q = xcalloc(1, sizeof(*q), "fsqueue_qwalk_new"); 547 (void)strlcpy(path, PATH_QUEUE, sizeof(path)); 548 q->fts = fts_open(path_argv, 549 FTS_PHYSICAL | FTS_NOCHDIR, NULL); 550 551 if (q->fts == NULL) 552 err(1, "fsqueue_qwalk_new: fts_open: %s", path); 553 554 return (q); 555 } 556 557 static void 558 fsqueue_qwalk_close(void *hdl) 559 { 560 struct qwalk *q = hdl; 561 562 fts_close(q->fts); 563 564 free(q); 565 } 566 567 static int 568 fsqueue_qwalk(void *hdl, uint64_t *evpid) 569 { 570 struct qwalk *q = hdl; 571 FTSENT *e; 572 char *tmp; 573 574 while ((e = fts_read(q->fts)) != NULL) { 575 switch (e->fts_info) { 576 case FTS_D: 577 q->depth += 1; 578 if (q->depth == 2 && e->fts_namelen != 2) { 579 log_debug("debug: fsqueue: bogus directory %s", 580 e->fts_path); 581 fts_set(q->fts, e, FTS_SKIP); 582 break; 583 } 584 if (q->depth == 3 && e->fts_namelen != 8) { 585 log_debug("debug: fsqueue: bogus directory %s", 586 e->fts_path); 587 fts_set(q->fts, e, FTS_SKIP); 588 break; 589 } 590 break; 591 592 case FTS_DP: 593 case FTS_DNR: 594 q->depth -= 1; 595 break; 596 597 case FTS_F: 598 if (q->depth != 3) 599 break; 600 if (e->fts_namelen != 16) 601 break; 602 if (timespeccmp(&e->fts_statp->st_mtim, &startup, >)) 603 break; 604 tmp = NULL; 605 *evpid = strtoull(e->fts_name, &tmp, 16); 606 if (tmp && *tmp != '\0') { 607 log_debug("debug: fsqueue: bogus file %s", 608 e->fts_path); 609 break; 610 } 611 return (1); 612 default: 613 break; 614 } 615 } 616 617 return (0); 618 } 619 620 static int 621 queue_fs_init(struct passwd *pw, int server, const char *conf) 622 { 623 unsigned int n; 624 char *paths[] = { PATH_QUEUE, PATH_CORRUPT, PATH_INCOMING }; 625 char path[SMTPD_MAXPATHLEN]; 626 int ret; 627 struct timeval tv; 628 629 /* remove incoming/ if it exists */ 630 if (server) 631 mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE); 632 633 fsqueue_envelope_path(0, path, sizeof(path)); 634 635 ret = 1; 636 for (n = 0; n < nitems(paths); n++) { 637 (void)strlcpy(path, PATH_SPOOL, sizeof(path)); 638 if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path)) 639 errx(1, "path too long %s%s", PATH_SPOOL, paths[n]); 640 if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0) 641 ret = 0; 642 } 643 644 if (gettimeofday(&tv, NULL) == -1) 645 err(1, "gettimeofday"); 646 TIMEVAL_TO_TIMESPEC(&tv, &startup); 647 648 tree_init(&evpcount); 649 650 queue_api_on_message_create(queue_fs_message_create); 651 queue_api_on_message_commit(queue_fs_message_commit); 652 queue_api_on_message_delete(queue_fs_message_delete); 653 queue_api_on_message_fd_r(queue_fs_message_fd_r); 654 queue_api_on_message_corrupt(queue_fs_message_corrupt); 655 queue_api_on_envelope_create(queue_fs_envelope_create); 656 queue_api_on_envelope_delete(queue_fs_envelope_delete); 657 queue_api_on_envelope_update(queue_fs_envelope_update); 658 queue_api_on_envelope_load(queue_fs_envelope_load); 659 queue_api_on_envelope_walk(queue_fs_envelope_walk); 660 661 return (ret); 662 } 663 664 struct queue_backend queue_backend_fs = { 665 queue_fs_init, 666 }; 667