xref: /openbsd/usr.sbin/smtpd/queue_fs.c (revision 1d4bec26)
1 /*	$OpenBSD: queue_fs.c,v 1.20 2020/02/25 17:03:13 millert Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/mount.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/socket.h>
24 #include <sys/stat.h>
25 
26 #include <ctype.h>
27 #include <dirent.h>
28 #include <err.h>
29 #include <errno.h>
30 #include <event.h>
31 #include <fcntl.h>
32 #include <fts.h>
33 #include <imsg.h>
34 #include <inttypes.h>
35 #include <pwd.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <time.h>
40 #include <unistd.h>
41 
42 #include "smtpd.h"
43 #include "log.h"
44 
45 #define PATH_QUEUE		"/queue"
46 #define PATH_INCOMING		"/incoming"
47 #define PATH_EVPTMP		PATH_INCOMING "/envelope.tmp"
48 #define PATH_MESSAGE		"/message"
49 
50 /* percentage of remaining space / inodes required to accept new messages */
51 #define	MINSPACE		5
52 #define	MININODES		5
53 
54 struct qwalk {
55 	FTS	*fts;
56 	int	 depth;
57 };
58 
59 static int	fsqueue_check_space(void);
60 static void	fsqueue_envelope_path(uint64_t, char *, size_t);
61 static void	fsqueue_envelope_incoming_path(uint64_t, char *, size_t);
62 static int	fsqueue_envelope_dump(char *, const char *, size_t, int, int);
63 static void	fsqueue_message_path(uint32_t, char *, size_t);
64 static void	fsqueue_message_incoming_path(uint32_t, char *, size_t);
65 static void    *fsqueue_qwalk_new(void);
66 static int	fsqueue_qwalk(void *, uint64_t *);
67 static void	fsqueue_qwalk_close(void *);
68 
69 struct tree evpcount;
70 static struct timespec startup;
71 
72 #define REF	(int*)0xf00
73 
74 static int
75 queue_fs_message_create(uint32_t *msgid)
76 {
77 	char		rootdir[PATH_MAX];
78 	struct stat	sb;
79 
80 	if (!fsqueue_check_space())
81 		return 0;
82 
83 again:
84 	*msgid = queue_generate_msgid();
85 
86 	/* prevent possible collision later when moving to Q_QUEUE */
87 	fsqueue_message_path(*msgid, rootdir, sizeof(rootdir));
88 	if (stat(rootdir, &sb) != -1)
89 		goto again;
90 
91 	/* we hit an unexpected error, temporarily fail */
92 	if (errno != ENOENT) {
93 		*msgid = 0;
94 		return 0;
95 	}
96 
97 	fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir));
98 	if (mkdir(rootdir, 0700) == -1) {
99 		if (errno == EEXIST)
100 			goto again;
101 
102 		if (errno == ENOSPC) {
103 			*msgid = 0;
104 			return 0;
105 		}
106 
107 		log_warn("warn: queue-fs: mkdir");
108 		*msgid = 0;
109 		return 0;
110 	}
111 
112 	return (1);
113 }
114 
115 static int
116 queue_fs_message_commit(uint32_t msgid, const char *path)
117 {
118 	char incomingdir[PATH_MAX];
119 	char queuedir[PATH_MAX];
120 	char msgdir[PATH_MAX];
121 	char msgpath[PATH_MAX];
122 
123 	/* before-first, move the message content in the incoming directory */
124 	fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath));
125 	if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath))
126 	    >= sizeof(msgpath))
127 		return (0);
128 	if (rename(path, msgpath) == -1)
129 		return (0);
130 
131 	fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir));
132 	fsqueue_message_path(msgid, msgdir, sizeof(msgdir));
133 	if (strlcpy(queuedir, msgdir, sizeof(queuedir))
134 	    >= sizeof(queuedir))
135 		return (0);
136 
137 	/* first attempt to rename */
138 	if (rename(incomingdir, msgdir) == 0)
139 		return 1;
140 	if (errno == ENOSPC)
141 		return 0;
142 	if (errno != ENOENT) {
143 		log_warn("warn: queue-fs: rename");
144 		return 0;
145 	}
146 
147 	/* create the bucket */
148 	*strrchr(queuedir, '/') = '\0';
149 	if (mkdir(queuedir, 0700) == -1) {
150 		if (errno == ENOSPC)
151 			return 0;
152 		if (errno != EEXIST) {
153 			log_warn("warn: queue-fs: mkdir");
154 			return 0;
155 		}
156 	}
157 
158 	/* rename */
159 	if (rename(incomingdir, msgdir) == -1) {
160 		if (errno == ENOSPC)
161 			return 0;
162 		log_warn("warn: queue-fs: rename");
163 		return 0;
164 	}
165 
166 	return 1;
167 }
168 
169 static int
170 queue_fs_message_fd_r(uint32_t msgid)
171 {
172 	int fd;
173 	char path[PATH_MAX];
174 
175 	fsqueue_message_path(msgid, path, sizeof(path));
176 	if (strlcat(path, PATH_MESSAGE, sizeof(path))
177 	    >= sizeof(path))
178 		return -1;
179 
180 	if ((fd = open(path, O_RDONLY)) == -1) {
181 		log_warn("warn: queue-fs: open");
182 		return -1;
183 	}
184 
185 	return fd;
186 }
187 
188 static int
189 queue_fs_message_delete(uint32_t msgid)
190 {
191 	char		path[PATH_MAX];
192 	struct stat	sb;
193 
194 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
195 	if (stat(path, &sb) == -1)
196 		fsqueue_message_path(msgid, path, sizeof(path));
197 
198 	if (rmtree(path, 0) == -1)
199 		log_warn("warn: queue-fs: rmtree");
200 
201 	tree_pop(&evpcount, msgid);
202 
203 	return 1;
204 }
205 
206 static int
207 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len,
208     uint64_t *evpid)
209 {
210 	char		path[PATH_MAX];
211 	int		queued = 0, i, r = 0, *n;
212 	struct stat	sb;
213 
214 	if (msgid == 0) {
215 		log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid);
216 		goto done;
217 	}
218 
219 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
220 	if (stat(path, &sb) == -1)
221 		queued = 1;
222 
223 	for (i = 0; i < 20; i ++) {
224 		*evpid = queue_generate_evpid(msgid);
225 		if (queued)
226 			fsqueue_envelope_path(*evpid, path, sizeof(path));
227 		else
228 			fsqueue_envelope_incoming_path(*evpid, path,
229 			    sizeof(path));
230 
231 		if ((r = fsqueue_envelope_dump(path, buf, len, 0, 0)) != 0)
232 			goto done;
233 	}
234 	r = 0;
235 	log_warnx("warn: queue-fs: could not allocate evpid");
236 
237 done:
238 	if (r) {
239 		n = tree_pop(&evpcount, msgid);
240 		if (n == NULL)
241 			n = REF;
242 		n += 1;
243 		tree_xset(&evpcount, msgid, n);
244 	}
245 	return (r);
246 }
247 
248 static int
249 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len)
250 {
251 	char	 pathname[PATH_MAX];
252 	FILE	*fp;
253 	size_t	 r;
254 
255 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
256 
257 	fp = fopen(pathname, "r");
258 	if (fp == NULL) {
259 		if (errno != ENOENT && errno != ENFILE)
260 			log_warn("warn: queue-fs: fopen");
261 		return 0;
262 	}
263 
264 	r = fread(buf, 1, len, fp);
265 	if (r) {
266 		if (r == len) {
267 			log_warn("warn: queue-fs: too large");
268 			r = 0;
269 		}
270 		else
271 			buf[r] = '\0';
272 	}
273 	fclose(fp);
274 
275 	return (r);
276 }
277 
278 static int
279 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len)
280 {
281 	char dest[PATH_MAX];
282 
283 	fsqueue_envelope_path(evpid, dest, sizeof(dest));
284 
285 	return (fsqueue_envelope_dump(dest, buf, len, 1, 1));
286 }
287 
288 static int
289 queue_fs_envelope_delete(uint64_t evpid)
290 {
291 	char		pathname[PATH_MAX];
292 	uint32_t	msgid;
293 	int		*n;
294 
295 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
296 	if (unlink(pathname) == -1)
297 		if (errno != ENOENT)
298 			return 0;
299 
300 	msgid = evpid_to_msgid(evpid);
301 	n = tree_pop(&evpcount, msgid);
302 	n -= 1;
303 
304 	if (n - REF == 0)
305 		queue_fs_message_delete(msgid);
306 	else
307 		tree_xset(&evpcount, msgid, n);
308 
309 	return (1);
310 }
311 
312 static int
313 queue_fs_message_walk(uint64_t *evpid, char *buf, size_t len,
314     uint32_t msgid, int *done, void **data)
315 {
316 	struct dirent	*dp;
317 	DIR		*dir = *data;
318 	char		 path[PATH_MAX];
319 	char		 msgid_str[9];
320 	char		*tmp;
321 	int		 r, *n;
322 
323 	if (*done)
324 		return (-1);
325 
326 	if (!bsnprintf(path, sizeof path, "%s/%02x/%08x",
327 	    PATH_QUEUE, (msgid  & 0xff000000) >> 24, msgid))
328 		fatalx("queue_fs_message_walk: path does not fit buffer");
329 
330 	if (dir == NULL) {
331 		if ((dir = opendir(path)) == NULL) {
332 			log_warn("warn: queue_fs: opendir: %s", path);
333 			*done = 1;
334 			return (-1);
335 		}
336 
337 		*data = dir;
338 	}
339 
340 	(void)snprintf(msgid_str, sizeof msgid_str, "%08" PRIx32, msgid);
341 	while ((dp = readdir(dir)) != NULL) {
342 		if (dp->d_type != DT_REG)
343 			continue;
344 
345 		/* ignore files other than envelopes */
346 		if (strlen(dp->d_name) != 16 ||
347 		    strncmp(dp->d_name, msgid_str, 8))
348 			continue;
349 
350 		tmp = NULL;
351 		*evpid = strtoull(dp->d_name, &tmp, 16);
352 		if (tmp && *tmp !=  '\0') {
353 			log_debug("debug: fsqueue: bogus file %s", dp->d_name);
354 			continue;
355 		}
356 
357 		memset(buf, 0, len);
358 		r = queue_fs_envelope_load(*evpid, buf, len);
359 		if (r) {
360 			n = tree_pop(&evpcount, msgid);
361 			if (n == NULL)
362 				n = REF;
363 
364 			n += 1;
365 			tree_xset(&evpcount, msgid, n);
366 		}
367 
368 		return (r);
369 	}
370 
371 	(void)closedir(dir);
372 	*done = 1;
373 	return (-1);
374 }
375 
376 static int
377 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len)
378 {
379 	static int	 done = 0;
380 	static void	*hdl = NULL;
381 	int		 r, *n;
382 	uint32_t	 msgid;
383 
384 	if (done)
385 		return (-1);
386 
387 	if (hdl == NULL)
388 		hdl = fsqueue_qwalk_new();
389 
390 	if (fsqueue_qwalk(hdl, evpid)) {
391 		memset(buf, 0, len);
392 		r = queue_fs_envelope_load(*evpid, buf, len);
393 		if (r) {
394 			msgid = evpid_to_msgid(*evpid);
395 			n = tree_pop(&evpcount, msgid);
396 			if (n == NULL)
397 				n = REF;
398 			n += 1;
399 			tree_xset(&evpcount, msgid, n);
400 		}
401 		return (r);
402 	}
403 
404 	fsqueue_qwalk_close(hdl);
405 	done = 1;
406 	return (-1);
407 }
408 
409 static int
410 fsqueue_check_space(void)
411 {
412 	struct statfs	buf;
413 	uint64_t	used;
414 	uint64_t	total;
415 
416 	if (statfs(PATH_QUEUE, &buf) == -1) {
417 		log_warn("warn: queue-fs: statfs");
418 		return 0;
419 	}
420 
421 	/*
422 	 * f_bfree and f_ffree is not set on all filesystems.
423 	 * They could be signed or unsigned integers.
424 	 * Some systems will set them to 0, others will set them to -1.
425 	 */
426 	if (buf.f_bfree == 0 || buf.f_ffree == 0 ||
427 	    (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1)
428 		return 1;
429 
430 	used = buf.f_blocks - buf.f_bfree;
431 	total = buf.f_bavail + used;
432 	if (total != 0)
433 		used = (float)used / (float)total * 100;
434 	else
435 		used = 100;
436 	if (100 - used < MINSPACE) {
437 		log_warnx("warn: not enough disk space: %llu%% left",
438 		    (unsigned long long) 100 - used);
439 		log_warnx("warn: temporarily rejecting messages");
440 		return 0;
441 	}
442 
443 	used = buf.f_files - buf.f_ffree;
444 	total = buf.f_favail + used;
445 	if (total != 0)
446 		used = (float)used / (float)total * 100;
447 	else
448 		used = 100;
449 	if (100 - used < MININODES) {
450 		log_warnx("warn: not enough inodes: %llu%% left",
451 		    (unsigned long long) 100 - used);
452 		log_warnx("warn: temporarily rejecting messages");
453 		return 0;
454 	}
455 
456 	return 1;
457 }
458 
459 static void
460 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len)
461 {
462 	if (!bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64,
463 		PATH_QUEUE,
464 		(evpid_to_msgid(evpid) & 0xff000000) >> 24,
465 		evpid_to_msgid(evpid),
466 		evpid))
467 		fatalx("fsqueue_envelope_path: path does not fit buffer");
468 }
469 
470 static void
471 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len)
472 {
473 	if (!bsnprintf(buf, len, "%s/%08x/%016" PRIx64,
474 		PATH_INCOMING,
475 		evpid_to_msgid(evpid),
476 		evpid))
477 		fatalx("fsqueue_envelope_incoming_path: path does not fit buffer");
478 }
479 
480 static int
481 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen,
482     int do_atomic, int do_sync)
483 {
484 	const char     *path = do_atomic ? PATH_EVPTMP : dest;
485 	FILE	       *fp = NULL;
486 	int		fd;
487 	size_t		w;
488 
489 	if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) {
490 		log_warn("warn: queue-fs: open");
491 		goto tempfail;
492 	}
493 
494 	if ((fp = fdopen(fd, "w")) == NULL) {
495 		log_warn("warn: queue-fs: fdopen");
496 		goto tempfail;
497 	}
498 
499 	w = fwrite(evpbuf, 1, evplen, fp);
500 	if (w < evplen) {
501 		log_warn("warn: queue-fs: short write");
502 		goto tempfail;
503 	}
504 	if (fflush(fp)) {
505 		log_warn("warn: queue-fs: fflush");
506 		goto tempfail;
507 	}
508 	if (do_sync && fsync(fileno(fp))) {
509 		log_warn("warn: queue-fs: fsync");
510 		goto tempfail;
511 	}
512 	if (fclose(fp) != 0) {
513 		log_warn("warn: queue-fs: fclose");
514 		fp = NULL;
515 		goto tempfail;
516 	}
517 	fp = NULL;
518 	fd = -1;
519 
520 	if (do_atomic && rename(path, dest) == -1) {
521 		log_warn("warn: queue-fs: rename");
522 		goto tempfail;
523 	}
524 	return (1);
525 
526 tempfail:
527 	if (fp)
528 		fclose(fp);
529 	else if (fd != -1)
530 		close(fd);
531 	if (unlink(path) == -1)
532 		log_warn("warn: queue-fs: unlink");
533 	return (0);
534 }
535 
536 static void
537 fsqueue_message_path(uint32_t msgid, char *buf, size_t len)
538 {
539 	if (!bsnprintf(buf, len, "%s/%02x/%08x",
540 		PATH_QUEUE,
541 		(msgid & 0xff000000) >> 24,
542 		msgid))
543 		fatalx("fsqueue_message_path: path does not fit buffer");
544 }
545 
546 static void
547 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
548 {
549 	if (!bsnprintf(buf, len, "%s/%08x",
550 		PATH_INCOMING,
551 		msgid))
552 		fatalx("fsqueue_message_incoming_path: path does not fit buffer");
553 }
554 
555 static void *
556 fsqueue_qwalk_new(void)
557 {
558 	char		 path[PATH_MAX];
559 	char * const	 path_argv[] = { path, NULL };
560 	struct qwalk	*q;
561 
562 	q = xcalloc(1, sizeof(*q));
563 	(void)strlcpy(path, PATH_QUEUE, sizeof(path));
564 	q->fts = fts_open(path_argv,
565 	    FTS_PHYSICAL | FTS_NOCHDIR, NULL);
566 
567 	if (q->fts == NULL)
568 		err(1, "fsqueue_qwalk_new: fts_open: %s", path);
569 
570 	return (q);
571 }
572 
573 static void
574 fsqueue_qwalk_close(void *hdl)
575 {
576 	struct qwalk	*q = hdl;
577 
578 	fts_close(q->fts);
579 
580 	free(q);
581 }
582 
583 static int
584 fsqueue_qwalk(void *hdl, uint64_t *evpid)
585 {
586 	struct qwalk	*q = hdl;
587 	FTSENT		*e;
588 	char		*tmp;
589 
590 	while ((e = fts_read(q->fts)) != NULL) {
591 		switch (e->fts_info) {
592 		case FTS_D:
593 			q->depth += 1;
594 			if (q->depth == 2 && e->fts_namelen != 2) {
595 				log_debug("debug: fsqueue: bogus directory %s",
596 				    e->fts_path);
597 				fts_set(q->fts, e, FTS_SKIP);
598 				break;
599 			}
600 			if (q->depth == 3 && e->fts_namelen != 8) {
601 				log_debug("debug: fsqueue: bogus directory %s",
602 				    e->fts_path);
603 				fts_set(q->fts, e, FTS_SKIP);
604 				break;
605 			}
606 			break;
607 
608 		case FTS_DP:
609 		case FTS_DNR:
610 			q->depth -= 1;
611 			break;
612 
613 		case FTS_F:
614 			if (q->depth != 3)
615 				break;
616 			if (e->fts_namelen != 16)
617 				break;
618 			if (timespeccmp(&e->fts_statp->st_mtim, &startup, >))
619 				break;
620 			tmp = NULL;
621 			*evpid = strtoull(e->fts_name, &tmp, 16);
622 			if (tmp && *tmp !=  '\0') {
623 				log_debug("debug: fsqueue: bogus file %s",
624 				    e->fts_path);
625 				break;
626 			}
627 			return (1);
628 		default:
629 			break;
630 		}
631 	}
632 
633 	return (0);
634 }
635 
636 static int
637 queue_fs_init(struct passwd *pw, int server, const char *conf)
638 {
639 	unsigned int	 n;
640 	char		*paths[] = { PATH_QUEUE, PATH_INCOMING };
641 	char		 path[PATH_MAX];
642 	int		 ret;
643 
644 	/* remove incoming/ if it exists */
645 	if (server)
646 		mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE);
647 
648 	fsqueue_envelope_path(0, path, sizeof(path));
649 
650 	ret = 1;
651 	for (n = 0; n < nitems(paths); n++) {
652 		(void)strlcpy(path, PATH_SPOOL, sizeof(path));
653 		if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path))
654 			errx(1, "path too long %s%s", PATH_SPOOL, paths[n]);
655 		if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0)
656 			ret = 0;
657 	}
658 
659 	if (clock_gettime(CLOCK_REALTIME, &startup))
660 		err(1, "clock_gettime");
661 
662 	tree_init(&evpcount);
663 
664 	queue_api_on_message_create(queue_fs_message_create);
665 	queue_api_on_message_commit(queue_fs_message_commit);
666 	queue_api_on_message_delete(queue_fs_message_delete);
667 	queue_api_on_message_fd_r(queue_fs_message_fd_r);
668 	queue_api_on_envelope_create(queue_fs_envelope_create);
669 	queue_api_on_envelope_delete(queue_fs_envelope_delete);
670 	queue_api_on_envelope_update(queue_fs_envelope_update);
671 	queue_api_on_envelope_load(queue_fs_envelope_load);
672 	queue_api_on_envelope_walk(queue_fs_envelope_walk);
673 	queue_api_on_message_walk(queue_fs_message_walk);
674 
675 	return (ret);
676 }
677 
678 struct queue_backend	queue_backend_fs = {
679 	queue_fs_init,
680 };
681