1 /* $OpenBSD: queue_fs.c,v 1.17 2018/05/31 21:06:12 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/mount.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/socket.h> 24 #include <sys/stat.h> 25 26 #include <ctype.h> 27 #include <dirent.h> 28 #include <err.h> 29 #include <errno.h> 30 #include <event.h> 31 #include <fcntl.h> 32 #include <fts.h> 33 #include <imsg.h> 34 #include <inttypes.h> 35 #include <libgen.h> 36 #include <pwd.h> 37 #include <stdio.h> 38 #include <stdlib.h> 39 #include <string.h> 40 #include <time.h> 41 #include <unistd.h> 42 43 #include "smtpd.h" 44 #include "log.h" 45 46 #define PATH_QUEUE "/queue" 47 #define PATH_INCOMING "/incoming" 48 #define PATH_EVPTMP PATH_INCOMING "/envelope.tmp" 49 #define PATH_MESSAGE "/message" 50 51 /* percentage of remaining space / inodes required to accept new messages */ 52 #define MINSPACE 5 53 #define MININODES 5 54 55 struct qwalk { 56 FTS *fts; 57 int depth; 58 }; 59 60 static int fsqueue_check_space(void); 61 static void fsqueue_envelope_path(uint64_t, char *, size_t); 62 static void fsqueue_envelope_incoming_path(uint64_t, char *, size_t); 63 static int fsqueue_envelope_dump(char *, const char *, size_t, int, int); 64 static void fsqueue_message_path(uint32_t, char *, size_t); 65 static void fsqueue_message_incoming_path(uint32_t, char *, size_t); 66 static void *fsqueue_qwalk_new(void); 67 static int fsqueue_qwalk(void *, uint64_t *); 68 static void fsqueue_qwalk_close(void *); 69 70 struct tree evpcount; 71 static struct timespec startup; 72 73 #define REF (int*)0xf00 74 75 static int 76 queue_fs_message_create(uint32_t *msgid) 77 { 78 char rootdir[PATH_MAX]; 79 struct stat sb; 80 81 if (!fsqueue_check_space()) 82 return 0; 83 84 again: 85 *msgid = queue_generate_msgid(); 86 87 /* prevent possible collision later when moving to Q_QUEUE */ 88 fsqueue_message_path(*msgid, rootdir, sizeof(rootdir)); 89 if (stat(rootdir, &sb) != -1) 90 goto again; 91 92 /* we hit an unexpected error, temporarily fail */ 93 if (errno != ENOENT) { 94 *msgid = 0; 95 return 0; 96 } 97 98 fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir)); 99 if (mkdir(rootdir, 0700) == -1) { 100 if (errno == EEXIST) 101 goto again; 102 103 if (errno == ENOSPC) { 104 *msgid = 0; 105 return 0; 106 } 107 108 log_warn("warn: queue-fs: mkdir"); 109 *msgid = 0; 110 return 0; 111 } 112 113 return (1); 114 } 115 116 static int 117 queue_fs_message_commit(uint32_t msgid, const char *path) 118 { 119 char incomingdir[PATH_MAX]; 120 char queuedir[PATH_MAX]; 121 char msgdir[PATH_MAX]; 122 char msgpath[PATH_MAX]; 123 124 /* before-first, move the message content in the incoming directory */ 125 fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath)); 126 if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath)) 127 >= sizeof(msgpath)) 128 return (0); 129 if (rename(path, msgpath) == -1) 130 return (0); 131 132 fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir)); 133 fsqueue_message_path(msgid, msgdir, sizeof(msgdir)); 134 if (strlcpy(queuedir, msgdir, sizeof(queuedir)) 135 >= sizeof(queuedir)) 136 return (0); 137 138 /* first attempt to rename */ 139 if (rename(incomingdir, msgdir) == 0) 140 return 1; 141 if (errno == ENOSPC) 142 return 0; 143 if (errno != ENOENT) { 144 log_warn("warn: queue-fs: rename"); 145 return 0; 146 } 147 148 /* create the bucket */ 149 *strrchr(queuedir, '/') = '\0'; 150 if (mkdir(queuedir, 0700) == -1) { 151 if (errno == ENOSPC) 152 return 0; 153 if (errno != EEXIST) { 154 log_warn("warn: queue-fs: mkdir"); 155 return 0; 156 } 157 } 158 159 /* rename */ 160 if (rename(incomingdir, msgdir) == -1) { 161 if (errno == ENOSPC) 162 return 0; 163 log_warn("warn: queue-fs: rename"); 164 return 0; 165 } 166 167 return 1; 168 } 169 170 static int 171 queue_fs_message_fd_r(uint32_t msgid) 172 { 173 int fd; 174 char path[PATH_MAX]; 175 176 fsqueue_message_path(msgid, path, sizeof(path)); 177 if (strlcat(path, PATH_MESSAGE, sizeof(path)) 178 >= sizeof(path)) 179 return -1; 180 181 if ((fd = open(path, O_RDONLY)) == -1) { 182 log_warn("warn: queue-fs: open"); 183 return -1; 184 } 185 186 return fd; 187 } 188 189 static int 190 queue_fs_message_delete(uint32_t msgid) 191 { 192 char path[PATH_MAX]; 193 struct stat sb; 194 195 fsqueue_message_incoming_path(msgid, path, sizeof(path)); 196 if (stat(path, &sb) == -1) 197 fsqueue_message_path(msgid, path, sizeof(path)); 198 199 if (rmtree(path, 0) == -1) 200 log_warn("warn: queue-fs: rmtree"); 201 202 tree_pop(&evpcount, msgid); 203 204 return 1; 205 } 206 207 static int 208 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len, 209 uint64_t *evpid) 210 { 211 char path[PATH_MAX]; 212 int queued = 0, i, r = 0, *n; 213 struct stat sb; 214 215 if (msgid == 0) { 216 log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid); 217 goto done; 218 } 219 220 fsqueue_message_incoming_path(msgid, path, sizeof(path)); 221 if (stat(path, &sb) == -1) 222 queued = 1; 223 224 for (i = 0; i < 20; i ++) { 225 *evpid = queue_generate_evpid(msgid); 226 if (queued) 227 fsqueue_envelope_path(*evpid, path, sizeof(path)); 228 else 229 fsqueue_envelope_incoming_path(*evpid, path, 230 sizeof(path)); 231 232 r = fsqueue_envelope_dump(path, buf, len, 0, 0); 233 if (r >= 0) 234 goto done; 235 } 236 r = 0; 237 log_warnx("warn: queue-fs: could not allocate evpid"); 238 239 done: 240 if (r) { 241 n = tree_pop(&evpcount, msgid); 242 if (n == NULL) 243 n = REF; 244 n += 1; 245 tree_xset(&evpcount, msgid, n); 246 } 247 return (r); 248 } 249 250 static int 251 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len) 252 { 253 char pathname[PATH_MAX]; 254 FILE *fp; 255 size_t r; 256 257 fsqueue_envelope_path(evpid, pathname, sizeof(pathname)); 258 259 fp = fopen(pathname, "r"); 260 if (fp == NULL) { 261 if (errno != ENOENT && errno != ENFILE) 262 log_warn("warn: queue-fs: fopen"); 263 return 0; 264 } 265 266 r = fread(buf, 1, len, fp); 267 if (r) { 268 if (r == len) { 269 log_warn("warn: queue-fs: too large"); 270 r = 0; 271 } 272 else 273 buf[r] = '\0'; 274 } 275 fclose(fp); 276 277 return (r); 278 } 279 280 static int 281 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len) 282 { 283 char dest[PATH_MAX]; 284 285 fsqueue_envelope_path(evpid, dest, sizeof(dest)); 286 287 return (fsqueue_envelope_dump(dest, buf, len, 1, 1)); 288 } 289 290 static int 291 queue_fs_envelope_delete(uint64_t evpid) 292 { 293 char pathname[PATH_MAX]; 294 uint32_t msgid; 295 int *n; 296 297 fsqueue_envelope_path(evpid, pathname, sizeof(pathname)); 298 if (unlink(pathname) == -1) 299 if (errno != ENOENT) 300 return 0; 301 302 msgid = evpid_to_msgid(evpid); 303 n = tree_pop(&evpcount, msgid); 304 n -= 1; 305 306 if (n - REF == 0) 307 queue_fs_message_delete(msgid); 308 else 309 tree_xset(&evpcount, msgid, n); 310 311 return (1); 312 } 313 314 static int 315 queue_fs_message_walk(uint64_t *evpid, char *buf, size_t len, 316 uint32_t msgid, int *done, void **data) 317 { 318 struct dirent *dp; 319 DIR *dir = *data; 320 char path[PATH_MAX]; 321 char msgid_str[9]; 322 char *tmp; 323 int r, *n; 324 325 if (*done) 326 return (-1); 327 328 if (!bsnprintf(path, sizeof path, "%s/%02x/%08x", 329 PATH_QUEUE, (msgid & 0xff000000) >> 24, msgid)) 330 fatalx("queue_fs_message_walk: path does not fit buffer"); 331 332 if (dir == NULL) { 333 if ((dir = opendir(path)) == NULL) { 334 log_warn("warn: queue_fs: opendir: %s", path); 335 *done = 1; 336 return (-1); 337 } 338 339 *data = dir; 340 } 341 342 (void)snprintf(msgid_str, sizeof msgid_str, "%08" PRIx32, msgid); 343 while ((dp = readdir(dir)) != NULL) { 344 if (dp->d_type != DT_REG) 345 continue; 346 347 /* ignore files other than envelopes */ 348 if (strlen(dp->d_name) != 16 || 349 strncmp(dp->d_name, msgid_str, 8)) 350 continue; 351 352 tmp = NULL; 353 *evpid = strtoull(dp->d_name, &tmp, 16); 354 if (tmp && *tmp != '\0') { 355 log_debug("debug: fsqueue: bogus file %s", dp->d_name); 356 continue; 357 } 358 359 memset(buf, 0, len); 360 r = queue_fs_envelope_load(*evpid, buf, len); 361 if (r) { 362 n = tree_pop(&evpcount, msgid); 363 if (n == NULL) 364 n = REF; 365 366 n += 1; 367 tree_xset(&evpcount, msgid, n); 368 } 369 370 return (r); 371 } 372 373 (void)closedir(dir); 374 *done = 1; 375 return (-1); 376 } 377 378 static int 379 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len) 380 { 381 static int done = 0; 382 static void *hdl = NULL; 383 int r, *n; 384 uint32_t msgid; 385 386 if (done) 387 return (-1); 388 389 if (hdl == NULL) 390 hdl = fsqueue_qwalk_new(); 391 392 if (fsqueue_qwalk(hdl, evpid)) { 393 memset(buf, 0, len); 394 r = queue_fs_envelope_load(*evpid, buf, len); 395 if (r) { 396 msgid = evpid_to_msgid(*evpid); 397 n = tree_pop(&evpcount, msgid); 398 if (n == NULL) 399 n = REF; 400 n += 1; 401 tree_xset(&evpcount, msgid, n); 402 } 403 return (r); 404 } 405 406 fsqueue_qwalk_close(hdl); 407 done = 1; 408 return (-1); 409 } 410 411 static int 412 fsqueue_check_space(void) 413 { 414 struct statfs buf; 415 uint64_t used; 416 uint64_t total; 417 418 if (statfs(PATH_QUEUE, &buf) < 0) { 419 log_warn("warn: queue-fs: statfs"); 420 return 0; 421 } 422 423 /* 424 * f_bfree and f_ffree is not set on all filesystems. 425 * They could be signed or unsigned integers. 426 * Some systems will set them to 0, others will set them to -1. 427 */ 428 if (buf.f_bfree == 0 || buf.f_ffree == 0 || 429 (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1) 430 return 1; 431 432 used = buf.f_blocks - buf.f_bfree; 433 total = buf.f_bavail + used; 434 if (total != 0) 435 used = (float)used / (float)total * 100; 436 else 437 used = 100; 438 if (100 - used < MINSPACE) { 439 log_warnx("warn: not enough disk space: %llu%% left", 440 (unsigned long long) 100 - used); 441 log_warnx("warn: temporarily rejecting messages"); 442 return 0; 443 } 444 445 used = buf.f_files - buf.f_ffree; 446 total = buf.f_favail + used; 447 if (total != 0) 448 used = (float)used / (float)total * 100; 449 else 450 used = 100; 451 if (100 - used < MININODES) { 452 log_warnx("warn: not enough inodes: %llu%% left", 453 (unsigned long long) 100 - used); 454 log_warnx("warn: temporarily rejecting messages"); 455 return 0; 456 } 457 458 return 1; 459 } 460 461 static void 462 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len) 463 { 464 if (!bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64, 465 PATH_QUEUE, 466 (evpid_to_msgid(evpid) & 0xff000000) >> 24, 467 evpid_to_msgid(evpid), 468 evpid)) 469 fatalx("fsqueue_envelope_path: path does not fit buffer"); 470 } 471 472 static void 473 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len) 474 { 475 if (!bsnprintf(buf, len, "%s/%08x/%016" PRIx64, 476 PATH_INCOMING, 477 evpid_to_msgid(evpid), 478 evpid)) 479 fatalx("fsqueue_envelope_incoming_path: path does not fit buffer"); 480 } 481 482 static int 483 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen, 484 int do_atomic, int do_sync) 485 { 486 const char *path = do_atomic ? PATH_EVPTMP : dest; 487 FILE *fp = NULL; 488 int fd; 489 size_t w; 490 491 if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) { 492 log_warn("warn: queue-fs: open"); 493 goto tempfail; 494 } 495 496 if ((fp = fdopen(fd, "w")) == NULL) { 497 log_warn("warn: queue-fs: fdopen"); 498 goto tempfail; 499 } 500 501 w = fwrite(evpbuf, 1, evplen, fp); 502 if (w < evplen) { 503 log_warn("warn: queue-fs: short write"); 504 goto tempfail; 505 } 506 if (fflush(fp)) { 507 log_warn("warn: queue-fs: fflush"); 508 goto tempfail; 509 } 510 if (do_sync && fsync(fileno(fp))) { 511 log_warn("warn: queue-fs: fsync"); 512 goto tempfail; 513 } 514 if (fclose(fp) != 0) { 515 log_warn("warn: queue-fs: fclose"); 516 fp = NULL; 517 goto tempfail; 518 } 519 fp = NULL; 520 fd = -1; 521 522 if (do_atomic && rename(path, dest) == -1) { 523 log_warn("warn: queue-fs: rename"); 524 goto tempfail; 525 } 526 return (1); 527 528 tempfail: 529 if (fp) 530 fclose(fp); 531 else if (fd != -1) 532 close(fd); 533 if (unlink(path) == -1) 534 log_warn("warn: queue-fs: unlink"); 535 return (0); 536 } 537 538 static void 539 fsqueue_message_path(uint32_t msgid, char *buf, size_t len) 540 { 541 if (!bsnprintf(buf, len, "%s/%02x/%08x", 542 PATH_QUEUE, 543 (msgid & 0xff000000) >> 24, 544 msgid)) 545 fatalx("fsqueue_message_path: path does not fit buffer"); 546 } 547 548 static void 549 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len) 550 { 551 if (!bsnprintf(buf, len, "%s/%08x", 552 PATH_INCOMING, 553 msgid)) 554 fatalx("fsqueue_message_incoming_path: path does not fit buffer"); 555 } 556 557 static void * 558 fsqueue_qwalk_new(void) 559 { 560 char path[PATH_MAX]; 561 char * const path_argv[] = { path, NULL }; 562 struct qwalk *q; 563 564 q = xcalloc(1, sizeof(*q)); 565 (void)strlcpy(path, PATH_QUEUE, sizeof(path)); 566 q->fts = fts_open(path_argv, 567 FTS_PHYSICAL | FTS_NOCHDIR, NULL); 568 569 if (q->fts == NULL) 570 err(1, "fsqueue_qwalk_new: fts_open: %s", path); 571 572 return (q); 573 } 574 575 static void 576 fsqueue_qwalk_close(void *hdl) 577 { 578 struct qwalk *q = hdl; 579 580 fts_close(q->fts); 581 582 free(q); 583 } 584 585 static int 586 fsqueue_qwalk(void *hdl, uint64_t *evpid) 587 { 588 struct qwalk *q = hdl; 589 FTSENT *e; 590 char *tmp; 591 592 while ((e = fts_read(q->fts)) != NULL) { 593 switch (e->fts_info) { 594 case FTS_D: 595 q->depth += 1; 596 if (q->depth == 2 && e->fts_namelen != 2) { 597 log_debug("debug: fsqueue: bogus directory %s", 598 e->fts_path); 599 fts_set(q->fts, e, FTS_SKIP); 600 break; 601 } 602 if (q->depth == 3 && e->fts_namelen != 8) { 603 log_debug("debug: fsqueue: bogus directory %s", 604 e->fts_path); 605 fts_set(q->fts, e, FTS_SKIP); 606 break; 607 } 608 break; 609 610 case FTS_DP: 611 case FTS_DNR: 612 q->depth -= 1; 613 break; 614 615 case FTS_F: 616 if (q->depth != 3) 617 break; 618 if (e->fts_namelen != 16) 619 break; 620 if (timespeccmp(&e->fts_statp->st_mtim, &startup, >)) 621 break; 622 tmp = NULL; 623 *evpid = strtoull(e->fts_name, &tmp, 16); 624 if (tmp && *tmp != '\0') { 625 log_debug("debug: fsqueue: bogus file %s", 626 e->fts_path); 627 break; 628 } 629 return (1); 630 default: 631 break; 632 } 633 } 634 635 return (0); 636 } 637 638 static int 639 queue_fs_init(struct passwd *pw, int server, const char *conf) 640 { 641 unsigned int n; 642 char *paths[] = { PATH_QUEUE, PATH_INCOMING }; 643 char path[PATH_MAX]; 644 int ret; 645 646 /* remove incoming/ if it exists */ 647 if (server) 648 mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE); 649 650 fsqueue_envelope_path(0, path, sizeof(path)); 651 652 ret = 1; 653 for (n = 0; n < nitems(paths); n++) { 654 (void)strlcpy(path, PATH_SPOOL, sizeof(path)); 655 if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path)) 656 errx(1, "path too long %s%s", PATH_SPOOL, paths[n]); 657 if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0) 658 ret = 0; 659 } 660 661 if (clock_gettime(CLOCK_REALTIME, &startup)) 662 err(1, "clock_gettime"); 663 664 tree_init(&evpcount); 665 666 queue_api_on_message_create(queue_fs_message_create); 667 queue_api_on_message_commit(queue_fs_message_commit); 668 queue_api_on_message_delete(queue_fs_message_delete); 669 queue_api_on_message_fd_r(queue_fs_message_fd_r); 670 queue_api_on_envelope_create(queue_fs_envelope_create); 671 queue_api_on_envelope_delete(queue_fs_envelope_delete); 672 queue_api_on_envelope_update(queue_fs_envelope_update); 673 queue_api_on_envelope_load(queue_fs_envelope_load); 674 queue_api_on_envelope_walk(queue_fs_envelope_walk); 675 queue_api_on_message_walk(queue_fs_message_walk); 676 677 return (ret); 678 } 679 680 struct queue_backend queue_backend_fs = { 681 queue_fs_init, 682 }; 683