1 /* $OpenBSD$ */
2
3 /*
4 * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
5 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
6 *
7 * Permission to use, copy, modify, and distribute this software for any
8 * purpose with or without fee is hereby granted, provided that the above
9 * copyright notice and this permission notice appear in all copies.
10 *
11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18 */
19
20 #include "includes.h"
21
22 #include <sys/types.h>
23 #include <sys/queue.h>
24 #include <sys/uio.h>
25
26 #include <event.h>
27 #include <fcntl.h>
28 #include <imsg.h>
29 #include <inttypes.h>
30 #include <pwd.h>
31 #include <signal.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <grp.h>
37
38 #include <smtpd-api.h>
39
40 #define FILTER_HIWAT 65536
41
42 static struct tree queries;
43 static struct tree sessions;
44
45 struct filter_session {
46 uint64_t id;
47 uint64_t qid;
48 int qtype;
49 size_t datalen;
50
51 int tx;
52 struct {
53 int eom_called;
54
55 int error;
56 struct io iev;
57 struct iobuf ibuf;
58 size_t idatalen;
59 struct io oev;
60 struct iobuf obuf;
61 size_t odatalen;
62 } pipe;
63
64 struct {
65 int ready;
66 int status;
67 int code;
68 char *line;
69 } response;
70
71 void *usession;
72 void *utx;
73
74 void *data_buffer;
75 void (*data_buffer_cb)(uint64_t, FILE *, void *);
76
77 struct rfc2822_parser rfc2822_parser;
78 struct dict headers_replace;
79 struct dict headers_add;
80
81 };
82
83 struct filter_timer {
84 struct event ev;
85 uint64_t id;
86 void (*cb)(uint64_t, void *);
87 void *arg;
88 };
89
90 static int register_done;
91 static const char *filter_name;
92
93 static struct filter_internals {
94 struct mproc p;
95
96 uint32_t flags;
97
98 uid_t uid;
99 gid_t gid;
100 const char *rootpath;
101
102 struct {
103 int (*connect)(uint64_t, struct filter_connect *);
104 int (*helo)(uint64_t, const char *);
105 int (*mail)(uint64_t, struct mailaddr *);
106 int (*rcpt)(uint64_t, struct mailaddr *);
107 int (*data)(uint64_t);
108
109 void (*msg_line)(uint64_t, const char *);
110 void (*msg_start)(uint64_t);
111 int (*msg_end)(uint64_t, size_t);
112
113 void (*disconnect)(uint64_t);
114 void (*reset)(uint64_t);
115
116 void *(*session_alloc)(uint64_t);
117 void (*session_free)(void *);
118
119 void *(*tx_alloc)(uint64_t);
120 void (*tx_free)(void *);
121 void (*tx_begin)(uint64_t);
122 void (*tx_commit)(uint64_t);
123 void (*tx_rollback)(uint64_t);
124 } cb;
125
126 int data_buffered;
127 } fi;
128
129 static void filter_api_init(void);
130 static void filter_response(struct filter_session *, int, int, const char *);
131 static void filter_send_response(struct filter_session *);
132 static void filter_register_query(uint64_t, uint64_t, int);
133 static void filter_dispatch(struct mproc *, struct imsg *);
134 static void filter_dispatch_data(uint64_t);
135 static void filter_dispatch_msg_line(uint64_t, const char *);
136 static void filter_dispatch_msg_start(uint64_t);
137 static void filter_dispatch_msg_end(uint64_t, size_t);
138 static void filter_dispatch_connect(uint64_t, struct filter_connect *);
139 static void filter_dispatch_helo(uint64_t, const char *);
140 static void filter_dispatch_mail(uint64_t, struct mailaddr *);
141 static void filter_dispatch_rcpt(uint64_t, struct mailaddr *);
142 static void filter_dispatch_reset(uint64_t);
143 static void filter_dispatch_tx_begin(uint64_t);
144 static void filter_dispatch_tx_commit(uint64_t);
145 static void filter_dispatch_tx_rollback(uint64_t);
146 static void filter_dispatch_disconnect(uint64_t);
147
148 static void filter_trigger_eom(struct filter_session *);
149 static void filter_io_in(struct io *, int);
150 static void filter_io_out(struct io *, int);
151 static const char *filterimsg_to_str(int);
152 static const char *query_to_str(int);
153 static const char *event_to_str(int);
154
155 static void data_buffered_setup(struct filter_session *);
156 static void data_buffered_release(struct filter_session *);
157 static void data_buffered_stream_process(uint64_t, FILE *, void *);
158
159
160 static void
filter_response(struct filter_session * s,int status,int code,const char * line)161 filter_response(struct filter_session *s, int status, int code, const char *line)
162 {
163 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" %s filter_response(%d, %d, %s)",
164 filter_name, s->id, query_to_str(s->qtype), status, code, line);
165
166 s->response.ready = 1;
167 s->response.status = status;
168 s->response.code = code;
169 if (line)
170 s->response.line = strdup(line);
171 else
172 s->response.line = NULL;
173
174 /* eom is special, as the reponse has to be deferred until the pipe is all flushed */
175 if (s->qtype == QUERY_EOM) {
176 /* wait for the obuf to drain */
177 if (iobuf_queued(&s->pipe.obuf))
178 return;
179
180 if (s->pipe.oev.sock != -1) {
181 io_clear(&s->pipe.oev);
182 iobuf_clear(&s->pipe.obuf);
183 }
184 filter_trigger_eom(s);
185 }
186 else
187 filter_send_response(s);
188 }
189
190 static void
filter_send_response(struct filter_session * s)191 filter_send_response(struct filter_session *s)
192 {
193 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" %s filter_send_response() -> %d, %d, %s",
194 filter_name, s->id, query_to_str(s->qtype),
195 s->response.status,
196 s->response.code,
197 s->response.line);
198
199 tree_xpop(&queries, s->qid);
200
201 m_create(&fi.p, IMSG_FILTER_RESPONSE, 0, 0, -1);
202 m_add_id(&fi.p, s->qid);
203 m_add_int(&fi.p, s->qtype);
204 if (s->qtype == QUERY_EOM)
205 m_add_u32(&fi.p, s->datalen);
206 m_add_int(&fi.p, s->response.status);
207 m_add_int(&fi.p, s->response.code);
208 if (s->response.line) {
209 m_add_string(&fi.p, s->response.line);
210 free(s->response.line);
211 s->response.line = NULL;
212 }
213 m_close(&fi.p);
214
215 s->qid = 0;
216 s->response.ready = 0;
217 }
218
219 static void
filter_dispatch(struct mproc * p,struct imsg * imsg)220 filter_dispatch(struct mproc *p, struct imsg *imsg)
221 {
222 struct filter_session *s;
223 struct filter_connect q_connect;
224 struct mailaddr maddr;
225 struct msg m;
226 const char *line, *name;
227 uint32_t v, datalen;
228 uint64_t id, qid;
229 int type;
230 int fds[2], fdin, fdout;
231
232 #ifdef EXPERIMENTAL
233 log_warnx("filter is EXPERIMENTAL and NOT meant to be used in production.");
234 #endif
235
236 if (imsg == NULL) {
237 log_trace(TRACE_FILTERS, "filter-api:%s server closed", filter_name);
238 exit(0);
239 }
240
241 log_trace(TRACE_FILTERS, "filter-api:%s imsg %s", filter_name,
242 filterimsg_to_str(imsg->hdr.type));
243
244 switch (imsg->hdr.type) {
245 case IMSG_FILTER_REGISTER:
246 m_msg(&m, imsg);
247 m_get_u32(&m, &v);
248 m_get_string(&m, &name);
249 filter_name = strdup(name);
250 m_end(&m);
251 if (v != FILTER_API_VERSION) {
252 log_warnx("warn: filter-api:%s API mismatch", filter_name);
253 fatalx("filter-api: exiting");
254 }
255 m_create(p, IMSG_FILTER_REGISTER, 0, 0, -1);
256 /* all hooks for now */
257 m_add_int(p, ~0);
258 m_add_int(p, fi.flags);
259 m_close(p);
260 break;
261
262 case IMSG_FILTER_EVENT:
263 m_msg(&m, imsg);
264 m_get_id(&m, &id);
265 m_get_int(&m, &type);
266 m_end(&m);
267 switch (type) {
268 case EVENT_CONNECT:
269 s = calloc(1, sizeof(*s));
270 if (s == NULL)
271 fatal("filter_dispatch");
272 s->id = id;
273 s->pipe.iev.sock = -1;
274 s->pipe.oev.sock = -1;
275 tree_xset(&sessions, id, s);
276 if (fi.cb.session_alloc)
277 s->usession = fi.cb.session_alloc(id);
278 break;
279 case EVENT_DISCONNECT:
280 filter_dispatch_disconnect(id);
281 s = tree_xpop(&sessions, id);
282 if (fi.cb.session_free && s->usession)
283 fi.cb.session_free(s->usession);
284 free(s);
285 break;
286 case EVENT_RESET:
287 filter_dispatch_reset(id);
288 break;
289 case EVENT_TX_BEGIN:
290 filter_dispatch_tx_begin(id);
291 break;
292 case EVENT_TX_COMMIT:
293 filter_dispatch_tx_commit(id);
294 break;
295 case EVENT_TX_ROLLBACK:
296 filter_dispatch_tx_rollback(id);
297 break;
298 default:
299 log_warnx("warn: filter-api:%s bad event %d", filter_name, type);
300 fatalx("filter-api: exiting");
301 }
302 break;
303
304 case IMSG_FILTER_QUERY:
305 m_msg(&m, imsg);
306 m_get_id(&m, &id);
307 m_get_id(&m, &qid);
308 m_get_int(&m, &type);
309 switch(type) {
310 case QUERY_CONNECT:
311 m_get_sockaddr(&m, (struct sockaddr*)&q_connect.local);
312 m_get_sockaddr(&m, (struct sockaddr*)&q_connect.remote);
313 m_get_string(&m, &q_connect.hostname);
314 m_end(&m);
315 filter_register_query(id, qid, type);
316 filter_dispatch_connect(id, &q_connect);
317 break;
318 case QUERY_HELO:
319 m_get_string(&m, &line);
320 m_end(&m);
321 filter_register_query(id, qid, type);
322 filter_dispatch_helo(id, line);
323 break;
324 case QUERY_MAIL:
325 m_get_mailaddr(&m, &maddr);
326 m_end(&m);
327 filter_register_query(id, qid, type);
328 filter_dispatch_mail(id, &maddr);
329 break;
330 case QUERY_RCPT:
331 m_get_mailaddr(&m, &maddr);
332 m_end(&m);
333 filter_register_query(id, qid, type);
334 filter_dispatch_rcpt(id, &maddr);
335 break;
336 case QUERY_DATA:
337 m_end(&m);
338 filter_register_query(id, qid, type);
339 filter_dispatch_data(id);
340 break;
341 case QUERY_EOM:
342 m_get_u32(&m, &datalen);
343 m_end(&m);
344 filter_register_query(id, qid, type);
345 filter_dispatch_msg_end(id, datalen);
346 break;
347 default:
348 log_warnx("warn: filter-api:%s bad query %d", filter_name, type);
349 fatalx("filter-api: exiting");
350 }
351 break;
352
353 case IMSG_FILTER_PIPE:
354 m_msg(&m, imsg);
355 m_get_id(&m, &id);
356 m_end(&m);
357
358 fdout = imsg->fd;
359 fdin = -1;
360
361 if (fdout == -1) {
362 log_warnx("warn: %016"PRIx64" failed to receive pipe",
363 id);
364 }
365 else if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, fds) == -1) {
366 log_warn("warn: filter-api:%s socketpair", filter_name);
367 close(fdout);
368 }
369 else {
370 s = tree_xget(&sessions, id);
371
372 s->pipe.eom_called = 0;
373 s->pipe.error = 0;
374 s->pipe.idatalen = 0;
375 s->pipe.odatalen = 0;
376
377 iobuf_init(&s->pipe.obuf, 0, 0);
378 io_init(&s->pipe.oev, fdout, s, filter_io_out, &s->pipe.obuf);
379 io_set_write(&s->pipe.oev);
380
381 iobuf_init(&s->pipe.ibuf, 0, 0);
382 io_init(&s->pipe.iev, fds[0], s, filter_io_in, &s->pipe.ibuf);
383 io_set_read(&s->pipe.iev);
384
385 fdin = fds[1];
386 }
387
388 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" tx pipe %d -> %d",
389 filter_name, id, fdin, fdout);
390
391 m_create(&fi.p, IMSG_FILTER_PIPE, 0, 0, fdin);
392 m_add_id(&fi.p, id);
393 m_close(&fi.p);
394
395 if (fdin != -1)
396 filter_dispatch_msg_start(id);
397
398 break;
399 }
400 }
401
402 static void
filter_register_query(uint64_t id,uint64_t qid,int type)403 filter_register_query(uint64_t id, uint64_t qid, int type)
404 {
405 struct filter_session *s;
406
407 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" %s", filter_name, id, query_to_str(type));
408
409 s = tree_xget(&sessions, id);
410 if (s->qid) {
411 log_warnx("warn: filter-api:%s query already in progress",
412 filter_name);
413 fatalx("filter-api: exiting");
414 }
415 s->qid = qid;
416 s->qtype = type;
417 s->response.ready = 0;
418
419 tree_xset(&queries, qid, s);
420 }
421
422 static void
filter_dispatch_connect(uint64_t id,struct filter_connect * conn)423 filter_dispatch_connect(uint64_t id, struct filter_connect *conn)
424 {
425 if (fi.cb.connect)
426 fi.cb.connect(id, conn);
427 else
428 filter_api_accept(id);
429 }
430
431 static void
filter_dispatch_helo(uint64_t id,const char * helo)432 filter_dispatch_helo(uint64_t id, const char *helo)
433 {
434 if (fi.cb.helo)
435 fi.cb.helo(id, helo);
436 else
437 filter_api_accept(id);
438 }
439
440 static void
filter_dispatch_mail(uint64_t id,struct mailaddr * mail)441 filter_dispatch_mail(uint64_t id, struct mailaddr *mail)
442 {
443 if (fi.cb.mail)
444 fi.cb.mail(id, mail);
445 else
446 filter_api_accept(id);
447 }
448
449 static void
filter_dispatch_rcpt(uint64_t id,struct mailaddr * rcpt)450 filter_dispatch_rcpt(uint64_t id, struct mailaddr *rcpt)
451 {
452 if (fi.cb.rcpt)
453 fi.cb.rcpt(id, rcpt);
454 else
455 filter_api_accept(id);
456 }
457
458 static void
filter_dispatch_data(uint64_t id)459 filter_dispatch_data(uint64_t id)
460 {
461 if (fi.cb.data)
462 fi.cb.data(id);
463 else
464 filter_api_accept(id);
465 }
466
467 static void
filter_dispatch_reset(uint64_t id)468 filter_dispatch_reset(uint64_t id)
469 {
470 if (fi.cb.reset)
471 fi.cb.reset(id);
472 }
473
474 static void
filter_dispatch_tx_begin(uint64_t id)475 filter_dispatch_tx_begin(uint64_t id)
476 {
477 struct filter_session *s;
478
479 s = tree_xget(&sessions, id);
480 if (s->tx)
481 fatalx("tx-begin: session %016"PRIx64" in transaction", id);
482
483 s->tx = 1;
484
485 if (fi.cb.tx_alloc)
486 s->utx = fi.cb.tx_alloc(id);
487
488 if (fi.cb.tx_begin)
489 fi.cb.tx_begin(id);
490 }
491
492 static void
filter_dispatch_tx_commit(uint64_t id)493 filter_dispatch_tx_commit(uint64_t id)
494 {
495 struct filter_session *s;
496
497 s = tree_xget(&sessions, id);
498 if (s->tx == 0)
499 fatalx("tx-commit: session %016"PRIx64" not in transaction", id);
500
501 s->tx = 0;
502 io_clear(&s->pipe.oev);
503 iobuf_clear(&s->pipe.obuf);
504 io_clear(&s->pipe.iev);
505 iobuf_clear(&s->pipe.ibuf);
506
507 if (fi.cb.tx_commit)
508 fi.cb.tx_commit(id);
509
510 if (fi.cb.tx_free && s->utx) {
511 fi.cb.tx_free(s->utx);
512 s->utx = NULL;
513 }
514
515 if (s->data_buffer)
516 data_buffered_release(s);
517 }
518
519 static void
filter_dispatch_tx_rollback(uint64_t id)520 filter_dispatch_tx_rollback(uint64_t id)
521 {
522 struct filter_session *s;
523
524 s = tree_xget(&sessions, id);
525 if (s->tx == 0)
526 fatalx("tx-rollback: session %016"PRIx64" not in transaction", id);
527
528 s->tx = 0;
529 io_clear(&s->pipe.oev);
530 iobuf_clear(&s->pipe.obuf);
531 io_clear(&s->pipe.iev);
532 iobuf_clear(&s->pipe.ibuf);
533
534 if (fi.cb.tx_rollback)
535 fi.cb.tx_rollback(id);
536
537 if (fi.cb.tx_free && s->utx) {
538 fi.cb.tx_free(s->utx);
539 s->utx = NULL;
540 }
541
542 if (s->data_buffer)
543 data_buffered_release(s);
544 }
545
546 static void
filter_dispatch_disconnect(uint64_t id)547 filter_dispatch_disconnect(uint64_t id)
548 {
549 if (fi.cb.disconnect)
550 fi.cb.disconnect(id);
551 }
552
553 static void
filter_dispatch_msg_line(uint64_t id,const char * data)554 filter_dispatch_msg_line(uint64_t id, const char *data)
555 {
556 if (fi.cb.msg_line)
557 fi.cb.msg_line(id, data);
558 else
559 filter_api_writeln(id, data);
560 }
561
562 static void
filter_dispatch_msg_start(uint64_t id)563 filter_dispatch_msg_start(uint64_t id)
564 {
565
566 struct filter_session *s;
567
568 if (fi.data_buffered) {
569 s = tree_xget(&sessions, id);
570 data_buffered_setup(s);
571 }
572
573 if (fi.cb.msg_start)
574 fi.cb.msg_start(id);
575 }
576
577 static void
filter_dispatch_msg_end(uint64_t id,size_t datalen)578 filter_dispatch_msg_end(uint64_t id, size_t datalen)
579 {
580 struct filter_session *s;
581
582 s = tree_xget(&sessions, id);
583 s->datalen = datalen;
584
585 filter_trigger_eom(s);
586 }
587
588 static void
filter_trigger_eom(struct filter_session * s)589 filter_trigger_eom(struct filter_session *s)
590 {
591 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" filter_trigger_eom(%d, %d, %zu, %zu, %zu)",
592 filter_name, s->id, s->pipe.iev.sock, s->pipe.oev.sock,
593 s->datalen, s->pipe.idatalen, s->pipe.odatalen);
594
595 /* This is called when
596 * - EOM query is first received
597 * - input data is closed
598 * - output has been written
599 */
600
601 /* input not done yet, or EOM query not received */
602 if (s->pipe.iev.sock != -1 || s->qid == 0)
603 return;
604
605 if (s->pipe.error)
606 goto fail;
607
608 /* if size don't match, error out */
609 if (s->pipe.idatalen != s->datalen) {
610 log_trace(TRACE_FILTERS, "filter-api:%s tx datalen mismatch: %zu/%zu",
611 filter_name, s->pipe.idatalen, s->datalen);
612 s->pipe.error = 1;
613 goto fail;
614 }
615
616 /* if we didn't send the eom to the user do it now */
617 if (!s->pipe.eom_called) {
618 s->pipe.eom_called = 1;
619 if (fi.cb.msg_end)
620 fi.cb.msg_end(s->id, s->datalen);
621 else
622 filter_api_accept(s->id);
623 return;
624 }
625
626 if (s->pipe.error)
627 goto fail;
628
629 /* wait for the output socket to be closed */
630 if (s->pipe.oev.sock != -1)
631 return;
632
633 s->datalen = s->pipe.odatalen;
634 filter_send_response(s);
635
636 fail:
637 /* XXX */
638 return;
639 }
640
641 static void
filter_io_in(struct io * io,int evt)642 filter_io_in(struct io *io, int evt)
643 {
644 struct filter_session *s = io->arg;
645 char *line;
646 size_t len;
647
648 log_trace(TRACE_FILTERS, "filter-api:%s filter_io_in(%p, %s)",
649 filter_name, s, io_strevent(evt));
650
651 switch (evt) {
652 case IO_DATAIN:
653 nextline:
654 line = iobuf_getline(&s->pipe.ibuf, &len);
655 if ((line == NULL && iobuf_len(&s->pipe.ibuf) >= LINE_MAX) ||
656 (line && len >= LINE_MAX)) {
657 s->pipe.error = 1;
658 break;
659 }
660 /* No complete line received */
661 if (line == NULL) {
662 iobuf_normalize(&s->pipe.ibuf);
663 /* flow control */
664 if (iobuf_queued(&s->pipe.obuf) >= FILTER_HIWAT)
665 io_pause(&s->pipe.iev, IO_PAUSE_IN);
666 return;
667 }
668
669 s->pipe.idatalen += len + 1;
670 /* XXX warning: do not clear io from this call! */
671 if (s->data_buffer) {
672 /* XXX handle errors somehow */
673 fprintf(s->data_buffer, "%s\n", line);
674 }
675 filter_dispatch_msg_line(s->id, line);
676 goto nextline;
677
678 case IO_DISCONNECTED:
679 if (iobuf_len(&s->pipe.ibuf)) {
680 log_warn("warn: filter-api:%s %016"PRIx64" incomplete input",
681 filter_name, s->id);
682 }
683 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" input done (%zu bytes)",
684 filter_name, s->id, s->pipe.idatalen);
685 break;
686
687 default:
688 log_warn("warn: filter-api:%s %016"PRIx64": unexpected io event %d on data pipe",
689 filter_name, s->id, evt);
690 s->pipe.error = 1;
691
692 }
693 if (s->pipe.error) {
694 io_clear(&s->pipe.oev);
695 iobuf_clear(&s->pipe.obuf);
696 }
697 io_clear(&s->pipe.iev);
698 iobuf_clear(&s->pipe.ibuf);
699 filter_trigger_eom(s);
700 }
701
702 static void
filter_io_out(struct io * io,int evt)703 filter_io_out(struct io *io, int evt)
704 {
705 struct filter_session *s = io->arg;
706
707 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" filter_io_out(%s)",
708 filter_name, s->id, io_strevent(evt));
709
710 switch (evt) {
711 case IO_TIMEOUT:
712 case IO_DISCONNECTED:
713 case IO_ERROR:
714 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" io error on output pipe",
715 filter_name, s->id);
716 s->pipe.error = 1;
717 break;
718
719 case IO_LOWAT:
720 /* flow control */
721 if (s->pipe.iev.sock != -1 && s->pipe.iev.flags & IO_PAUSE_IN) {
722 io_resume(&s->pipe.iev, IO_PAUSE_IN);
723 return;
724 }
725
726 /* if the input is done and there is a response we are done */
727 if (s->pipe.iev.sock == -1 && s->response.ready)
728 break;
729
730 /* just wait for more data to send or feed through callback */
731 if (s->data_buffer_cb)
732 s->data_buffer_cb(s->id, s->data_buffer, s);
733 return;
734
735 default:
736 fatalx("filter_io_out()");
737 }
738
739 io_clear(&s->pipe.oev);
740 iobuf_clear(&s->pipe.obuf);
741 if (s->pipe.error) {
742 io_clear(&s->pipe.iev);
743 iobuf_clear(&s->pipe.ibuf);
744 }
745 filter_trigger_eom(s);
746 }
747
748 #define CASE(x) case x : return #x
749
750 static const char *
filterimsg_to_str(int imsg)751 filterimsg_to_str(int imsg)
752 {
753 switch (imsg) {
754 CASE(IMSG_FILTER_REGISTER);
755 CASE(IMSG_FILTER_EVENT);
756 CASE(IMSG_FILTER_QUERY);
757 CASE(IMSG_FILTER_PIPE);
758 CASE(IMSG_FILTER_RESPONSE);
759 default:
760 return ("IMSG_FILTER_???");
761 }
762 }
763
764 static const char *
query_to_str(int query)765 query_to_str(int query)
766 {
767 switch (query) {
768 CASE(QUERY_CONNECT);
769 CASE(QUERY_HELO);
770 CASE(QUERY_MAIL);
771 CASE(QUERY_RCPT);
772 CASE(QUERY_DATA);
773 CASE(QUERY_EOM);
774 CASE(QUERY_DATALINE);
775 default:
776 return ("QUERY_???");
777 }
778 }
779
780 static const char *
event_to_str(int event)781 event_to_str(int event)
782 {
783 switch (event) {
784 CASE(EVENT_CONNECT);
785 CASE(EVENT_RESET);
786 CASE(EVENT_DISCONNECT);
787 CASE(EVENT_TX_BEGIN);
788 CASE(EVENT_TX_COMMIT);
789 CASE(EVENT_TX_ROLLBACK);
790 default:
791 return ("EVENT_???");
792 }
793 }
794
795 /*
796 * These functions are called from mproc.c
797 */
798
799 enum smtp_proc_type smtpd_process;
800
801 const char *
proc_name(enum smtp_proc_type proc)802 proc_name(enum smtp_proc_type proc)
803 {
804 if (proc == PROC_FILTER)
805 return (filter_name);
806 return ("filter");
807 }
808
809 const char *
imsg_to_str(int imsg)810 imsg_to_str(int imsg)
811 {
812 static char buf[32];
813
814 snprintf(buf, sizeof(buf), "%d", imsg);
815
816 return (buf);
817 }
818
819
820 /*
821 * These functions are callable by filters
822 */
823
824 void
filter_api_session_allocator(void * (* f)(uint64_t))825 filter_api_session_allocator(void *(*f)(uint64_t))
826 {
827 fi.cb.session_alloc = f;
828 }
829
830 void
filter_api_session_destructor(void (* f)(void *))831 filter_api_session_destructor(void (*f)(void *))
832 {
833 fi.cb.session_free = f;
834 }
835
836 void *
filter_api_session(uint64_t id)837 filter_api_session(uint64_t id)
838 {
839 struct filter_session *s;
840
841 s = tree_xget(&sessions, id);
842 return s->usession;
843 }
844
845 void
filter_api_transaction_allocator(void * (* f)(uint64_t))846 filter_api_transaction_allocator(void *(*f)(uint64_t))
847 {
848 fi.cb.tx_alloc = f;
849 }
850
851 void
filter_api_transaction_destructor(void (* f)(void *))852 filter_api_transaction_destructor(void (*f)(void *))
853 {
854 fi.cb.tx_free = f;
855 }
856
857 void *
filter_api_transaction(uint64_t id)858 filter_api_transaction(uint64_t id)
859 {
860 struct filter_session *s;
861
862 s = tree_xget(&sessions, id);
863 return s->utx;
864 }
865
866 void
filter_api_setugid(uid_t uid,gid_t gid)867 filter_api_setugid(uid_t uid, gid_t gid)
868 {
869 filter_api_init();
870
871 if (!uid) {
872 log_warn("warn: filter-api:%s can't set uid 0", filter_name);
873 fatalx("filter-api: exiting");
874 }
875 if (!gid) {
876 log_warn("warn: filter-api:%s can't set gid 0", filter_name);
877 fatalx("filter-api: exiting");
878 }
879 fi.uid = uid;
880 fi.gid = gid;
881 }
882
883 void
filter_api_no_chroot(void)884 filter_api_no_chroot(void)
885 {
886 filter_api_init();
887
888 fi.rootpath = NULL;
889 }
890
891 void
filter_api_set_chroot(const char * rootpath)892 filter_api_set_chroot(const char *rootpath)
893 {
894 filter_api_init();
895
896 fi.rootpath = rootpath;
897 }
898
899 static void
filter_api_init(void)900 filter_api_init(void)
901 {
902 extern const char *__progname;
903 struct passwd *pw;
904 static int init = 0;
905
906 if (init)
907 return;
908
909 init = 1;
910
911 smtpd_process = PROC_FILTER;
912 filter_name = __progname;
913
914 pw = getpwnam(SMTPD_USER);
915 if (pw == NULL) {
916 log_warn("warn: filter-api:%s getpwnam", filter_name);
917 fatalx("filter-api: exiting");
918 }
919
920 tree_init(&queries);
921 tree_init(&sessions);
922 event_init();
923
924 memset(&fi, 0, sizeof(fi));
925 fi.p.proc = PROC_PONY;
926 fi.p.name = "filter";
927 fi.p.handler = filter_dispatch;
928 fi.uid = pw->pw_uid;
929 fi.gid = pw->pw_gid;
930 fi.rootpath = PATH_CHROOT;
931
932 mproc_init(&fi.p, 0);
933 }
934
935 void
filter_api_on_connect(int (* cb)(uint64_t,struct filter_connect *))936 filter_api_on_connect(int(*cb)(uint64_t, struct filter_connect *))
937 {
938 filter_api_init();
939
940 fi.cb.connect = cb;
941 }
942
943 void
filter_api_on_helo(int (* cb)(uint64_t,const char *))944 filter_api_on_helo(int(*cb)(uint64_t, const char *))
945 {
946 filter_api_init();
947
948 fi.cb.helo = cb;
949 }
950
951 void
filter_api_on_mail(int (* cb)(uint64_t,struct mailaddr *))952 filter_api_on_mail(int(*cb)(uint64_t, struct mailaddr *))
953 {
954 filter_api_init();
955
956 fi.cb.mail = cb;
957 }
958
959 void
filter_api_on_rcpt(int (* cb)(uint64_t,struct mailaddr *))960 filter_api_on_rcpt(int(*cb)(uint64_t, struct mailaddr *))
961 {
962 filter_api_init();
963
964 fi.cb.rcpt = cb;
965 }
966
967 void
filter_api_on_data(int (* cb)(uint64_t))968 filter_api_on_data(int(*cb)(uint64_t))
969 {
970 filter_api_init();
971
972 fi.cb.data = cb;
973 }
974
975 void
filter_api_on_msg_line(void (* cb)(uint64_t,const char *))976 filter_api_on_msg_line(void(*cb)(uint64_t, const char *))
977 {
978 filter_api_init();
979
980 fi.cb.msg_line = cb;
981 }
982
983 void
filter_api_on_msg_start(void (* cb)(uint64_t))984 filter_api_on_msg_start(void(*cb)(uint64_t))
985 {
986 filter_api_init();
987
988 fi.cb.msg_start = cb;
989 }
990
991 void
filter_api_on_msg_end(int (* cb)(uint64_t,size_t))992 filter_api_on_msg_end(int(*cb)(uint64_t, size_t))
993 {
994 filter_api_init();
995
996 fi.cb.msg_end = cb;
997 }
998
999 void
filter_api_on_reset(void (* cb)(uint64_t))1000 filter_api_on_reset(void(*cb)(uint64_t))
1001 {
1002 filter_api_init();
1003
1004 fi.cb.reset = cb;
1005 }
1006
1007 void
filter_api_on_disconnect(void (* cb)(uint64_t))1008 filter_api_on_disconnect(void(*cb)(uint64_t))
1009 {
1010 filter_api_init();
1011
1012 fi.cb.disconnect = cb;
1013 }
1014
1015 void
filter_api_on_tx_begin(void (* cb)(uint64_t))1016 filter_api_on_tx_begin(void(*cb)(uint64_t))
1017 {
1018 filter_api_init();
1019
1020 fi.cb.tx_begin = cb;
1021 }
1022
1023 void
filter_api_on_tx_commit(void (* cb)(uint64_t))1024 filter_api_on_tx_commit(void(*cb)(uint64_t))
1025 {
1026 filter_api_init();
1027
1028 fi.cb.tx_commit = cb;
1029 }
1030
1031 void
filter_api_on_tx_rollback(void (* cb)(uint64_t))1032 filter_api_on_tx_rollback(void(*cb)(uint64_t))
1033 {
1034 filter_api_init();
1035
1036 fi.cb.tx_rollback = cb;
1037 }
1038
1039 void
filter_api_loop(void)1040 filter_api_loop(void)
1041 {
1042 if (register_done) {
1043 log_warnx("warn: filter-api:%s filter_api_loop() already called", filter_name);
1044 fatalx("filter-api: exiting");
1045 }
1046
1047 filter_api_init();
1048
1049 register_done = 1;
1050
1051 mproc_enable(&fi.p);
1052
1053 if (fi.rootpath) {
1054 if (chroot(fi.rootpath) == -1) {
1055 log_warn("warn: filter-api:%s chroot", filter_name);
1056 fatalx("filter-api: exiting");
1057 }
1058 if (chdir("/") == -1) {
1059 log_warn("warn: filter-api:%s chdir", filter_name);
1060 fatalx("filter-api: exiting");
1061 }
1062 }
1063
1064 if (setgroups(1, &fi.gid) ||
1065 setresgid(fi.gid, fi.gid, fi.gid) ||
1066 setresuid(fi.uid, fi.uid, fi.uid)) {
1067 log_warn("warn: filter-api:%s cannot drop privileges", filter_name);
1068 fatalx("filter-api: exiting");
1069 }
1070
1071 /* we must ignore SIGPIPE otherwise we might die when a data pipe goes away */
1072 signal(SIGPIPE, SIG_IGN);
1073
1074 if (event_dispatch() < 0) {
1075 log_warn("warn: filter-api:%s event_dispatch", filter_name);
1076 fatalx("filter-api: exiting");
1077 }
1078 }
1079
1080 int
filter_api_accept(uint64_t id)1081 filter_api_accept(uint64_t id)
1082 {
1083 struct filter_session *s;
1084
1085 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" filter_api_accept()", filter_name, id);
1086
1087 s = tree_xget(&sessions, id);
1088 filter_response(s, FILTER_OK, 0, NULL);
1089
1090 return (1);
1091 }
1092
1093 int
filter_api_reject(uint64_t id,enum filter_status status)1094 filter_api_reject(uint64_t id, enum filter_status status)
1095 {
1096 struct filter_session *s;
1097
1098 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" filter_api_reject(%d)",
1099 filter_name, id, status);
1100
1101 s = tree_xget(&sessions, id);
1102
1103 /* This is NOT an acceptable status for a failure */
1104 if (status == FILTER_OK)
1105 status = FILTER_FAIL;
1106
1107 filter_response(s, status, 0, NULL);
1108
1109 return (1);
1110 }
1111
1112 int
filter_api_reject_code(uint64_t id,enum filter_status status,uint32_t code,const char * line)1113 filter_api_reject_code(uint64_t id, enum filter_status status, uint32_t code,
1114 const char *line)
1115 {
1116 struct filter_session *s;
1117
1118 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" filter_api_reject_code(%d, %u, %s)",
1119 filter_name, id, status, code, line);
1120
1121 s = tree_xget(&sessions, id);
1122
1123 /* This is NOT an acceptable status for a failure */
1124 if (status == FILTER_OK)
1125 status = FILTER_FAIL;
1126
1127 filter_response(s, status, code, line);
1128
1129 return (1);
1130 }
1131
1132 void
filter_api_writeln(uint64_t id,const char * line)1133 filter_api_writeln(uint64_t id, const char *line)
1134 {
1135 struct filter_session *s;
1136
1137 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" filter_api_writeln(%s)", filter_name, id, line);
1138
1139 s = tree_xget(&sessions, id);
1140
1141 if (s->pipe.oev.sock == -1) {
1142 log_warnx("warn: session %016"PRIx64": write out of sequence", id);
1143 return;
1144 }
1145
1146 s->pipe.odatalen += strlen(line) + 1;
1147 iobuf_fqueue(&s->pipe.obuf, "%s\n", line);
1148 io_reload(&s->pipe.oev);
1149 }
1150
1151 void
filter_api_printf(uint64_t id,const char * fmt,...)1152 filter_api_printf(uint64_t id, const char *fmt, ...)
1153 {
1154 struct filter_session *s;
1155 va_list ap;
1156 int len;
1157
1158 log_trace(TRACE_FILTERS, "filter-api:%s %016"PRIx64" filter_api_printf(%s)",
1159 filter_name, id, fmt);
1160
1161 s = tree_xget(&sessions, id);
1162
1163 if (s->pipe.oev.sock == -1) {
1164 log_warnx("warn: session %016"PRIx64": write out of sequence", id);
1165 return;
1166 }
1167
1168 va_start(ap, fmt);
1169 len = iobuf_vfqueue(&s->pipe.obuf, fmt, ap);
1170 iobuf_fqueue(&s->pipe.obuf, "\n");
1171 va_end(ap);
1172 s->pipe.odatalen += len + 1;
1173 io_reload(&s->pipe.oev);
1174 }
1175
1176 static void
filter_api_timer_cb(int fd,short evt,void * arg)1177 filter_api_timer_cb(int fd, short evt, void *arg)
1178 {
1179 struct filter_timer *ft = arg;
1180
1181 ft->cb(ft->id, ft->arg);
1182 free(ft);
1183 }
1184
1185 void
filter_api_timer(uint64_t id,uint32_t tmo,void (* cb)(uint64_t,void *),void * arg)1186 filter_api_timer(uint64_t id, uint32_t tmo, void (*cb)(uint64_t, void *), void *arg)
1187 {
1188 struct filter_timer *ft = xcalloc(1, sizeof(struct filter_timer), "filter_api_timer");
1189 struct timeval tv = { tmo / 1000, (tmo % 1000) * 1000 };
1190
1191 ft->id = id;
1192 ft->cb = cb;
1193 ft->arg = arg;
1194 evtimer_set(&ft->ev, filter_api_timer_cb, ft);
1195 evtimer_add(&ft->ev, &tv);
1196 }
1197
1198 const char *
filter_api_sockaddr_to_text(const struct sockaddr * sa)1199 filter_api_sockaddr_to_text(const struct sockaddr *sa)
1200 {
1201 static char buf[NI_MAXHOST];
1202
1203 if (getnameinfo(sa, SA_LEN(sa), buf, sizeof(buf), NULL, 0,
1204 NI_NUMERICHOST))
1205 return ("(unknown)");
1206 else
1207 return (buf);
1208 }
1209
1210 const char *
filter_api_mailaddr_to_text(const struct mailaddr * maddr)1211 filter_api_mailaddr_to_text(const struct mailaddr *maddr)
1212 {
1213 static char buffer[LINE_MAX];
1214
1215 strlcpy(buffer, maddr->user, sizeof buffer);
1216 if (maddr->domain[0] == '\0')
1217 return (buffer);
1218 strlcat(buffer, "@", sizeof buffer);
1219 if (strlcat(buffer, maddr->domain, sizeof buffer) >= sizeof buffer)
1220 return (NULL);
1221
1222 return (buffer);
1223 }
1224
1225
1226 /* X X X */
1227 static void
data_buffered_stream_process(uint64_t id,FILE * fp,void * arg)1228 data_buffered_stream_process(uint64_t id, FILE *fp, void *arg)
1229 {
1230 struct filter_session *s;
1231 size_t sz;
1232 ssize_t len;
1233 char *line = NULL;
1234
1235 s = tree_xget(&sessions, id);
1236 errno = 0;
1237 if ((len = getline(&line, &sz, fp)) == -1) {
1238 if (errno) {
1239 filter_api_reject_code(id, FILTER_FAIL, 421,
1240 "Internal Server Error");
1241 return;
1242 }
1243 filter_api_accept(id);
1244 return;
1245 }
1246 line[strcspn(line, "\n")] = '\0';
1247 rfc2822_parser_feed(&s->rfc2822_parser, line);
1248 free(line);
1249
1250 /* XXX - should be driven by parser_feed */
1251 if (1)
1252 io_callback(&s->pipe.oev, IO_LOWAT);
1253 }
1254
1255 static void
default_header_callback(const struct rfc2822_header * hdr,void * arg)1256 default_header_callback(const struct rfc2822_header *hdr, void *arg)
1257 {
1258 struct filter_session *s = arg;
1259 struct rfc2822_line *l;
1260 int i = 0;
1261
1262 TAILQ_FOREACH(l, &hdr->lines, next) {
1263 if (i++ == 0) {
1264 filter_api_printf(s->id, "%s: %s", hdr->name, l->buffer + 1);
1265 continue;
1266 }
1267 filter_api_printf(s->id, "%s", l->buffer);
1268 }
1269 }
1270
1271 static void
default_body_callback(const char * line,void * arg)1272 default_body_callback(const char *line, void *arg)
1273 {
1274 struct filter_session *s = arg;
1275
1276 filter_api_writeln(s->id, line);
1277 }
1278
1279 static void
header_remove_callback(const struct rfc2822_header * hdr,void * arg)1280 header_remove_callback(const struct rfc2822_header *hdr, void *arg)
1281 {
1282 }
1283
1284 static void
header_replace_callback(const struct rfc2822_header * hdr,void * arg)1285 header_replace_callback(const struct rfc2822_header *hdr, void *arg)
1286 {
1287 struct filter_session *s = arg;
1288 char *value;
1289 char *key;
1290
1291 key = xstrdup(hdr->name, "header_replace_callback");
1292 lowercase(key, key, strlen(key)+1);
1293
1294 value = dict_xget(&s->headers_replace, key);
1295 filter_api_printf(s->id, "%s: %s", hdr->name, value);
1296 free(key);
1297 }
1298
1299 static void
header_eoh_callback(void * arg)1300 header_eoh_callback(void *arg)
1301 {
1302 struct filter_session *s = arg;
1303 void *iter;
1304 const char *key;
1305 void *data;
1306
1307 iter = NULL;
1308 while (dict_iter(&s->headers_add, &iter, &key, &data))
1309 filter_api_printf(s->id, "%s: %s", key, (char *)data);
1310 }
1311
1312 void
data_buffered_setup(struct filter_session * s)1313 data_buffered_setup(struct filter_session *s)
1314 {
1315 FILE *fp;
1316 int fd;
1317 char pathname[] = "/tmp/XXXXXXXXXX";
1318
1319 fd = mkstemp(pathname);
1320 if (fd == -1)
1321 return;
1322
1323 fp = fdopen(fd, "w+b");
1324 if (fp == NULL) {
1325 close(fd);
1326 return;
1327 }
1328 unlink(pathname);
1329
1330 s->data_buffer = fp;
1331 s->data_buffer_cb = data_buffered_stream_process;
1332
1333 rfc2822_parser_init(&s->rfc2822_parser);
1334 rfc2822_parser_reset(&s->rfc2822_parser);
1335 rfc2822_header_default_callback(&s->rfc2822_parser,
1336 default_header_callback, s);
1337 rfc2822_body_callback(&s->rfc2822_parser,
1338 default_body_callback, s);
1339 rfc2822_eoh_callback(&s->rfc2822_parser,
1340 header_eoh_callback, s);
1341
1342 dict_init(&s->headers_replace);
1343 dict_init(&s->headers_add);
1344 }
1345
1346 static void
data_buffered_release(struct filter_session * s)1347 data_buffered_release(struct filter_session *s)
1348 {
1349 void *data;
1350
1351 rfc2822_parser_release(&s->rfc2822_parser);
1352 if (s->data_buffer) {
1353 fclose(s->data_buffer);
1354 s->data_buffer = NULL;
1355 }
1356
1357 while (dict_poproot(&s->headers_replace, &data))
1358 free(data);
1359
1360 while (dict_poproot(&s->headers_add, &data))
1361 free(data);
1362 }
1363
1364 void
filter_api_data_buffered(void)1365 filter_api_data_buffered(void)
1366 {
1367 fi.data_buffered = 1;
1368 }
1369
1370 void
filter_api_data_buffered_stream(uint64_t id)1371 filter_api_data_buffered_stream(uint64_t id)
1372 {
1373 struct filter_session *s;
1374
1375 s = tree_xget(&sessions, id);
1376 if (s->data_buffer)
1377 fseek(s->data_buffer, 0, 0);
1378 io_callback(&s->pipe.oev, IO_LOWAT);
1379 }
1380
1381 void
filter_api_header_remove(uint64_t id,const char * header)1382 filter_api_header_remove(uint64_t id, const char *header)
1383 {
1384 struct filter_session *s;
1385
1386 s = tree_xget(&sessions, id);
1387 rfc2822_header_callback(&s->rfc2822_parser, header,
1388 header_remove_callback, s);
1389 }
1390
1391 void
filter_api_header_replace(uint64_t id,const char * header,const char * fmt,...)1392 filter_api_header_replace(uint64_t id, const char *header, const char *fmt, ...)
1393 {
1394 struct filter_session *s;
1395 char *key;
1396 char *buffer = NULL;
1397 va_list ap;
1398
1399 s = tree_xget(&sessions, id);
1400 va_start(ap, fmt);
1401 vasprintf(&buffer, fmt, ap);
1402 va_end(ap);
1403
1404 key = xstrdup(header, "filter_api_header_replace");
1405 lowercase(key, key, strlen(key)+1);
1406 dict_set(&s->headers_replace, header, buffer);
1407 free(key);
1408
1409 rfc2822_header_callback(&s->rfc2822_parser, header,
1410 header_replace_callback, s);
1411 }
1412
1413 void
filter_api_header_add(uint64_t id,const char * header,const char * fmt,...)1414 filter_api_header_add(uint64_t id, const char *header, const char *fmt, ...)
1415 {
1416 struct filter_session *s;
1417 char *key;
1418 char *buffer = NULL;
1419 va_list ap;
1420
1421 s = tree_xget(&sessions, id);
1422 va_start(ap, fmt);
1423 vasprintf(&buffer, fmt, ap);
1424 va_end(ap);
1425
1426 key = xstrdup(header, "filter_api_header_replace");
1427 lowercase(key, key, strlen(key)+1);
1428 dict_set(&s->headers_add, header, buffer);
1429 free(key);
1430 }
1431