1 /* $OpenBSD: queue_fs.c,v 1.9 2015/10/12 22:29:49 gilles Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/mount.h> 21 #include <sys/queue.h> 22 #include <sys/tree.h> 23 #include <sys/socket.h> 24 #include <sys/stat.h> 25 26 #include <ctype.h> 27 #include <err.h> 28 #include <errno.h> 29 #include <event.h> 30 #include <fcntl.h> 31 #include <fts.h> 32 #include <imsg.h> 33 #include <inttypes.h> 34 #include <libgen.h> 35 #include <pwd.h> 36 #include <stdio.h> 37 #include <stdlib.h> 38 #include <string.h> 39 #include <time.h> 40 #include <unistd.h> 41 42 #include "smtpd.h" 43 #include "log.h" 44 45 #define PATH_QUEUE "/queue" 46 #define PATH_CORRUPT "/corrupt" 47 #define PATH_INCOMING "/incoming" 48 #define PATH_EVPTMP PATH_INCOMING "/envelope.tmp" 49 #define PATH_MESSAGE "/message" 50 51 /* percentage of remaining space / inodes required to accept new messages */ 52 #define MINSPACE 5 53 #define MININODES 5 54 55 struct qwalk { 56 FTS *fts; 57 int depth; 58 }; 59 60 static int fsqueue_check_space(void); 61 static void fsqueue_envelope_path(uint64_t, char *, size_t); 62 static void fsqueue_envelope_incoming_path(uint64_t, char *, size_t); 63 static int fsqueue_envelope_dump(char *, const char *, size_t, int, int); 64 static void fsqueue_message_path(uint32_t, char *, size_t); 65 static void fsqueue_message_corrupt_path(uint32_t, char *, size_t); 66 static void fsqueue_message_incoming_path(uint32_t, char *, size_t); 67 static void *fsqueue_qwalk_new(void); 68 static int fsqueue_qwalk(void *, uint64_t *); 69 static void fsqueue_qwalk_close(void *); 70 71 struct tree evpcount; 72 static struct timespec startup; 73 74 #define REF (int*)0xf00 75 76 static int 77 queue_fs_message_create(uint32_t *msgid) 78 { 79 char rootdir[PATH_MAX]; 80 struct stat sb; 81 82 if (! fsqueue_check_space()) 83 return 0; 84 85 again: 86 *msgid = queue_generate_msgid(); 87 88 /* prevent possible collision later when moving to Q_QUEUE */ 89 fsqueue_message_path(*msgid, rootdir, sizeof(rootdir)); 90 if (stat(rootdir, &sb) != -1) 91 goto again; 92 93 /* we hit an unexpected error, temporarily fail */ 94 if (errno != ENOENT) { 95 *msgid = 0; 96 return 0; 97 } 98 99 fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir)); 100 if (mkdir(rootdir, 0700) == -1) { 101 if (errno == EEXIST) 102 goto again; 103 104 if (errno == ENOSPC) { 105 *msgid = 0; 106 return 0; 107 } 108 109 log_warn("warn: queue-fs: mkdir"); 110 *msgid = 0; 111 return 0; 112 } 113 114 return (1); 115 } 116 117 static int 118 queue_fs_message_commit(uint32_t msgid, const char *path) 119 { 120 char incomingdir[PATH_MAX]; 121 char queuedir[PATH_MAX]; 122 char msgdir[PATH_MAX]; 123 char msgpath[PATH_MAX]; 124 125 /* before-first, move the message content in the incoming directory */ 126 fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath)); 127 if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath)) 128 >= sizeof(msgpath)) 129 return (0); 130 if (rename(path, msgpath) == -1) 131 return (0); 132 133 fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir)); 134 fsqueue_message_path(msgid, msgdir, sizeof(msgdir)); 135 if (strlcpy(queuedir, msgdir, sizeof(queuedir)) 136 >= sizeof(queuedir)) 137 return (0); 138 139 /* first attempt to rename */ 140 if (rename(incomingdir, msgdir) == 0) 141 return 1; 142 if (errno == ENOSPC) 143 return 0; 144 if (errno != ENOENT) { 145 log_warn("warn: queue-fs: rename"); 146 return 0; 147 } 148 149 /* create the bucket */ 150 *strrchr(queuedir, '/') = '\0'; 151 if (mkdir(queuedir, 0700) == -1) { 152 if (errno == ENOSPC) 153 return 0; 154 if (errno != EEXIST) { 155 log_warn("warn: queue-fs: mkdir"); 156 return 0; 157 } 158 } 159 160 /* rename */ 161 if (rename(incomingdir, msgdir) == -1) { 162 if (errno == ENOSPC) 163 return 0; 164 log_warn("warn: queue-fs: rename"); 165 return 0; 166 } 167 168 return 1; 169 } 170 171 static int 172 queue_fs_message_fd_r(uint32_t msgid) 173 { 174 int fd; 175 char path[PATH_MAX]; 176 177 fsqueue_message_path(msgid, path, sizeof(path)); 178 if (strlcat(path, PATH_MESSAGE, sizeof(path)) 179 >= sizeof(path)) 180 return -1; 181 182 if ((fd = open(path, O_RDONLY)) == -1) { 183 log_warn("warn: queue-fs: open"); 184 return -1; 185 } 186 187 return fd; 188 } 189 190 static int 191 queue_fs_message_delete(uint32_t msgid) 192 { 193 char path[PATH_MAX]; 194 struct stat sb; 195 196 fsqueue_message_incoming_path(msgid, path, sizeof(path)); 197 if (stat(path, &sb) == -1) 198 fsqueue_message_path(msgid, path, sizeof(path)); 199 200 if (rmtree(path, 0) == -1) 201 log_warn("warn: queue-fs: rmtree"); 202 203 tree_pop(&evpcount, msgid); 204 205 return 1; 206 } 207 208 static int 209 queue_fs_message_corrupt(uint32_t msgid) 210 { 211 struct stat sb; 212 char rootdir[PATH_MAX]; 213 char corruptdir[PATH_MAX]; 214 char buf[64]; 215 int retry = 0; 216 217 fsqueue_message_path(msgid, rootdir, sizeof(rootdir)); 218 fsqueue_message_corrupt_path(msgid, corruptdir, 219 sizeof(corruptdir)); 220 221 again: 222 if (stat(corruptdir, &sb) != -1 || errno != ENOENT) { 223 fsqueue_message_corrupt_path(msgid, corruptdir, 224 sizeof(corruptdir)); 225 (void)snprintf(buf, sizeof (buf), ".%d", retry++); 226 (void)strlcat(corruptdir, buf, sizeof(corruptdir)); 227 goto again; 228 } 229 230 if (rename(rootdir, corruptdir) == -1) { 231 log_warn("warn: queue-fs: rename"); 232 return 0; 233 } 234 235 tree_pop(&evpcount, msgid); 236 237 return 1; 238 } 239 240 static int 241 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len, 242 uint64_t *evpid) 243 { 244 char path[PATH_MAX]; 245 int queued = 0, i, r = 0, *n; 246 struct stat sb; 247 248 if (msgid == 0) { 249 log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid); 250 goto done; 251 } 252 253 fsqueue_message_incoming_path(msgid, path, sizeof(path)); 254 if (stat(path, &sb) == -1) 255 queued = 1; 256 257 for (i = 0; i < 20; i ++) { 258 *evpid = queue_generate_evpid(msgid); 259 if (queued) 260 fsqueue_envelope_path(*evpid, path, sizeof(path)); 261 else 262 fsqueue_envelope_incoming_path(*evpid, path, 263 sizeof(path)); 264 265 r = fsqueue_envelope_dump(path, buf, len, 0, 0); 266 if (r >= 0) 267 goto done; 268 } 269 r = 0; 270 log_warnx("warn: queue-fs: could not allocate evpid"); 271 272 done: 273 if (r) { 274 n = tree_pop(&evpcount, msgid); 275 if (n == NULL) 276 n = REF; 277 n += 1; 278 tree_xset(&evpcount, msgid, n); 279 } 280 return (r); 281 } 282 283 static int 284 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len) 285 { 286 char pathname[PATH_MAX]; 287 FILE *fp; 288 size_t r; 289 290 fsqueue_envelope_path(evpid, pathname, sizeof(pathname)); 291 292 fp = fopen(pathname, "r"); 293 if (fp == NULL) { 294 if (errno != ENOENT && errno != ENFILE) 295 log_warn("warn: queue-fs: fopen"); 296 return 0; 297 } 298 299 r = fread(buf, 1, len, fp); 300 if (r) { 301 if (r == len) { 302 log_warn("warn: queue-fs: too large"); 303 r = 0; 304 } 305 else 306 buf[r] = '\0'; 307 } 308 fclose(fp); 309 310 return (r); 311 } 312 313 static int 314 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len) 315 { 316 char dest[PATH_MAX]; 317 318 fsqueue_envelope_path(evpid, dest, sizeof(dest)); 319 320 return (fsqueue_envelope_dump(dest, buf, len, 1, 1)); 321 } 322 323 static int 324 queue_fs_envelope_delete(uint64_t evpid) 325 { 326 char pathname[PATH_MAX]; 327 uint32_t msgid; 328 int *n; 329 330 fsqueue_envelope_path(evpid, pathname, sizeof(pathname)); 331 if (unlink(pathname) == -1) 332 if (errno != ENOENT) 333 return 0; 334 335 msgid = evpid_to_msgid(evpid); 336 n = tree_pop(&evpcount, msgid); 337 n -= 1; 338 339 if (n - REF == 0) 340 queue_fs_message_delete(msgid); 341 else 342 tree_xset(&evpcount, msgid, n); 343 344 return (1); 345 } 346 347 static int 348 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len) 349 { 350 static int done = 0; 351 static void *hdl = NULL; 352 int r, *n; 353 uint32_t msgid; 354 355 if (done) 356 return (-1); 357 358 if (hdl == NULL) 359 hdl = fsqueue_qwalk_new(); 360 361 if (fsqueue_qwalk(hdl, evpid)) { 362 memset(buf, 0, len); 363 r = queue_fs_envelope_load(*evpid, buf, len); 364 if (r) { 365 msgid = evpid_to_msgid(*evpid); 366 n = tree_pop(&evpcount, msgid); 367 if (n == NULL) 368 n = REF; 369 n += 1; 370 tree_xset(&evpcount, msgid, n); 371 } 372 return (r); 373 } 374 375 fsqueue_qwalk_close(hdl); 376 done = 1; 377 return (-1); 378 } 379 380 static int 381 fsqueue_check_space(void) 382 { 383 struct statfs buf; 384 uint64_t used; 385 uint64_t total; 386 387 if (statfs(PATH_QUEUE, &buf) < 0) { 388 log_warn("warn: queue-fs: statfs"); 389 return 0; 390 } 391 392 /* 393 * f_bfree and f_ffree is not set on all filesystems. 394 * They could be signed or unsigned integers. 395 * Some systems will set them to 0, others will set them to -1. 396 */ 397 if (buf.f_bfree == 0 || buf.f_ffree == 0 || 398 (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1) 399 return 1; 400 401 used = buf.f_blocks - buf.f_bfree; 402 total = buf.f_bavail + used; 403 if (total != 0) 404 used = (float)used / (float)total * 100; 405 else 406 used = 100; 407 if (100 - used < MINSPACE) { 408 log_warnx("warn: not enough disk space: %llu%% left", 409 (unsigned long long) 100 - used); 410 log_warnx("warn: temporarily rejecting messages"); 411 return 0; 412 } 413 414 used = buf.f_files - buf.f_ffree; 415 total = buf.f_favail + used; 416 if (total != 0) 417 used = (float)used / (float)total * 100; 418 else 419 used = 100; 420 if (100 - used < MININODES) { 421 log_warnx("warn: not enough inodes: %llu%% left", 422 (unsigned long long) 100 - used); 423 log_warnx("warn: temporarily rejecting messages"); 424 return 0; 425 } 426 427 return 1; 428 } 429 430 static void 431 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len) 432 { 433 if (! bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64, 434 PATH_QUEUE, 435 (evpid_to_msgid(evpid) & 0xff000000) >> 24, 436 evpid_to_msgid(evpid), 437 evpid)) 438 fatalx("fsqueue_envelope_path: path does not fit buffer"); 439 } 440 441 static void 442 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len) 443 { 444 if (! bsnprintf(buf, len, "%s/%08x/%016" PRIx64, 445 PATH_INCOMING, 446 evpid_to_msgid(evpid), 447 evpid)) 448 fatalx("fsqueue_envelope_incoming_path: path does not fit buffer"); 449 } 450 451 static int 452 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen, 453 int do_atomic, int do_sync) 454 { 455 const char *path = do_atomic ? PATH_EVPTMP : dest; 456 FILE *fp = NULL; 457 int fd; 458 size_t w; 459 460 if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) { 461 log_warn("warn: queue-fs: open"); 462 goto tempfail; 463 } 464 465 if ((fp = fdopen(fd, "w")) == NULL) { 466 log_warn("warn: queue-fs: fdopen"); 467 goto tempfail; 468 } 469 470 w = fwrite(evpbuf, 1, evplen, fp); 471 if (w < evplen) { 472 log_warn("warn: queue-fs: short write"); 473 goto tempfail; 474 } 475 if (fflush(fp)) { 476 log_warn("warn: queue-fs: fflush"); 477 goto tempfail; 478 } 479 if (do_sync && fsync(fileno(fp))) { 480 log_warn("warn: queue-fs: fsync"); 481 goto tempfail; 482 } 483 if (fclose(fp) != 0) { 484 log_warn("warn: queue-fs: fclose"); 485 fp = NULL; 486 goto tempfail; 487 } 488 fp = NULL; 489 fd = -1; 490 491 if (do_atomic && rename(path, dest) == -1) { 492 log_warn("warn: queue-fs: rename"); 493 goto tempfail; 494 } 495 return (1); 496 497 tempfail: 498 if (fp) 499 fclose(fp); 500 else if (fd != -1) 501 close(fd); 502 if (unlink(path) == -1) 503 log_warn("warn: queue-fs: unlink"); 504 return (0); 505 } 506 507 static void 508 fsqueue_message_path(uint32_t msgid, char *buf, size_t len) 509 { 510 if (! bsnprintf(buf, len, "%s/%02x/%08x", 511 PATH_QUEUE, 512 (msgid & 0xff000000) >> 24, 513 msgid)) 514 fatalx("fsqueue_message_path: path does not fit buffer"); 515 } 516 517 static void 518 fsqueue_message_corrupt_path(uint32_t msgid, char *buf, size_t len) 519 { 520 if (! bsnprintf(buf, len, "%s/%08x", 521 PATH_CORRUPT, 522 msgid)) 523 fatalx("fsqueue_message_corrupt_path: path does not fit buffer"); 524 } 525 526 static void 527 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len) 528 { 529 if (! bsnprintf(buf, len, "%s/%08x", 530 PATH_INCOMING, 531 msgid)) 532 fatalx("fsqueue_message_incoming_path: path does not fit buffer"); 533 } 534 535 static void * 536 fsqueue_qwalk_new(void) 537 { 538 char path[PATH_MAX]; 539 char * const path_argv[] = { path, NULL }; 540 struct qwalk *q; 541 542 q = xcalloc(1, sizeof(*q), "fsqueue_qwalk_new"); 543 (void)strlcpy(path, PATH_QUEUE, sizeof(path)); 544 q->fts = fts_open(path_argv, 545 FTS_PHYSICAL | FTS_NOCHDIR, NULL); 546 547 if (q->fts == NULL) 548 err(1, "fsqueue_qwalk_new: fts_open: %s", path); 549 550 return (q); 551 } 552 553 static void 554 fsqueue_qwalk_close(void *hdl) 555 { 556 struct qwalk *q = hdl; 557 558 fts_close(q->fts); 559 560 free(q); 561 } 562 563 static int 564 fsqueue_qwalk(void *hdl, uint64_t *evpid) 565 { 566 struct qwalk *q = hdl; 567 FTSENT *e; 568 char *tmp; 569 570 while ((e = fts_read(q->fts)) != NULL) { 571 switch (e->fts_info) { 572 case FTS_D: 573 q->depth += 1; 574 if (q->depth == 2 && e->fts_namelen != 2) { 575 log_debug("debug: fsqueue: bogus directory %s", 576 e->fts_path); 577 fts_set(q->fts, e, FTS_SKIP); 578 break; 579 } 580 if (q->depth == 3 && e->fts_namelen != 8) { 581 log_debug("debug: fsqueue: bogus directory %s", 582 e->fts_path); 583 fts_set(q->fts, e, FTS_SKIP); 584 break; 585 } 586 break; 587 588 case FTS_DP: 589 case FTS_DNR: 590 q->depth -= 1; 591 break; 592 593 case FTS_F: 594 if (q->depth != 3) 595 break; 596 if (e->fts_namelen != 16) 597 break; 598 if (timespeccmp(&e->fts_statp->st_mtim, &startup, >)) 599 break; 600 tmp = NULL; 601 *evpid = strtoull(e->fts_name, &tmp, 16); 602 if (tmp && *tmp != '\0') { 603 log_debug("debug: fsqueue: bogus file %s", 604 e->fts_path); 605 break; 606 } 607 return (1); 608 default: 609 break; 610 } 611 } 612 613 return (0); 614 } 615 616 static int 617 queue_fs_init(struct passwd *pw, int server, const char *conf) 618 { 619 unsigned int n; 620 char *paths[] = { PATH_QUEUE, PATH_CORRUPT, PATH_INCOMING }; 621 char path[PATH_MAX]; 622 int ret; 623 struct timeval tv; 624 625 /* remove incoming/ if it exists */ 626 if (server) 627 mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE); 628 629 fsqueue_envelope_path(0, path, sizeof(path)); 630 631 ret = 1; 632 for (n = 0; n < nitems(paths); n++) { 633 (void)strlcpy(path, PATH_SPOOL, sizeof(path)); 634 if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path)) 635 errx(1, "path too long %s%s", PATH_SPOOL, paths[n]); 636 if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0) 637 ret = 0; 638 } 639 640 if (gettimeofday(&tv, NULL) == -1) 641 err(1, "gettimeofday"); 642 TIMEVAL_TO_TIMESPEC(&tv, &startup); 643 644 tree_init(&evpcount); 645 646 queue_api_on_message_create(queue_fs_message_create); 647 queue_api_on_message_commit(queue_fs_message_commit); 648 queue_api_on_message_delete(queue_fs_message_delete); 649 queue_api_on_message_fd_r(queue_fs_message_fd_r); 650 queue_api_on_message_corrupt(queue_fs_message_corrupt); 651 queue_api_on_envelope_create(queue_fs_envelope_create); 652 queue_api_on_envelope_delete(queue_fs_envelope_delete); 653 queue_api_on_envelope_update(queue_fs_envelope_update); 654 queue_api_on_envelope_load(queue_fs_envelope_load); 655 queue_api_on_envelope_walk(queue_fs_envelope_walk); 656 657 return (ret); 658 } 659 660 struct queue_backend queue_backend_fs = { 661 queue_fs_init, 662 }; 663