xref: /openbsd/usr.sbin/smtpd/queue_fs.c (revision 118c16f3)
1 /*	$OpenBSD: queue_fs.c,v 1.17 2018/05/31 21:06:12 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/mount.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/socket.h>
24 #include <sys/stat.h>
25 
26 #include <ctype.h>
27 #include <dirent.h>
28 #include <err.h>
29 #include <errno.h>
30 #include <event.h>
31 #include <fcntl.h>
32 #include <fts.h>
33 #include <imsg.h>
34 #include <inttypes.h>
35 #include <libgen.h>
36 #include <pwd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <time.h>
41 #include <unistd.h>
42 
43 #include "smtpd.h"
44 #include "log.h"
45 
46 #define PATH_QUEUE		"/queue"
47 #define PATH_INCOMING		"/incoming"
48 #define PATH_EVPTMP		PATH_INCOMING "/envelope.tmp"
49 #define PATH_MESSAGE		"/message"
50 
51 /* percentage of remaining space / inodes required to accept new messages */
52 #define	MINSPACE		5
53 #define	MININODES		5
54 
55 struct qwalk {
56 	FTS	*fts;
57 	int	 depth;
58 };
59 
60 static int	fsqueue_check_space(void);
61 static void	fsqueue_envelope_path(uint64_t, char *, size_t);
62 static void	fsqueue_envelope_incoming_path(uint64_t, char *, size_t);
63 static int	fsqueue_envelope_dump(char *, const char *, size_t, int, int);
64 static void	fsqueue_message_path(uint32_t, char *, size_t);
65 static void	fsqueue_message_incoming_path(uint32_t, char *, size_t);
66 static void    *fsqueue_qwalk_new(void);
67 static int	fsqueue_qwalk(void *, uint64_t *);
68 static void	fsqueue_qwalk_close(void *);
69 
70 struct tree evpcount;
71 static struct timespec startup;
72 
73 #define REF	(int*)0xf00
74 
75 static int
76 queue_fs_message_create(uint32_t *msgid)
77 {
78 	char		rootdir[PATH_MAX];
79 	struct stat	sb;
80 
81 	if (!fsqueue_check_space())
82 		return 0;
83 
84 again:
85 	*msgid = queue_generate_msgid();
86 
87 	/* prevent possible collision later when moving to Q_QUEUE */
88 	fsqueue_message_path(*msgid, rootdir, sizeof(rootdir));
89 	if (stat(rootdir, &sb) != -1)
90 		goto again;
91 
92 	/* we hit an unexpected error, temporarily fail */
93 	if (errno != ENOENT) {
94 		*msgid = 0;
95 		return 0;
96 	}
97 
98 	fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir));
99 	if (mkdir(rootdir, 0700) == -1) {
100 		if (errno == EEXIST)
101 			goto again;
102 
103 		if (errno == ENOSPC) {
104 			*msgid = 0;
105 			return 0;
106 		}
107 
108 		log_warn("warn: queue-fs: mkdir");
109 		*msgid = 0;
110 		return 0;
111 	}
112 
113 	return (1);
114 }
115 
116 static int
117 queue_fs_message_commit(uint32_t msgid, const char *path)
118 {
119 	char incomingdir[PATH_MAX];
120 	char queuedir[PATH_MAX];
121 	char msgdir[PATH_MAX];
122 	char msgpath[PATH_MAX];
123 
124 	/* before-first, move the message content in the incoming directory */
125 	fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath));
126 	if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath))
127 	    >= sizeof(msgpath))
128 		return (0);
129 	if (rename(path, msgpath) == -1)
130 		return (0);
131 
132 	fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir));
133 	fsqueue_message_path(msgid, msgdir, sizeof(msgdir));
134 	if (strlcpy(queuedir, msgdir, sizeof(queuedir))
135 	    >= sizeof(queuedir))
136 		return (0);
137 
138 	/* first attempt to rename */
139 	if (rename(incomingdir, msgdir) == 0)
140 		return 1;
141 	if (errno == ENOSPC)
142 		return 0;
143 	if (errno != ENOENT) {
144 		log_warn("warn: queue-fs: rename");
145 		return 0;
146 	}
147 
148 	/* create the bucket */
149 	*strrchr(queuedir, '/') = '\0';
150 	if (mkdir(queuedir, 0700) == -1) {
151 		if (errno == ENOSPC)
152 			return 0;
153 		if (errno != EEXIST) {
154 			log_warn("warn: queue-fs: mkdir");
155 			return 0;
156 		}
157 	}
158 
159 	/* rename */
160 	if (rename(incomingdir, msgdir) == -1) {
161 		if (errno == ENOSPC)
162 			return 0;
163 		log_warn("warn: queue-fs: rename");
164 		return 0;
165 	}
166 
167 	return 1;
168 }
169 
170 static int
171 queue_fs_message_fd_r(uint32_t msgid)
172 {
173 	int fd;
174 	char path[PATH_MAX];
175 
176 	fsqueue_message_path(msgid, path, sizeof(path));
177 	if (strlcat(path, PATH_MESSAGE, sizeof(path))
178 	    >= sizeof(path))
179 		return -1;
180 
181 	if ((fd = open(path, O_RDONLY)) == -1) {
182 		log_warn("warn: queue-fs: open");
183 		return -1;
184 	}
185 
186 	return fd;
187 }
188 
189 static int
190 queue_fs_message_delete(uint32_t msgid)
191 {
192 	char		path[PATH_MAX];
193 	struct stat	sb;
194 
195 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
196 	if (stat(path, &sb) == -1)
197 		fsqueue_message_path(msgid, path, sizeof(path));
198 
199 	if (rmtree(path, 0) == -1)
200 		log_warn("warn: queue-fs: rmtree");
201 
202 	tree_pop(&evpcount, msgid);
203 
204 	return 1;
205 }
206 
207 static int
208 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len,
209     uint64_t *evpid)
210 {
211 	char		path[PATH_MAX];
212 	int		queued = 0, i, r = 0, *n;
213 	struct stat	sb;
214 
215 	if (msgid == 0) {
216 		log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid);
217 		goto done;
218 	}
219 
220 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
221 	if (stat(path, &sb) == -1)
222 		queued = 1;
223 
224 	for (i = 0; i < 20; i ++) {
225 		*evpid = queue_generate_evpid(msgid);
226 		if (queued)
227 			fsqueue_envelope_path(*evpid, path, sizeof(path));
228 		else
229 			fsqueue_envelope_incoming_path(*evpid, path,
230 			    sizeof(path));
231 
232 		r = fsqueue_envelope_dump(path, buf, len, 0, 0);
233 		if (r >= 0)
234 			goto done;
235 	}
236 	r = 0;
237 	log_warnx("warn: queue-fs: could not allocate evpid");
238 
239 done:
240 	if (r) {
241 		n = tree_pop(&evpcount, msgid);
242 		if (n == NULL)
243 			n = REF;
244 		n += 1;
245 		tree_xset(&evpcount, msgid, n);
246 	}
247 	return (r);
248 }
249 
250 static int
251 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len)
252 {
253 	char	 pathname[PATH_MAX];
254 	FILE	*fp;
255 	size_t	 r;
256 
257 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
258 
259 	fp = fopen(pathname, "r");
260 	if (fp == NULL) {
261 		if (errno != ENOENT && errno != ENFILE)
262 			log_warn("warn: queue-fs: fopen");
263 		return 0;
264 	}
265 
266 	r = fread(buf, 1, len, fp);
267 	if (r) {
268 		if (r == len) {
269 			log_warn("warn: queue-fs: too large");
270 			r = 0;
271 		}
272 		else
273 			buf[r] = '\0';
274 	}
275 	fclose(fp);
276 
277 	return (r);
278 }
279 
280 static int
281 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len)
282 {
283 	char dest[PATH_MAX];
284 
285 	fsqueue_envelope_path(evpid, dest, sizeof(dest));
286 
287 	return (fsqueue_envelope_dump(dest, buf, len, 1, 1));
288 }
289 
290 static int
291 queue_fs_envelope_delete(uint64_t evpid)
292 {
293 	char		pathname[PATH_MAX];
294 	uint32_t	msgid;
295 	int		*n;
296 
297 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
298 	if (unlink(pathname) == -1)
299 		if (errno != ENOENT)
300 			return 0;
301 
302 	msgid = evpid_to_msgid(evpid);
303 	n = tree_pop(&evpcount, msgid);
304 	n -= 1;
305 
306 	if (n - REF == 0)
307 		queue_fs_message_delete(msgid);
308 	else
309 		tree_xset(&evpcount, msgid, n);
310 
311 	return (1);
312 }
313 
314 static int
315 queue_fs_message_walk(uint64_t *evpid, char *buf, size_t len,
316     uint32_t msgid, int *done, void **data)
317 {
318 	struct dirent	*dp;
319 	DIR		*dir = *data;
320 	char		 path[PATH_MAX];
321 	char		 msgid_str[9];
322 	char		*tmp;
323 	int		 r, *n;
324 
325 	if (*done)
326 		return (-1);
327 
328 	if (!bsnprintf(path, sizeof path, "%s/%02x/%08x",
329 	    PATH_QUEUE, (msgid  & 0xff000000) >> 24, msgid))
330 		fatalx("queue_fs_message_walk: path does not fit buffer");
331 
332 	if (dir == NULL) {
333 		if ((dir = opendir(path)) == NULL) {
334 			log_warn("warn: queue_fs: opendir: %s", path);
335 			*done = 1;
336 			return (-1);
337 		}
338 
339 		*data = dir;
340 	}
341 
342 	(void)snprintf(msgid_str, sizeof msgid_str, "%08" PRIx32, msgid);
343 	while ((dp = readdir(dir)) != NULL) {
344 		if (dp->d_type != DT_REG)
345 			continue;
346 
347 		/* ignore files other than envelopes */
348 		if (strlen(dp->d_name) != 16 ||
349 		    strncmp(dp->d_name, msgid_str, 8))
350 			continue;
351 
352 		tmp = NULL;
353 		*evpid = strtoull(dp->d_name, &tmp, 16);
354 		if (tmp && *tmp !=  '\0') {
355 			log_debug("debug: fsqueue: bogus file %s", dp->d_name);
356 			continue;
357 		}
358 
359 		memset(buf, 0, len);
360 		r = queue_fs_envelope_load(*evpid, buf, len);
361 		if (r) {
362 			n = tree_pop(&evpcount, msgid);
363 			if (n == NULL)
364 				n = REF;
365 
366 			n += 1;
367 			tree_xset(&evpcount, msgid, n);
368 		}
369 
370 		return (r);
371 	}
372 
373 	(void)closedir(dir);
374 	*done = 1;
375 	return (-1);
376 }
377 
378 static int
379 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len)
380 {
381 	static int	 done = 0;
382 	static void	*hdl = NULL;
383 	int		 r, *n;
384 	uint32_t	 msgid;
385 
386 	if (done)
387 		return (-1);
388 
389 	if (hdl == NULL)
390 		hdl = fsqueue_qwalk_new();
391 
392 	if (fsqueue_qwalk(hdl, evpid)) {
393 		memset(buf, 0, len);
394 		r = queue_fs_envelope_load(*evpid, buf, len);
395 		if (r) {
396 			msgid = evpid_to_msgid(*evpid);
397 			n = tree_pop(&evpcount, msgid);
398 			if (n == NULL)
399 				n = REF;
400 			n += 1;
401 			tree_xset(&evpcount, msgid, n);
402 		}
403 		return (r);
404 	}
405 
406 	fsqueue_qwalk_close(hdl);
407 	done = 1;
408 	return (-1);
409 }
410 
411 static int
412 fsqueue_check_space(void)
413 {
414 	struct statfs	buf;
415 	uint64_t	used;
416 	uint64_t	total;
417 
418 	if (statfs(PATH_QUEUE, &buf) < 0) {
419 		log_warn("warn: queue-fs: statfs");
420 		return 0;
421 	}
422 
423 	/*
424 	 * f_bfree and f_ffree is not set on all filesystems.
425 	 * They could be signed or unsigned integers.
426 	 * Some systems will set them to 0, others will set them to -1.
427 	 */
428 	if (buf.f_bfree == 0 || buf.f_ffree == 0 ||
429 	    (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1)
430 		return 1;
431 
432 	used = buf.f_blocks - buf.f_bfree;
433 	total = buf.f_bavail + used;
434 	if (total != 0)
435 		used = (float)used / (float)total * 100;
436 	else
437 		used = 100;
438 	if (100 - used < MINSPACE) {
439 		log_warnx("warn: not enough disk space: %llu%% left",
440 		    (unsigned long long) 100 - used);
441 		log_warnx("warn: temporarily rejecting messages");
442 		return 0;
443 	}
444 
445 	used = buf.f_files - buf.f_ffree;
446 	total = buf.f_favail + used;
447 	if (total != 0)
448 		used = (float)used / (float)total * 100;
449 	else
450 		used = 100;
451 	if (100 - used < MININODES) {
452 		log_warnx("warn: not enough inodes: %llu%% left",
453 		    (unsigned long long) 100 - used);
454 		log_warnx("warn: temporarily rejecting messages");
455 		return 0;
456 	}
457 
458 	return 1;
459 }
460 
461 static void
462 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len)
463 {
464 	if (!bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64,
465 		PATH_QUEUE,
466 		(evpid_to_msgid(evpid) & 0xff000000) >> 24,
467 		evpid_to_msgid(evpid),
468 		evpid))
469 		fatalx("fsqueue_envelope_path: path does not fit buffer");
470 }
471 
472 static void
473 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len)
474 {
475 	if (!bsnprintf(buf, len, "%s/%08x/%016" PRIx64,
476 		PATH_INCOMING,
477 		evpid_to_msgid(evpid),
478 		evpid))
479 		fatalx("fsqueue_envelope_incoming_path: path does not fit buffer");
480 }
481 
482 static int
483 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen,
484     int do_atomic, int do_sync)
485 {
486 	const char     *path = do_atomic ? PATH_EVPTMP : dest;
487 	FILE	       *fp = NULL;
488 	int		fd;
489 	size_t		w;
490 
491 	if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) {
492 		log_warn("warn: queue-fs: open");
493 		goto tempfail;
494 	}
495 
496 	if ((fp = fdopen(fd, "w")) == NULL) {
497 		log_warn("warn: queue-fs: fdopen");
498 		goto tempfail;
499 	}
500 
501 	w = fwrite(evpbuf, 1, evplen, fp);
502 	if (w < evplen) {
503 		log_warn("warn: queue-fs: short write");
504 		goto tempfail;
505 	}
506 	if (fflush(fp)) {
507 		log_warn("warn: queue-fs: fflush");
508 		goto tempfail;
509 	}
510 	if (do_sync && fsync(fileno(fp))) {
511 		log_warn("warn: queue-fs: fsync");
512 		goto tempfail;
513 	}
514 	if (fclose(fp) != 0) {
515 		log_warn("warn: queue-fs: fclose");
516 		fp = NULL;
517 		goto tempfail;
518 	}
519 	fp = NULL;
520 	fd = -1;
521 
522 	if (do_atomic && rename(path, dest) == -1) {
523 		log_warn("warn: queue-fs: rename");
524 		goto tempfail;
525 	}
526 	return (1);
527 
528 tempfail:
529 	if (fp)
530 		fclose(fp);
531 	else if (fd != -1)
532 		close(fd);
533 	if (unlink(path) == -1)
534 		log_warn("warn: queue-fs: unlink");
535 	return (0);
536 }
537 
538 static void
539 fsqueue_message_path(uint32_t msgid, char *buf, size_t len)
540 {
541 	if (!bsnprintf(buf, len, "%s/%02x/%08x",
542 		PATH_QUEUE,
543 		(msgid & 0xff000000) >> 24,
544 		msgid))
545 		fatalx("fsqueue_message_path: path does not fit buffer");
546 }
547 
548 static void
549 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
550 {
551 	if (!bsnprintf(buf, len, "%s/%08x",
552 		PATH_INCOMING,
553 		msgid))
554 		fatalx("fsqueue_message_incoming_path: path does not fit buffer");
555 }
556 
557 static void *
558 fsqueue_qwalk_new(void)
559 {
560 	char		 path[PATH_MAX];
561 	char * const	 path_argv[] = { path, NULL };
562 	struct qwalk	*q;
563 
564 	q = xcalloc(1, sizeof(*q));
565 	(void)strlcpy(path, PATH_QUEUE, sizeof(path));
566 	q->fts = fts_open(path_argv,
567 	    FTS_PHYSICAL | FTS_NOCHDIR, NULL);
568 
569 	if (q->fts == NULL)
570 		err(1, "fsqueue_qwalk_new: fts_open: %s", path);
571 
572 	return (q);
573 }
574 
575 static void
576 fsqueue_qwalk_close(void *hdl)
577 {
578 	struct qwalk	*q = hdl;
579 
580 	fts_close(q->fts);
581 
582 	free(q);
583 }
584 
585 static int
586 fsqueue_qwalk(void *hdl, uint64_t *evpid)
587 {
588 	struct qwalk	*q = hdl;
589 	FTSENT		*e;
590 	char		*tmp;
591 
592 	while ((e = fts_read(q->fts)) != NULL) {
593 		switch (e->fts_info) {
594 		case FTS_D:
595 			q->depth += 1;
596 			if (q->depth == 2 && e->fts_namelen != 2) {
597 				log_debug("debug: fsqueue: bogus directory %s",
598 				    e->fts_path);
599 				fts_set(q->fts, e, FTS_SKIP);
600 				break;
601 			}
602 			if (q->depth == 3 && e->fts_namelen != 8) {
603 				log_debug("debug: fsqueue: bogus directory %s",
604 				    e->fts_path);
605 				fts_set(q->fts, e, FTS_SKIP);
606 				break;
607 			}
608 			break;
609 
610 		case FTS_DP:
611 		case FTS_DNR:
612 			q->depth -= 1;
613 			break;
614 
615 		case FTS_F:
616 			if (q->depth != 3)
617 				break;
618 			if (e->fts_namelen != 16)
619 				break;
620 			if (timespeccmp(&e->fts_statp->st_mtim, &startup, >))
621 				break;
622 			tmp = NULL;
623 			*evpid = strtoull(e->fts_name, &tmp, 16);
624 			if (tmp && *tmp !=  '\0') {
625 				log_debug("debug: fsqueue: bogus file %s",
626 				    e->fts_path);
627 				break;
628 			}
629 			return (1);
630 		default:
631 			break;
632 		}
633 	}
634 
635 	return (0);
636 }
637 
638 static int
639 queue_fs_init(struct passwd *pw, int server, const char *conf)
640 {
641 	unsigned int	 n;
642 	char		*paths[] = { PATH_QUEUE, PATH_INCOMING };
643 	char		 path[PATH_MAX];
644 	int		 ret;
645 
646 	/* remove incoming/ if it exists */
647 	if (server)
648 		mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE);
649 
650 	fsqueue_envelope_path(0, path, sizeof(path));
651 
652 	ret = 1;
653 	for (n = 0; n < nitems(paths); n++) {
654 		(void)strlcpy(path, PATH_SPOOL, sizeof(path));
655 		if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path))
656 			errx(1, "path too long %s%s", PATH_SPOOL, paths[n]);
657 		if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0)
658 			ret = 0;
659 	}
660 
661 	if (clock_gettime(CLOCK_REALTIME, &startup))
662 		err(1, "clock_gettime");
663 
664 	tree_init(&evpcount);
665 
666 	queue_api_on_message_create(queue_fs_message_create);
667 	queue_api_on_message_commit(queue_fs_message_commit);
668 	queue_api_on_message_delete(queue_fs_message_delete);
669 	queue_api_on_message_fd_r(queue_fs_message_fd_r);
670 	queue_api_on_envelope_create(queue_fs_envelope_create);
671 	queue_api_on_envelope_delete(queue_fs_envelope_delete);
672 	queue_api_on_envelope_update(queue_fs_envelope_update);
673 	queue_api_on_envelope_load(queue_fs_envelope_load);
674 	queue_api_on_envelope_walk(queue_fs_envelope_walk);
675 	queue_api_on_message_walk(queue_fs_message_walk);
676 
677 	return (ret);
678 }
679 
680 struct queue_backend	queue_backend_fs = {
681 	queue_fs_init,
682 };
683