1 /* $OpenBSD: queue_fs.c,v 1.23 2023/05/31 16:51:46 op Exp $ */
2
3 /*
4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 #include <sys/types.h>
20 #include <sys/mount.h>
21 #include <sys/stat.h>
22
23 #include <dirent.h>
24 #include <errno.h>
25 #include <fcntl.h>
26 #include <fts.h>
27 #include <inttypes.h>
28 #include <pwd.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <time.h>
32 #include <unistd.h>
33
34 #include "smtpd.h"
35 #include "log.h"
36
37 #define PATH_QUEUE "/queue"
38 #define PATH_INCOMING "/incoming"
39 #define PATH_EVPTMP PATH_INCOMING "/envelope.tmp"
40 #define PATH_MESSAGE "/message"
41
42 /* percentage of remaining space / inodes required to accept new messages */
43 #define MINSPACE 5
44 #define MININODES 5
45
46 struct qwalk {
47 FTS *fts;
48 int depth;
49 };
50
51 static int fsqueue_check_space(void);
52 static void fsqueue_envelope_path(uint64_t, char *, size_t);
53 static void fsqueue_envelope_incoming_path(uint64_t, char *, size_t);
54 static int fsqueue_envelope_dump(char *, const char *, size_t, int, int);
55 static void fsqueue_message_path(uint32_t, char *, size_t);
56 static void fsqueue_message_incoming_path(uint32_t, char *, size_t);
57 static void *fsqueue_qwalk_new(void);
58 static int fsqueue_qwalk(void *, uint64_t *);
59 static void fsqueue_qwalk_close(void *);
60
61 struct tree evpcount;
62 static struct timespec startup;
63
64 #define REF (int*)0xf00
65
66 static int
queue_fs_message_create(uint32_t * msgid)67 queue_fs_message_create(uint32_t *msgid)
68 {
69 char rootdir[PATH_MAX];
70 struct stat sb;
71
72 if (!fsqueue_check_space())
73 return 0;
74
75 again:
76 *msgid = queue_generate_msgid();
77
78 /* prevent possible collision later when moving to Q_QUEUE */
79 fsqueue_message_path(*msgid, rootdir, sizeof(rootdir));
80 if (stat(rootdir, &sb) != -1)
81 goto again;
82
83 /* we hit an unexpected error, temporarily fail */
84 if (errno != ENOENT) {
85 *msgid = 0;
86 return 0;
87 }
88
89 fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir));
90 if (mkdir(rootdir, 0700) == -1) {
91 if (errno == EEXIST)
92 goto again;
93
94 if (errno == ENOSPC) {
95 *msgid = 0;
96 return 0;
97 }
98
99 log_warn("warn: queue-fs: mkdir");
100 *msgid = 0;
101 return 0;
102 }
103
104 return (1);
105 }
106
107 static int
queue_fs_message_commit(uint32_t msgid,const char * path)108 queue_fs_message_commit(uint32_t msgid, const char *path)
109 {
110 char incomingdir[PATH_MAX];
111 char queuedir[PATH_MAX];
112 char msgdir[PATH_MAX];
113 char msgpath[PATH_MAX];
114
115 /* before-first, move the message content in the incoming directory */
116 fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath));
117 if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath))
118 >= sizeof(msgpath))
119 return (0);
120 if (rename(path, msgpath) == -1)
121 return (0);
122
123 fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir));
124 fsqueue_message_path(msgid, msgdir, sizeof(msgdir));
125 if (strlcpy(queuedir, msgdir, sizeof(queuedir))
126 >= sizeof(queuedir))
127 return (0);
128
129 /* first attempt to rename */
130 if (rename(incomingdir, msgdir) == 0)
131 return 1;
132 if (errno == ENOSPC)
133 return 0;
134 if (errno != ENOENT) {
135 log_warn("warn: queue-fs: rename");
136 return 0;
137 }
138
139 /* create the bucket */
140 *strrchr(queuedir, '/') = '\0';
141 if (mkdir(queuedir, 0700) == -1) {
142 if (errno == ENOSPC)
143 return 0;
144 if (errno != EEXIST) {
145 log_warn("warn: queue-fs: mkdir");
146 return 0;
147 }
148 }
149
150 /* rename */
151 if (rename(incomingdir, msgdir) == -1) {
152 if (errno == ENOSPC)
153 return 0;
154 log_warn("warn: queue-fs: rename");
155 return 0;
156 }
157
158 return 1;
159 }
160
161 static int
queue_fs_message_fd_r(uint32_t msgid)162 queue_fs_message_fd_r(uint32_t msgid)
163 {
164 int fd;
165 char path[PATH_MAX];
166
167 fsqueue_message_path(msgid, path, sizeof(path));
168 if (strlcat(path, PATH_MESSAGE, sizeof(path))
169 >= sizeof(path))
170 return -1;
171
172 if ((fd = open(path, O_RDONLY)) == -1) {
173 log_warn("warn: queue-fs: open");
174 return -1;
175 }
176
177 return fd;
178 }
179
180 static int
queue_fs_message_delete(uint32_t msgid)181 queue_fs_message_delete(uint32_t msgid)
182 {
183 char path[PATH_MAX];
184 struct stat sb;
185
186 fsqueue_message_incoming_path(msgid, path, sizeof(path));
187 if (stat(path, &sb) == -1)
188 fsqueue_message_path(msgid, path, sizeof(path));
189
190 if (rmtree(path, 0) == -1)
191 log_warn("warn: queue-fs: rmtree");
192
193 tree_pop(&evpcount, msgid);
194
195 return 1;
196 }
197
198 static int
queue_fs_envelope_create(uint32_t msgid,const char * buf,size_t len,uint64_t * evpid)199 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len,
200 uint64_t *evpid)
201 {
202 char path[PATH_MAX];
203 int queued = 0, i, r = 0, *n;
204 struct stat sb;
205
206 if (msgid == 0) {
207 log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid);
208 goto done;
209 }
210
211 fsqueue_message_incoming_path(msgid, path, sizeof(path));
212 if (stat(path, &sb) == -1)
213 queued = 1;
214
215 for (i = 0; i < 20; i ++) {
216 *evpid = queue_generate_evpid(msgid);
217 if (queued)
218 fsqueue_envelope_path(*evpid, path, sizeof(path));
219 else
220 fsqueue_envelope_incoming_path(*evpid, path,
221 sizeof(path));
222
223 if ((r = fsqueue_envelope_dump(path, buf, len, 0, 0)) != 0)
224 goto done;
225 }
226 r = 0;
227 log_warnx("warn: queue-fs: could not allocate evpid");
228
229 done:
230 if (r) {
231 n = tree_pop(&evpcount, msgid);
232 if (n == NULL)
233 n = REF;
234 n += 1;
235 tree_xset(&evpcount, msgid, n);
236 }
237 return (r);
238 }
239
240 static int
queue_fs_envelope_load(uint64_t evpid,char * buf,size_t len)241 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len)
242 {
243 char pathname[PATH_MAX];
244 FILE *fp;
245 size_t r;
246
247 fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
248
249 fp = fopen(pathname, "r");
250 if (fp == NULL) {
251 if (errno != ENOENT && errno != ENFILE)
252 log_warn("warn: queue-fs: fopen");
253 return 0;
254 }
255
256 r = fread(buf, 1, len, fp);
257 if (r) {
258 if (r == len) {
259 log_warn("warn: queue-fs: too large");
260 r = 0;
261 }
262 else
263 buf[r] = '\0';
264 }
265 fclose(fp);
266
267 return (r);
268 }
269
270 static int
queue_fs_envelope_update(uint64_t evpid,const char * buf,size_t len)271 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len)
272 {
273 char dest[PATH_MAX];
274
275 fsqueue_envelope_path(evpid, dest, sizeof(dest));
276
277 return (fsqueue_envelope_dump(dest, buf, len, 1, 1));
278 }
279
280 static int
queue_fs_envelope_delete(uint64_t evpid)281 queue_fs_envelope_delete(uint64_t evpid)
282 {
283 char pathname[PATH_MAX];
284 uint32_t msgid;
285 int *n;
286
287 fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
288 if (unlink(pathname) == -1)
289 if (errno != ENOENT)
290 return 0;
291
292 msgid = evpid_to_msgid(evpid);
293 n = tree_pop(&evpcount, msgid);
294 n -= 1;
295
296 if (n - REF == 0)
297 queue_fs_message_delete(msgid);
298 else
299 tree_xset(&evpcount, msgid, n);
300
301 return (1);
302 }
303
304 static int
queue_fs_message_walk(uint64_t * evpid,char * buf,size_t len,uint32_t msgid,int * done,void ** data)305 queue_fs_message_walk(uint64_t *evpid, char *buf, size_t len,
306 uint32_t msgid, int *done, void **data)
307 {
308 struct dirent *dp;
309 DIR *dir = *data;
310 char path[PATH_MAX];
311 char msgid_str[9];
312 char *tmp;
313 int r, *n;
314
315 if (*done)
316 return (-1);
317
318 if (!bsnprintf(path, sizeof path, "%s/%02x/%08x",
319 PATH_QUEUE, (msgid & 0xff000000) >> 24, msgid))
320 fatalx("queue_fs_message_walk: path does not fit buffer");
321
322 if (dir == NULL) {
323 if ((dir = opendir(path)) == NULL) {
324 log_warn("warn: queue_fs: opendir: %s", path);
325 *done = 1;
326 return (-1);
327 }
328
329 *data = dir;
330 }
331
332 (void)snprintf(msgid_str, sizeof msgid_str, "%08" PRIx32, msgid);
333 while ((dp = readdir(dir)) != NULL) {
334 if (dp->d_type != DT_REG)
335 continue;
336
337 /* ignore files other than envelopes */
338 if (strlen(dp->d_name) != 16 ||
339 strncmp(dp->d_name, msgid_str, 8))
340 continue;
341
342 tmp = NULL;
343 *evpid = strtoull(dp->d_name, &tmp, 16);
344 if (tmp && *tmp != '\0') {
345 log_debug("debug: fsqueue: bogus file %s", dp->d_name);
346 continue;
347 }
348
349 memset(buf, 0, len);
350 r = queue_fs_envelope_load(*evpid, buf, len);
351 if (r) {
352 n = tree_pop(&evpcount, msgid);
353 if (n == NULL)
354 n = REF;
355
356 n += 1;
357 tree_xset(&evpcount, msgid, n);
358 }
359
360 return (r);
361 }
362
363 (void)closedir(dir);
364 *done = 1;
365 return (-1);
366 }
367
368 static int
queue_fs_envelope_walk(uint64_t * evpid,char * buf,size_t len)369 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len)
370 {
371 static int done = 0;
372 static void *hdl = NULL;
373 int r, *n;
374 uint32_t msgid;
375
376 if (done)
377 return (-1);
378
379 if (hdl == NULL)
380 hdl = fsqueue_qwalk_new();
381
382 if (fsqueue_qwalk(hdl, evpid)) {
383 memset(buf, 0, len);
384 r = queue_fs_envelope_load(*evpid, buf, len);
385 if (r) {
386 msgid = evpid_to_msgid(*evpid);
387 n = tree_pop(&evpcount, msgid);
388 if (n == NULL)
389 n = REF;
390 n += 1;
391 tree_xset(&evpcount, msgid, n);
392 }
393 return (r);
394 }
395
396 fsqueue_qwalk_close(hdl);
397 done = 1;
398 return (-1);
399 }
400
401 static int
fsqueue_check_space(void)402 fsqueue_check_space(void)
403 {
404 struct statfs buf;
405 uint64_t used;
406 uint64_t total;
407
408 if (statfs(PATH_QUEUE, &buf) == -1) {
409 log_warn("warn: queue-fs: statfs");
410 return 0;
411 }
412
413 /*
414 * f_bfree and f_ffree is not set on all filesystems.
415 * They could be signed or unsigned integers.
416 * Some systems will set them to 0, others will set them to -1.
417 */
418 if (buf.f_bfree == 0 || buf.f_ffree == 0 ||
419 (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1)
420 return 1;
421
422 used = buf.f_blocks - buf.f_bfree;
423 total = buf.f_bavail + used;
424 if (total != 0)
425 used = (float)used / (float)total * 100;
426 else
427 used = 100;
428 if (100 - used < MINSPACE) {
429 log_warnx("warn: not enough disk space: %llu%% left",
430 (unsigned long long) 100 - used);
431 log_warnx("warn: temporarily rejecting messages");
432 return 0;
433 }
434
435 used = buf.f_files - buf.f_ffree;
436 total = buf.f_favail + used;
437 if (total != 0)
438 used = (float)used / (float)total * 100;
439 else
440 used = 100;
441 if (100 - used < MININODES) {
442 log_warnx("warn: not enough inodes: %llu%% left",
443 (unsigned long long) 100 - used);
444 log_warnx("warn: temporarily rejecting messages");
445 return 0;
446 }
447
448 return 1;
449 }
450
451 static void
fsqueue_envelope_path(uint64_t evpid,char * buf,size_t len)452 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len)
453 {
454 if (!bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64,
455 PATH_QUEUE,
456 (evpid_to_msgid(evpid) & 0xff000000) >> 24,
457 evpid_to_msgid(evpid),
458 evpid))
459 fatalx("fsqueue_envelope_path: path does not fit buffer");
460 }
461
462 static void
fsqueue_envelope_incoming_path(uint64_t evpid,char * buf,size_t len)463 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len)
464 {
465 if (!bsnprintf(buf, len, "%s/%08x/%016" PRIx64,
466 PATH_INCOMING,
467 evpid_to_msgid(evpid),
468 evpid))
469 fatalx("fsqueue_envelope_incoming_path: path does not fit buffer");
470 }
471
472 static int
fsqueue_envelope_dump(char * dest,const char * evpbuf,size_t evplen,int do_atomic,int do_sync)473 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen,
474 int do_atomic, int do_sync)
475 {
476 const char *path = do_atomic ? PATH_EVPTMP : dest;
477 FILE *fp = NULL;
478 int fd;
479 size_t w;
480
481 if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) {
482 log_warn("warn: queue-fs: open");
483 goto tempfail;
484 }
485
486 if ((fp = fdopen(fd, "w")) == NULL) {
487 log_warn("warn: queue-fs: fdopen");
488 goto tempfail;
489 }
490
491 w = fwrite(evpbuf, 1, evplen, fp);
492 if (w < evplen) {
493 log_warn("warn: queue-fs: short write");
494 goto tempfail;
495 }
496 if (fflush(fp)) {
497 log_warn("warn: queue-fs: fflush");
498 goto tempfail;
499 }
500 if (do_sync && fsync(fileno(fp))) {
501 log_warn("warn: queue-fs: fsync");
502 goto tempfail;
503 }
504 if (fclose(fp) != 0) {
505 log_warn("warn: queue-fs: fclose");
506 fp = NULL;
507 goto tempfail;
508 }
509 fp = NULL;
510 fd = -1;
511
512 if (do_atomic && rename(path, dest) == -1) {
513 log_warn("warn: queue-fs: rename");
514 goto tempfail;
515 }
516 return (1);
517
518 tempfail:
519 if (fp)
520 fclose(fp);
521 else if (fd != -1)
522 close(fd);
523 if (unlink(path) == -1)
524 log_warn("warn: queue-fs: unlink");
525 return (0);
526 }
527
528 static void
fsqueue_message_path(uint32_t msgid,char * buf,size_t len)529 fsqueue_message_path(uint32_t msgid, char *buf, size_t len)
530 {
531 if (!bsnprintf(buf, len, "%s/%02x/%08x",
532 PATH_QUEUE,
533 (msgid & 0xff000000) >> 24,
534 msgid))
535 fatalx("fsqueue_message_path: path does not fit buffer");
536 }
537
538 static void
fsqueue_message_incoming_path(uint32_t msgid,char * buf,size_t len)539 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
540 {
541 if (!bsnprintf(buf, len, "%s/%08x",
542 PATH_INCOMING,
543 msgid))
544 fatalx("fsqueue_message_incoming_path: path does not fit buffer");
545 }
546
547 static void *
fsqueue_qwalk_new(void)548 fsqueue_qwalk_new(void)
549 {
550 char path[PATH_MAX];
551 char * const path_argv[] = { path, NULL };
552 struct qwalk *q;
553
554 q = xcalloc(1, sizeof(*q));
555 (void)strlcpy(path, PATH_QUEUE, sizeof(path));
556 q->fts = fts_open(path_argv,
557 FTS_PHYSICAL | FTS_NOCHDIR, NULL);
558
559 if (q->fts == NULL)
560 fatal("fsqueue_qwalk_new: fts_open: %s", path);
561
562 return (q);
563 }
564
565 static void
fsqueue_qwalk_close(void * hdl)566 fsqueue_qwalk_close(void *hdl)
567 {
568 struct qwalk *q = hdl;
569
570 fts_close(q->fts);
571
572 free(q);
573 }
574
575 static int
fsqueue_qwalk(void * hdl,uint64_t * evpid)576 fsqueue_qwalk(void *hdl, uint64_t *evpid)
577 {
578 struct qwalk *q = hdl;
579 FTSENT *e;
580 char *tmp;
581
582 while ((e = fts_read(q->fts)) != NULL) {
583 switch (e->fts_info) {
584 case FTS_D:
585 q->depth += 1;
586 if (q->depth == 2 && e->fts_namelen != 2) {
587 log_debug("debug: fsqueue: bogus directory %s",
588 e->fts_path);
589 fts_set(q->fts, e, FTS_SKIP);
590 break;
591 }
592 if (q->depth == 3 && e->fts_namelen != 8) {
593 log_debug("debug: fsqueue: bogus directory %s",
594 e->fts_path);
595 fts_set(q->fts, e, FTS_SKIP);
596 break;
597 }
598 break;
599
600 case FTS_DP:
601 case FTS_DNR:
602 q->depth -= 1;
603 break;
604
605 case FTS_F:
606 if (q->depth != 3)
607 break;
608 if (e->fts_namelen != 16)
609 break;
610 if (timespeccmp(&e->fts_statp->st_mtim, &startup, >))
611 break;
612 tmp = NULL;
613 *evpid = strtoull(e->fts_name, &tmp, 16);
614 if (tmp && *tmp != '\0') {
615 log_debug("debug: fsqueue: bogus file %s",
616 e->fts_path);
617 break;
618 }
619 return (1);
620 default:
621 break;
622 }
623 }
624
625 return (0);
626 }
627
628 static int
queue_fs_init(struct passwd * pw,int server,const char * conf)629 queue_fs_init(struct passwd *pw, int server, const char *conf)
630 {
631 unsigned int n;
632 char *paths[] = { PATH_QUEUE, PATH_INCOMING };
633 char path[PATH_MAX];
634 int ret;
635
636 /* remove incoming/ if it exists */
637 if (server)
638 mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE);
639
640 fsqueue_envelope_path(0, path, sizeof(path));
641
642 ret = 1;
643 for (n = 0; n < nitems(paths); n++) {
644 (void)strlcpy(path, PATH_SPOOL, sizeof(path));
645 if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path))
646 fatalx("path too long %s%s", PATH_SPOOL, paths[n]);
647 if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0)
648 ret = 0;
649 }
650
651 if (clock_gettime(CLOCK_REALTIME, &startup))
652 fatal("clock_gettime");
653
654 tree_init(&evpcount);
655
656 queue_api_on_message_create(queue_fs_message_create);
657 queue_api_on_message_commit(queue_fs_message_commit);
658 queue_api_on_message_delete(queue_fs_message_delete);
659 queue_api_on_message_fd_r(queue_fs_message_fd_r);
660 queue_api_on_envelope_create(queue_fs_envelope_create);
661 queue_api_on_envelope_delete(queue_fs_envelope_delete);
662 queue_api_on_envelope_update(queue_fs_envelope_update);
663 queue_api_on_envelope_load(queue_fs_envelope_load);
664 queue_api_on_envelope_walk(queue_fs_envelope_walk);
665 queue_api_on_message_walk(queue_fs_message_walk);
666
667 return (ret);
668 }
669
670 struct queue_backend queue_backend_fs = {
671 queue_fs_init,
672 };
673