xref: /openbsd/usr.sbin/smtpd/queue_fs.c (revision 98f67d16)
1 /*	$OpenBSD: queue_fs.c,v 1.6 2014/07/08 15:45:32 eric Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 #include <sys/param.h>
25 #include <sys/mount.h>
26 
27 #include <ctype.h>
28 #include <err.h>
29 #include <errno.h>
30 #include <event.h>
31 #include <fcntl.h>
32 #include <fts.h>
33 #include <imsg.h>
34 #include <inttypes.h>
35 #include <libgen.h>
36 #include <pwd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <time.h>
41 #include <unistd.h>
42 
43 #include "smtpd.h"
44 #include "log.h"
45 
46 #define PATH_QUEUE		"/queue"
47 #define PATH_CORRUPT		"/corrupt"
48 #define PATH_INCOMING		"/incoming"
49 #define PATH_EVPTMP		PATH_INCOMING "/envelope.tmp"
50 #define PATH_MESSAGE		"/message"
51 
52 /* percentage of remaining space / inodes required to accept new messages */
53 #define	MINSPACE		5
54 #define	MININODES		5
55 
56 struct qwalk {
57 	FTS	*fts;
58 	int	 depth;
59 };
60 
61 static int	fsqueue_check_space(void);
62 static void	fsqueue_envelope_path(uint64_t, char *, size_t);
63 static void	fsqueue_envelope_incoming_path(uint64_t, char *, size_t);
64 static int	fsqueue_envelope_dump(char *, const char *, size_t, int, int);
65 static void	fsqueue_message_path(uint32_t, char *, size_t);
66 static void	fsqueue_message_corrupt_path(uint32_t, char *, size_t);
67 static void	fsqueue_message_incoming_path(uint32_t, char *, size_t);
68 static void    *fsqueue_qwalk_new(void);
69 static int	fsqueue_qwalk(void *, uint64_t *);
70 static void	fsqueue_qwalk_close(void *);
71 
72 struct tree evpcount;
73 static struct timespec startup;
74 
75 #define REF	(int*)0xf00
76 
77 static int
78 queue_fs_message_create(uint32_t *msgid)
79 {
80 	char		rootdir[SMTPD_MAXPATHLEN];
81 	struct stat	sb;
82 
83 	if (! fsqueue_check_space())
84 		return 0;
85 
86 again:
87 	*msgid = queue_generate_msgid();
88 
89 	/* prevent possible collision later when moving to Q_QUEUE */
90 	fsqueue_message_path(*msgid, rootdir, sizeof(rootdir));
91 	if (stat(rootdir, &sb) != -1)
92 		goto again;
93 
94 	/* we hit an unexpected error, temporarily fail */
95 	if (errno != ENOENT) {
96 		*msgid = 0;
97 		return 0;
98 	}
99 
100 	fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir));
101 	if (mkdir(rootdir, 0700) == -1) {
102 		if (errno == EEXIST)
103 			goto again;
104 
105 		if (errno == ENOSPC) {
106 			*msgid = 0;
107 			return 0;
108 		}
109 
110 		log_warn("warn: queue-fs: mkdir");
111 		*msgid = 0;
112 		return 0;
113 	}
114 
115 	return (1);
116 }
117 
118 static int
119 queue_fs_message_commit(uint32_t msgid, const char *path)
120 {
121 	char incomingdir[SMTPD_MAXPATHLEN];
122 	char queuedir[SMTPD_MAXPATHLEN];
123 	char msgdir[SMTPD_MAXPATHLEN];
124 	char msgpath[SMTPD_MAXPATHLEN];
125 
126 	/* before-first, move the message content in the incoming directory */
127 	fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath));
128 	if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath))
129 	    >= sizeof(msgpath))
130 		return (0);
131 	if (rename(path, msgpath) == -1)
132 		return (0);
133 
134 	fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir));
135 	fsqueue_message_path(msgid, msgdir, sizeof(msgdir));
136 	if (strlcpy(queuedir, msgdir, sizeof(queuedir))
137 	    >= sizeof(queuedir))
138 		return (0);
139 
140 	/* first attempt to rename */
141 	if (rename(incomingdir, msgdir) == 0)
142 		return 1;
143 	if (errno == ENOSPC)
144 		return 0;
145 	if (errno != ENOENT) {
146 		log_warn("warn: queue-fs: rename");
147 		return 0;
148 	}
149 
150 	/* create the bucket */
151 	*strrchr(queuedir, '/') = '\0';
152 	if (mkdir(queuedir, 0700) == -1) {
153 		if (errno == ENOSPC)
154 			return 0;
155 		if (errno != EEXIST) {
156 			log_warn("warn: queue-fs: mkdir");
157 			return 0;
158 		}
159 	}
160 
161 	/* rename */
162 	if (rename(incomingdir, msgdir) == -1) {
163 		if (errno == ENOSPC)
164 			return 0;
165 		log_warn("warn: queue-fs: rename");
166 		return 0;
167 	}
168 
169 	/* best effort */
170 	sync();
171 
172 	return 1;
173 }
174 
175 static int
176 queue_fs_message_fd_r(uint32_t msgid)
177 {
178 	int fd;
179 	char path[SMTPD_MAXPATHLEN];
180 
181 	fsqueue_message_path(msgid, path, sizeof(path));
182 	if (strlcat(path, PATH_MESSAGE, sizeof(path))
183 	    >= sizeof(path))
184 		return -1;
185 
186 	if ((fd = open(path, O_RDONLY)) == -1) {
187 		log_warn("warn: queue-fs: open");
188 		return -1;
189 	}
190 
191 	return fd;
192 }
193 
194 static int
195 queue_fs_message_delete(uint32_t msgid)
196 {
197 	char		path[SMTPD_MAXPATHLEN];
198 	struct stat	sb;
199 
200 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
201 	if (stat(path, &sb) == -1)
202 		fsqueue_message_path(msgid, path, sizeof(path));
203 
204 	if (rmtree(path, 0) == -1)
205 		log_warn("warn: queue-fs: rmtree");
206 
207 	tree_pop(&evpcount, msgid);
208 
209 	return 1;
210 }
211 
212 static int
213 queue_fs_message_corrupt(uint32_t msgid)
214 {
215 	struct stat sb;
216 	char rootdir[SMTPD_MAXPATHLEN];
217 	char corruptdir[SMTPD_MAXPATHLEN];
218 	char buf[64];
219 	int  retry = 0;
220 
221 	fsqueue_message_path(msgid, rootdir, sizeof(rootdir));
222 	fsqueue_message_corrupt_path(msgid, corruptdir,
223 	    sizeof(corruptdir));
224 
225 again:
226 	if (stat(corruptdir, &sb) != -1 || errno != ENOENT) {
227 		fsqueue_message_corrupt_path(msgid, corruptdir,
228 		    sizeof(corruptdir));
229 		(void)snprintf(buf, sizeof (buf), ".%d", retry++);
230 		(void)strlcat(corruptdir, buf, sizeof(corruptdir));
231 		goto again;
232 	}
233 
234 	if (rename(rootdir, corruptdir) == -1) {
235 		log_warn("warn: queue-fs: rename");
236 		return 0;
237 	}
238 
239 	tree_pop(&evpcount, msgid);
240 
241 	return 1;
242 }
243 
244 static int
245 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len,
246     uint64_t *evpid)
247 {
248 	char		path[SMTPD_MAXPATHLEN];
249 	int		queued = 0, i, r = 0, *n;
250 	struct stat	sb;
251 
252 	if (msgid == 0) {
253 		log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid);
254 		goto done;
255 	}
256 
257 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
258 	if (stat(path, &sb) == -1)
259 		queued = 1;
260 
261 	for (i = 0; i < 20; i ++) {
262 		*evpid = queue_generate_evpid(msgid);
263 		if (queued)
264 			fsqueue_envelope_path(*evpid, path, sizeof(path));
265 		else
266 			fsqueue_envelope_incoming_path(*evpid, path,
267 			    sizeof(path));
268 
269 		r = fsqueue_envelope_dump(path, buf, len, 0, 0);
270 		if (r >= 0)
271 			goto done;
272 	}
273 	r = 0;
274 	log_warnx("warn: queue-fs: could not allocate evpid");
275 
276 done:
277 	if (r) {
278 		n = tree_pop(&evpcount, msgid);
279 		if (n == NULL)
280 			n = REF;
281 		n += 1;
282 		tree_xset(&evpcount, msgid, n);
283 	}
284 	return (r);
285 }
286 
287 static int
288 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len)
289 {
290 	char	 pathname[SMTPD_MAXPATHLEN];
291 	FILE	*fp;
292 	size_t	 r;
293 
294 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
295 
296 	fp = fopen(pathname, "r");
297 	if (fp == NULL) {
298 		if (errno != ENOENT && errno != ENFILE)
299 			log_warn("warn: queue-fs: fopen");
300 		return 0;
301 	}
302 
303 	r = fread(buf, 1, len, fp);
304 	if (r) {
305 		if (r == len) {
306 			log_warn("warn: queue-fs: too large");
307 			r = 0;
308 		}
309 		else
310 			buf[r] = '\0';
311 	}
312 	fclose(fp);
313 
314 	return (r);
315 }
316 
317 static int
318 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len)
319 {
320 	char dest[SMTPD_MAXPATHLEN];
321 
322 	fsqueue_envelope_path(evpid, dest, sizeof(dest));
323 
324 	return (fsqueue_envelope_dump(dest, buf, len, 1, 1));
325 }
326 
327 static int
328 queue_fs_envelope_delete(uint64_t evpid)
329 {
330 	char		pathname[SMTPD_MAXPATHLEN];
331 	uint32_t	msgid;
332 	int		*n;
333 
334 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
335 	if (unlink(pathname) == -1)
336 		if (errno != ENOENT)
337 			return 0;
338 
339 	msgid = evpid_to_msgid(evpid);
340 	n = tree_pop(&evpcount, msgid);
341 	n -= 1;
342 
343 	if (n - REF == 0)
344 		queue_fs_message_delete(msgid);
345 	else
346 		tree_xset(&evpcount, msgid, n);
347 
348 	return (1);
349 }
350 
351 static int
352 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len)
353 {
354 	static int	 done = 0;
355 	static void	*hdl = NULL;
356 	int		 r, *n;
357 	uint32_t	 msgid;
358 
359 	if (done)
360 		return (-1);
361 
362 	if (hdl == NULL)
363 		hdl = fsqueue_qwalk_new();
364 
365 	if (fsqueue_qwalk(hdl, evpid)) {
366 		memset(buf, 0, len);
367 		r = queue_fs_envelope_load(*evpid, buf, len);
368 		if (r) {
369 			msgid = evpid_to_msgid(*evpid);
370 			n = tree_pop(&evpcount, msgid);
371 			if (n == NULL)
372 				n = REF;
373 			n += 1;
374 			tree_xset(&evpcount, msgid, n);
375 		}
376 		return (r);
377 	}
378 
379 	fsqueue_qwalk_close(hdl);
380 	done = 1;
381 	return (-1);
382 }
383 
384 static int
385 fsqueue_check_space(void)
386 {
387 	struct statfs	buf;
388 	uint64_t	used;
389 	uint64_t	total;
390 
391 	if (statfs(PATH_QUEUE, &buf) < 0) {
392 		log_warn("warn: queue-fs: statfs");
393 		return 0;
394 	}
395 
396 	/*
397 	 * f_bfree and f_ffree is not set on all filesystems.
398 	 * They could be signed or unsigned integers.
399 	 * Some systems will set them to 0, others will set them to -1.
400 	 */
401 	if (buf.f_bfree == 0 || buf.f_ffree == 0 ||
402 	    (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1)
403 		return 1;
404 
405 	used = buf.f_blocks - buf.f_bfree;
406 	total = buf.f_bavail + used;
407 	if (total != 0)
408 		used = (float)used / (float)total * 100;
409 	else
410 		used = 100;
411 	if (100 - used < MINSPACE) {
412 		log_warnx("warn: not enough disk space: %llu%% left",
413 		    (unsigned long long) 100 - used);
414 		log_warnx("warn: temporarily rejecting messages");
415 		return 0;
416 	}
417 
418 	used = buf.f_files - buf.f_ffree;
419 	total = buf.f_favail + used;
420 	if (total != 0)
421 		used = (float)used / (float)total * 100;
422 	else
423 		used = 100;
424 	if (100 - used < MININODES) {
425 		log_warnx("warn: not enough inodes: %llu%% left",
426 		    (unsigned long long) 100 - used);
427 		log_warnx("warn: temporarily rejecting messages");
428 		return 0;
429 	}
430 
431 	return 1;
432 }
433 
434 static void
435 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len)
436 {
437 	if (! bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64,
438 		PATH_QUEUE,
439 		(evpid_to_msgid(evpid) & 0xff000000) >> 24,
440 		evpid_to_msgid(evpid),
441 		evpid))
442 		fatalx("fsqueue_envelope_path: path does not fit buffer");
443 }
444 
445 static void
446 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len)
447 {
448 	if (! bsnprintf(buf, len, "%s/%08x/%016" PRIx64,
449 		PATH_INCOMING,
450 		evpid_to_msgid(evpid),
451 		evpid))
452 		fatalx("fsqueue_envelope_incoming_path: path does not fit buffer");
453 }
454 
455 static int
456 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen,
457     int do_atomic, int do_sync)
458 {
459 	const char     *path = do_atomic ? PATH_EVPTMP : dest;
460 	FILE	       *fp = NULL;
461 	int		fd;
462 	size_t		w;
463 
464 	if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) {
465 		log_warn("warn: queue-fs: open");
466 		goto tempfail;
467 	}
468 
469 	if ((fp = fdopen(fd, "w")) == NULL) {
470 		log_warn("warn: queue-fs: fdopen");
471 		goto tempfail;
472 	}
473 
474 	w = fwrite(evpbuf, 1, evplen, fp);
475 	if (w < evplen) {
476 		log_warn("warn: queue-fs: short write");
477 		goto tempfail;
478 	}
479 	if (fflush(fp)) {
480 		log_warn("warn: queue-fs: fflush");
481 		goto tempfail;
482 	}
483 	if (do_sync && fsync(fileno(fp))) {
484 		log_warn("warn: queue-fs: fsync");
485 		goto tempfail;
486 	}
487 	if (fclose(fp) != 0) {
488 		log_warn("warn: queue-fs: fclose");
489 		fp = NULL;
490 		goto tempfail;
491 	}
492 	fp = NULL;
493 	fd = -1;
494 
495 	if (do_atomic && rename(path, dest) == -1) {
496 		log_warn("warn: queue-fs: rename");
497 		goto tempfail;
498 	}
499 	return (1);
500 
501 tempfail:
502 	if (fp)
503 		fclose(fp);
504 	else if (fd != -1)
505 		close(fd);
506 	if (unlink(path) == -1)
507 		log_warn("warn: queue-fs: unlink");
508 	return (0);
509 }
510 
511 static void
512 fsqueue_message_path(uint32_t msgid, char *buf, size_t len)
513 {
514 	if (! bsnprintf(buf, len, "%s/%02x/%08x",
515 		PATH_QUEUE,
516 		(msgid & 0xff000000) >> 24,
517 		msgid))
518 		fatalx("fsqueue_message_path: path does not fit buffer");
519 }
520 
521 static void
522 fsqueue_message_corrupt_path(uint32_t msgid, char *buf, size_t len)
523 {
524 	if (! bsnprintf(buf, len, "%s/%08x",
525 		PATH_CORRUPT,
526 		msgid))
527 		fatalx("fsqueue_message_corrupt_path: path does not fit buffer");
528 }
529 
530 static void
531 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
532 {
533 	if (! bsnprintf(buf, len, "%s/%08x",
534 		PATH_INCOMING,
535 		msgid))
536 		fatalx("fsqueue_message_incoming_path: path does not fit buffer");
537 }
538 
539 static void *
540 fsqueue_qwalk_new(void)
541 {
542 	char		 path[SMTPD_MAXPATHLEN];
543 	char * const	 path_argv[] = { path, NULL };
544 	struct qwalk	*q;
545 
546 	q = xcalloc(1, sizeof(*q), "fsqueue_qwalk_new");
547 	(void)strlcpy(path, PATH_QUEUE, sizeof(path));
548 	q->fts = fts_open(path_argv,
549 	    FTS_PHYSICAL | FTS_NOCHDIR, NULL);
550 
551 	if (q->fts == NULL)
552 		err(1, "fsqueue_qwalk_new: fts_open: %s", path);
553 
554 	return (q);
555 }
556 
557 static void
558 fsqueue_qwalk_close(void *hdl)
559 {
560 	struct qwalk	*q = hdl;
561 
562 	fts_close(q->fts);
563 
564 	free(q);
565 }
566 
567 static int
568 fsqueue_qwalk(void *hdl, uint64_t *evpid)
569 {
570 	struct qwalk	*q = hdl;
571 	FTSENT		*e;
572 	char		*tmp;
573 
574 	while ((e = fts_read(q->fts)) != NULL) {
575 		switch (e->fts_info) {
576 		case FTS_D:
577 			q->depth += 1;
578 			if (q->depth == 2 && e->fts_namelen != 2) {
579 				log_debug("debug: fsqueue: bogus directory %s",
580 				    e->fts_path);
581 				fts_set(q->fts, e, FTS_SKIP);
582 				break;
583 			}
584 			if (q->depth == 3 && e->fts_namelen != 8) {
585 				log_debug("debug: fsqueue: bogus directory %s",
586 				    e->fts_path);
587 				fts_set(q->fts, e, FTS_SKIP);
588 				break;
589 			}
590 			break;
591 
592 		case FTS_DP:
593 		case FTS_DNR:
594 			q->depth -= 1;
595 			break;
596 
597 		case FTS_F:
598 			if (q->depth != 3)
599 				break;
600 			if (e->fts_namelen != 16)
601 				break;
602 			if (timespeccmp(&e->fts_statp->st_mtim, &startup, >))
603 				break;
604 			tmp = NULL;
605 			*evpid = strtoull(e->fts_name, &tmp, 16);
606 			if (tmp && *tmp !=  '\0') {
607 				log_debug("debug: fsqueue: bogus file %s",
608 				    e->fts_path);
609 				break;
610 			}
611 			return (1);
612 		default:
613 			break;
614 		}
615 	}
616 
617 	return (0);
618 }
619 
620 static int
621 queue_fs_init(struct passwd *pw, int server, const char *conf)
622 {
623 	unsigned int	 n;
624 	char		*paths[] = { PATH_QUEUE, PATH_CORRUPT, PATH_INCOMING };
625 	char		 path[SMTPD_MAXPATHLEN];
626 	int		 ret;
627 	struct timeval	 tv;
628 
629 	/* remove incoming/ if it exists */
630 	if (server)
631 		mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE);
632 
633 	fsqueue_envelope_path(0, path, sizeof(path));
634 
635 	ret = 1;
636 	for (n = 0; n < nitems(paths); n++) {
637 		(void)strlcpy(path, PATH_SPOOL, sizeof(path));
638 		if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path))
639 			errx(1, "path too long %s%s", PATH_SPOOL, paths[n]);
640 		if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0)
641 			ret = 0;
642 	}
643 
644 	if (gettimeofday(&tv, NULL) == -1)
645 		err(1, "gettimeofday");
646 	TIMEVAL_TO_TIMESPEC(&tv, &startup);
647 
648 	tree_init(&evpcount);
649 
650 	queue_api_on_message_create(queue_fs_message_create);
651 	queue_api_on_message_commit(queue_fs_message_commit);
652 	queue_api_on_message_delete(queue_fs_message_delete);
653 	queue_api_on_message_fd_r(queue_fs_message_fd_r);
654 	queue_api_on_message_corrupt(queue_fs_message_corrupt);
655 	queue_api_on_envelope_create(queue_fs_envelope_create);
656 	queue_api_on_envelope_delete(queue_fs_envelope_delete);
657 	queue_api_on_envelope_update(queue_fs_envelope_update);
658 	queue_api_on_envelope_load(queue_fs_envelope_load);
659 	queue_api_on_envelope_walk(queue_fs_envelope_walk);
660 
661 	return (ret);
662 }
663 
664 struct queue_backend	queue_backend_fs = {
665 	queue_fs_init,
666 };
667