xref: /openbsd/usr.sbin/smtpd/queue_fs.c (revision b5be0396)
1 /*	$OpenBSD: queue_fs.c,v 1.5 2014/04/19 13:48:57 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 #include <sys/param.h>
25 #include <sys/mount.h>
26 
27 #include <ctype.h>
28 #include <err.h>
29 #include <errno.h>
30 #include <event.h>
31 #include <fcntl.h>
32 #include <fts.h>
33 #include <imsg.h>
34 #include <inttypes.h>
35 #include <libgen.h>
36 #include <pwd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <time.h>
41 #include <unistd.h>
42 
43 #include "smtpd.h"
44 #include "log.h"
45 
46 #define PATH_QUEUE		"/queue"
47 #define PATH_CORRUPT		"/corrupt"
48 #define PATH_INCOMING		"/incoming"
49 #define PATH_EVPTMP		PATH_INCOMING "/envelope.tmp"
50 #define PATH_MESSAGE		"/message"
51 
52 /* percentage of remaining space / inodes required to accept new messages */
53 #define	MINSPACE		5
54 #define	MININODES		5
55 
56 struct qwalk {
57 	FTS	*fts;
58 	int	 depth;
59 };
60 
61 static int	fsqueue_check_space(void);
62 static void	fsqueue_envelope_path(uint64_t, char *, size_t);
63 static void	fsqueue_envelope_incoming_path(uint64_t, char *, size_t);
64 static int	fsqueue_envelope_dump(char *, const char *, size_t, int, int);
65 static void	fsqueue_message_path(uint32_t, char *, size_t);
66 static void	fsqueue_message_corrupt_path(uint32_t, char *, size_t);
67 static void	fsqueue_message_incoming_path(uint32_t, char *, size_t);
68 static void    *fsqueue_qwalk_new(void);
69 static int	fsqueue_qwalk(void *, uint64_t *);
70 static void	fsqueue_qwalk_close(void *);
71 
72 struct tree evpcount;
73 static struct timespec startup;
74 
75 #define REF	(int*)0xf00
76 
77 static int
78 queue_fs_message_create(uint32_t *msgid)
79 {
80 	char		rootdir[SMTPD_MAXPATHLEN];
81 	struct stat	sb;
82 
83 	if (! fsqueue_check_space())
84 		return 0;
85 
86 again:
87 	*msgid = queue_generate_msgid();
88 
89 	/* prevent possible collision later when moving to Q_QUEUE */
90 	fsqueue_message_path(*msgid, rootdir, sizeof(rootdir));
91 	if (stat(rootdir, &sb) != -1)
92 		goto again;
93 
94 	/* we hit an unexpected error, temporarily fail */
95 	if (errno != ENOENT) {
96 		*msgid = 0;
97 		return 0;
98 	}
99 
100 	fsqueue_message_incoming_path(*msgid, rootdir, sizeof(rootdir));
101 	if (mkdir(rootdir, 0700) == -1) {
102 		if (errno == EEXIST)
103 			goto again;
104 
105 		if (errno == ENOSPC) {
106 			*msgid = 0;
107 			return 0;
108 		}
109 
110 		log_warn("warn: queue-fs: mkdir");
111 		*msgid = 0;
112 		return 0;
113 	}
114 
115 	return (1);
116 }
117 
118 static int
119 queue_fs_message_commit(uint32_t msgid, const char *path)
120 {
121 	char incomingdir[SMTPD_MAXPATHLEN];
122 	char queuedir[SMTPD_MAXPATHLEN];
123 	char msgdir[SMTPD_MAXPATHLEN];
124 	char msgpath[SMTPD_MAXPATHLEN];
125 
126 	/* before-first, move the message content in the incoming directory */
127 	fsqueue_message_incoming_path(msgid, msgpath, sizeof(msgpath));
128 	if (strlcat(msgpath, PATH_MESSAGE, sizeof(msgpath))
129 	    >= sizeof(msgpath))
130 		return (0);
131 	if (rename(path, msgpath) == -1)
132 		return (0);
133 
134 	fsqueue_message_incoming_path(msgid, incomingdir, sizeof(incomingdir));
135 	fsqueue_message_path(msgid, msgdir, sizeof(msgdir));
136 	if (strlcpy(queuedir, msgdir, sizeof(queuedir))
137 	    >= sizeof(queuedir))
138 		return (0);
139 
140 	/* first attempt to rename */
141 	if (rename(incomingdir, msgdir) == 0)
142 		return 1;
143 	if (errno == ENOSPC)
144 		return 0;
145 	if (errno != ENOENT) {
146 		log_warn("warn: queue-fs: rename");
147 		return 0;
148 	}
149 
150 	/* create the bucket */
151 	*strrchr(queuedir, '/') = '\0';
152 	if (mkdir(queuedir, 0700) == -1) {
153 		if (errno == ENOSPC)
154 			return 0;
155 		if (errno != EEXIST) {
156 			log_warn("warn: queue-fs: mkdir");
157 			return 0;
158 		}
159 	}
160 
161 	/* rename */
162 	if (rename(incomingdir, msgdir) == -1) {
163 		if (errno == ENOSPC)
164 			return 0;
165 		log_warn("warn: queue-fs: rename");
166 		return 0;
167 	}
168 
169 	return 1;
170 }
171 
172 static int
173 queue_fs_message_fd_r(uint32_t msgid)
174 {
175 	int fd;
176 	char path[SMTPD_MAXPATHLEN];
177 
178 	fsqueue_message_path(msgid, path, sizeof(path));
179 	if (strlcat(path, PATH_MESSAGE, sizeof(path))
180 	    >= sizeof(path))
181 		return -1;
182 
183 	if ((fd = open(path, O_RDONLY)) == -1) {
184 		log_warn("warn: queue-fs: open");
185 		return -1;
186 	}
187 
188 	return fd;
189 }
190 
191 static int
192 queue_fs_message_delete(uint32_t msgid)
193 {
194 	char		path[SMTPD_MAXPATHLEN];
195 	struct stat	sb;
196 
197 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
198 	if (stat(path, &sb) == -1)
199 		fsqueue_message_path(msgid, path, sizeof(path));
200 
201 	if (rmtree(path, 0) == -1)
202 		log_warn("warn: queue-fs: rmtree");
203 
204 	tree_pop(&evpcount, msgid);
205 
206 	return 1;
207 }
208 
209 static int
210 queue_fs_message_corrupt(uint32_t msgid)
211 {
212 	struct stat sb;
213 	char rootdir[SMTPD_MAXPATHLEN];
214 	char corruptdir[SMTPD_MAXPATHLEN];
215 	char buf[64];
216 	int  retry = 0;
217 
218 	fsqueue_message_path(msgid, rootdir, sizeof(rootdir));
219 	fsqueue_message_corrupt_path(msgid, corruptdir,
220 	    sizeof(corruptdir));
221 
222 again:
223 	if (stat(corruptdir, &sb) != -1 || errno != ENOENT) {
224 		fsqueue_message_corrupt_path(msgid, corruptdir,
225 		    sizeof(corruptdir));
226 		(void)snprintf(buf, sizeof (buf), ".%d", retry++);
227 		(void)strlcat(corruptdir, buf, sizeof(corruptdir));
228 		goto again;
229 	}
230 
231 	if (rename(rootdir, corruptdir) == -1) {
232 		log_warn("warn: queue-fs: rename");
233 		return 0;
234 	}
235 
236 	tree_pop(&evpcount, msgid);
237 
238 	return 1;
239 }
240 
241 static int
242 queue_fs_envelope_create(uint32_t msgid, const char *buf, size_t len,
243     uint64_t *evpid)
244 {
245 	char		path[SMTPD_MAXPATHLEN];
246 	int		queued = 0, i, r = 0, *n;
247 	struct stat	sb;
248 
249 	if (msgid == 0) {
250 		log_warnx("warn: queue-fs: msgid=0, evpid=%016"PRIx64, *evpid);
251 		goto done;
252 	}
253 
254 	fsqueue_message_incoming_path(msgid, path, sizeof(path));
255 	if (stat(path, &sb) == -1)
256 		queued = 1;
257 
258 	for (i = 0; i < 20; i ++) {
259 		*evpid = queue_generate_evpid(msgid);
260 		if (queued)
261 			fsqueue_envelope_path(*evpid, path, sizeof(path));
262 		else
263 			fsqueue_envelope_incoming_path(*evpid, path,
264 			    sizeof(path));
265 
266 		r = fsqueue_envelope_dump(path, buf, len, 0, 1);
267 		if (r >= 0)
268 			goto done;
269 	}
270 	r = 0;
271 	log_warnx("warn: queue-fs: could not allocate evpid");
272 
273 done:
274 	if (r) {
275 		n = tree_pop(&evpcount, msgid);
276 		if (n == NULL)
277 			n = REF;
278 		n += 1;
279 		tree_xset(&evpcount, msgid, n);
280 	}
281 	return (r);
282 }
283 
284 static int
285 queue_fs_envelope_load(uint64_t evpid, char *buf, size_t len)
286 {
287 	char	 pathname[SMTPD_MAXPATHLEN];
288 	FILE	*fp;
289 	size_t	 r;
290 
291 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
292 
293 	fp = fopen(pathname, "r");
294 	if (fp == NULL) {
295 		if (errno != ENOENT && errno != ENFILE)
296 			log_warn("warn: queue-fs: fopen");
297 		return 0;
298 	}
299 
300 	r = fread(buf, 1, len, fp);
301 	if (r) {
302 		if (r == len) {
303 			log_warn("warn: queue-fs: too large");
304 			r = 0;
305 		}
306 		else
307 			buf[r] = '\0';
308 	}
309 	fclose(fp);
310 
311 	return (r);
312 }
313 
314 static int
315 queue_fs_envelope_update(uint64_t evpid, const char *buf, size_t len)
316 {
317 	char dest[SMTPD_MAXPATHLEN];
318 
319 	fsqueue_envelope_path(evpid, dest, sizeof(dest));
320 
321 	return (fsqueue_envelope_dump(dest, buf, len, 1, 1));
322 }
323 
324 static int
325 queue_fs_envelope_delete(uint64_t evpid)
326 {
327 	char		pathname[SMTPD_MAXPATHLEN];
328 	uint32_t	msgid;
329 	int		*n;
330 
331 	fsqueue_envelope_path(evpid, pathname, sizeof(pathname));
332 	if (unlink(pathname) == -1)
333 		if (errno != ENOENT)
334 			return 0;
335 
336 	msgid = evpid_to_msgid(evpid);
337 	n = tree_pop(&evpcount, msgid);
338 	n -= 1;
339 
340 	if (n - REF == 0)
341 		queue_fs_message_delete(msgid);
342 	else
343 		tree_xset(&evpcount, msgid, n);
344 
345 	return (1);
346 }
347 
348 static int
349 queue_fs_envelope_walk(uint64_t *evpid, char *buf, size_t len)
350 {
351 	static int	 done = 0;
352 	static void	*hdl = NULL;
353 	int		 r, *n;
354 	uint32_t	 msgid;
355 
356 	if (done)
357 		return (-1);
358 
359 	if (hdl == NULL)
360 		hdl = fsqueue_qwalk_new();
361 
362 	if (fsqueue_qwalk(hdl, evpid)) {
363 		memset(buf, 0, len);
364 		r = queue_fs_envelope_load(*evpid, buf, len);
365 		if (r) {
366 			msgid = evpid_to_msgid(*evpid);
367 			n = tree_pop(&evpcount, msgid);
368 			if (n == NULL)
369 				n = REF;
370 			n += 1;
371 			tree_xset(&evpcount, msgid, n);
372 		}
373 		return (r);
374 	}
375 
376 	fsqueue_qwalk_close(hdl);
377 	done = 1;
378 	return (-1);
379 }
380 
381 static int
382 fsqueue_check_space(void)
383 {
384 	struct statfs	buf;
385 	uint64_t	used;
386 	uint64_t	total;
387 
388 	if (statfs(PATH_QUEUE, &buf) < 0) {
389 		log_warn("warn: queue-fs: statfs");
390 		return 0;
391 	}
392 
393 	/*
394 	 * f_bfree and f_ffree is not set on all filesystems.
395 	 * They could be signed or unsigned integers.
396 	 * Some systems will set them to 0, others will set them to -1.
397 	 */
398 	if (buf.f_bfree == 0 || buf.f_ffree == 0 ||
399 	    (int64_t)buf.f_bfree == -1 || (int64_t)buf.f_ffree == -1)
400 		return 1;
401 
402 	used = buf.f_blocks - buf.f_bfree;
403 	total = buf.f_bavail + used;
404 	if (total != 0)
405 		used = (float)used / (float)total * 100;
406 	else
407 		used = 100;
408 	if (100 - used < MINSPACE) {
409 		log_warnx("warn: not enough disk space: %llu%% left",
410 		    (unsigned long long) 100 - used);
411 		log_warnx("warn: temporarily rejecting messages");
412 		return 0;
413 	}
414 
415 	used = buf.f_files - buf.f_ffree;
416 	total = buf.f_favail + used;
417 	if (total != 0)
418 		used = (float)used / (float)total * 100;
419 	else
420 		used = 100;
421 	if (100 - used < MININODES) {
422 		log_warnx("warn: not enough inodes: %llu%% left",
423 		    (unsigned long long) 100 - used);
424 		log_warnx("warn: temporarily rejecting messages");
425 		return 0;
426 	}
427 
428 	return 1;
429 }
430 
431 static void
432 fsqueue_envelope_path(uint64_t evpid, char *buf, size_t len)
433 {
434 	if (! bsnprintf(buf, len, "%s/%02x/%08x/%016" PRIx64,
435 		PATH_QUEUE,
436 		(evpid_to_msgid(evpid) & 0xff000000) >> 24,
437 		evpid_to_msgid(evpid),
438 		evpid))
439 		fatalx("fsqueue_envelope_path: path does not fit buffer");
440 }
441 
442 static void
443 fsqueue_envelope_incoming_path(uint64_t evpid, char *buf, size_t len)
444 {
445 	if (! bsnprintf(buf, len, "%s/%08x/%016" PRIx64,
446 		PATH_INCOMING,
447 		evpid_to_msgid(evpid),
448 		evpid))
449 		fatalx("fsqueue_envelope_incoming_path: path does not fit buffer");
450 }
451 
452 static int
453 fsqueue_envelope_dump(char *dest, const char *evpbuf, size_t evplen,
454     int do_atomic, int do_sync)
455 {
456 	const char     *path = do_atomic ? PATH_EVPTMP : dest;
457 	FILE	       *fp = NULL;
458 	int		fd;
459 	size_t		w;
460 
461 	if ((fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600)) == -1) {
462 		log_warn("warn: queue-fs: open");
463 		goto tempfail;
464 	}
465 
466 	if ((fp = fdopen(fd, "w")) == NULL) {
467 		log_warn("warn: queue-fs: fdopen");
468 		goto tempfail;
469 	}
470 
471 	w = fwrite(evpbuf, 1, evplen, fp);
472 	if (w < evplen) {
473 		log_warn("warn: queue-fs: short write");
474 		goto tempfail;
475 	}
476 	if (fflush(fp)) {
477 		log_warn("warn: queue-fs: fflush");
478 		goto tempfail;
479 	}
480 	if (do_sync && fsync(fileno(fp))) {
481 		log_warn("warn: queue-fs: fsync");
482 		goto tempfail;
483 	}
484 	if (fclose(fp) != 0) {
485 		log_warn("warn: queue-fs: fclose");
486 		fp = NULL;
487 		goto tempfail;
488 	}
489 	fp = NULL;
490 	fd = -1;
491 
492 	if (do_atomic && rename(path, dest) == -1) {
493 		log_warn("warn: queue-fs: rename");
494 		goto tempfail;
495 	}
496 	return (1);
497 
498 tempfail:
499 	if (fp)
500 		fclose(fp);
501 	else if (fd != -1)
502 		close(fd);
503 	if (unlink(path) == -1)
504 		log_warn("warn: queue-fs: unlink");
505 	return (0);
506 }
507 
508 static void
509 fsqueue_message_path(uint32_t msgid, char *buf, size_t len)
510 {
511 	if (! bsnprintf(buf, len, "%s/%02x/%08x",
512 		PATH_QUEUE,
513 		(msgid & 0xff000000) >> 24,
514 		msgid))
515 		fatalx("fsqueue_message_path: path does not fit buffer");
516 }
517 
518 static void
519 fsqueue_message_corrupt_path(uint32_t msgid, char *buf, size_t len)
520 {
521 	if (! bsnprintf(buf, len, "%s/%08x",
522 		PATH_CORRUPT,
523 		msgid))
524 		fatalx("fsqueue_message_corrupt_path: path does not fit buffer");
525 }
526 
527 static void
528 fsqueue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
529 {
530 	if (! bsnprintf(buf, len, "%s/%08x",
531 		PATH_INCOMING,
532 		msgid))
533 		fatalx("fsqueue_message_incoming_path: path does not fit buffer");
534 }
535 
536 static void *
537 fsqueue_qwalk_new(void)
538 {
539 	char		 path[SMTPD_MAXPATHLEN];
540 	char * const	 path_argv[] = { path, NULL };
541 	struct qwalk	*q;
542 
543 	q = xcalloc(1, sizeof(*q), "fsqueue_qwalk_new");
544 	(void)strlcpy(path, PATH_QUEUE, sizeof(path));
545 	q->fts = fts_open(path_argv,
546 	    FTS_PHYSICAL | FTS_NOCHDIR, NULL);
547 
548 	if (q->fts == NULL)
549 		err(1, "fsqueue_qwalk_new: fts_open: %s", path);
550 
551 	return (q);
552 }
553 
554 static void
555 fsqueue_qwalk_close(void *hdl)
556 {
557 	struct qwalk	*q = hdl;
558 
559 	fts_close(q->fts);
560 
561 	free(q);
562 }
563 
564 static int
565 fsqueue_qwalk(void *hdl, uint64_t *evpid)
566 {
567 	struct qwalk	*q = hdl;
568 	FTSENT		*e;
569 	char		*tmp;
570 
571 	while ((e = fts_read(q->fts)) != NULL) {
572 		switch (e->fts_info) {
573 		case FTS_D:
574 			q->depth += 1;
575 			if (q->depth == 2 && e->fts_namelen != 2) {
576 				log_debug("debug: fsqueue: bogus directory %s",
577 				    e->fts_path);
578 				fts_set(q->fts, e, FTS_SKIP);
579 				break;
580 			}
581 			if (q->depth == 3 && e->fts_namelen != 8) {
582 				log_debug("debug: fsqueue: bogus directory %s",
583 				    e->fts_path);
584 				fts_set(q->fts, e, FTS_SKIP);
585 				break;
586 			}
587 			break;
588 
589 		case FTS_DP:
590 		case FTS_DNR:
591 			q->depth -= 1;
592 			break;
593 
594 		case FTS_F:
595 			if (q->depth != 3)
596 				break;
597 			if (e->fts_namelen != 16)
598 				break;
599 			if (timespeccmp(&e->fts_statp->st_mtim, &startup, >))
600 				break;
601 			tmp = NULL;
602 			*evpid = strtoull(e->fts_name, &tmp, 16);
603 			if (tmp && *tmp !=  '\0') {
604 				log_debug("debug: fsqueue: bogus file %s",
605 				    e->fts_path);
606 				break;
607 			}
608 			return (1);
609 		default:
610 			break;
611 		}
612 	}
613 
614 	return (0);
615 }
616 
617 static int
618 queue_fs_init(struct passwd *pw, int server)
619 {
620 	unsigned int	 n;
621 	char		*paths[] = { PATH_QUEUE, PATH_CORRUPT, PATH_INCOMING };
622 	char		 path[SMTPD_MAXPATHLEN];
623 	int		 ret;
624 	struct timeval	 tv;
625 
626 	/* remove incoming/ if it exists */
627 	if (server)
628 		mvpurge(PATH_SPOOL PATH_INCOMING, PATH_SPOOL PATH_PURGE);
629 
630 	fsqueue_envelope_path(0, path, sizeof(path));
631 
632 	ret = 1;
633 	for (n = 0; n < nitems(paths); n++) {
634 		(void)strlcpy(path, PATH_SPOOL, sizeof(path));
635 		if (strlcat(path, paths[n], sizeof(path)) >= sizeof(path))
636 			errx(1, "path too long %s%s", PATH_SPOOL, paths[n]);
637 		if (ckdir(path, 0700, pw->pw_uid, 0, server) == 0)
638 			ret = 0;
639 	}
640 
641 	if (gettimeofday(&tv, NULL) == -1)
642 		err(1, "gettimeofday");
643 	TIMEVAL_TO_TIMESPEC(&tv, &startup);
644 
645 	tree_init(&evpcount);
646 
647 	queue_api_on_message_create(queue_fs_message_create);
648 	queue_api_on_message_commit(queue_fs_message_commit);
649 	queue_api_on_message_delete(queue_fs_message_delete);
650 	queue_api_on_message_fd_r(queue_fs_message_fd_r);
651 	queue_api_on_message_corrupt(queue_fs_message_corrupt);
652 	queue_api_on_envelope_create(queue_fs_envelope_create);
653 	queue_api_on_envelope_delete(queue_fs_envelope_delete);
654 	queue_api_on_envelope_update(queue_fs_envelope_update);
655 	queue_api_on_envelope_load(queue_fs_envelope_load);
656 	queue_api_on_envelope_walk(queue_fs_envelope_walk);
657 
658 	return (ret);
659 }
660 
661 struct queue_backend	queue_backend_fs = {
662 	queue_fs_init,
663 };
664