xref: /openbsd/usr.sbin/smtpd/queue_fs.c (revision df67ff3e)
1 /*	$OpenBSD: queue_fs.c,v 1.9 2015/10/12 22:29:49 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/mount.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/socket.h>
24 #include <sys/stat.h>
25 
26 #include <ctype.h>
27 #include <err.h>
28 #include <errno.h>
29 #include <event.h>
30 #include <fcntl.h>
31 #include <fts.h>
32 #include <imsg.h>
33 #include <inttypes.h>
34 #include <libgen.h>
35 #include <pwd.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <time.h>
40 #include <unistd.h>
41 
42 #include "smtpd.h"
43 #include "log.h"
44 
45 #define PATH_QUEUE		"/queue"
46 #define PATH_CORRUPT		"/corrupt"
47 #define PATH_INCOMING		"/incoming"
48 #define PATH_EVPTMP		PATH_INCOMING "/envelope.tmp"
49 #define PATH_MESSAGE		"/message"
50 
51 /* percentage of remaining space / inodes required to accept new messages */
52 #define	MINSPACE		5
53 #define	MININODES		5
54 
55 struct qwalk {
56 	FTS	*fts;
57 	int	 depth;
58 };
59 
60 static int	fsqueue_check_space(void);
61 static void	fsqueue_envelope_path(uint64_t, char *, size_t);
62 static void	fsqueue_envelope_incoming_path(uint64_t, char *, size_t);
63 static int	fsqueue_envelope_dump(char *, const char *, size_t, int, int);
64 static void	fsqueue_message_path(uint32_t, char *, size_t);
65 static void	fsqueue_message_corrupt_path(uint32_t, char *, size_t);
66 static void	fsqueue_message_incoming_path(uint32_t, char *, size_t);
67 static void    *fsqueue_qwalk_new(void);
68 static int	fsqueue_qwalk(void *, uint64_t *);
69 static void	fsqueue_qwalk_close(void *);
70 
71 struct tree evpcount;
72 static struct timespec startup;
73 
74 #define REF	(int*)0xf00
75 
76 static int
77 queue_fs_message_create(uint32_t *msgid)
78 {
79 	char		rootdir[PATH_MAX];
80 	struct stat	sb;
81 
82 	if (! fsqueue_check_space())
83 		return 0;
84 
85 again:
86 	*msgid = queue_generate_msgid();
87 
88 	/* prevent possible collision later when moving to Q_QUEUE */
89 	fsqueue_message_path(*msgid, rootdir, sizeof(rootdir));
90 	if (stat(rootdir, &sb) != -1)
91 		goto again;
92 
93 	/* we hit an unexpected error, temporarily fail */
94 	if (errno != ENOENT) {
95 		*msgid = 0;
96 		return 0;
97 	}
98 
99 	fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir));
100 	if (mkdir(rootdir, 0700) == -1) {
101 		if (errno == EEXIST)
102 			goto again;
103 
104 		if (errno == ENOSPC) {
105 			*msgid = 0;
106 			return 0;
107 		}
108 
109 		log_warn("warn: queue-fs: mkdir");
110 		*msgid = 0;
111 		return 0;
112 	}
113 
114 	return (1);
115 }
116 
117 static int
118 queue_fs_message_commit(uint32_t msgid, const char *path)
119 {
120 	char incomingdir[PATH_MAX];
121 	char queuedir[PATH_MAX];
122 	char msgdir[PATH_MAX];
123 	char msgpath[PATH_MAX];
124 
125 	/* before-first, move the message content in the incoming directory */
126 	fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath));
127 	if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath))
128 	    >= sizeof(msgpath))
129 		return (0);
130 	if (rename(path, msgpath) == -1)
131 		return (0);
132 
133 	fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir));
134 	fsqueue_message_path(msgid, msgdir, sizeof(msgdir));
135 	if (strlcpy(queuedir, msgdir, sizeof(queuedir))
136 	    >= sizeof(queuedir))
137 		return (0);
138 
139 	/* first attempt to rename */
140 	if (rename(incomingdir, msgdir) == 0)
141 		return 1;
142 	if (errno == ENOSPC)
143 		return 0;
144 	if (errno != ENOENT) {
145 		log_warn("warn: queue-fs: rename");
146 		return 0;
147 	}
148 
149 	/* create the bucket */
150 	*strrchr(queuedir, '/') = '\0';
151 	if (mkdir(queuedir, 0700) == -1) {
152 		if (errno == ENOSPC)
153 			return 0;
154 		if (errno != EEXIST) {
155 			log_warn("warn: queue-fs: mkdir");
156 			return 0;
157 		}
158 	}
159 
160 	/* rename */
161 	if (rename(incomingdir, msgdir) == -1) {
162 		if (errno == ENOSPC)
163 			return 0;
164 		log_warn("warn: queue-fs: rename");
165 		return 0;
166 	}
167 
168 	return 1;
169 }
170 
171 static int
172 queue_fs_message_fd_r(uint32_t msgid)
173 {
174 	int fd;
175 	char path[PATH_MAX];
176 
177 	fsqueue_message_path(msgid, path, sizeof(path));
178 	if (strlcat(path, PATH_MESSAGE, sizeof(path))
179 	    >= sizeof(path))
180 		return -1;
181 
182 	if ((fd = open(path, O_RDONLY)) == -1) {
183 		log_warn("warn: queue-fs: open");
184 		return -1;
185 	}
186 
187 	return fd;
188 }
189 
190 static int
191 queue_fs_message_delete(uint32_t msgid)
192 {
193 	char		path[PATH_MAX];
194 	struct stat	sb;
195 
196 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
197 	if (stat(path, &sb) == -1)
198 		fsqueue_message_path(msgid, path, sizeof(path));
199 
200 	if (rmtree(path, 0) == -1)
201 		log_warn("warn: queue-fs: rmtree");
202 
203 	tree_pop(&evpcount, msgid);
204 
205 	return 1;
206 }
207 
208 static int
209 queue_fs_message_corrupt(uint32_t msgid)
210 {
211 	struct stat sb;
212 	char rootdir[PATH_MAX];
213 	char corruptdir[PATH_MAX];
214 	char buf[64];
215 	int  retry = 0;
216 
217 	fsqueue_message_path(msgid, rootdir, sizeof(rootdir));
218 	fsqueue_message_corrupt_path(msgid, corruptdir,
219 	    sizeof(corruptdir));
220 
221 again:
222 	if (stat(corruptdir, &sb) != -1 || errno != ENOENT) {
223 		fsqueue_message_corrupt_path(msgid, corruptdir,
224 		    sizeof(corruptdir));
225 		(void)snprintf(buf, sizeof (buf), ".%d", retry++);
226 		(void)strlcat(corruptdir, buf, sizeof(corruptdir));
227 		goto again;
228 	}
229 
230 	if (rename(rootdir, corruptdir) == -1) {
231 		log_warn("warn: queue-fs: rename");
232 		return 0;
233 	}
234 
235 	tree_pop(&evpcount, msgid);
236 
237 	return 1;
238 }
239 
240 static int
241 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len,
242     uint64_t *evpid)
243 {
244 	char		path[PATH_MAX];
245 	int		queued = 0, i, r = 0, *n;
246 	struct stat	sb;
247 
248 	if (msgid == 0) {
249 		log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid);
250 		goto done;
251 	}
252 
253 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
254 	if (stat(path, &sb) == -1)
255 		queued = 1;
256 
257 	for (i = 0; i < 20; i ++) {
258 		*evpid = queue_generate_evpid(msgid);
259 		if (queued)
260 			fsqueue_envelope_path(*evpid, path, sizeof(path));
261 		else
262 			fsqueue_envelope_incoming_path(*evpid, path,
263 			    sizeof(path));
264 
265 		r = fsqueue_envelope_dump(path, buf, len, 0, 0);
266 		if (r >= 0)
267 			goto done;
268 	}
269 	r = 0;
270 	log_warnx("warn: queue-fs: could not allocate evpid");
271 
272 done:
273 	if (r) {
274 		n = tree_pop(&evpcount, msgid);
275 		if (n == NULL)
276 			n = REF;
277 		n += 1;
278 		tree_xset(&evpcount, msgid, n);
279 	}
280 	return (r);
281 }
282 
283 static int
284 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len)
285 {
286 	char	 pathname[PATH_MAX];
287 	FILE	*fp;
288 	size_t	 r;
289 
290 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
291 
292 	fp = fopen(pathname, "r");
293 	if (fp == NULL) {
294 		if (errno != ENOENT && errno != ENFILE)
295 			log_warn("warn: queue-fs: fopen");
296 		return 0;
297 	}
298 
299 	r = fread(buf, 1, len, fp);
300 	if (r) {
301 		if (r == len) {
302 			log_warn("warn: queue-fs: too large");
303 			r = 0;
304 		}
305 		else
306 			buf[r] = '\0';
307 	}
308 	fclose(fp);
309 
310 	return (r);
311 }
312 
313 static int
314 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len)
315 {
316 	char dest[PATH_MAX];
317 
318 	fsqueue_envelope_path(evpid, dest, sizeof(dest));
319 
320 	return (fsqueue_envelope_dump(dest, buf, len, 1, 1));
321 }
322 
323 static int
324 queue_fs_envelope_delete(uint64_t evpid)
325 {
326 	char		pathname[PATH_MAX];
327 	uint32_t	msgid;
328 	int		*n;
329 
330 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
331 	if (unlink(pathname) == -1)
332 		if (errno != ENOENT)
333 			return 0;
334 
335 	msgid = evpid_to_msgid(evpid);
336 	n = tree_pop(&evpcount, msgid);
337 	n -= 1;
338 
339 	if (n - REF == 0)
340 		queue_fs_message_delete(msgid);
341 	else
342 		tree_xset(&evpcount, msgid, n);
343 
344 	return (1);
345 }
346 
347 static int
348 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len)
349 {
350 	static int	 done = 0;
351 	static void	*hdl = NULL;
352 	int		 r, *n;
353 	uint32_t	 msgid;
354 
355 	if (done)
356 		return (-1);
357 
358 	if (hdl == NULL)
359 		hdl = fsqueue_qwalk_new();
360 
361 	if (fsqueue_qwalk(hdl, evpid)) {
362 		memset(buf, 0, len);
363 		r = queue_fs_envelope_load(*evpid, buf, len);
364 		if (r) {
365 			msgid = evpid_to_msgid(*evpid);
366 			n = tree_pop(&evpcount, msgid);
367 			if (n == NULL)
368 				n = REF;
369 			n += 1;
370 			tree_xset(&evpcount, msgid, n);
371 		}
372 		return (r);
373 	}
374 
375 	fsqueue_qwalk_close(hdl);
376 	done = 1;
377 	return (-1);
378 }
379 
380 static int
381 fsqueue_check_space(void)
382 {
383 	struct statfs	buf;
384 	uint64_t	used;
385 	uint64_t	total;
386 
387 	if (statfs(PATH_QUEUE, &buf) < 0) {
388 		log_warn("warn: queue-fs: statfs");
389 		return 0;
390 	}
391 
392 	/*
393 	 * f_bfree and f_ffree is not set on all filesystems.
394 	 * They could be signed or unsigned integers.
395 	 * Some systems will set them to 0, others will set them to -1.
396 	 */
397 	if (buf.f_bfree == 0 || buf.f_ffree == 0 ||
398 	    (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1)
399 		return 1;
400 
401 	used = buf.f_blocks - buf.f_bfree;
402 	total = buf.f_bavail + used;
403 	if (total != 0)
404 		used = (float)used / (float)total * 100;
405 	else
406 		used = 100;
407 	if (100 - used < MINSPACE) {
408 		log_warnx("warn: not enough disk space: %llu%% left",
409 		    (unsigned long long) 100 - used);
410 		log_warnx("warn: temporarily rejecting messages");
411 		return 0;
412 	}
413 
414 	used = buf.f_files - buf.f_ffree;
415 	total = buf.f_favail + used;
416 	if (total != 0)
417 		used = (float)used / (float)total * 100;
418 	else
419 		used = 100;
420 	if (100 - used < MININODES) {
421 		log_warnx("warn: not enough inodes: %llu%% left",
422 		    (unsigned long long) 100 - used);
423 		log_warnx("warn: temporarily rejecting messages");
424 		return 0;
425 	}
426 
427 	return 1;
428 }
429 
430 static void
431 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len)
432 {
433 	if (! bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64,
434 		PATH_QUEUE,
435 		(evpid_to_msgid(evpid) & 0xff000000) >> 24,
436 		evpid_to_msgid(evpid),
437 		evpid))
438 		fatalx("fsqueue_envelope_path: path does not fit buffer");
439 }
440 
441 static void
442 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len)
443 {
444 	if (! bsnprintf(buf, len, "%s/%08x/%016" PRIx64,
445 		PATH_INCOMING,
446 		evpid_to_msgid(evpid),
447 		evpid))
448 		fatalx("fsqueue_envelope_incoming_path: path does not fit buffer");
449 }
450 
451 static int
452 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen,
453     int do_atomic, int do_sync)
454 {
455 	const char     *path = do_atomic ? PATH_EVPTMP : dest;
456 	FILE	       *fp = NULL;
457 	int		fd;
458 	size_t		w;
459 
460 	if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) {
461 		log_warn("warn: queue-fs: open");
462 		goto tempfail;
463 	}
464 
465 	if ((fp = fdopen(fd, "w")) == NULL) {
466 		log_warn("warn: queue-fs: fdopen");
467 		goto tempfail;
468 	}
469 
470 	w = fwrite(evpbuf, 1, evplen, fp);
471 	if (w < evplen) {
472 		log_warn("warn: queue-fs: short write");
473 		goto tempfail;
474 	}
475 	if (fflush(fp)) {
476 		log_warn("warn: queue-fs: fflush");
477 		goto tempfail;
478 	}
479 	if (do_sync && fsync(fileno(fp))) {
480 		log_warn("warn: queue-fs: fsync");
481 		goto tempfail;
482 	}
483 	if (fclose(fp) != 0) {
484 		log_warn("warn: queue-fs: fclose");
485 		fp = NULL;
486 		goto tempfail;
487 	}
488 	fp = NULL;
489 	fd = -1;
490 
491 	if (do_atomic && rename(path, dest) == -1) {
492 		log_warn("warn: queue-fs: rename");
493 		goto tempfail;
494 	}
495 	return (1);
496 
497 tempfail:
498 	if (fp)
499 		fclose(fp);
500 	else if (fd != -1)
501 		close(fd);
502 	if (unlink(path) == -1)
503 		log_warn("warn: queue-fs: unlink");
504 	return (0);
505 }
506 
507 static void
508 fsqueue_message_path(uint32_t msgid, char *buf, size_t len)
509 {
510 	if (! bsnprintf(buf, len, "%s/%02x/%08x",
511 		PATH_QUEUE,
512 		(msgid & 0xff000000) >> 24,
513 		msgid))
514 		fatalx("fsqueue_message_path: path does not fit buffer");
515 }
516 
517 static void
518 fsqueue_message_corrupt_path(uint32_t msgid, char *buf, size_t len)
519 {
520 	if (! bsnprintf(buf, len, "%s/%08x",
521 		PATH_CORRUPT,
522 		msgid))
523 		fatalx("fsqueue_message_corrupt_path: path does not fit buffer");
524 }
525 
526 static void
527 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
528 {
529 	if (! bsnprintf(buf, len, "%s/%08x",
530 		PATH_INCOMING,
531 		msgid))
532 		fatalx("fsqueue_message_incoming_path: path does not fit buffer");
533 }
534 
535 static void *
536 fsqueue_qwalk_new(void)
537 {
538 	char		 path[PATH_MAX];
539 	char * const	 path_argv[] = { path, NULL };
540 	struct qwalk	*q;
541 
542 	q = xcalloc(1, sizeof(*q), "fsqueue_qwalk_new");
543 	(void)strlcpy(path, PATH_QUEUE, sizeof(path));
544 	q->fts = fts_open(path_argv,
545 	    FTS_PHYSICAL | FTS_NOCHDIR, NULL);
546 
547 	if (q->fts == NULL)
548 		err(1, "fsqueue_qwalk_new: fts_open: %s", path);
549 
550 	return (q);
551 }
552 
553 static void
554 fsqueue_qwalk_close(void *hdl)
555 {
556 	struct qwalk	*q = hdl;
557 
558 	fts_close(q->fts);
559 
560 	free(q);
561 }
562 
563 static int
564 fsqueue_qwalk(void *hdl, uint64_t *evpid)
565 {
566 	struct qwalk	*q = hdl;
567 	FTSENT		*e;
568 	char		*tmp;
569 
570 	while ((e = fts_read(q->fts)) != NULL) {
571 		switch (e->fts_info) {
572 		case FTS_D:
573 			q->depth += 1;
574 			if (q->depth == 2 && e->fts_namelen != 2) {
575 				log_debug("debug: fsqueue: bogus directory %s",
576 				    e->fts_path);
577 				fts_set(q->fts, e, FTS_SKIP);
578 				break;
579 			}
580 			if (q->depth == 3 && e->fts_namelen != 8) {
581 				log_debug("debug: fsqueue: bogus directory %s",
582 				    e->fts_path);
583 				fts_set(q->fts, e, FTS_SKIP);
584 				break;
585 			}
586 			break;
587 
588 		case FTS_DP:
589 		case FTS_DNR:
590 			q->depth -= 1;
591 			break;
592 
593 		case FTS_F:
594 			if (q->depth != 3)
595 				break;
596 			if (e->fts_namelen != 16)
597 				break;
598 			if (timespeccmp(&e->fts_statp->st_mtim, &startup, >))
599 				break;
600 			tmp = NULL;
601 			*evpid = strtoull(e->fts_name, &tmp, 16);
602 			if (tmp && *tmp !=  '\0') {
603 				log_debug("debug: fsqueue: bogus file %s",
604 				    e->fts_path);
605 				break;
606 			}
607 			return (1);
608 		default:
609 			break;
610 		}
611 	}
612 
613 	return (0);
614 }
615 
616 static int
617 queue_fs_init(struct passwd *pw, int server, const char *conf)
618 {
619 	unsigned int	 n;
620 	char		*paths[] = { PATH_QUEUE, PATH_CORRUPT, PATH_INCOMING };
621 	char		 path[PATH_MAX];
622 	int		 ret;
623 	struct timeval	 tv;
624 
625 	/* remove incoming/ if it exists */
626 	if (server)
627 		mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE);
628 
629 	fsqueue_envelope_path(0, path, sizeof(path));
630 
631 	ret = 1;
632 	for (n = 0; n < nitems(paths); n++) {
633 		(void)strlcpy(path, PATH_SPOOL, sizeof(path));
634 		if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path))
635 			errx(1, "path too long %s%s", PATH_SPOOL, paths[n]);
636 		if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0)
637 			ret = 0;
638 	}
639 
640 	if (gettimeofday(&tv, NULL) == -1)
641 		err(1, "gettimeofday");
642 	TIMEVAL_TO_TIMESPEC(&tv, &startup);
643 
644 	tree_init(&evpcount);
645 
646 	queue_api_on_message_create(queue_fs_message_create);
647 	queue_api_on_message_commit(queue_fs_message_commit);
648 	queue_api_on_message_delete(queue_fs_message_delete);
649 	queue_api_on_message_fd_r(queue_fs_message_fd_r);
650 	queue_api_on_message_corrupt(queue_fs_message_corrupt);
651 	queue_api_on_envelope_create(queue_fs_envelope_create);
652 	queue_api_on_envelope_delete(queue_fs_envelope_delete);
653 	queue_api_on_envelope_update(queue_fs_envelope_update);
654 	queue_api_on_envelope_load(queue_fs_envelope_load);
655 	queue_api_on_envelope_walk(queue_fs_envelope_walk);
656 
657 	return (ret);
658 }
659 
660 struct queue_backend	queue_backend_fs = {
661 	queue_fs_init,
662 };
663