1 /*	$OpenBSD: mproc.c,v 1.8 2014/04/19 17:45:05 gilles Exp $	*/
2 
3 /*
4  * Copyright (c) 2012 Eric Faurot <eric@faurot.net>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include "includes.h"
20 
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <sys/tree.h>
24 #include <sys/queue.h>
25 #include <sys/uio.h>
26 
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <arpa/nameser.h>
30 
31 #include <err.h>
32 #include <errno.h>
33 #include <event.h>
34 #include <fcntl.h>
35 #include <limits.h>
36 #include <imsg.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <unistd.h>
41 
42 #include <smtpd-api.h>
43 
44 
45 /* from filter_api.c */
46 const char *proc_name(enum smtp_proc_type);
47 const char *imsg_to_str(int);
48 
49 enum smtp_proc_type	smtpd_process = PROC_PONY;
50 
51 static void mproc_dispatch(int, short, void *);
52 
53 static ssize_t msgbuf_write2(struct msgbuf *);
54 
55 void
session_socket_blockmode(int fd,enum blockmodes bm)56 session_socket_blockmode(int fd, enum blockmodes bm)
57 {
58 	int	flags;
59 
60 	if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
61 		fatal("fcntl F_GETFL");
62 
63 	if (bm == BM_NONBLOCK)
64 		flags |= O_NONBLOCK;
65 	else
66 		flags &= ~O_NONBLOCK;
67 
68 	if ((flags = fcntl(fd, F_SETFL, flags)) == -1)
69 		fatal("fcntl F_SETFL");
70 }
71 
72 int
mproc_fork(struct mproc * p,const char * path,char * argv[])73 mproc_fork(struct mproc *p, const char *path, char *argv[])
74 {
75 	int sp[2];
76 
77 	if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sp) < 0)
78 		return (-1);
79 
80 	session_socket_blockmode(sp[0], BM_NONBLOCK);
81 	session_socket_blockmode(sp[1], BM_NONBLOCK);
82 
83 	if ((p->pid = fork()) == -1)
84 		goto err;
85 
86 	if (p->pid == 0) {
87 		/* child process */
88 		dup2(sp[0], STDIN_FILENO);
89 		closefrom(STDERR_FILENO + 1);
90 
91 		execv(path, argv);
92 		err(1, "execv: %s", path);
93 	}
94 
95 	/* parent process */
96 	close(sp[0]);
97 	mproc_init(p, sp[1]);
98 	return (0);
99 
100 err:
101 	log_warn("warn: Failed to start process %s, instance of %s", argv[0], path);
102 	close(sp[0]);
103 	close(sp[1]);
104 	return (-1);
105 }
106 
107 void
mproc_init(struct mproc * p,int fd)108 mproc_init(struct mproc *p, int fd)
109 {
110 	imsg_init(&p->imsgbuf, fd);
111 }
112 
113 void
mproc_clear(struct mproc * p)114 mproc_clear(struct mproc *p)
115 {
116 	event_del(&p->ev);
117 	close(p->imsgbuf.fd);
118 	imsg_clear(&p->imsgbuf);
119 }
120 
121 void
mproc_enable(struct mproc * p)122 mproc_enable(struct mproc *p)
123 {
124 	if (p->enable == 0) {
125 		log_trace(TRACE_MPROC, "mproc: %s -> %s: enabled",
126 		    proc_name(smtpd_process),
127 		    proc_name(p->proc));
128 		p->enable = 1;
129 	}
130 	mproc_event_add(p);
131 }
132 
133 void
mproc_disable(struct mproc * p)134 mproc_disable(struct mproc *p)
135 {
136 	if (p->enable == 1) {
137 		log_trace(TRACE_MPROC, "mproc: %s -> %s: disabled",
138 		    proc_name(smtpd_process),
139 		    proc_name(p->proc));
140 		p->enable = 0;
141 	}
142 	mproc_event_add(p);
143 }
144 
145 void
mproc_event_add(struct mproc * p)146 mproc_event_add(struct mproc *p)
147 {
148 	short	events;
149 
150 	if (p->enable)
151 		events = EV_READ;
152 	else
153 		events = 0;
154 
155 	if (p->imsgbuf.w.queued)
156 		events |= EV_WRITE;
157 
158 	if (p->events)
159 		event_del(&p->ev);
160 
161 	p->events = events;
162 	if (events) {
163 		event_set(&p->ev, p->imsgbuf.fd, events, mproc_dispatch, p);
164 		event_add(&p->ev, NULL);
165 	}
166 }
167 
168 static void
mproc_dispatch(int fd,short event,void * arg)169 mproc_dispatch(int fd, short event, void *arg)
170 {
171 	struct mproc	*p = arg;
172 	struct imsg	 imsg;
173 	ssize_t		 n;
174 
175 	p->events = 0;
176 
177 	if (event & EV_READ) {
178 
179 		if ((n = imsg_read(&p->imsgbuf)) == -1) {
180 			log_warn("warn: %s -> %s: imsg_read",
181 			    proc_name(smtpd_process),  p->name);
182 			if (errno != EAGAIN)
183 				fatal("exiting");
184 		}
185 		else if (n == 0) {
186 			/* this pipe is dead, so remove the event handler */
187 			if (smtpd_process != PROC_CONTROL ||
188 			    p->proc != PROC_CLIENT)
189 				log_warnx("warn: %s -> %s: pipe closed",
190 				    proc_name(smtpd_process),  p->name);
191 			p->handler(p, NULL);
192 			return;
193 		}
194 		else
195 			p->bytes_in += n;
196 	}
197 
198 	if (event & EV_WRITE) {
199 		n = msgbuf_write2(&p->imsgbuf.w);
200 		if (n == 0 || (n == -1 && errno != EAGAIN)) {
201 			/* this pipe is dead, so remove the event handler */
202 			if (smtpd_process != PROC_CONTROL ||
203 			    p->proc != PROC_CLIENT)
204 				log_warnx("warn: %s -> %s: pipe closed",
205 				    proc_name(smtpd_process),  p->name);
206 			p->handler(p, NULL);
207 			return;
208 		} else if (n != -1) {
209 			p->bytes_out += n;
210 			p->bytes_queued -= n;
211 		}
212 	}
213 
214 	for (;;) {
215 		if ((n = imsg_get(&p->imsgbuf, &imsg)) == -1) {
216 			log_warn("fatal: %s: error in imsg_get for %s",
217 			    proc_name(smtpd_process),  p->name);
218 			fatalx(NULL);
219 		}
220 		if (n == 0)
221 			break;
222 
223 		p->msg_in += 1;
224 		p->handler(p, &imsg);
225 
226 		imsg_free(&imsg);
227 	}
228 
229 #if 0
230 	if (smtpd_process == PROC_QUEUE)
231 		queue_flow_control();
232 #endif
233 
234 	mproc_event_add(p);
235 }
236 
237 /* XXX msgbuf_write() should return n ... */
238 static ssize_t
msgbuf_write2(struct msgbuf * msgbuf)239 msgbuf_write2(struct msgbuf *msgbuf)
240 {
241 	struct iovec	 iov[IOV_MAX];
242 	struct ibuf	*buf;
243 	unsigned int	 i = 0;
244 	ssize_t		 n;
245 	struct msghdr	 msg;
246 	struct cmsghdr	*cmsg;
247 	union {
248 		struct cmsghdr	hdr;
249 		char		buf[CMSG_SPACE(sizeof(int))];
250 	} cmsgbuf;
251 
252 	memset(&iov, 0, sizeof(iov));
253 	memset(&msg, 0, sizeof(msg));
254 	TAILQ_FOREACH(buf, &msgbuf->bufs, entry) {
255 		if (i >= IOV_MAX)
256 			break;
257 		iov[i].iov_base = buf->buf + buf->rpos;
258 		iov[i].iov_len = buf->wpos - buf->rpos;
259 		i++;
260 		if (buf->fd != -1)
261 			break;
262 	}
263 
264 	msg.msg_iov = iov;
265 	msg.msg_iovlen = i;
266 
267 	if (buf != NULL && buf->fd != -1) {
268 		msg.msg_control = (caddr_t)&cmsgbuf.buf;
269 		msg.msg_controllen = sizeof(cmsgbuf.buf);
270 		cmsg = CMSG_FIRSTHDR(&msg);
271 		cmsg->cmsg_len = CMSG_LEN(sizeof(int));
272 		cmsg->cmsg_level = SOL_SOCKET;
273 		cmsg->cmsg_type = SCM_RIGHTS;
274 		*(int *)CMSG_DATA(cmsg) = buf->fd;
275 	}
276 
277 again:
278 	if ((n = sendmsg(msgbuf->fd, &msg, 0)) == -1) {
279 		if (errno == EINTR)
280 			goto again;
281 		if (errno == ENOBUFS)
282 			errno = EAGAIN;
283 		return (-1);
284 	}
285 
286 	if (n == 0) {			/* connection closed */
287 		errno = 0;
288 		return (0);
289 	}
290 
291 	/*
292 	 * assumption: fd got sent if sendmsg sent anything
293 	 * this works because fds are passed one at a time
294 	 */
295 	if (buf != NULL && buf->fd != -1) {
296 		close(buf->fd);
297 		buf->fd = -1;
298 	}
299 
300 	msgbuf_drain(msgbuf, n);
301 
302 	return (n);
303 }
304 
305 void
m_forward(struct mproc * p,struct imsg * imsg)306 m_forward(struct mproc *p, struct imsg *imsg)
307 {
308 	imsg_compose(&p->imsgbuf, imsg->hdr.type, imsg->hdr.peerid,
309 	    imsg->hdr.pid, imsg->fd, imsg->data,
310 	    imsg->hdr.len - sizeof(imsg->hdr));
311 
312 	log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s (forward)",
313 		    proc_name(smtpd_process),
314 		    proc_name(p->proc),
315 		    imsg->hdr.len - sizeof(imsg->hdr),
316 		    imsg_to_str(imsg->hdr.type));
317 
318 	p->msg_out += 1;
319 	p->bytes_queued += imsg->hdr.len;
320 	if (p->bytes_queued > p->bytes_queued_max)
321 		p->bytes_queued_max = p->bytes_queued;
322 
323 	mproc_event_add(p);
324 }
325 
326 void
m_compose(struct mproc * p,uint32_t type,uint32_t peerid,pid_t pid,int fd,void * data,size_t len)327 m_compose(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, int fd,
328     void *data, size_t len)
329 {
330 	imsg_compose(&p->imsgbuf, type, peerid, pid, fd, data, len);
331 
332 	log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s",
333 		    proc_name(smtpd_process),
334 		    proc_name(p->proc),
335 		    len,
336 		    imsg_to_str(type));
337 
338 	p->msg_out += 1;
339 	p->bytes_queued += len + IMSG_HEADER_SIZE;
340 	if (p->bytes_queued > p->bytes_queued_max)
341 		p->bytes_queued_max = p->bytes_queued;
342 
343 	mproc_event_add(p);
344 }
345 
346 void
m_composev(struct mproc * p,uint32_t type,uint32_t peerid,pid_t pid,int fd,const struct iovec * iov,int n)347 m_composev(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid,
348     int fd, const struct iovec *iov, int n)
349 {
350 	size_t	len;
351 	int	i;
352 
353 	imsg_composev(&p->imsgbuf, type, peerid, pid, fd, iov, n);
354 
355 	len = 0;
356 	for (i = 0; i < n; i++)
357 		len += iov[i].iov_len;
358 
359 	p->msg_out += 1;
360 	p->bytes_queued += IMSG_HEADER_SIZE + len;
361 	if (p->bytes_queued > p->bytes_queued_max)
362 		p->bytes_queued_max = p->bytes_queued;
363 
364 	log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s",
365 		    proc_name(smtpd_process),
366 		    proc_name(p->proc),
367 		    len,
368 		    imsg_to_str(type));
369 
370 	mproc_event_add(p);
371 }
372 
373 void
m_create(struct mproc * p,uint32_t type,uint32_t peerid,pid_t pid,int fd)374 m_create(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, int fd)
375 {
376 	if (p->m_buf == NULL) {
377 		p->m_alloc = 128;
378 		log_trace(TRACE_MPROC, "mproc: %s -> %s: allocating %zu",
379 		    proc_name(smtpd_process),
380 		    proc_name(p->proc),
381 		    p->m_alloc);
382 		p->m_buf = malloc(p->m_alloc);
383 		if (p->m_buf == NULL)
384 			fatal("warn: m_create: malloc");
385 	}
386 
387 	p->m_pos = 0;
388 	p->m_type = type;
389 	p->m_peerid = peerid;
390 	p->m_pid = pid;
391 	p->m_fd = fd;
392 }
393 
394 void
m_add(struct mproc * p,const void * data,size_t len)395 m_add(struct mproc *p, const void *data, size_t len)
396 {
397 	size_t	 alloc;
398 	void	*tmp;
399 
400 	if (p->m_pos + len + IMSG_HEADER_SIZE > MAX_IMSGSIZE) {
401 		log_warnx("warn: message to large");
402 		fatal(NULL);
403 	}
404 
405 	alloc = p->m_alloc;
406 	while (p->m_pos + len > alloc)
407 		alloc *= 2;
408 	if (alloc != p->m_alloc) {
409 		log_trace(TRACE_MPROC, "mproc: %s -> %s: realloc %zu -> %zu",
410 		    proc_name(smtpd_process),
411 		    proc_name(p->proc),
412 		    p->m_alloc,
413 		    alloc);
414 
415 		tmp = realloc(p->m_buf, alloc);
416 		if (tmp == NULL)
417 			fatal("realloc");
418 		p->m_alloc = alloc;
419 		p->m_buf = tmp;
420 	}
421 
422 	memmove(p->m_buf + p->m_pos, data, len);
423 	p->m_pos += len;
424 }
425 
426 void
m_close(struct mproc * p)427 m_close(struct mproc *p)
428 {
429 	if (imsg_compose(&p->imsgbuf, p->m_type, p->m_peerid, p->m_pid, p->m_fd,
430 	    p->m_buf, p->m_pos) == -1)
431 		fatal("imsg_compose");
432 
433 	log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s",
434 		    proc_name(smtpd_process),
435 		    proc_name(p->proc),
436 		    p->m_pos,
437 		    imsg_to_str(p->m_type));
438 
439 	p->msg_out += 1;
440 	p->bytes_queued += p->m_pos + IMSG_HEADER_SIZE;
441 	if (p->bytes_queued > p->bytes_queued_max)
442 		p->bytes_queued_max = p->bytes_queued;
443 
444 	mproc_event_add(p);
445 }
446 
447 void
m_flush(struct mproc * p)448 m_flush(struct mproc *p)
449 {
450 	if (imsg_compose(&p->imsgbuf, p->m_type, p->m_peerid, p->m_pid, p->m_fd,
451 	    p->m_buf, p->m_pos) == -1)
452 		fatal("imsg_compose");
453 
454 	log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s (flush)",
455 	    proc_name(smtpd_process),
456 	    proc_name(p->proc),
457 	    p->m_pos,
458 	    imsg_to_str(p->m_type));
459 
460 	p->msg_out += 1;
461 	p->m_pos = 0;
462 
463 	imsg_flush(&p->imsgbuf);
464 }
465 
466 static struct imsg * current;
467 
468 static void
m_error(const char * error)469 m_error(const char *error)
470 {
471 	char	buf[512];
472 
473 	(void)snprintf(buf, sizeof buf, "%s: %s: %s",
474 	    proc_name(smtpd_process),
475 	    imsg_to_str(current->hdr.type),
476 	    error);
477 	fatalx("%s", buf);
478 }
479 
480 void
m_msg(struct msg * m,struct imsg * imsg)481 m_msg(struct msg *m, struct imsg *imsg)
482 {
483 	current = imsg;
484 	m->pos = imsg->data;
485 	m->end = m->pos + (imsg->hdr.len - sizeof(imsg->hdr));
486 }
487 
488 void
m_end(struct msg * m)489 m_end(struct msg *m)
490 {
491 	if (m->pos != m->end)
492 		m_error("not at msg end");
493 }
494 
495 int
m_is_eom(struct msg * m)496 m_is_eom(struct msg *m)
497 {
498 	return (m->pos == m->end);
499 }
500 
501 static inline void
m_get(struct msg * m,void * dst,size_t sz)502 m_get(struct msg *m, void *dst, size_t sz)
503 {
504 	if (sz > MAX_IMSGSIZE ||
505 	    m->end - m->pos < (ssize_t)sz)
506 		m_error("msg too short");
507 
508 	memmove(dst, m->pos, sz);
509 	m->pos += sz;
510 }
511 
512 void
m_add_int(struct mproc * m,int v)513 m_add_int(struct mproc *m, int v)
514 {
515 	m_add(m, &v, sizeof(v));
516 };
517 
518 void
m_add_u32(struct mproc * m,uint32_t u32)519 m_add_u32(struct mproc *m, uint32_t u32)
520 {
521 	m_add(m, &u32, sizeof(u32));
522 };
523 
524 void
m_add_size(struct mproc * m,size_t sz)525 m_add_size(struct mproc *m, size_t sz)
526 {
527 	m_add(m, &sz, sizeof(sz));
528 };
529 
530 void
m_add_time(struct mproc * m,time_t v)531 m_add_time(struct mproc *m, time_t v)
532 {
533 	m_add(m, &v, sizeof(v));
534 };
535 
536 void
m_add_string(struct mproc * m,const char * v)537 m_add_string(struct mproc *m, const char *v)
538 {
539 	m_add(m, v, strlen(v) + 1);
540 };
541 
542 void
m_add_data(struct mproc * m,const void * v,size_t len)543 m_add_data(struct mproc *m, const void *v, size_t len)
544 {
545 	m_add_size(m, len);
546 	m_add(m, v, len);
547 };
548 
549 void
m_add_id(struct mproc * m,uint64_t v)550 m_add_id(struct mproc *m, uint64_t v)
551 {
552 	m_add(m, &v, sizeof(v));
553 }
554 
555 void
m_add_evpid(struct mproc * m,uint64_t v)556 m_add_evpid(struct mproc *m, uint64_t v)
557 {
558 	m_add(m, &v, sizeof(v));
559 }
560 
561 void
m_add_msgid(struct mproc * m,uint32_t v)562 m_add_msgid(struct mproc *m, uint32_t v)
563 {
564 	m_add(m, &v, sizeof(v));
565 }
566 
567 void
m_add_sockaddr(struct mproc * m,const struct sockaddr * sa)568 m_add_sockaddr(struct mproc *m, const struct sockaddr *sa)
569 {
570 	m_add_size(m, SA_LEN(sa));
571 	m_add(m, sa, SA_LEN(sa));
572 }
573 
574 void
m_add_mailaddr(struct mproc * m,const struct mailaddr * maddr)575 m_add_mailaddr(struct mproc *m, const struct mailaddr *maddr)
576 {
577 	m_add(m, maddr, sizeof(*maddr));
578 }
579 
580 void
m_get_int(struct msg * m,int * i)581 m_get_int(struct msg *m, int *i)
582 {
583 	m_get(m, i, sizeof(*i));
584 }
585 
586 void
m_get_u32(struct msg * m,uint32_t * u32)587 m_get_u32(struct msg *m, uint32_t *u32)
588 {
589 	m_get(m, u32, sizeof(*u32));
590 }
591 
592 void
m_get_size(struct msg * m,size_t * sz)593 m_get_size(struct msg *m, size_t *sz)
594 {
595 	m_get(m, sz, sizeof(*sz));
596 }
597 
598 void
m_get_time(struct msg * m,time_t * t)599 m_get_time(struct msg *m, time_t *t)
600 {
601 	m_get(m, t, sizeof(*t));
602 }
603 
604 void
m_get_string(struct msg * m,const char ** s)605 m_get_string(struct msg *m, const char **s)
606 {
607 	uint8_t	*end;
608 
609 	if (m->pos >= m->end)
610 		m_error("msg too short");
611 
612 	end = memchr(m->pos, 0, m->end - m->pos);
613 	if (end == NULL)
614 		m_error("unterminated string");
615 
616 	*s = m->pos;
617 	m->pos = end + 1;
618 }
619 
620 void
m_get_data(struct msg * m,const void ** data,size_t * sz)621 m_get_data(struct msg *m, const void **data, size_t *sz)
622 {
623 	m_get_size(m, sz);
624 
625 	if (m->pos + *sz > m->end)
626 		m_error("msg too short");
627 
628 	*data = m->pos;
629 	m->pos += *sz;
630 }
631 
632 void
m_get_evpid(struct msg * m,uint64_t * evpid)633 m_get_evpid(struct msg *m, uint64_t *evpid)
634 {
635 	m_get(m, evpid, sizeof(*evpid));
636 }
637 
638 void
m_get_msgid(struct msg * m,uint32_t * msgid)639 m_get_msgid(struct msg *m, uint32_t *msgid)
640 {
641 	m_get(m, msgid, sizeof(*msgid));
642 }
643 
644 void
m_get_id(struct msg * m,uint64_t * id)645 m_get_id(struct msg *m, uint64_t *id)
646 {
647 	m_get(m, id, sizeof(*id));
648 }
649 
650 void
m_get_sockaddr(struct msg * m,struct sockaddr * sa)651 m_get_sockaddr(struct msg *m, struct sockaddr *sa)
652 {
653 	size_t len;
654 
655 	m_get_size(m, &len);
656 	m_get(m, sa, len);
657 }
658 
659 void
m_get_mailaddr(struct msg * m,struct mailaddr * maddr)660 m_get_mailaddr(struct msg *m, struct mailaddr *maddr)
661 {
662 	m_get(m, maddr, sizeof(*maddr));
663 }
664