1 /* $OpenBSD: mproc.c,v 1.8 2014/04/19 17:45:05 gilles Exp $ */
2
3 /*
4 * Copyright (c) 2012 Eric Faurot <eric@faurot.net>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 #include "includes.h"
20
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <sys/tree.h>
24 #include <sys/queue.h>
25 #include <sys/uio.h>
26
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <arpa/nameser.h>
30
31 #include <err.h>
32 #include <errno.h>
33 #include <event.h>
34 #include <fcntl.h>
35 #include <limits.h>
36 #include <imsg.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <unistd.h>
41
42 #include <smtpd-api.h>
43
44
45 /* from filter_api.c */
46 const char *proc_name(enum smtp_proc_type);
47 const char *imsg_to_str(int);
48
49 enum smtp_proc_type smtpd_process = PROC_PONY;
50
51 static void mproc_dispatch(int, short, void *);
52
53 static ssize_t msgbuf_write2(struct msgbuf *);
54
55 void
session_socket_blockmode(int fd,enum blockmodes bm)56 session_socket_blockmode(int fd, enum blockmodes bm)
57 {
58 int flags;
59
60 if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
61 fatal("fcntl F_GETFL");
62
63 if (bm == BM_NONBLOCK)
64 flags |= O_NONBLOCK;
65 else
66 flags &= ~O_NONBLOCK;
67
68 if ((flags = fcntl(fd, F_SETFL, flags)) == -1)
69 fatal("fcntl F_SETFL");
70 }
71
72 int
mproc_fork(struct mproc * p,const char * path,char * argv[])73 mproc_fork(struct mproc *p, const char *path, char *argv[])
74 {
75 int sp[2];
76
77 if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sp) < 0)
78 return (-1);
79
80 session_socket_blockmode(sp[0], BM_NONBLOCK);
81 session_socket_blockmode(sp[1], BM_NONBLOCK);
82
83 if ((p->pid = fork()) == -1)
84 goto err;
85
86 if (p->pid == 0) {
87 /* child process */
88 dup2(sp[0], STDIN_FILENO);
89 closefrom(STDERR_FILENO + 1);
90
91 execv(path, argv);
92 err(1, "execv: %s", path);
93 }
94
95 /* parent process */
96 close(sp[0]);
97 mproc_init(p, sp[1]);
98 return (0);
99
100 err:
101 log_warn("warn: Failed to start process %s, instance of %s", argv[0], path);
102 close(sp[0]);
103 close(sp[1]);
104 return (-1);
105 }
106
107 void
mproc_init(struct mproc * p,int fd)108 mproc_init(struct mproc *p, int fd)
109 {
110 imsg_init(&p->imsgbuf, fd);
111 }
112
113 void
mproc_clear(struct mproc * p)114 mproc_clear(struct mproc *p)
115 {
116 event_del(&p->ev);
117 close(p->imsgbuf.fd);
118 imsg_clear(&p->imsgbuf);
119 }
120
121 void
mproc_enable(struct mproc * p)122 mproc_enable(struct mproc *p)
123 {
124 if (p->enable == 0) {
125 log_trace(TRACE_MPROC, "mproc: %s -> %s: enabled",
126 proc_name(smtpd_process),
127 proc_name(p->proc));
128 p->enable = 1;
129 }
130 mproc_event_add(p);
131 }
132
133 void
mproc_disable(struct mproc * p)134 mproc_disable(struct mproc *p)
135 {
136 if (p->enable == 1) {
137 log_trace(TRACE_MPROC, "mproc: %s -> %s: disabled",
138 proc_name(smtpd_process),
139 proc_name(p->proc));
140 p->enable = 0;
141 }
142 mproc_event_add(p);
143 }
144
145 void
mproc_event_add(struct mproc * p)146 mproc_event_add(struct mproc *p)
147 {
148 short events;
149
150 if (p->enable)
151 events = EV_READ;
152 else
153 events = 0;
154
155 if (p->imsgbuf.w.queued)
156 events |= EV_WRITE;
157
158 if (p->events)
159 event_del(&p->ev);
160
161 p->events = events;
162 if (events) {
163 event_set(&p->ev, p->imsgbuf.fd, events, mproc_dispatch, p);
164 event_add(&p->ev, NULL);
165 }
166 }
167
168 static void
mproc_dispatch(int fd,short event,void * arg)169 mproc_dispatch(int fd, short event, void *arg)
170 {
171 struct mproc *p = arg;
172 struct imsg imsg;
173 ssize_t n;
174
175 p->events = 0;
176
177 if (event & EV_READ) {
178
179 if ((n = imsg_read(&p->imsgbuf)) == -1) {
180 log_warn("warn: %s -> %s: imsg_read",
181 proc_name(smtpd_process), p->name);
182 if (errno != EAGAIN)
183 fatal("exiting");
184 }
185 else if (n == 0) {
186 /* this pipe is dead, so remove the event handler */
187 if (smtpd_process != PROC_CONTROL ||
188 p->proc != PROC_CLIENT)
189 log_warnx("warn: %s -> %s: pipe closed",
190 proc_name(smtpd_process), p->name);
191 p->handler(p, NULL);
192 return;
193 }
194 else
195 p->bytes_in += n;
196 }
197
198 if (event & EV_WRITE) {
199 n = msgbuf_write2(&p->imsgbuf.w);
200 if (n == 0 || (n == -1 && errno != EAGAIN)) {
201 /* this pipe is dead, so remove the event handler */
202 if (smtpd_process != PROC_CONTROL ||
203 p->proc != PROC_CLIENT)
204 log_warnx("warn: %s -> %s: pipe closed",
205 proc_name(smtpd_process), p->name);
206 p->handler(p, NULL);
207 return;
208 } else if (n != -1) {
209 p->bytes_out += n;
210 p->bytes_queued -= n;
211 }
212 }
213
214 for (;;) {
215 if ((n = imsg_get(&p->imsgbuf, &imsg)) == -1) {
216 log_warn("fatal: %s: error in imsg_get for %s",
217 proc_name(smtpd_process), p->name);
218 fatalx(NULL);
219 }
220 if (n == 0)
221 break;
222
223 p->msg_in += 1;
224 p->handler(p, &imsg);
225
226 imsg_free(&imsg);
227 }
228
229 #if 0
230 if (smtpd_process == PROC_QUEUE)
231 queue_flow_control();
232 #endif
233
234 mproc_event_add(p);
235 }
236
237 /* XXX msgbuf_write() should return n ... */
238 static ssize_t
msgbuf_write2(struct msgbuf * msgbuf)239 msgbuf_write2(struct msgbuf *msgbuf)
240 {
241 struct iovec iov[IOV_MAX];
242 struct ibuf *buf;
243 unsigned int i = 0;
244 ssize_t n;
245 struct msghdr msg;
246 struct cmsghdr *cmsg;
247 union {
248 struct cmsghdr hdr;
249 char buf[CMSG_SPACE(sizeof(int))];
250 } cmsgbuf;
251
252 memset(&iov, 0, sizeof(iov));
253 memset(&msg, 0, sizeof(msg));
254 TAILQ_FOREACH(buf, &msgbuf->bufs, entry) {
255 if (i >= IOV_MAX)
256 break;
257 iov[i].iov_base = buf->buf + buf->rpos;
258 iov[i].iov_len = buf->wpos - buf->rpos;
259 i++;
260 if (buf->fd != -1)
261 break;
262 }
263
264 msg.msg_iov = iov;
265 msg.msg_iovlen = i;
266
267 if (buf != NULL && buf->fd != -1) {
268 msg.msg_control = (caddr_t)&cmsgbuf.buf;
269 msg.msg_controllen = sizeof(cmsgbuf.buf);
270 cmsg = CMSG_FIRSTHDR(&msg);
271 cmsg->cmsg_len = CMSG_LEN(sizeof(int));
272 cmsg->cmsg_level = SOL_SOCKET;
273 cmsg->cmsg_type = SCM_RIGHTS;
274 *(int *)CMSG_DATA(cmsg) = buf->fd;
275 }
276
277 again:
278 if ((n = sendmsg(msgbuf->fd, &msg, 0)) == -1) {
279 if (errno == EINTR)
280 goto again;
281 if (errno == ENOBUFS)
282 errno = EAGAIN;
283 return (-1);
284 }
285
286 if (n == 0) { /* connection closed */
287 errno = 0;
288 return (0);
289 }
290
291 /*
292 * assumption: fd got sent if sendmsg sent anything
293 * this works because fds are passed one at a time
294 */
295 if (buf != NULL && buf->fd != -1) {
296 close(buf->fd);
297 buf->fd = -1;
298 }
299
300 msgbuf_drain(msgbuf, n);
301
302 return (n);
303 }
304
305 void
m_forward(struct mproc * p,struct imsg * imsg)306 m_forward(struct mproc *p, struct imsg *imsg)
307 {
308 imsg_compose(&p->imsgbuf, imsg->hdr.type, imsg->hdr.peerid,
309 imsg->hdr.pid, imsg->fd, imsg->data,
310 imsg->hdr.len - sizeof(imsg->hdr));
311
312 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s (forward)",
313 proc_name(smtpd_process),
314 proc_name(p->proc),
315 imsg->hdr.len - sizeof(imsg->hdr),
316 imsg_to_str(imsg->hdr.type));
317
318 p->msg_out += 1;
319 p->bytes_queued += imsg->hdr.len;
320 if (p->bytes_queued > p->bytes_queued_max)
321 p->bytes_queued_max = p->bytes_queued;
322
323 mproc_event_add(p);
324 }
325
326 void
m_compose(struct mproc * p,uint32_t type,uint32_t peerid,pid_t pid,int fd,void * data,size_t len)327 m_compose(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, int fd,
328 void *data, size_t len)
329 {
330 imsg_compose(&p->imsgbuf, type, peerid, pid, fd, data, len);
331
332 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s",
333 proc_name(smtpd_process),
334 proc_name(p->proc),
335 len,
336 imsg_to_str(type));
337
338 p->msg_out += 1;
339 p->bytes_queued += len + IMSG_HEADER_SIZE;
340 if (p->bytes_queued > p->bytes_queued_max)
341 p->bytes_queued_max = p->bytes_queued;
342
343 mproc_event_add(p);
344 }
345
346 void
m_composev(struct mproc * p,uint32_t type,uint32_t peerid,pid_t pid,int fd,const struct iovec * iov,int n)347 m_composev(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid,
348 int fd, const struct iovec *iov, int n)
349 {
350 size_t len;
351 int i;
352
353 imsg_composev(&p->imsgbuf, type, peerid, pid, fd, iov, n);
354
355 len = 0;
356 for (i = 0; i < n; i++)
357 len += iov[i].iov_len;
358
359 p->msg_out += 1;
360 p->bytes_queued += IMSG_HEADER_SIZE + len;
361 if (p->bytes_queued > p->bytes_queued_max)
362 p->bytes_queued_max = p->bytes_queued;
363
364 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s",
365 proc_name(smtpd_process),
366 proc_name(p->proc),
367 len,
368 imsg_to_str(type));
369
370 mproc_event_add(p);
371 }
372
373 void
m_create(struct mproc * p,uint32_t type,uint32_t peerid,pid_t pid,int fd)374 m_create(struct mproc *p, uint32_t type, uint32_t peerid, pid_t pid, int fd)
375 {
376 if (p->m_buf == NULL) {
377 p->m_alloc = 128;
378 log_trace(TRACE_MPROC, "mproc: %s -> %s: allocating %zu",
379 proc_name(smtpd_process),
380 proc_name(p->proc),
381 p->m_alloc);
382 p->m_buf = malloc(p->m_alloc);
383 if (p->m_buf == NULL)
384 fatal("warn: m_create: malloc");
385 }
386
387 p->m_pos = 0;
388 p->m_type = type;
389 p->m_peerid = peerid;
390 p->m_pid = pid;
391 p->m_fd = fd;
392 }
393
394 void
m_add(struct mproc * p,const void * data,size_t len)395 m_add(struct mproc *p, const void *data, size_t len)
396 {
397 size_t alloc;
398 void *tmp;
399
400 if (p->m_pos + len + IMSG_HEADER_SIZE > MAX_IMSGSIZE) {
401 log_warnx("warn: message to large");
402 fatal(NULL);
403 }
404
405 alloc = p->m_alloc;
406 while (p->m_pos + len > alloc)
407 alloc *= 2;
408 if (alloc != p->m_alloc) {
409 log_trace(TRACE_MPROC, "mproc: %s -> %s: realloc %zu -> %zu",
410 proc_name(smtpd_process),
411 proc_name(p->proc),
412 p->m_alloc,
413 alloc);
414
415 tmp = realloc(p->m_buf, alloc);
416 if (tmp == NULL)
417 fatal("realloc");
418 p->m_alloc = alloc;
419 p->m_buf = tmp;
420 }
421
422 memmove(p->m_buf + p->m_pos, data, len);
423 p->m_pos += len;
424 }
425
426 void
m_close(struct mproc * p)427 m_close(struct mproc *p)
428 {
429 if (imsg_compose(&p->imsgbuf, p->m_type, p->m_peerid, p->m_pid, p->m_fd,
430 p->m_buf, p->m_pos) == -1)
431 fatal("imsg_compose");
432
433 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s",
434 proc_name(smtpd_process),
435 proc_name(p->proc),
436 p->m_pos,
437 imsg_to_str(p->m_type));
438
439 p->msg_out += 1;
440 p->bytes_queued += p->m_pos + IMSG_HEADER_SIZE;
441 if (p->bytes_queued > p->bytes_queued_max)
442 p->bytes_queued_max = p->bytes_queued;
443
444 mproc_event_add(p);
445 }
446
447 void
m_flush(struct mproc * p)448 m_flush(struct mproc *p)
449 {
450 if (imsg_compose(&p->imsgbuf, p->m_type, p->m_peerid, p->m_pid, p->m_fd,
451 p->m_buf, p->m_pos) == -1)
452 fatal("imsg_compose");
453
454 log_trace(TRACE_MPROC, "mproc: %s -> %s : %zu %s (flush)",
455 proc_name(smtpd_process),
456 proc_name(p->proc),
457 p->m_pos,
458 imsg_to_str(p->m_type));
459
460 p->msg_out += 1;
461 p->m_pos = 0;
462
463 imsg_flush(&p->imsgbuf);
464 }
465
466 static struct imsg * current;
467
468 static void
m_error(const char * error)469 m_error(const char *error)
470 {
471 char buf[512];
472
473 (void)snprintf(buf, sizeof buf, "%s: %s: %s",
474 proc_name(smtpd_process),
475 imsg_to_str(current->hdr.type),
476 error);
477 fatalx("%s", buf);
478 }
479
480 void
m_msg(struct msg * m,struct imsg * imsg)481 m_msg(struct msg *m, struct imsg *imsg)
482 {
483 current = imsg;
484 m->pos = imsg->data;
485 m->end = m->pos + (imsg->hdr.len - sizeof(imsg->hdr));
486 }
487
488 void
m_end(struct msg * m)489 m_end(struct msg *m)
490 {
491 if (m->pos != m->end)
492 m_error("not at msg end");
493 }
494
495 int
m_is_eom(struct msg * m)496 m_is_eom(struct msg *m)
497 {
498 return (m->pos == m->end);
499 }
500
501 static inline void
m_get(struct msg * m,void * dst,size_t sz)502 m_get(struct msg *m, void *dst, size_t sz)
503 {
504 if (sz > MAX_IMSGSIZE ||
505 m->end - m->pos < (ssize_t)sz)
506 m_error("msg too short");
507
508 memmove(dst, m->pos, sz);
509 m->pos += sz;
510 }
511
512 void
m_add_int(struct mproc * m,int v)513 m_add_int(struct mproc *m, int v)
514 {
515 m_add(m, &v, sizeof(v));
516 };
517
518 void
m_add_u32(struct mproc * m,uint32_t u32)519 m_add_u32(struct mproc *m, uint32_t u32)
520 {
521 m_add(m, &u32, sizeof(u32));
522 };
523
524 void
m_add_size(struct mproc * m,size_t sz)525 m_add_size(struct mproc *m, size_t sz)
526 {
527 m_add(m, &sz, sizeof(sz));
528 };
529
530 void
m_add_time(struct mproc * m,time_t v)531 m_add_time(struct mproc *m, time_t v)
532 {
533 m_add(m, &v, sizeof(v));
534 };
535
536 void
m_add_string(struct mproc * m,const char * v)537 m_add_string(struct mproc *m, const char *v)
538 {
539 m_add(m, v, strlen(v) + 1);
540 };
541
542 void
m_add_data(struct mproc * m,const void * v,size_t len)543 m_add_data(struct mproc *m, const void *v, size_t len)
544 {
545 m_add_size(m, len);
546 m_add(m, v, len);
547 };
548
549 void
m_add_id(struct mproc * m,uint64_t v)550 m_add_id(struct mproc *m, uint64_t v)
551 {
552 m_add(m, &v, sizeof(v));
553 }
554
555 void
m_add_evpid(struct mproc * m,uint64_t v)556 m_add_evpid(struct mproc *m, uint64_t v)
557 {
558 m_add(m, &v, sizeof(v));
559 }
560
561 void
m_add_msgid(struct mproc * m,uint32_t v)562 m_add_msgid(struct mproc *m, uint32_t v)
563 {
564 m_add(m, &v, sizeof(v));
565 }
566
567 void
m_add_sockaddr(struct mproc * m,const struct sockaddr * sa)568 m_add_sockaddr(struct mproc *m, const struct sockaddr *sa)
569 {
570 m_add_size(m, SA_LEN(sa));
571 m_add(m, sa, SA_LEN(sa));
572 }
573
574 void
m_add_mailaddr(struct mproc * m,const struct mailaddr * maddr)575 m_add_mailaddr(struct mproc *m, const struct mailaddr *maddr)
576 {
577 m_add(m, maddr, sizeof(*maddr));
578 }
579
580 void
m_get_int(struct msg * m,int * i)581 m_get_int(struct msg *m, int *i)
582 {
583 m_get(m, i, sizeof(*i));
584 }
585
586 void
m_get_u32(struct msg * m,uint32_t * u32)587 m_get_u32(struct msg *m, uint32_t *u32)
588 {
589 m_get(m, u32, sizeof(*u32));
590 }
591
592 void
m_get_size(struct msg * m,size_t * sz)593 m_get_size(struct msg *m, size_t *sz)
594 {
595 m_get(m, sz, sizeof(*sz));
596 }
597
598 void
m_get_time(struct msg * m,time_t * t)599 m_get_time(struct msg *m, time_t *t)
600 {
601 m_get(m, t, sizeof(*t));
602 }
603
604 void
m_get_string(struct msg * m,const char ** s)605 m_get_string(struct msg *m, const char **s)
606 {
607 uint8_t *end;
608
609 if (m->pos >= m->end)
610 m_error("msg too short");
611
612 end = memchr(m->pos, 0, m->end - m->pos);
613 if (end == NULL)
614 m_error("unterminated string");
615
616 *s = m->pos;
617 m->pos = end + 1;
618 }
619
620 void
m_get_data(struct msg * m,const void ** data,size_t * sz)621 m_get_data(struct msg *m, const void **data, size_t *sz)
622 {
623 m_get_size(m, sz);
624
625 if (m->pos + *sz > m->end)
626 m_error("msg too short");
627
628 *data = m->pos;
629 m->pos += *sz;
630 }
631
632 void
m_get_evpid(struct msg * m,uint64_t * evpid)633 m_get_evpid(struct msg *m, uint64_t *evpid)
634 {
635 m_get(m, evpid, sizeof(*evpid));
636 }
637
638 void
m_get_msgid(struct msg * m,uint32_t * msgid)639 m_get_msgid(struct msg *m, uint32_t *msgid)
640 {
641 m_get(m, msgid, sizeof(*msgid));
642 }
643
644 void
m_get_id(struct msg * m,uint64_t * id)645 m_get_id(struct msg *m, uint64_t *id)
646 {
647 m_get(m, id, sizeof(*id));
648 }
649
650 void
m_get_sockaddr(struct msg * m,struct sockaddr * sa)651 m_get_sockaddr(struct msg *m, struct sockaddr *sa)
652 {
653 size_t len;
654
655 m_get_size(m, &len);
656 m_get(m, sa, len);
657 }
658
659 void
m_get_mailaddr(struct msg * m,struct mailaddr * maddr)660 m_get_mailaddr(struct msg *m, struct mailaddr *maddr)
661 {
662 m_get(m, maddr, sizeof(*maddr));
663 }
664