1 /* $OpenBSD: queue_backend.c,v 1.55 2015/01/20 17:37:54 deraadt Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <imsg.h> 31 #include <limits.h> 32 #include <inttypes.h> 33 #include <libgen.h> 34 #include <pwd.h> 35 #include <stdio.h> 36 #include <stdlib.h> 37 #include <string.h> 38 #include <time.h> 39 #include <unistd.h> 40 41 #include "smtpd.h" 42 #include "log.h" 43 44 static const char* envelope_validate(struct envelope *); 45 46 extern struct queue_backend queue_backend_fs; 47 extern struct queue_backend queue_backend_null; 48 extern struct queue_backend queue_backend_proc; 49 extern struct queue_backend queue_backend_ram; 50 51 static void queue_envelope_cache_add(struct envelope *); 52 static void queue_envelope_cache_update(struct envelope *); 53 static void queue_envelope_cache_del(uint64_t evpid); 54 55 TAILQ_HEAD(evplst, envelope); 56 57 static struct tree evpcache_tree; 58 static struct evplst evpcache_list; 59 static struct queue_backend *backend; 60 61 static int (*handler_close)(void); 62 static int (*handler_message_create)(uint32_t *); 63 static int (*handler_message_commit)(uint32_t, const char*); 64 static int (*handler_message_delete)(uint32_t); 65 static int (*handler_message_fd_r)(uint32_t); 66 static int (*handler_message_corrupt)(uint32_t); 67 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 68 static int (*handler_envelope_delete)(uint64_t); 69 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 70 static int (*handler_envelope_load)(uint64_t, char *, size_t); 71 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 72 73 #ifdef QUEUE_PROFILING 74 75 static struct { 76 struct timespec t0; 77 const char *name; 78 } profile; 79 80 static inline void profile_enter(const char *name) 81 { 82 if ((profiling & PROFILE_QUEUE) == 0) 83 return; 84 85 profile.name = name; 86 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 87 } 88 89 static inline void profile_leave(void) 90 { 91 struct timespec t1, dt; 92 93 if ((profiling & PROFILE_QUEUE) == 0) 94 return; 95 96 clock_gettime(CLOCK_MONOTONIC, &t1); 97 timespecsub(&t1, &profile.t0, &dt); 98 log_debug("profile-queue: %s %lld.%09ld", profile.name, 99 (long long)dt.tv_sec, dt.tv_nsec); 100 } 101 #else 102 #define profile_enter(x) do {} while (0) 103 #define profile_leave() do {} while (0) 104 #endif 105 106 static int 107 queue_message_path(uint32_t msgid, char *buf, size_t len) 108 { 109 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 110 } 111 112 int 113 queue_init(const char *name, int server) 114 { 115 struct passwd *pwq; 116 int r; 117 118 pwq = getpwnam(SMTPD_QUEUE_USER); 119 if (pwq == NULL) 120 errx(1, "unknown user %s", SMTPD_QUEUE_USER); 121 122 tree_init(&evpcache_tree); 123 TAILQ_INIT(&evpcache_list); 124 125 if (!strcmp(name, "fs")) 126 backend = &queue_backend_fs; 127 else if (!strcmp(name, "null")) 128 backend = &queue_backend_null; 129 else if (!strcmp(name, "ram")) 130 backend = &queue_backend_ram; 131 else 132 backend = &queue_backend_proc; 133 134 if (server) { 135 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 136 errx(1, "error in spool directory setup"); 137 if (ckdir(PATH_SPOOL PATH_OFFLINE, 01777, 0, 0, 1) == 0) 138 errx(1, "error in offline directory setup"); 139 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 140 errx(1, "error in purge directory setup"); 141 142 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 143 144 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 145 errx(1, "error in purge directory setup"); 146 } 147 148 r = backend->init(pwq, server, name); 149 150 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 151 152 return (r); 153 } 154 155 int 156 queue_close(void) 157 { 158 if (handler_close) 159 return (handler_close()); 160 161 return (1); 162 } 163 164 int 165 queue_message_create(uint32_t *msgid) 166 { 167 int r; 168 169 profile_enter("queue_message_create"); 170 r = handler_message_create(msgid); 171 profile_leave(); 172 173 log_trace(TRACE_QUEUE, 174 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 175 r, *msgid); 176 177 return (r); 178 } 179 180 int 181 queue_message_delete(uint32_t msgid) 182 { 183 char msgpath[PATH_MAX]; 184 int r; 185 186 profile_enter("queue_message_delete"); 187 r = handler_message_delete(msgid); 188 profile_leave(); 189 190 /* in case the message is incoming */ 191 queue_message_path(msgid, msgpath, sizeof(msgpath)); 192 unlink(msgpath); 193 194 log_trace(TRACE_QUEUE, 195 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 196 197 return (r); 198 } 199 200 int 201 queue_message_commit(uint32_t msgid) 202 { 203 int r; 204 char msgpath[PATH_MAX]; 205 char tmppath[PATH_MAX]; 206 FILE *ifp = NULL; 207 FILE *ofp = NULL; 208 209 profile_enter("queue_message_commit"); 210 211 queue_message_path(msgid, msgpath, sizeof(msgpath)); 212 213 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 214 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 215 ifp = fopen(msgpath, "r"); 216 ofp = fopen(tmppath, "w+"); 217 if (ifp == NULL || ofp == NULL) 218 goto err; 219 if (! compress_file(ifp, ofp)) 220 goto err; 221 fclose(ifp); 222 fclose(ofp); 223 ifp = NULL; 224 ofp = NULL; 225 226 if (rename(tmppath, msgpath) == -1) { 227 if (errno == ENOSPC) 228 return (0); 229 unlink(tmppath); 230 log_warn("rename"); 231 return (0); 232 } 233 } 234 235 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 236 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 237 ifp = fopen(msgpath, "r"); 238 ofp = fopen(tmppath, "w+"); 239 if (ifp == NULL || ofp == NULL) 240 goto err; 241 if (! crypto_encrypt_file(ifp, ofp)) 242 goto err; 243 fclose(ifp); 244 fclose(ofp); 245 ifp = NULL; 246 ofp = NULL; 247 248 if (rename(tmppath, msgpath) == -1) { 249 if (errno == ENOSPC) 250 return (0); 251 unlink(tmppath); 252 log_warn("rename"); 253 return (0); 254 } 255 } 256 257 r = handler_message_commit(msgid, msgpath); 258 profile_leave(); 259 260 /* in case it's not done by the backend */ 261 unlink(msgpath); 262 263 log_trace(TRACE_QUEUE, 264 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 265 msgid, r); 266 267 return (r); 268 269 err: 270 if (ifp) 271 fclose(ifp); 272 if (ofp) 273 fclose(ofp); 274 return 0; 275 } 276 277 int 278 queue_message_corrupt(uint32_t msgid) 279 { 280 int r; 281 282 profile_enter("queue_message_corrupt"); 283 r = handler_message_corrupt(msgid); 284 profile_leave(); 285 286 log_trace(TRACE_QUEUE, 287 "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r); 288 289 return (r); 290 } 291 292 int 293 queue_message_fd_r(uint32_t msgid) 294 { 295 int fdin = -1, fdout = -1, fd = -1; 296 FILE *ifp = NULL; 297 FILE *ofp = NULL; 298 299 profile_enter("queue_message_fd_r"); 300 fdin = handler_message_fd_r(msgid); 301 profile_leave(); 302 303 log_trace(TRACE_QUEUE, 304 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 305 306 if (fdin == -1) 307 return (-1); 308 309 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 310 if ((fdout = mktmpfile()) == -1) 311 goto err; 312 if ((fd = dup(fdout)) == -1) 313 goto err; 314 if ((ifp = fdopen(fdin, "r")) == NULL) 315 goto err; 316 fdin = fd; 317 fd = -1; 318 if ((ofp = fdopen(fdout, "w+")) == NULL) 319 goto err; 320 321 if (! crypto_decrypt_file(ifp, ofp)) 322 goto err; 323 324 fclose(ifp); 325 ifp = NULL; 326 fclose(ofp); 327 ofp = NULL; 328 lseek(fdin, SEEK_SET, 0); 329 } 330 331 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 332 if ((fdout = mktmpfile()) == -1) 333 goto err; 334 if ((fd = dup(fdout)) == -1) 335 goto err; 336 if ((ifp = fdopen(fdin, "r")) == NULL) 337 goto err; 338 fdin = fd; 339 fd = -1; 340 if ((ofp = fdopen(fdout, "w+")) == NULL) 341 goto err; 342 343 if (! uncompress_file(ifp, ofp)) 344 goto err; 345 346 fclose(ifp); 347 ifp = NULL; 348 fclose(ofp); 349 ofp = NULL; 350 lseek(fdin, SEEK_SET, 0); 351 } 352 353 return (fdin); 354 355 err: 356 if (fd != -1) 357 close(fd); 358 if (fdin != -1) 359 close(fdin); 360 if (fdout != -1) 361 close(fdout); 362 if (ifp) 363 fclose(ifp); 364 if (ofp) 365 fclose(ofp); 366 return -1; 367 } 368 369 int 370 queue_message_fd_rw(uint32_t msgid) 371 { 372 char buf[PATH_MAX]; 373 374 queue_message_path(msgid, buf, sizeof(buf)); 375 376 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 377 } 378 379 static int 380 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 381 { 382 char *evp; 383 size_t evplen; 384 size_t complen; 385 char compbuf[sizeof(struct envelope)]; 386 size_t enclen; 387 char encbuf[sizeof(struct envelope)]; 388 389 evp = evpbuf; 390 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 391 if (evplen == 0) 392 return (0); 393 394 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 395 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 396 if (complen == 0) 397 return (0); 398 evp = compbuf; 399 evplen = complen; 400 } 401 402 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 403 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 404 if (enclen == 0) 405 return (0); 406 evp = encbuf; 407 evplen = enclen; 408 } 409 410 memmove(evpbuf, evp, evplen); 411 412 return (evplen); 413 } 414 415 static int 416 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 417 { 418 char *evp; 419 size_t evplen; 420 char compbuf[sizeof(struct envelope)]; 421 size_t complen; 422 char encbuf[sizeof(struct envelope)]; 423 size_t enclen; 424 425 evp = evpbuf; 426 evplen = evpbufsize; 427 428 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 429 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 430 if (enclen == 0) 431 return (0); 432 evp = encbuf; 433 evplen = enclen; 434 } 435 436 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 437 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 438 if (complen == 0) 439 return (0); 440 evp = compbuf; 441 evplen = complen; 442 } 443 444 return (envelope_load_buffer(ep, evp, evplen)); 445 } 446 447 static void 448 queue_envelope_cache_add(struct envelope *e) 449 { 450 struct envelope *cached; 451 452 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 453 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 454 455 cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); 456 *cached = *e; 457 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 458 tree_xset(&evpcache_tree, e->id, cached); 459 stat_increment("queue.evpcache.size", 1); 460 } 461 462 static void 463 queue_envelope_cache_update(struct envelope *e) 464 { 465 struct envelope *cached; 466 467 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 468 queue_envelope_cache_add(e); 469 stat_increment("queue.evpcache.update.missed", 1); 470 } else { 471 TAILQ_REMOVE(&evpcache_list, cached, entry); 472 *cached = *e; 473 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 474 stat_increment("queue.evpcache.update.hit", 1); 475 } 476 } 477 478 static void 479 queue_envelope_cache_del(uint64_t evpid) 480 { 481 struct envelope *cached; 482 483 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 484 return; 485 486 TAILQ_REMOVE(&evpcache_list, cached, entry); 487 free(cached); 488 stat_decrement("queue.evpcache.size", 1); 489 } 490 491 int 492 queue_envelope_create(struct envelope *ep) 493 { 494 int r; 495 char evpbuf[sizeof(struct envelope)]; 496 size_t evplen; 497 uint64_t evpid; 498 uint32_t msgid; 499 500 ep->creation = time(NULL); 501 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 502 if (evplen == 0) 503 return (0); 504 505 evpid = ep->id; 506 msgid = evpid_to_msgid(evpid); 507 508 profile_enter("queue_envelope_create"); 509 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 510 profile_leave(); 511 512 log_trace(TRACE_QUEUE, 513 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 514 evpid, evplen, r, ep->id); 515 516 if (!r) { 517 ep->creation = 0; 518 ep->id = 0; 519 } 520 521 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 522 queue_envelope_cache_add(ep); 523 524 return (r); 525 } 526 527 int 528 queue_envelope_delete(uint64_t evpid) 529 { 530 int r; 531 532 if (env->sc_queue_flags & QUEUE_EVPCACHE) 533 queue_envelope_cache_del(evpid); 534 535 profile_enter("queue_envelope_delete"); 536 r = handler_envelope_delete(evpid); 537 profile_leave(); 538 539 log_trace(TRACE_QUEUE, 540 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 541 evpid, r); 542 543 return (r); 544 } 545 546 int 547 queue_envelope_load(uint64_t evpid, struct envelope *ep) 548 { 549 const char *e; 550 char evpbuf[sizeof(struct envelope)]; 551 size_t evplen; 552 struct envelope *cached; 553 554 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 555 (cached = tree_get(&evpcache_tree, evpid))) { 556 *ep = *cached; 557 stat_increment("queue.evpcache.load.hit", 1); 558 return (1); 559 } 560 561 ep->id = evpid; 562 profile_enter("queue_envelope_load"); 563 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 564 profile_leave(); 565 566 log_trace(TRACE_QUEUE, 567 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 568 evpid, evplen); 569 570 if (evplen == 0) 571 return (0); 572 573 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 574 if ((e = envelope_validate(ep)) == NULL) { 575 ep->id = evpid; 576 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 577 queue_envelope_cache_add(ep); 578 stat_increment("queue.evpcache.load.missed", 1); 579 } 580 return (1); 581 } 582 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 583 ep->id, e); 584 } 585 586 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 587 return (0); 588 } 589 590 int 591 queue_envelope_update(struct envelope *ep) 592 { 593 char evpbuf[sizeof(struct envelope)]; 594 size_t evplen; 595 int r; 596 597 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 598 if (evplen == 0) 599 return (0); 600 601 profile_enter("queue_envelope_update"); 602 r = handler_envelope_update(ep->id, evpbuf, evplen); 603 profile_leave(); 604 605 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 606 queue_envelope_cache_update(ep); 607 608 log_trace(TRACE_QUEUE, 609 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 610 ep->id, r); 611 612 return (r); 613 } 614 615 int 616 queue_envelope_walk(struct envelope *ep) 617 { 618 const char *e; 619 uint64_t evpid; 620 char evpbuf[sizeof(struct envelope)]; 621 int r; 622 623 profile_enter("queue_envelope_walk"); 624 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 625 profile_leave(); 626 627 log_trace(TRACE_QUEUE, 628 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 629 r, evpid); 630 631 if (r == -1) 632 return (r); 633 634 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 635 if ((e = envelope_validate(ep)) == NULL) { 636 ep->id = evpid; 637 if (env->sc_queue_flags & QUEUE_EVPCACHE) 638 queue_envelope_cache_add(ep); 639 return (1); 640 } 641 log_debug("debug: invalid envelope %016" PRIx64 ": %s", 642 ep->id, e); 643 (void)queue_message_corrupt(evpid_to_msgid(evpid)); 644 } 645 646 return (0); 647 } 648 649 uint32_t 650 queue_generate_msgid(void) 651 { 652 uint32_t msgid; 653 654 while ((msgid = arc4random()) == 0) 655 ; 656 657 return msgid; 658 } 659 660 uint64_t 661 queue_generate_evpid(uint32_t msgid) 662 { 663 uint32_t rnd; 664 uint64_t evpid; 665 666 while ((rnd = arc4random()) == 0) 667 ; 668 669 evpid = msgid; 670 evpid <<= 32; 671 evpid |= rnd; 672 673 return evpid; 674 } 675 676 static const char* 677 envelope_validate(struct envelope *ep) 678 { 679 if (ep->version != SMTPD_ENVELOPE_VERSION) 680 return "version mismatch"; 681 682 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 683 return "invalid helo"; 684 if (ep->helo[0] == '\0') 685 return "empty helo"; 686 687 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 688 return "invalid hostname"; 689 if (ep->hostname[0] == '\0') 690 return "empty hostname"; 691 692 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 693 return "invalid error line"; 694 695 return NULL; 696 } 697 698 void 699 queue_api_on_close(int(*cb)(void)) 700 { 701 handler_close = cb; 702 } 703 704 void 705 queue_api_on_message_create(int(*cb)(uint32_t *)) 706 { 707 handler_message_create = cb; 708 } 709 710 void 711 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 712 { 713 handler_message_commit = cb; 714 } 715 716 void 717 queue_api_on_message_delete(int(*cb)(uint32_t)) 718 { 719 handler_message_delete = cb; 720 } 721 722 void 723 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 724 { 725 handler_message_fd_r = cb; 726 } 727 728 void 729 queue_api_on_message_corrupt(int(*cb)(uint32_t)) 730 { 731 handler_message_corrupt = cb; 732 } 733 734 void 735 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 736 { 737 handler_envelope_create = cb; 738 } 739 740 void 741 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 742 { 743 handler_envelope_delete = cb; 744 } 745 746 void 747 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 748 { 749 handler_envelope_update = cb; 750 } 751 752 void 753 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 754 { 755 handler_envelope_load = cb; 756 } 757 758 void 759 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 760 { 761 handler_envelope_walk = cb; 762 } 763