xref: /openbsd/usr.sbin/smtpd/queue_fs.c (revision 771fbea0)
1 /*	$OpenBSD: queue_fs.c,v 1.21 2021/05/26 18:08:55 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/mount.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/socket.h>
24 #include <sys/stat.h>
25 
26 #include <ctype.h>
27 #include <dirent.h>
28 #include <errno.h>
29 #include <event.h>
30 #include <fcntl.h>
31 #include <fts.h>
32 #include <imsg.h>
33 #include <inttypes.h>
34 #include <pwd.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 
41 #include "smtpd.h"
42 #include "log.h"
43 
44 #define PATH_QUEUE		"/queue"
45 #define PATH_INCOMING		"/incoming"
46 #define PATH_EVPTMP		PATH_INCOMING "/envelope.tmp"
47 #define PATH_MESSAGE		"/message"
48 
49 /* percentage of remaining space / inodes required to accept new messages */
50 #define	MINSPACE		5
51 #define	MININODES		5
52 
53 struct qwalk {
54 	FTS	*fts;
55 	int	 depth;
56 };
57 
58 static int	fsqueue_check_space(void);
59 static void	fsqueue_envelope_path(uint64_t, char *, size_t);
60 static void	fsqueue_envelope_incoming_path(uint64_t, char *, size_t);
61 static int	fsqueue_envelope_dump(char *, const char *, size_t, int, int);
62 static void	fsqueue_message_path(uint32_t, char *, size_t);
63 static void	fsqueue_message_incoming_path(uint32_t, char *, size_t);
64 static void    *fsqueue_qwalk_new(void);
65 static int	fsqueue_qwalk(void *, uint64_t *);
66 static void	fsqueue_qwalk_close(void *);
67 
68 struct tree evpcount;
69 static struct timespec startup;
70 
71 #define REF	(int*)0xf00
72 
73 static int
74 queue_fs_message_create(uint32_t *msgid)
75 {
76 	char		rootdir[PATH_MAX];
77 	struct stat	sb;
78 
79 	if (!fsqueue_check_space())
80 		return 0;
81 
82 again:
83 	*msgid = queue_generate_msgid();
84 
85 	/* prevent possible collision later when moving to Q_QUEUE */
86 	fsqueue_message_path(*msgid, rootdir, sizeof(rootdir));
87 	if (stat(rootdir, &sb) != -1)
88 		goto again;
89 
90 	/* we hit an unexpected error, temporarily fail */
91 	if (errno != ENOENT) {
92 		*msgid = 0;
93 		return 0;
94 	}
95 
96 	fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir));
97 	if (mkdir(rootdir, 0700) == -1) {
98 		if (errno == EEXIST)
99 			goto again;
100 
101 		if (errno == ENOSPC) {
102 			*msgid = 0;
103 			return 0;
104 		}
105 
106 		log_warn("warn: queue-fs: mkdir");
107 		*msgid = 0;
108 		return 0;
109 	}
110 
111 	return (1);
112 }
113 
114 static int
115 queue_fs_message_commit(uint32_t msgid, const char *path)
116 {
117 	char incomingdir[PATH_MAX];
118 	char queuedir[PATH_MAX];
119 	char msgdir[PATH_MAX];
120 	char msgpath[PATH_MAX];
121 
122 	/* before-first, move the message content in the incoming directory */
123 	fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath));
124 	if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath))
125 	    >= sizeof(msgpath))
126 		return (0);
127 	if (rename(path, msgpath) == -1)
128 		return (0);
129 
130 	fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir));
131 	fsqueue_message_path(msgid, msgdir, sizeof(msgdir));
132 	if (strlcpy(queuedir, msgdir, sizeof(queuedir))
133 	    >= sizeof(queuedir))
134 		return (0);
135 
136 	/* first attempt to rename */
137 	if (rename(incomingdir, msgdir) == 0)
138 		return 1;
139 	if (errno == ENOSPC)
140 		return 0;
141 	if (errno != ENOENT) {
142 		log_warn("warn: queue-fs: rename");
143 		return 0;
144 	}
145 
146 	/* create the bucket */
147 	*strrchr(queuedir, '/') = '\0';
148 	if (mkdir(queuedir, 0700) == -1) {
149 		if (errno == ENOSPC)
150 			return 0;
151 		if (errno != EEXIST) {
152 			log_warn("warn: queue-fs: mkdir");
153 			return 0;
154 		}
155 	}
156 
157 	/* rename */
158 	if (rename(incomingdir, msgdir) == -1) {
159 		if (errno == ENOSPC)
160 			return 0;
161 		log_warn("warn: queue-fs: rename");
162 		return 0;
163 	}
164 
165 	return 1;
166 }
167 
168 static int
169 queue_fs_message_fd_r(uint32_t msgid)
170 {
171 	int fd;
172 	char path[PATH_MAX];
173 
174 	fsqueue_message_path(msgid, path, sizeof(path));
175 	if (strlcat(path, PATH_MESSAGE, sizeof(path))
176 	    >= sizeof(path))
177 		return -1;
178 
179 	if ((fd = open(path, O_RDONLY)) == -1) {
180 		log_warn("warn: queue-fs: open");
181 		return -1;
182 	}
183 
184 	return fd;
185 }
186 
187 static int
188 queue_fs_message_delete(uint32_t msgid)
189 {
190 	char		path[PATH_MAX];
191 	struct stat	sb;
192 
193 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
194 	if (stat(path, &sb) == -1)
195 		fsqueue_message_path(msgid, path, sizeof(path));
196 
197 	if (rmtree(path, 0) == -1)
198 		log_warn("warn: queue-fs: rmtree");
199 
200 	tree_pop(&evpcount, msgid);
201 
202 	return 1;
203 }
204 
205 static int
206 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len,
207     uint64_t *evpid)
208 {
209 	char		path[PATH_MAX];
210 	int		queued = 0, i, r = 0, *n;
211 	struct stat	sb;
212 
213 	if (msgid == 0) {
214 		log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid);
215 		goto done;
216 	}
217 
218 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
219 	if (stat(path, &sb) == -1)
220 		queued = 1;
221 
222 	for (i = 0; i < 20; i ++) {
223 		*evpid = queue_generate_evpid(msgid);
224 		if (queued)
225 			fsqueue_envelope_path(*evpid, path, sizeof(path));
226 		else
227 			fsqueue_envelope_incoming_path(*evpid, path,
228 			    sizeof(path));
229 
230 		if ((r = fsqueue_envelope_dump(path, buf, len, 0, 0)) != 0)
231 			goto done;
232 	}
233 	r = 0;
234 	log_warnx("warn: queue-fs: could not allocate evpid");
235 
236 done:
237 	if (r) {
238 		n = tree_pop(&evpcount, msgid);
239 		if (n == NULL)
240 			n = REF;
241 		n += 1;
242 		tree_xset(&evpcount, msgid, n);
243 	}
244 	return (r);
245 }
246 
247 static int
248 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len)
249 {
250 	char	 pathname[PATH_MAX];
251 	FILE	*fp;
252 	size_t	 r;
253 
254 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
255 
256 	fp = fopen(pathname, "r");
257 	if (fp == NULL) {
258 		if (errno != ENOENT && errno != ENFILE)
259 			log_warn("warn: queue-fs: fopen");
260 		return 0;
261 	}
262 
263 	r = fread(buf, 1, len, fp);
264 	if (r) {
265 		if (r == len) {
266 			log_warn("warn: queue-fs: too large");
267 			r = 0;
268 		}
269 		else
270 			buf[r] = '\0';
271 	}
272 	fclose(fp);
273 
274 	return (r);
275 }
276 
277 static int
278 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len)
279 {
280 	char dest[PATH_MAX];
281 
282 	fsqueue_envelope_path(evpid, dest, sizeof(dest));
283 
284 	return (fsqueue_envelope_dump(dest, buf, len, 1, 1));
285 }
286 
287 static int
288 queue_fs_envelope_delete(uint64_t evpid)
289 {
290 	char		pathname[PATH_MAX];
291 	uint32_t	msgid;
292 	int		*n;
293 
294 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
295 	if (unlink(pathname) == -1)
296 		if (errno != ENOENT)
297 			return 0;
298 
299 	msgid = evpid_to_msgid(evpid);
300 	n = tree_pop(&evpcount, msgid);
301 	n -= 1;
302 
303 	if (n - REF == 0)
304 		queue_fs_message_delete(msgid);
305 	else
306 		tree_xset(&evpcount, msgid, n);
307 
308 	return (1);
309 }
310 
311 static int
312 queue_fs_message_walk(uint64_t *evpid, char *buf, size_t len,
313     uint32_t msgid, int *done, void **data)
314 {
315 	struct dirent	*dp;
316 	DIR		*dir = *data;
317 	char		 path[PATH_MAX];
318 	char		 msgid_str[9];
319 	char		*tmp;
320 	int		 r, *n;
321 
322 	if (*done)
323 		return (-1);
324 
325 	if (!bsnprintf(path, sizeof path, "%s/%02x/%08x",
326 	    PATH_QUEUE, (msgid  & 0xff000000) >> 24, msgid))
327 		fatalx("queue_fs_message_walk: path does not fit buffer");
328 
329 	if (dir == NULL) {
330 		if ((dir = opendir(path)) == NULL) {
331 			log_warn("warn: queue_fs: opendir: %s", path);
332 			*done = 1;
333 			return (-1);
334 		}
335 
336 		*data = dir;
337 	}
338 
339 	(void)snprintf(msgid_str, sizeof msgid_str, "%08" PRIx32, msgid);
340 	while ((dp = readdir(dir)) != NULL) {
341 		if (dp->d_type != DT_REG)
342 			continue;
343 
344 		/* ignore files other than envelopes */
345 		if (strlen(dp->d_name) != 16 ||
346 		    strncmp(dp->d_name, msgid_str, 8))
347 			continue;
348 
349 		tmp = NULL;
350 		*evpid = strtoull(dp->d_name, &tmp, 16);
351 		if (tmp && *tmp !=  '\0') {
352 			log_debug("debug: fsqueue: bogus file %s", dp->d_name);
353 			continue;
354 		}
355 
356 		memset(buf, 0, len);
357 		r = queue_fs_envelope_load(*evpid, buf, len);
358 		if (r) {
359 			n = tree_pop(&evpcount, msgid);
360 			if (n == NULL)
361 				n = REF;
362 
363 			n += 1;
364 			tree_xset(&evpcount, msgid, n);
365 		}
366 
367 		return (r);
368 	}
369 
370 	(void)closedir(dir);
371 	*done = 1;
372 	return (-1);
373 }
374 
375 static int
376 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len)
377 {
378 	static int	 done = 0;
379 	static void	*hdl = NULL;
380 	int		 r, *n;
381 	uint32_t	 msgid;
382 
383 	if (done)
384 		return (-1);
385 
386 	if (hdl == NULL)
387 		hdl = fsqueue_qwalk_new();
388 
389 	if (fsqueue_qwalk(hdl, evpid)) {
390 		memset(buf, 0, len);
391 		r = queue_fs_envelope_load(*evpid, buf, len);
392 		if (r) {
393 			msgid = evpid_to_msgid(*evpid);
394 			n = tree_pop(&evpcount, msgid);
395 			if (n == NULL)
396 				n = REF;
397 			n += 1;
398 			tree_xset(&evpcount, msgid, n);
399 		}
400 		return (r);
401 	}
402 
403 	fsqueue_qwalk_close(hdl);
404 	done = 1;
405 	return (-1);
406 }
407 
408 static int
409 fsqueue_check_space(void)
410 {
411 	struct statfs	buf;
412 	uint64_t	used;
413 	uint64_t	total;
414 
415 	if (statfs(PATH_QUEUE, &buf) == -1) {
416 		log_warn("warn: queue-fs: statfs");
417 		return 0;
418 	}
419 
420 	/*
421 	 * f_bfree and f_ffree is not set on all filesystems.
422 	 * They could be signed or unsigned integers.
423 	 * Some systems will set them to 0, others will set them to -1.
424 	 */
425 	if (buf.f_bfree == 0 || buf.f_ffree == 0 ||
426 	    (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1)
427 		return 1;
428 
429 	used = buf.f_blocks - buf.f_bfree;
430 	total = buf.f_bavail + used;
431 	if (total != 0)
432 		used = (float)used / (float)total * 100;
433 	else
434 		used = 100;
435 	if (100 - used < MINSPACE) {
436 		log_warnx("warn: not enough disk space: %llu%% left",
437 		    (unsigned long long) 100 - used);
438 		log_warnx("warn: temporarily rejecting messages");
439 		return 0;
440 	}
441 
442 	used = buf.f_files - buf.f_ffree;
443 	total = buf.f_favail + used;
444 	if (total != 0)
445 		used = (float)used / (float)total * 100;
446 	else
447 		used = 100;
448 	if (100 - used < MININODES) {
449 		log_warnx("warn: not enough inodes: %llu%% left",
450 		    (unsigned long long) 100 - used);
451 		log_warnx("warn: temporarily rejecting messages");
452 		return 0;
453 	}
454 
455 	return 1;
456 }
457 
458 static void
459 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len)
460 {
461 	if (!bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64,
462 		PATH_QUEUE,
463 		(evpid_to_msgid(evpid) & 0xff000000) >> 24,
464 		evpid_to_msgid(evpid),
465 		evpid))
466 		fatalx("fsqueue_envelope_path: path does not fit buffer");
467 }
468 
469 static void
470 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len)
471 {
472 	if (!bsnprintf(buf, len, "%s/%08x/%016" PRIx64,
473 		PATH_INCOMING,
474 		evpid_to_msgid(evpid),
475 		evpid))
476 		fatalx("fsqueue_envelope_incoming_path: path does not fit buffer");
477 }
478 
479 static int
480 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen,
481     int do_atomic, int do_sync)
482 {
483 	const char     *path = do_atomic ? PATH_EVPTMP : dest;
484 	FILE	       *fp = NULL;
485 	int		fd;
486 	size_t		w;
487 
488 	if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) {
489 		log_warn("warn: queue-fs: open");
490 		goto tempfail;
491 	}
492 
493 	if ((fp = fdopen(fd, "w")) == NULL) {
494 		log_warn("warn: queue-fs: fdopen");
495 		goto tempfail;
496 	}
497 
498 	w = fwrite(evpbuf, 1, evplen, fp);
499 	if (w < evplen) {
500 		log_warn("warn: queue-fs: short write");
501 		goto tempfail;
502 	}
503 	if (fflush(fp)) {
504 		log_warn("warn: queue-fs: fflush");
505 		goto tempfail;
506 	}
507 	if (do_sync && fsync(fileno(fp))) {
508 		log_warn("warn: queue-fs: fsync");
509 		goto tempfail;
510 	}
511 	if (fclose(fp) != 0) {
512 		log_warn("warn: queue-fs: fclose");
513 		fp = NULL;
514 		goto tempfail;
515 	}
516 	fp = NULL;
517 	fd = -1;
518 
519 	if (do_atomic && rename(path, dest) == -1) {
520 		log_warn("warn: queue-fs: rename");
521 		goto tempfail;
522 	}
523 	return (1);
524 
525 tempfail:
526 	if (fp)
527 		fclose(fp);
528 	else if (fd != -1)
529 		close(fd);
530 	if (unlink(path) == -1)
531 		log_warn("warn: queue-fs: unlink");
532 	return (0);
533 }
534 
535 static void
536 fsqueue_message_path(uint32_t msgid, char *buf, size_t len)
537 {
538 	if (!bsnprintf(buf, len, "%s/%02x/%08x",
539 		PATH_QUEUE,
540 		(msgid & 0xff000000) >> 24,
541 		msgid))
542 		fatalx("fsqueue_message_path: path does not fit buffer");
543 }
544 
545 static void
546 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
547 {
548 	if (!bsnprintf(buf, len, "%s/%08x",
549 		PATH_INCOMING,
550 		msgid))
551 		fatalx("fsqueue_message_incoming_path: path does not fit buffer");
552 }
553 
554 static void *
555 fsqueue_qwalk_new(void)
556 {
557 	char		 path[PATH_MAX];
558 	char * const	 path_argv[] = { path, NULL };
559 	struct qwalk	*q;
560 
561 	q = xcalloc(1, sizeof(*q));
562 	(void)strlcpy(path, PATH_QUEUE, sizeof(path));
563 	q->fts = fts_open(path_argv,
564 	    FTS_PHYSICAL | FTS_NOCHDIR, NULL);
565 
566 	if (q->fts == NULL)
567 		fatal("fsqueue_qwalk_new: fts_open: %s", path);
568 
569 	return (q);
570 }
571 
572 static void
573 fsqueue_qwalk_close(void *hdl)
574 {
575 	struct qwalk	*q = hdl;
576 
577 	fts_close(q->fts);
578 
579 	free(q);
580 }
581 
582 static int
583 fsqueue_qwalk(void *hdl, uint64_t *evpid)
584 {
585 	struct qwalk	*q = hdl;
586 	FTSENT		*e;
587 	char		*tmp;
588 
589 	while ((e = fts_read(q->fts)) != NULL) {
590 		switch (e->fts_info) {
591 		case FTS_D:
592 			q->depth += 1;
593 			if (q->depth == 2 && e->fts_namelen != 2) {
594 				log_debug("debug: fsqueue: bogus directory %s",
595 				    e->fts_path);
596 				fts_set(q->fts, e, FTS_SKIP);
597 				break;
598 			}
599 			if (q->depth == 3 && e->fts_namelen != 8) {
600 				log_debug("debug: fsqueue: bogus directory %s",
601 				    e->fts_path);
602 				fts_set(q->fts, e, FTS_SKIP);
603 				break;
604 			}
605 			break;
606 
607 		case FTS_DP:
608 		case FTS_DNR:
609 			q->depth -= 1;
610 			break;
611 
612 		case FTS_F:
613 			if (q->depth != 3)
614 				break;
615 			if (e->fts_namelen != 16)
616 				break;
617 			if (timespeccmp(&e->fts_statp->st_mtim, &startup, >))
618 				break;
619 			tmp = NULL;
620 			*evpid = strtoull(e->fts_name, &tmp, 16);
621 			if (tmp && *tmp !=  '\0') {
622 				log_debug("debug: fsqueue: bogus file %s",
623 				    e->fts_path);
624 				break;
625 			}
626 			return (1);
627 		default:
628 			break;
629 		}
630 	}
631 
632 	return (0);
633 }
634 
635 static int
636 queue_fs_init(struct passwd *pw, int server, const char *conf)
637 {
638 	unsigned int	 n;
639 	char		*paths[] = { PATH_QUEUE, PATH_INCOMING };
640 	char		 path[PATH_MAX];
641 	int		 ret;
642 
643 	/* remove incoming/ if it exists */
644 	if (server)
645 		mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE);
646 
647 	fsqueue_envelope_path(0, path, sizeof(path));
648 
649 	ret = 1;
650 	for (n = 0; n < nitems(paths); n++) {
651 		(void)strlcpy(path, PATH_SPOOL, sizeof(path));
652 		if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path))
653 			fatalx("path too long %s%s", PATH_SPOOL, paths[n]);
654 		if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0)
655 			ret = 0;
656 	}
657 
658 	if (clock_gettime(CLOCK_REALTIME, &startup))
659 		fatal("clock_gettime");
660 
661 	tree_init(&evpcount);
662 
663 	queue_api_on_message_create(queue_fs_message_create);
664 	queue_api_on_message_commit(queue_fs_message_commit);
665 	queue_api_on_message_delete(queue_fs_message_delete);
666 	queue_api_on_message_fd_r(queue_fs_message_fd_r);
667 	queue_api_on_envelope_create(queue_fs_envelope_create);
668 	queue_api_on_envelope_delete(queue_fs_envelope_delete);
669 	queue_api_on_envelope_update(queue_fs_envelope_update);
670 	queue_api_on_envelope_load(queue_fs_envelope_load);
671 	queue_api_on_envelope_walk(queue_fs_envelope_walk);
672 	queue_api_on_message_walk(queue_fs_message_walk);
673 
674 	return (ret);
675 }
676 
677 struct queue_backend	queue_backend_fs = {
678 	queue_fs_init,
679 };
680