xref: /openbsd/usr.sbin/smtpd/lka_filter.c (revision 510d2225)
1 /*	$OpenBSD: lka_filter.c,v 1.74 2023/11/03 13:38:28 op Exp $	*/
2 
3 /*
4  * Copyright (c) 2018 Gilles Chehade <gilles@poolp.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <errno.h>
20 #include <inttypes.h>
21 #include <stdlib.h>
22 #include <string.h>
23 
24 #include "smtpd.h"
25 #include "log.h"
26 
27 #define	PROTOCOL_VERSION	"0.7"
28 
29 struct filter;
30 struct filter_session;
31 static void	filter_protocol_internal(struct filter_session *, uint64_t *, uint64_t, enum filter_phase, const char *);
32 static void	filter_protocol(uint64_t, enum filter_phase, const char *);
33 static void	filter_protocol_next(uint64_t, uint64_t, enum filter_phase);
34 static void	filter_protocol_query(struct filter *, uint64_t, uint64_t, const char *, const char *);
35 
36 static void	filter_data_internal(struct filter_session *, uint64_t, uint64_t, const char *);
37 static void	filter_data(uint64_t, const char *);
38 static void	filter_data_next(uint64_t, uint64_t, const char *);
39 static void	filter_data_query(struct filter *, uint64_t, uint64_t, const char *);
40 
41 static int	filter_builtins_notimpl(struct filter_session *, struct filter *, uint64_t, const char *);
42 static int	filter_builtins_connect(struct filter_session *, struct filter *, uint64_t, const char *);
43 static int	filter_builtins_helo(struct filter_session *, struct filter *, uint64_t, const char *);
44 static int	filter_builtins_mail_from(struct filter_session *, struct filter *, uint64_t, const char *);
45 static int	filter_builtins_rcpt_to(struct filter_session *, struct filter *, uint64_t, const char *);
46 static int	filter_builtins_data(struct filter_session *, struct filter *, uint64_t, const char *);
47 static int	filter_builtins_commit(struct filter_session *, struct filter *, uint64_t, const char *);
48 
49 static void	filter_result_proceed(uint64_t);
50 static void	filter_result_junk(uint64_t);
51 static void	filter_result_rewrite(uint64_t, const char *);
52 static void	filter_result_reject(uint64_t, const char *);
53 static void	filter_result_disconnect(uint64_t, const char *);
54 
55 static void	filter_session_io(struct io *, int, void *);
56 void		lka_filter_process_response(const char *, const char *);
57 
58 
59 struct filter_session {
60 	uint64_t	id;
61 	struct io	*io;
62 
63 	char *lastparam;
64 
65 	char *filter_name;
66 	struct sockaddr_storage ss_src;
67 	struct sockaddr_storage ss_dest;
68 	char *rdns;
69 	int fcrdns;
70 
71 	char *helo;
72 	char *username;
73 	char *mail_from;
74 
75 	enum filter_phase	phase;
76 };
77 
78 static struct filter_exec {
79 	enum filter_phase	phase;
80 	const char	       *phase_name;
81 	int		       (*func)(struct filter_session *, struct filter *, uint64_t, const char *);
82 } filter_execs[FILTER_PHASES_COUNT] = {
83 	{ FILTER_CONNECT,	"connect",	filter_builtins_connect },
84 	{ FILTER_HELO,		"helo",		filter_builtins_helo },
85 	{ FILTER_EHLO,		"ehlo",		filter_builtins_helo },
86 	{ FILTER_STARTTLS,     	"starttls",	filter_builtins_notimpl },
87 	{ FILTER_AUTH,     	"auth",		filter_builtins_notimpl },
88 	{ FILTER_MAIL_FROM,    	"mail-from",	filter_builtins_mail_from },
89 	{ FILTER_RCPT_TO,    	"rcpt-to",	filter_builtins_rcpt_to },
90 	{ FILTER_DATA,    	"data",		filter_builtins_data },
91 	{ FILTER_DATA_LINE,    	"data-line",   	filter_builtins_notimpl },
92 	{ FILTER_RSET,    	"rset",		filter_builtins_notimpl },
93 	{ FILTER_QUIT,    	"quit",		filter_builtins_notimpl },
94 	{ FILTER_NOOP,    	"noop",		filter_builtins_notimpl },
95 	{ FILTER_HELP,    	"help",		filter_builtins_notimpl },
96 	{ FILTER_WIZ,    	"wiz",		filter_builtins_notimpl },
97 	{ FILTER_COMMIT,    	"commit",      	filter_builtins_commit },
98 };
99 
100 struct filter {
101 	uint64_t		id;
102 	uint32_t		phases;
103 	const char	       *name;
104 	const char	       *proc;
105 	struct filter  	      **chain;
106 	size_t 			chain_size;
107 	struct filter_config   *config;
108 };
109 static struct dict filters;
110 
111 struct filter_entry {
112 	TAILQ_ENTRY(filter_entry)	entries;
113 	uint64_t			id;
114 	const char		       *name;
115 };
116 
117 struct filter_chain {
118 	TAILQ_HEAD(, filter_entry)		chain[nitems(filter_execs)];
119 };
120 
121 static struct tree	sessions;
122 static int		filters_inited;
123 
124 static struct dict	filter_chains;
125 
126 struct reporter_proc {
127 	TAILQ_ENTRY(reporter_proc)	entries;
128 	const char		       *name;
129 };
130 TAILQ_HEAD(reporters, reporter_proc);
131 
132 static struct dict	report_smtp_in;
133 static struct dict	report_smtp_out;
134 
135 static struct smtp_events {
136 	const char     *event;
137 } smtp_events[] = {
138 	{ "link-connect" },
139 	{ "link-disconnect" },
140 	{ "link-greeting" },
141 	{ "link-identify" },
142 	{ "link-tls" },
143 	{ "link-auth" },
144 
145 	{ "tx-reset" },
146 	{ "tx-begin" },
147 	{ "tx-mail" },
148 	{ "tx-rcpt" },
149 	{ "tx-envelope" },
150 	{ "tx-data" },
151 	{ "tx-commit" },
152 	{ "tx-rollback" },
153 
154 	{ "protocol-client" },
155 	{ "protocol-server" },
156 
157 	{ "filter-report" },
158 	{ "filter-response" },
159 
160 	{ "timeout" },
161 };
162 
163 static int			processors_inited = 0;
164 static struct dict		processors;
165 
166 struct processor_instance {
167 	char			*name;
168 	struct io		*io;
169 	struct io		*errfd;
170 	int			 ready;
171 	uint32_t		 subsystems;
172 };
173 
174 static void	processor_io(struct io *, int, void *);
175 static void	processor_errfd(struct io *, int, void *);
176 void		lka_filter_process_response(const char *, const char *);
177 
178 int
179 lka_proc_ready(void)
180 {
181 	void	*iter;
182 	struct processor_instance	*pi;
183 
184 	iter = NULL;
185 	while (dict_iter(&processors, &iter, NULL, (void **)&pi))
186 		if (!pi->ready)
187 			return 0;
188 	return 1;
189 }
190 
191 static void
192 lka_proc_config(struct processor_instance *pi)
193 {
194 	io_printf(pi->io, "config|smtpd-version|%s\n", SMTPD_VERSION);
195 	io_printf(pi->io, "config|smtp-session-timeout|%d\n", SMTPD_SESSION_TIMEOUT);
196 	if (pi->subsystems & FILTER_SUBSYSTEM_SMTP_IN)
197 		io_printf(pi->io, "config|subsystem|smtp-in\n");
198 	if (pi->subsystems & FILTER_SUBSYSTEM_SMTP_OUT)
199 		io_printf(pi->io, "config|subsystem|smtp-out\n");
200 	io_printf(pi->io, "config|admd|%s\n",
201 	    env->sc_admd != NULL ? env->sc_admd : env->sc_hostname);
202 	io_printf(pi->io, "config|ready\n");
203 }
204 
205 void
206 lka_proc_forked(const char *name, uint32_t subsystems, int fd)
207 {
208 	struct processor_instance	*processor;
209 
210 	if (!processors_inited) {
211 		dict_init(&processors);
212 		processors_inited = 1;
213 	}
214 
215 	processor = xcalloc(1, sizeof *processor);
216 	processor->name = xstrdup(name);
217 	processor->io = io_new();
218 	processor->subsystems = subsystems;
219 
220 	io_set_nonblocking(fd);
221 
222 	io_set_fd(processor->io, fd);
223 	io_set_callback(processor->io, processor_io, processor->name);
224 	dict_xset(&processors, name, processor);
225 }
226 
227 void
228 lka_proc_errfd(const char *name, int fd)
229 {
230 	struct processor_instance	*processor;
231 
232 	processor = dict_xget(&processors, name);
233 
234 	io_set_nonblocking(fd);
235 
236 	processor->errfd = io_new();
237 	io_set_fd(processor->errfd, fd);
238 	io_set_callback(processor->errfd, processor_errfd, processor->name);
239 
240 	lka_proc_config(processor);
241 }
242 
243 struct io *
244 lka_proc_get_io(const char *name)
245 {
246 	struct processor_instance *processor;
247 
248 	processor = dict_xget(&processors, name);
249 
250 	return processor->io;
251 }
252 
253 static void
254 processor_register(const char *name, const char *line)
255 {
256 	struct processor_instance *processor;
257 
258 	processor = dict_xget(&processors, name);
259 
260 	if (strcmp(line, "register|ready") == 0) {
261 		processor->ready = 1;
262 		return;
263 	}
264 
265 	if (strncmp(line, "register|report|", 16) == 0) {
266 		lka_report_register_hook(name, line+16);
267 		return;
268 	}
269 
270 	if (strncmp(line, "register|filter|", 16) == 0) {
271 		lka_filter_register_hook(name, line+16);
272 		return;
273 	}
274 
275 	fatalx("Invalid register line received: %s", line);
276 }
277 
278 static void
279 processor_io(struct io *io, int evt, void *arg)
280 {
281 	struct processor_instance *processor;
282 	const char		*name = arg;
283 	char			*line = NULL;
284 	ssize_t			 len;
285 
286 	switch (evt) {
287 	case IO_DATAIN:
288 		while ((line = io_getline(io, &len)) != NULL) {
289 			if (strncmp("register|", line, 9) == 0) {
290 				processor_register(name, line);
291 				continue;
292 			}
293 
294 			processor = dict_xget(&processors, name);
295 			if (!processor->ready)
296 				fatalx("Non-register message before register|"
297 				    "ready: %s", line);
298 			else if (strncmp(line, "filter-result|", 14) == 0 ||
299 			    strncmp(line, "filter-dataline|", 16) == 0)
300 				lka_filter_process_response(name, line);
301 			else if (strncmp(line, "report|", 7) == 0)
302 				lka_report_proc(name, line);
303 			else
304 				fatalx("Invalid filter message type: %s", line);
305 		}
306 	}
307 }
308 
309 static void
310 processor_errfd(struct io *io, int evt, void *arg)
311 {
312 	const char	*name = arg;
313 	char		*line = NULL;
314 	ssize_t		 len;
315 
316 	switch (evt) {
317 	case IO_DATAIN:
318 		while ((line = io_getline(io, &len)) != NULL)
319 			log_warnx("%s: %s", name, line);
320 	}
321 }
322 
323 void
324 lka_filter_init(void)
325 {
326 	void		*iter;
327 	const char	*name;
328 	struct filter  	*filter;
329 	struct filter_config	*filter_config;
330 	size_t		i;
331 	char		 buffer[LINE_MAX];	/* for traces */
332 
333 	dict_init(&filters);
334 	dict_init(&filter_chains);
335 
336 	/* first pass, allocate and init individual filters */
337 	iter = NULL;
338 	while (dict_iter(env->sc_filters_dict, &iter, &name, (void **)&filter_config)) {
339 		switch (filter_config->filter_type) {
340 		case FILTER_TYPE_BUILTIN:
341 			filter = xcalloc(1, sizeof(*filter));
342 			filter->name = name;
343 			filter->phases |= (1<<filter_config->phase);
344 			filter->config = filter_config;
345 			dict_set(&filters, name, filter);
346 			log_trace(TRACE_FILTERS, "filters init type=builtin, name=%s, hooks=%08x",
347 			    name, filter->phases);
348 			break;
349 
350 		case FILTER_TYPE_PROC:
351 			filter = xcalloc(1, sizeof(*filter));
352 			filter->name = name;
353 			filter->proc = filter_config->proc;
354 			filter->config = filter_config;
355 			dict_set(&filters, name, filter);
356 			log_trace(TRACE_FILTERS, "filters init type=proc, name=%s, proc=%s",
357 			    name, filter_config->proc);
358 			break;
359 
360 		case FILTER_TYPE_CHAIN:
361 			break;
362 		}
363 	}
364 
365 	/* second pass, allocate and init filter chains but don't build yet */
366 	iter = NULL;
367 	while (dict_iter(env->sc_filters_dict, &iter, &name, (void **)&filter_config)) {
368 		switch (filter_config->filter_type) {
369 		case FILTER_TYPE_CHAIN:
370 			filter = xcalloc(1, sizeof(*filter));
371 			filter->name = name;
372 			filter->chain = xcalloc(filter_config->chain_size, sizeof(void **));
373 			filter->chain_size = filter_config->chain_size;
374 			filter->config = filter_config;
375 
376 			buffer[0] = '\0';
377 			for (i = 0; i < filter->chain_size; ++i) {
378 				filter->chain[i] = dict_xget(&filters, filter_config->chain[i]);
379 				if (i)
380 					(void)strlcat(buffer, ", ", sizeof buffer);
381 				(void)strlcat(buffer, filter->chain[i]->name, sizeof buffer);
382 			}
383 			log_trace(TRACE_FILTERS, "filters init type=chain, name=%s { %s }", name, buffer);
384 
385 			dict_set(&filters, name, filter);
386 			break;
387 
388 		case FILTER_TYPE_BUILTIN:
389 		case FILTER_TYPE_PROC:
390 			break;
391 		}
392 	}
393 }
394 
395 void
396 lka_filter_register_hook(const char *name, const char *hook)
397 {
398 	struct filter		*filter;
399 	const char	*filter_name;
400 	void		*iter;
401 	size_t	i;
402 
403 	if (strncasecmp(hook, "smtp-in|", 8) == 0) {
404 		hook += 8;
405 	}
406 	else
407 		fatalx("Invalid message direction: %s", hook);
408 
409 	for (i = 0; i < nitems(filter_execs); i++)
410 		if (strcmp(hook, filter_execs[i].phase_name) == 0)
411 			break;
412 	if (i == nitems(filter_execs))
413 		fatalx("Unrecognized report name: %s", hook);
414 
415 	iter = NULL;
416 	while (dict_iter(&filters, &iter, &filter_name, (void **)&filter))
417 		if (filter->proc && strcmp(name, filter->proc) == 0)
418 			filter->phases |= (1<<filter_execs[i].phase);
419 }
420 
421 void
422 lka_filter_ready(void)
423 {
424 	struct filter  	*filter;
425 	struct filter  	*subfilter;
426 	const char	*filter_name;
427 	struct filter_entry	*filter_entry;
428 	struct filter_chain	*filter_chain;
429 	void		*iter;
430 	size_t		i;
431 	size_t		j;
432 
433 	/* all filters are ready, actually build the filter chains */
434 	iter = NULL;
435 	while (dict_iter(&filters, &iter, &filter_name, (void **)&filter)) {
436 		filter_chain = xcalloc(1, sizeof *filter_chain);
437 		for (i = 0; i < nitems(filter_execs); i++)
438 			TAILQ_INIT(&filter_chain->chain[i]);
439 		dict_set(&filter_chains, filter_name, filter_chain);
440 
441 		if (filter->chain) {
442 			for (i = 0; i < filter->chain_size; i++) {
443 				subfilter = filter->chain[i];
444 				for (j = 0; j < nitems(filter_execs); ++j) {
445 					if (subfilter->phases & (1<<j)) {
446 						filter_entry = xcalloc(1, sizeof *filter_entry);
447 						filter_entry->id = generate_uid();
448 						filter_entry->name = subfilter->name;
449 						TAILQ_INSERT_TAIL(&filter_chain->chain[j],
450 						    filter_entry, entries);
451 					}
452 				}
453 			}
454 			continue;
455 		}
456 
457 		for (i = 0; i < nitems(filter_execs); ++i) {
458 			if (filter->phases & (1<<i)) {
459 				filter_entry = xcalloc(1, sizeof *filter_entry);
460 				filter_entry->id = generate_uid();
461 				filter_entry->name = filter_name;
462 				TAILQ_INSERT_TAIL(&filter_chain->chain[i],
463 				    filter_entry, entries);
464 			}
465 		}
466 	}
467 }
468 
469 int
470 lka_filter_proc_in_session(uint64_t reqid, const char *proc)
471 {
472 	struct filter_session	*fs;
473 	struct filter		*filter;
474 	size_t			 i;
475 
476 	if ((fs = tree_get(&sessions, reqid)) == NULL)
477 		return 0;
478 
479 	filter = dict_get(&filters, fs->filter_name);
480 	if (filter == NULL || (filter->proc == NULL && filter->chain == NULL))
481 		return 0;
482 
483 	if (filter->proc)
484 		return strcmp(filter->proc, proc) == 0 ? 1 : 0;
485 
486 	for (i = 0; i < filter->chain_size; i++)
487 		if (filter->chain[i]->proc &&
488 		    strcmp(filter->chain[i]->proc, proc) == 0)
489 			return 1;
490 
491 	return 0;
492 }
493 
494 void
495 lka_filter_begin(uint64_t reqid, const char *filter_name)
496 {
497 	struct filter_session	*fs;
498 
499 	if (!filters_inited) {
500 		tree_init(&sessions);
501 		filters_inited = 1;
502 	}
503 
504 	fs = xcalloc(1, sizeof (struct filter_session));
505 	fs->id = reqid;
506 	fs->filter_name = xstrdup(filter_name);
507 	tree_xset(&sessions, fs->id, fs);
508 
509 	log_trace(TRACE_FILTERS, "%016"PRIx64" filters session-begin", reqid);
510 }
511 
512 void
513 lka_filter_end(uint64_t reqid)
514 {
515 	struct filter_session	*fs;
516 
517 	fs = tree_xpop(&sessions, reqid);
518 	free(fs->rdns);
519 	free(fs->helo);
520 	free(fs->mail_from);
521 	free(fs->username);
522 	free(fs->lastparam);
523 	free(fs->filter_name);
524 	free(fs);
525 	log_trace(TRACE_FILTERS, "%016"PRIx64" filters session-end", reqid);
526 }
527 
528 void
529 lka_filter_data_begin(uint64_t reqid)
530 {
531 	struct filter_session  *fs;
532 	int	sp[2];
533 	int	fd = -1;
534 
535 	fs = tree_xget(&sessions, reqid);
536 
537 	if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sp) == -1)
538 		goto end;
539 	io_set_nonblocking(sp[0]);
540 	io_set_nonblocking(sp[1]);
541 	fd = sp[0];
542 	fs->io = io_new();
543 	io_set_fd(fs->io, sp[1]);
544 	io_set_callback(fs->io, filter_session_io, fs);
545 
546 end:
547 	m_create(p_dispatcher, IMSG_FILTER_SMTP_DATA_BEGIN, 0, 0, fd);
548 	m_add_id(p_dispatcher, reqid);
549 	m_add_int(p_dispatcher, fd != -1 ? 1 : 0);
550 	m_close(p_dispatcher);
551 	log_trace(TRACE_FILTERS, "%016"PRIx64" filters data-begin fd=%d", reqid, fd);
552 }
553 
554 void
555 lka_filter_data_end(uint64_t reqid)
556 {
557 	struct filter_session	*fs;
558 
559 	fs = tree_xget(&sessions, reqid);
560 	if (fs->io) {
561 		io_free(fs->io);
562 		fs->io = NULL;
563 	}
564 	log_trace(TRACE_FILTERS, "%016"PRIx64" filters data-end", reqid);
565 }
566 
567 static void
568 filter_session_io(struct io *io, int evt, void *arg)
569 {
570 	struct filter_session *fs = arg;
571 	char *line = NULL;
572 	ssize_t len;
573 
574 	log_trace(TRACE_IO, "filter session: %p: %s %s", fs, io_strevent(evt),
575 	    io_strio(io));
576 
577 	switch (evt) {
578 	case IO_DATAIN:
579 	nextline:
580 		line = io_getline(fs->io, &len);
581 		/* No complete line received */
582 		if (line == NULL)
583 			return;
584 
585 		filter_data(fs->id, line);
586 
587 		goto nextline;
588 	}
589 }
590 
591 void
592 lka_filter_process_response(const char *name, const char *line)
593 {
594 	uint64_t reqid;
595 	uint64_t token;
596 	char *ep = NULL;
597 	const char *kind = NULL;
598 	const char *qid = NULL;
599 	const char *response = NULL;
600 	const char *parameter = NULL;
601 	struct filter_session *fs;
602 
603 	kind = line;
604 
605 	if ((ep = strchr(kind, '|')) == NULL)
606 		fatalx("Missing token: %s", line);
607 	qid = ep+1;
608 
609 	errno = 0;
610 	reqid = strtoull(qid, &ep, 16);
611 	if (qid[0] == '\0' || *ep != '|')
612 		fatalx("Invalid reqid: %s", line);
613 	if (errno == ERANGE && reqid == ULLONG_MAX)
614 		fatal("Invalid reqid: %s", line);
615 
616 	qid = ep + 1;
617 	token = strtoull(qid, &ep, 16);
618 	if (qid[0] == '\0' || *ep != '|')
619 		fatalx("Invalid token: %s", line);
620 	if (errno == ERANGE && token == ULLONG_MAX)
621 		fatal("Invalid token: %s", line);
622 
623 	response = ep+1;
624 
625 	/* session can legitimately disappear on a resume */
626 	if ((fs = tree_get(&sessions, reqid)) == NULL)
627 		return;
628 
629 	if (strncmp(kind, "filter-dataline|", 16) == 0) {
630 		if (fs->phase != FILTER_DATA_LINE)
631 			fatalx("filter-dataline out of dataline phase");
632 		filter_data_next(token, reqid, response);
633 		return;
634 	}
635 	if (fs->phase == FILTER_DATA_LINE)
636 		fatalx("filter-result in dataline phase");
637 
638 	if ((ep = strchr(response, '|')) != NULL)
639 		parameter = ep + 1;
640 
641 	if (strcmp(response, "proceed") == 0) {
642 		filter_protocol_next(token, reqid, 0);
643 		return;
644 	} else if (strcmp(response, "junk") == 0) {
645 		if (fs->phase == FILTER_COMMIT)
646 			fatalx("filter-reponse junk after DATA");
647 		filter_result_junk(reqid);
648 		return;
649 	} else {
650 		if (parameter == NULL)
651 			fatalx("Missing parameter: %s", line);
652 
653 		if (strncmp(response, "rewrite|", 8) == 0)
654 			filter_result_rewrite(reqid, parameter);
655 		else if (strncmp(response, "reject|", 7) == 0)
656 			filter_result_reject(reqid, parameter);
657 		else if (strncmp(response, "disconnect|", 11) == 0)
658 			filter_result_disconnect(reqid, parameter);
659 		else
660 			fatalx("Invalid directive: %s", line);
661 	}
662 }
663 
664 void
665 lka_filter_protocol(uint64_t reqid, enum filter_phase phase, const char *param)
666 {
667 	filter_protocol(reqid, phase, param);
668 }
669 
670 static void
671 filter_protocol_internal(struct filter_session *fs, uint64_t *token, uint64_t reqid, enum filter_phase phase, const char *param)
672 {
673 	struct filter_chain	*filter_chain;
674 	struct filter_entry	*filter_entry;
675 	struct filter		*filter;
676 	struct timeval		 tv;
677 	const char		*phase_name = filter_execs[phase].phase_name;
678 	int			 resume = 1;
679 
680 	if (!*token) {
681 		fs->phase = phase;
682 		resume = 0;
683 	}
684 
685 	/* XXX - this sanity check requires a protocol change, stub for now */
686 	phase = fs->phase;
687 	if (fs->phase != phase)
688 		fatalx("misbehaving filter");
689 
690 	/* based on token, identify the filter_entry we should apply  */
691 	filter_chain = dict_get(&filter_chains, fs->filter_name);
692 	filter_entry = TAILQ_FIRST(&filter_chain->chain[fs->phase]);
693 	if (*token) {
694 		TAILQ_FOREACH(filter_entry, &filter_chain->chain[fs->phase], entries)
695 		    if (filter_entry->id == *token)
696 			    break;
697 		if (filter_entry == NULL)
698 			fatalx("misbehaving filter");
699 		filter_entry = TAILQ_NEXT(filter_entry, entries);
700 	}
701 
702 	/* no filter_entry, we either had none or reached end of chain */
703 	if (filter_entry == NULL) {
704 		log_trace(TRACE_FILTERS, "%016"PRIx64" filters protocol phase=%s, resume=%s, "
705 		    "action=proceed",
706 		    fs->id, phase_name, resume ? "y" : "n");
707 		filter_result_proceed(reqid);
708 		return;
709 	}
710 
711 	/* process param with current filter_entry */
712 	*token = filter_entry->id;
713 	filter = dict_get(&filters, filter_entry->name);
714 	if (filter->proc) {
715 		log_trace(TRACE_FILTERS, "%016"PRIx64" filters protocol phase=%s, "
716 		    "resume=%s, action=deferred, filter=%s",
717 		    fs->id, phase_name, resume ? "y" : "n",
718 		    filter->name);
719 		filter_protocol_query(filter, filter_entry->id, reqid,
720 		    filter_execs[fs->phase].phase_name, param);
721 		return;	/* deferred response */
722 	}
723 
724 	if (filter_execs[fs->phase].func(fs, filter, reqid, param)) {
725 		if (filter->config->rewrite) {
726 			log_trace(TRACE_FILTERS, "%016"PRIx64" filters protocol phase=%s, "
727 			    "resume=%s, action=rewrite, filter=%s, query=%s, response=%s",
728 			    fs->id, phase_name, resume ? "y" : "n",
729 			    filter->name,
730 			    param,
731 			    filter->config->rewrite);
732 			    filter_result_rewrite(reqid, filter->config->rewrite);
733 			return;
734 		}
735 		else if (filter->config->disconnect) {
736 			log_trace(TRACE_FILTERS, "%016"PRIx64" filters protocol phase=%s, "
737 			    "resume=%s, action=disconnect, filter=%s, query=%s, response=%s",
738 			    fs->id, phase_name, resume ? "y" : "n",
739 			    filter->name,
740 			    param,
741 			    filter->config->disconnect);
742 			filter_result_disconnect(reqid, filter->config->disconnect);
743 			return;
744 		}
745 		else if (filter->config->junk) {
746 			log_trace(TRACE_FILTERS, "%016"PRIx64" filters protocol phase=%s, "
747 			    "resume=%s, action=junk, filter=%s, query=%s",
748 			    fs->id, phase_name, resume ? "y" : "n",
749 			    filter->name,
750 			    param);
751 			filter_result_junk(reqid);
752 			return;
753 		} else if (filter->config->report) {
754 			log_trace(TRACE_FILTERS, "%016"PRIx64" filters protocol phase=%s, "
755 			    "resume=%s, action=report, filter=%s, query=%s response=%s",
756 			    fs->id, phase_name, resume ? "y" : "n",
757 			    filter->name,
758 			    param, filter->config->report);
759 
760 			gettimeofday(&tv, NULL);
761 			lka_report_filter_report(fs->id, filter->name, 1,
762 			    "smtp-in", &tv, filter->config->report);
763 		} else if (filter->config->bypass) {
764 			log_trace(TRACE_FILTERS, "%016"PRIx64" filters protocol phase=%s, "
765 			    "resume=%s, action=bypass, filter=%s, query=%s",
766 			    fs->id, phase_name, resume ? "y" : "n",
767 			    filter->name,
768 			    param);
769 			filter_result_proceed(reqid);
770 			return;
771 		} else {
772 			log_trace(TRACE_FILTERS, "%016"PRIx64" filters protocol phase=%s, "
773 			    "resume=%s, action=reject, filter=%s, query=%s, response=%s",
774 			    fs->id, phase_name, resume ? "y" : "n",
775 			    filter->name,
776 			    param,
777 			    filter->config->reject);
778 			filter_result_reject(reqid, filter->config->reject);
779 			return;
780 		}
781 	}
782 
783 	log_trace(TRACE_FILTERS, "%016"PRIx64" filters protocol phase=%s, "
784 	    "resume=%s, action=proceed, filter=%s, query=%s",
785 	    fs->id, phase_name, resume ? "y" : "n",
786 	    filter->name,
787 	    param);
788 
789 	/* filter_entry resulted in proceed, try next filter */
790 	filter_protocol_internal(fs, token, reqid, phase, param);
791 	return;
792 }
793 
794 static void
795 filter_data_internal(struct filter_session *fs, uint64_t token, uint64_t reqid, const char *line)
796 {
797 	struct filter_chain	*filter_chain;
798 	struct filter_entry	*filter_entry;
799 	struct filter		*filter;
800 
801 	if (!token)
802 		fs->phase = FILTER_DATA_LINE;
803 	if (fs->phase != FILTER_DATA_LINE)
804 		fatalx("misbehaving filter");
805 
806 	/* based on token, identify the filter_entry we should apply  */
807 	filter_chain = dict_get(&filter_chains, fs->filter_name);
808 	filter_entry = TAILQ_FIRST(&filter_chain->chain[fs->phase]);
809 	if (token) {
810 		TAILQ_FOREACH(filter_entry, &filter_chain->chain[fs->phase], entries)
811 		    if (filter_entry->id == token)
812 			    break;
813 		if (filter_entry == NULL)
814 			fatalx("misbehaving filter");
815 		filter_entry = TAILQ_NEXT(filter_entry, entries);
816 	}
817 
818 	/* no filter_entry, we either had none or reached end of chain */
819 	if (filter_entry == NULL) {
820 		io_printf(fs->io, "%s\n", line);
821 		return;
822 	}
823 
824 	/* pass data to the filter */
825 	filter = dict_get(&filters, filter_entry->name);
826 	filter_data_query(filter, filter_entry->id, reqid, line);
827 }
828 
829 static void
830 filter_protocol(uint64_t reqid, enum filter_phase phase, const char *param)
831 {
832 	struct filter_session  *fs;
833 	uint64_t		token = 0;
834 	char		       *nparam = NULL;
835 
836 	fs = tree_xget(&sessions, reqid);
837 
838 	switch (phase) {
839 	case FILTER_HELO:
840 	case FILTER_EHLO:
841 		free(fs->helo);
842 		fs->helo = xstrdup(param);
843 		break;
844 	case FILTER_MAIL_FROM:
845 		free(fs->mail_from);
846 		fs->mail_from = xstrdup(param + 1);
847 		*strchr(fs->mail_from, '>') = '\0';
848 		param = fs->mail_from;
849 
850 		break;
851 	case FILTER_RCPT_TO:
852 		nparam = xstrdup(param + 1);
853 		*strchr(nparam, '>') = '\0';
854 		param = nparam;
855 		break;
856 	case FILTER_STARTTLS:
857 		/* TBD */
858 		break;
859 	default:
860 		break;
861 	}
862 
863 	free(fs->lastparam);
864 	fs->lastparam = xstrdup(param);
865 
866 	filter_protocol_internal(fs, &token, reqid, phase, param);
867 	if (nparam)
868 		free(nparam);
869 }
870 
871 static void
872 filter_protocol_next(uint64_t token, uint64_t reqid, enum filter_phase phase)
873 {
874 	struct filter_session  *fs;
875 
876 	/* session can legitimately disappear on a resume */
877 	if ((fs = tree_get(&sessions, reqid)) == NULL)
878 		return;
879 
880 	filter_protocol_internal(fs, &token, reqid, phase, fs->lastparam);
881 }
882 
883 static void
884 filter_data(uint64_t reqid, const char *line)
885 {
886 	struct filter_session  *fs;
887 
888 	fs = tree_xget(&sessions, reqid);
889 
890 	filter_data_internal(fs, 0, reqid, line);
891 }
892 
893 static void
894 filter_data_next(uint64_t token, uint64_t reqid, const char *line)
895 {
896 	struct filter_session  *fs;
897 
898 	/* session can legitimately disappear on a resume */
899 	if ((fs = tree_get(&sessions, reqid)) == NULL)
900 		return;
901 
902 	filter_data_internal(fs, token, reqid, line);
903 }
904 
905 static void
906 filter_protocol_query(struct filter *filter, uint64_t token, uint64_t reqid, const char *phase, const char *param)
907 {
908 	int	n;
909 	struct filter_session	*fs;
910 	struct timeval	tv;
911 
912 	gettimeofday(&tv, NULL);
913 
914 	fs = tree_xget(&sessions, reqid);
915 	if (strcmp(phase, "connect") == 0)
916 		n = io_printf(lka_proc_get_io(filter->proc),
917 		    "filter|%s|%lld.%06ld|smtp-in|%s|%016"PRIx64"|%016"PRIx64"|%s|%s\n",
918 		    PROTOCOL_VERSION,
919 		    (long long)tv.tv_sec, (long)tv.tv_usec,
920 		    phase, reqid, token, fs->rdns, param);
921 	else
922 		n = io_printf(lka_proc_get_io(filter->proc),
923 		    "filter|%s|%lld.%06ld|smtp-in|%s|%016"PRIx64"|%016"PRIx64"|%s\n",
924 		    PROTOCOL_VERSION,
925 		    (long long)tv.tv_sec, (long)tv.tv_usec,
926 		    phase, reqid, token, param);
927 	if (n == -1)
928 		fatalx("failed to write to processor");
929 }
930 
931 static void
932 filter_data_query(struct filter *filter, uint64_t token, uint64_t reqid, const char *line)
933 {
934 	int	n;
935 	struct timeval	tv;
936 
937 	gettimeofday(&tv, NULL);
938 
939 	n = io_printf(lka_proc_get_io(filter->proc),
940 	    "filter|%s|%lld.%06ld|smtp-in|data-line|"
941 	    "%016"PRIx64"|%016"PRIx64"|%s\n",
942 	    PROTOCOL_VERSION,
943 	    (long long)tv.tv_sec, (long)tv.tv_usec,
944 	    reqid, token, line);
945 	if (n == -1)
946 		fatalx("failed to write to processor");
947 }
948 
949 static void
950 filter_result_proceed(uint64_t reqid)
951 {
952 	m_create(p_dispatcher, IMSG_FILTER_SMTP_PROTOCOL, 0, 0, -1);
953 	m_add_id(p_dispatcher, reqid);
954 	m_add_int(p_dispatcher, FILTER_PROCEED);
955 	m_close(p_dispatcher);
956 }
957 
958 static void
959 filter_result_junk(uint64_t reqid)
960 {
961 	m_create(p_dispatcher, IMSG_FILTER_SMTP_PROTOCOL, 0, 0, -1);
962 	m_add_id(p_dispatcher, reqid);
963 	m_add_int(p_dispatcher, FILTER_JUNK);
964 	m_close(p_dispatcher);
965 }
966 
967 static void
968 filter_result_rewrite(uint64_t reqid, const char *param)
969 {
970 	m_create(p_dispatcher, IMSG_FILTER_SMTP_PROTOCOL, 0, 0, -1);
971 	m_add_id(p_dispatcher, reqid);
972 	m_add_int(p_dispatcher, FILTER_REWRITE);
973 	m_add_string(p_dispatcher, param);
974 	m_close(p_dispatcher);
975 }
976 
977 static void
978 filter_result_reject(uint64_t reqid, const char *message)
979 {
980 	m_create(p_dispatcher, IMSG_FILTER_SMTP_PROTOCOL, 0, 0, -1);
981 	m_add_id(p_dispatcher, reqid);
982 	m_add_int(p_dispatcher, FILTER_REJECT);
983 	m_add_string(p_dispatcher, message);
984 	m_close(p_dispatcher);
985 }
986 
987 static void
988 filter_result_disconnect(uint64_t reqid, const char *message)
989 {
990 	m_create(p_dispatcher, IMSG_FILTER_SMTP_PROTOCOL, 0, 0, -1);
991 	m_add_id(p_dispatcher, reqid);
992 	m_add_int(p_dispatcher, FILTER_DISCONNECT);
993 	m_add_string(p_dispatcher, message);
994 	m_close(p_dispatcher);
995 }
996 
997 
998 /* below is code for builtin filters */
999 
1000 static int
1001 filter_check_rdns_table(struct filter *filter, enum table_service kind, const char *key)
1002 {
1003 	int	ret = 0;
1004 
1005 	if (filter->config->rdns_table == NULL)
1006 		return 0;
1007 
1008 	if (table_match(filter->config->rdns_table, kind, key) > 0)
1009 		ret = 1;
1010 
1011 	return filter->config->not_rdns_table < 0 ? !ret : ret;
1012 }
1013 
1014 static int
1015 filter_check_rdns_regex(struct filter *filter, const char *key)
1016 {
1017 	int	ret = 0;
1018 
1019 	if (filter->config->rdns_regex == NULL)
1020 		return 0;
1021 
1022 	if (table_match(filter->config->rdns_regex, K_REGEX, key) > 0)
1023 		ret = 1;
1024 	return filter->config->not_rdns_regex < 0 ? !ret : ret;
1025 }
1026 
1027 static int
1028 filter_check_src_table(struct filter *filter, enum table_service kind, const char *key)
1029 {
1030 	int	ret = 0;
1031 
1032 	if (filter->config->src_table == NULL)
1033 		return 0;
1034 
1035 	if (table_match(filter->config->src_table, kind, key) > 0)
1036 		ret = 1;
1037 	return filter->config->not_src_table < 0 ? !ret : ret;
1038 }
1039 
1040 static int
1041 filter_check_src_regex(struct filter *filter, const char *key)
1042 {
1043 	int	ret = 0;
1044 
1045 	if (filter->config->src_regex == NULL)
1046 		return 0;
1047 
1048 	if (table_match(filter->config->src_regex, K_REGEX, key) > 0)
1049 		ret = 1;
1050 	return filter->config->not_src_regex < 0 ? !ret : ret;
1051 }
1052 
1053 static int
1054 filter_check_helo_table(struct filter *filter, enum table_service kind, const char *key)
1055 {
1056 	int	ret = 0;
1057 
1058 	if (filter->config->helo_table == NULL)
1059 		return 0;
1060 
1061 	if (table_match(filter->config->helo_table, kind, key) > 0)
1062 		ret = 1;
1063 	return filter->config->not_helo_table < 0 ? !ret : ret;
1064 }
1065 
1066 static int
1067 filter_check_helo_regex(struct filter *filter, const char *key)
1068 {
1069 	int	ret = 0;
1070 
1071 	if (filter->config->helo_regex == NULL)
1072 		return 0;
1073 
1074 	if (table_match(filter->config->helo_regex, K_REGEX, key) > 0)
1075 		ret = 1;
1076 	return filter->config->not_helo_regex < 0 ? !ret : ret;
1077 }
1078 
1079 static int
1080 filter_check_auth(struct filter *filter, const char *username)
1081 {
1082 	int ret = 0;
1083 
1084 	if (!filter->config->auth)
1085 		return 0;
1086 
1087 	ret = username ? 1 : 0;
1088 
1089 	return filter->config->not_auth < 0 ? !ret : ret;
1090 }
1091 
1092 static int
1093 filter_check_auth_table(struct filter *filter, enum table_service kind, const char *key)
1094 {
1095 	int	ret = 0;
1096 
1097 	if (filter->config->auth_table == NULL)
1098 		return 0;
1099 
1100 	if (key && table_match(filter->config->auth_table, kind, key) > 0)
1101 		ret = 1;
1102 
1103 	return filter->config->not_auth_table < 0 ? !ret : ret;
1104 }
1105 
1106 static int
1107 filter_check_auth_regex(struct filter *filter, const char *key)
1108 {
1109 	int	ret = 0;
1110 
1111 	if (filter->config->auth_regex == NULL)
1112 		return 0;
1113 
1114 	if (key && table_match(filter->config->auth_regex, K_REGEX, key) > 0)
1115 		ret = 1;
1116 	return filter->config->not_auth_regex < 0 ? !ret : ret;
1117 }
1118 
1119 
1120 static int
1121 filter_check_mail_from_table(struct filter *filter, enum table_service kind, const char *key)
1122 {
1123 	int	ret = 0;
1124 
1125 	if (filter->config->mail_from_table == NULL)
1126 		return 0;
1127 
1128 	if (table_match(filter->config->mail_from_table, kind, key) > 0)
1129 		ret = 1;
1130 	return filter->config->not_mail_from_table < 0 ? !ret : ret;
1131 }
1132 
1133 static int
1134 filter_check_mail_from_regex(struct filter *filter, const char *key)
1135 {
1136 	int	ret = 0;
1137 
1138 	if (filter->config->mail_from_regex == NULL)
1139 		return 0;
1140 
1141 	if (table_match(filter->config->mail_from_regex, K_REGEX, key) > 0)
1142 		ret = 1;
1143 	return filter->config->not_mail_from_regex < 0 ? !ret : ret;
1144 }
1145 
1146 static int
1147 filter_check_rcpt_to_table(struct filter *filter, enum table_service kind, const char *key)
1148 {
1149 	int	ret = 0;
1150 
1151 	if (filter->config->rcpt_to_table == NULL)
1152 		return 0;
1153 
1154 	if (table_match(filter->config->rcpt_to_table, kind, key) > 0)
1155 		ret = 1;
1156 	return filter->config->not_rcpt_to_table < 0 ? !ret : ret;
1157 }
1158 
1159 static int
1160 filter_check_rcpt_to_regex(struct filter *filter, const char *key)
1161 {
1162 	int	ret = 0;
1163 
1164 	if (filter->config->rcpt_to_regex == NULL)
1165 		return 0;
1166 
1167 	if (table_match(filter->config->rcpt_to_regex, K_REGEX, key) > 0)
1168 		ret = 1;
1169 	return filter->config->not_rcpt_to_regex < 0 ? !ret : ret;
1170 }
1171 
1172 static int
1173 filter_check_fcrdns(struct filter *filter, int fcrdns)
1174 {
1175 	int	ret = 0;
1176 
1177 	if (!filter->config->fcrdns)
1178 		return 0;
1179 
1180 	ret = fcrdns == 1;
1181 	return filter->config->not_fcrdns < 0 ? !ret : ret;
1182 }
1183 
1184 static int
1185 filter_check_rdns(struct filter *filter, const char *hostname)
1186 {
1187 	int	ret = 0;
1188 	struct netaddr	netaddr;
1189 
1190 	if (!filter->config->rdns)
1191 		return 0;
1192 
1193 	/* this is a hack until smtp session properly deals with lack of rdns */
1194 	ret = strcmp("<unknown>", hostname);
1195 	if (ret == 0)
1196 		return filter->config->not_rdns < 0 ? !ret : ret;
1197 
1198 	/* if text_to_netaddress succeeds,
1199 	 * we don't have an rDNS so the filter should match
1200 	 */
1201 	ret = !text_to_netaddr(&netaddr, hostname);
1202 	return filter->config->not_rdns < 0 ? !ret : ret;
1203 }
1204 
1205 static int
1206 filter_builtins_notimpl(struct filter_session *fs, struct filter *filter, uint64_t reqid, const char *param)
1207 {
1208 	return 0;
1209 }
1210 
1211 static int
1212 filter_builtins_global(struct filter_session *fs, struct filter *filter, uint64_t reqid)
1213 {
1214 	return filter_check_fcrdns(filter, fs->fcrdns) ||
1215 	    filter_check_rdns(filter, fs->rdns) ||
1216 	    filter_check_rdns_table(filter, K_DOMAIN, fs->rdns) ||
1217 	    filter_check_rdns_regex(filter, fs->rdns) ||
1218 	    filter_check_src_table(filter, K_NETADDR, ss_to_text(&fs->ss_src)) ||
1219 	    filter_check_src_regex(filter, ss_to_text(&fs->ss_src)) ||
1220 	    filter_check_helo_table(filter, K_DOMAIN, fs->helo) ||
1221 	    filter_check_helo_regex(filter, fs->helo) ||
1222 	    filter_check_auth(filter, fs->username) ||
1223 	    filter_check_auth_table(filter, K_STRING, fs->username) ||
1224 	    filter_check_auth_table(filter, K_CREDENTIALS, fs->username) ||
1225 	    filter_check_auth_regex(filter, fs->username) ||
1226 	    filter_check_mail_from_table(filter, K_MAILADDR, fs->mail_from) ||
1227 	    filter_check_mail_from_regex(filter, fs->mail_from);
1228 }
1229 
1230 static int
1231 filter_builtins_connect(struct filter_session *fs, struct filter *filter, uint64_t reqid, const char *param)
1232 {
1233 	return filter_builtins_global(fs, filter, reqid);
1234 }
1235 
1236 static int
1237 filter_builtins_helo(struct filter_session *fs, struct filter *filter, uint64_t reqid, const char *param)
1238 {
1239 	return filter_builtins_global(fs, filter, reqid);
1240 }
1241 
1242 static int
1243 filter_builtins_mail_from(struct filter_session *fs, struct filter *filter, uint64_t reqid, const char *param)
1244 {
1245 	return filter_builtins_global(fs, filter, reqid);
1246 }
1247 
1248 static int
1249 filter_builtins_rcpt_to(struct filter_session *fs, struct filter *filter, uint64_t reqid, const char *param)
1250 {
1251 	return filter_builtins_global(fs, filter, reqid) ||
1252 	    filter_check_rcpt_to_table(filter, K_MAILADDR, param) ||
1253 	    filter_check_rcpt_to_regex(filter, param);
1254 }
1255 
1256 static int
1257 filter_builtins_data(struct filter_session *fs, struct filter *filter, uint64_t reqid, const char *param)
1258 {
1259 	return filter_builtins_global(fs, filter, reqid);
1260 }
1261 
1262 static int
1263 filter_builtins_commit(struct filter_session *fs, struct filter *filter, uint64_t reqid, const char *param)
1264 {
1265 	return filter_builtins_global(fs, filter, reqid);
1266 }
1267 
1268 static void
1269 report_smtp_broadcast(uint64_t, const char *, struct timeval *, const char *,
1270     const char *, ...) __attribute__((__format__ (printf, 5, 6)));
1271 
1272 void
1273 lka_report_init(void)
1274 {
1275 	struct reporters	*tailq;
1276 	size_t			 i;
1277 
1278 	dict_init(&report_smtp_in);
1279 	dict_init(&report_smtp_out);
1280 
1281 	for (i = 0; i < nitems(smtp_events); ++i) {
1282 		tailq = xcalloc(1, sizeof (struct reporters));
1283 		TAILQ_INIT(tailq);
1284 		dict_xset(&report_smtp_in, smtp_events[i].event, tailq);
1285 
1286 		tailq = xcalloc(1, sizeof (struct reporters));
1287 		TAILQ_INIT(tailq);
1288 		dict_xset(&report_smtp_out, smtp_events[i].event, tailq);
1289 	}
1290 }
1291 
1292 void
1293 lka_report_register_hook(const char *name, const char *hook)
1294 {
1295 	struct dict	*subsystem;
1296 	struct reporter_proc	*rp;
1297 	struct reporters	*tailq;
1298 	void *iter;
1299 	size_t	i;
1300 
1301 	if (strncmp(hook, "smtp-in|", 8) == 0) {
1302 		subsystem = &report_smtp_in;
1303 		hook += 8;
1304 	}
1305 	else if (strncmp(hook, "smtp-out|", 9) == 0) {
1306 		subsystem = &report_smtp_out;
1307 		hook += 9;
1308 	}
1309 	else
1310 		fatalx("Invalid message direction: %s", hook);
1311 
1312 	if (strcmp(hook, "*") == 0) {
1313 		iter = NULL;
1314 		while (dict_iter(subsystem, &iter, NULL, (void **)&tailq)) {
1315 			rp = xcalloc(1, sizeof *rp);
1316 			rp->name = xstrdup(name);
1317 			TAILQ_INSERT_TAIL(tailq, rp, entries);
1318 		}
1319 		return;
1320 	}
1321 
1322 	for (i = 0; i < nitems(smtp_events); i++)
1323 		if (strcmp(hook, smtp_events[i].event) == 0)
1324 			break;
1325 	if (i == nitems(smtp_events))
1326 		fatalx("Unrecognized report name: %s", hook);
1327 
1328 	tailq = dict_get(subsystem, hook);
1329 	rp = xcalloc(1, sizeof *rp);
1330 	rp->name = xstrdup(name);
1331 	TAILQ_INSERT_TAIL(tailq, rp, entries);
1332 }
1333 
1334 static void
1335 report_smtp_broadcast(uint64_t reqid, const char *direction, struct timeval *tv, const char *event,
1336     const char *format, ...)
1337 {
1338 	va_list		ap;
1339 	struct dict	*d;
1340 	struct reporters	*tailq;
1341 	struct reporter_proc	*rp;
1342 
1343 	if (strcmp("smtp-in", direction) == 0)
1344 		d = &report_smtp_in;
1345 
1346 	else if (strcmp("smtp-out", direction) == 0)
1347 		d = &report_smtp_out;
1348 
1349 	else
1350 		fatalx("unexpected direction: %s", direction);
1351 
1352 	tailq = dict_xget(d, event);
1353 	TAILQ_FOREACH(rp, tailq, entries) {
1354 		if (!lka_filter_proc_in_session(reqid, rp->name))
1355 			continue;
1356 
1357 		va_start(ap, format);
1358 		if (io_printf(lka_proc_get_io(rp->name),
1359 		    "report|%s|%lld.%06ld|%s|%s|%016"PRIx64"%s",
1360 		    PROTOCOL_VERSION, (long long)tv->tv_sec, (long)tv->tv_usec,
1361 		    direction, event, reqid,
1362 		    format[0] != '\n' ? "|" : "") == -1 ||
1363 		    io_vprintf(lka_proc_get_io(rp->name), format, ap) == -1)
1364 			fatalx("failed to write to processor");
1365 		va_end(ap);
1366 	}
1367 }
1368 
1369 void
1370 lka_report_smtp_link_connect(const char *direction, struct timeval *tv, uint64_t reqid, const char *rdns,
1371     int fcrdns,
1372     const struct sockaddr_storage *ss_src,
1373     const struct sockaddr_storage *ss_dest)
1374 {
1375 	struct filter_session *fs;
1376 	char	src[NI_MAXHOST + 5];
1377 	char	dest[NI_MAXHOST + 5];
1378 	uint16_t	src_port = 0;
1379 	uint16_t	dest_port = 0;
1380 	const char     *fcrdns_str;
1381 
1382 	if (ss_src->ss_family == AF_INET)
1383 		src_port = ntohs(((const struct sockaddr_in *)ss_src)->sin_port);
1384 	else if (ss_src->ss_family == AF_INET6)
1385 		src_port = ntohs(((const struct sockaddr_in6 *)ss_src)->sin6_port);
1386 
1387 	if (ss_dest->ss_family == AF_INET)
1388 		dest_port = ntohs(((const struct sockaddr_in *)ss_dest)->sin_port);
1389 	else if (ss_dest->ss_family == AF_INET6)
1390 		dest_port = ntohs(((const struct sockaddr_in6 *)ss_dest)->sin6_port);
1391 
1392 	if (strcmp(ss_to_text(ss_src), "local") == 0) {
1393 		(void)snprintf(src, sizeof src, "unix:%s", SMTPD_SOCKET);
1394 		(void)snprintf(dest, sizeof dest, "unix:%s", SMTPD_SOCKET);
1395 	} else {
1396 		(void)snprintf(src, sizeof src, "%s:%d", ss_to_text(ss_src), src_port);
1397 		(void)snprintf(dest, sizeof dest, "%s:%d", ss_to_text(ss_dest), dest_port);
1398 	}
1399 
1400 	switch (fcrdns) {
1401 	case 1:
1402 		fcrdns_str = "pass";
1403 		break;
1404 	case 0:
1405 		fcrdns_str = "fail";
1406 		break;
1407 	default:
1408 		fcrdns_str = "error";
1409 		break;
1410 	}
1411 
1412 	fs = tree_xget(&sessions, reqid);
1413 	fs->rdns = xstrdup(rdns);
1414 	fs->fcrdns = fcrdns;
1415 	fs->ss_src = *ss_src;
1416 	fs->ss_dest = *ss_dest;
1417 
1418 	report_smtp_broadcast(reqid, direction, tv, "link-connect",
1419 	    "%s|%s|%s|%s\n", rdns, fcrdns_str, src, dest);
1420 }
1421 
1422 void
1423 lka_report_smtp_link_disconnect(const char *direction, struct timeval *tv, uint64_t reqid)
1424 {
1425 	report_smtp_broadcast(reqid, direction, tv, "link-disconnect", "\n");
1426 }
1427 
1428 void
1429 lka_report_smtp_link_greeting(const char *direction, uint64_t reqid,
1430     struct timeval *tv, const char *domain)
1431 {
1432 	report_smtp_broadcast(reqid, direction, tv, "link-greeting", "%s\n",
1433 	    domain);
1434 }
1435 
1436 void
1437 lka_report_smtp_link_auth(const char *direction, struct timeval *tv, uint64_t reqid,
1438     const char *username, const char *result)
1439 {
1440 	struct filter_session *fs;
1441 
1442 	if (strcmp(result, "pass") == 0) {
1443 		fs = tree_xget(&sessions, reqid);
1444 		fs->username = xstrdup(username);
1445 	}
1446 	report_smtp_broadcast(reqid, direction, tv, "link-auth", "%s|%s\n",
1447 	    result, username);
1448 }
1449 
1450 void
1451 lka_report_smtp_link_identify(const char *direction, struct timeval *tv,
1452     uint64_t reqid, const char *method, const char *heloname)
1453 {
1454 	report_smtp_broadcast(reqid, direction, tv, "link-identify", "%s|%s\n",
1455 	    method, heloname);
1456 }
1457 
1458 void
1459 lka_report_smtp_link_tls(const char *direction, struct timeval *tv, uint64_t reqid, const char *ciphers)
1460 {
1461 	report_smtp_broadcast(reqid, direction, tv, "link-tls", "%s\n",
1462 	    ciphers);
1463 }
1464 
1465 void
1466 lka_report_smtp_tx_reset(const char *direction, struct timeval *tv, uint64_t reqid, uint32_t msgid)
1467 {
1468 	report_smtp_broadcast(reqid, direction, tv, "tx-reset", "%08x\n",
1469 	    msgid);
1470 }
1471 
1472 void
1473 lka_report_smtp_tx_begin(const char *direction, struct timeval *tv, uint64_t reqid, uint32_t msgid)
1474 {
1475 	report_smtp_broadcast(reqid, direction, tv, "tx-begin", "%08x\n",
1476 	    msgid);
1477 }
1478 
1479 void
1480 lka_report_smtp_tx_mail(const char *direction, struct timeval *tv, uint64_t reqid, uint32_t msgid, const char *address, int ok)
1481 {
1482 	const char *result;
1483 
1484 	switch (ok) {
1485 	case 1:
1486 		result = "ok";
1487 		break;
1488 	case 0:
1489 		result = "permfail";
1490 		break;
1491 	default:
1492 		result = "tempfail";
1493 		break;
1494 	}
1495 	report_smtp_broadcast(reqid, direction, tv, "tx-mail", "%08x|%s|%s\n",
1496 	    msgid, result, address);
1497 }
1498 
1499 void
1500 lka_report_smtp_tx_rcpt(const char *direction, struct timeval *tv, uint64_t reqid, uint32_t msgid, const char *address, int ok)
1501 {
1502 	const char *result;
1503 
1504 	switch (ok) {
1505 	case 1:
1506 		result = "ok";
1507 		break;
1508 	case 0:
1509 		result = "permfail";
1510 		break;
1511 	default:
1512 		result = "tempfail";
1513 		break;
1514 	}
1515 	report_smtp_broadcast(reqid, direction, tv, "tx-rcpt", "%08x|%s|%s\n",
1516 	    msgid, result, address);
1517 }
1518 
1519 void
1520 lka_report_smtp_tx_envelope(const char *direction, struct timeval *tv, uint64_t reqid, uint32_t msgid, uint64_t evpid)
1521 {
1522 	report_smtp_broadcast(reqid, direction, tv, "tx-envelope",
1523 	    "%08x|%016"PRIx64"\n", msgid, evpid);
1524 }
1525 
1526 void
1527 lka_report_smtp_tx_data(const char *direction, struct timeval *tv, uint64_t reqid, uint32_t msgid, int ok)
1528 {
1529 	const char *result;
1530 
1531 	switch (ok) {
1532 	case 1:
1533 		result = "ok";
1534 		break;
1535 	case 0:
1536 		result = "permfail";
1537 		break;
1538 	default:
1539 		result = "tempfail";
1540 		break;
1541 	}
1542 	report_smtp_broadcast(reqid, direction, tv, "tx-data", "%08x|%s\n",
1543 	    msgid, result);
1544 }
1545 
1546 void
1547 lka_report_smtp_tx_commit(const char *direction, struct timeval *tv, uint64_t reqid, uint32_t msgid, size_t msgsz)
1548 {
1549 	report_smtp_broadcast(reqid, direction, tv, "tx-commit", "%08x|%zd\n",
1550 	    msgid, msgsz);
1551 }
1552 
1553 void
1554 lka_report_smtp_tx_rollback(const char *direction, struct timeval *tv, uint64_t reqid, uint32_t msgid)
1555 {
1556 	report_smtp_broadcast(reqid, direction, tv, "tx-rollback", "%08x\n",
1557 	    msgid);
1558 }
1559 
1560 void
1561 lka_report_smtp_protocol_client(const char *direction, struct timeval *tv, uint64_t reqid, const char *command)
1562 {
1563 	report_smtp_broadcast(reqid, direction, tv, "protocol-client", "%s\n",
1564 	    command);
1565 }
1566 
1567 void
1568 lka_report_smtp_protocol_server(const char *direction, struct timeval *tv, uint64_t reqid, const char *response)
1569 {
1570 	report_smtp_broadcast(reqid, direction, tv, "protocol-server", "%s\n",
1571 	    response);
1572 }
1573 
1574 void
1575 lka_report_smtp_filter_response(const char *direction, struct timeval *tv, uint64_t reqid,
1576     int phase, int response, const char *param)
1577 {
1578 	const char *phase_name;
1579 	const char *response_name;
1580 
1581 	switch (phase) {
1582 	case FILTER_CONNECT:
1583 		phase_name = "connected";
1584 		break;
1585 	case FILTER_HELO:
1586 		phase_name = "helo";
1587 		break;
1588 	case FILTER_EHLO:
1589 		phase_name = "ehlo";
1590 		break;
1591 	case FILTER_STARTTLS:
1592 		phase_name = "tls";
1593 		break;
1594 	case FILTER_AUTH:
1595 		phase_name = "auth";
1596 		break;
1597 	case FILTER_MAIL_FROM:
1598 		phase_name = "mail-from";
1599 		break;
1600 	case FILTER_RCPT_TO:
1601 		phase_name = "rcpt-to";
1602 		break;
1603 	case FILTER_DATA:
1604 		phase_name = "data";
1605 		break;
1606 	case FILTER_DATA_LINE:
1607 		phase_name = "data-line";
1608 		break;
1609 	case FILTER_RSET:
1610 		phase_name = "rset";
1611 		break;
1612 	case FILTER_QUIT:
1613 		phase_name = "quit";
1614 		break;
1615 	case FILTER_NOOP:
1616 		phase_name = "noop";
1617 		break;
1618 	case FILTER_HELP:
1619 		phase_name = "help";
1620 		break;
1621 	case FILTER_WIZ:
1622 		phase_name = "wiz";
1623 		break;
1624 	case FILTER_COMMIT:
1625 		phase_name = "commit";
1626 		break;
1627 	default:
1628 		phase_name = "";
1629 	}
1630 
1631 	switch (response) {
1632 	case FILTER_PROCEED:
1633 		response_name = "proceed";
1634 		break;
1635 	case FILTER_JUNK:
1636 		response_name = "junk";
1637 		break;
1638 	case FILTER_REWRITE:
1639 		response_name = "rewrite";
1640 		break;
1641 	case FILTER_REJECT:
1642 		response_name = "reject";
1643 		break;
1644 	case FILTER_DISCONNECT:
1645 		response_name = "disconnect";
1646 		break;
1647 	default:
1648 		response_name = "";
1649 	}
1650 
1651 	report_smtp_broadcast(reqid, direction, tv, "filter-response",
1652 	    "%s|%s%s%s\n", phase_name, response_name, param ? "|" : "",
1653 	    param ? param : "");
1654 }
1655 
1656 void
1657 lka_report_smtp_timeout(const char *direction, struct timeval *tv, uint64_t reqid)
1658 {
1659 	report_smtp_broadcast(reqid, direction, tv, "timeout", "\n");
1660 }
1661 
1662 void
1663 lka_report_filter_report(uint64_t reqid, const char *name, int builtin,
1664     const char *direction, struct timeval *tv, const char *message)
1665 {
1666 	report_smtp_broadcast(reqid, direction, tv, "filter-report",
1667 	    "%s|%s|%s\n", builtin ? "builtin" : "proc",
1668 	    name, message);
1669 }
1670 
1671 void
1672 lka_report_proc(const char *name, const char *line)
1673 {
1674 	char buffer[LINE_MAX];
1675 	struct timeval tv;
1676 	char *ep, *sp, *direction;
1677 	uint64_t reqid;
1678 
1679 	if (strlcpy(buffer, line + 7, sizeof(buffer)) >= sizeof(buffer))
1680 		fatalx("Invalid report: line too long: %s", line);
1681 
1682 	errno = 0;
1683 	tv.tv_sec = strtoll(buffer, &ep, 10);
1684 	if (ep[0] != '.' || errno != 0)
1685 		fatalx("Invalid report: invalid time: %s", line);
1686 	sp = ep + 1;
1687 	tv.tv_usec = strtol(sp, &ep, 10);
1688 	if (ep[0] != '|' || errno != 0)
1689 		fatalx("Invalid report: invalid time: %s", line);
1690 	if (ep - sp != 6)
1691 		fatalx("Invalid report: invalid time: %s", line);
1692 
1693 	direction = ep + 1;
1694 	if (strncmp(direction, "smtp-in|", 8) == 0) {
1695 		direction[7] = '\0';
1696 		direction += 7;
1697 #if 0
1698 	} else if (strncmp(direction, "smtp-out|", 9) == 0) {
1699 		direction[8] = '\0';
1700 		direction += 8;
1701 #endif
1702 	} else
1703 		fatalx("Invalid report: invalid direction: %s", line);
1704 
1705 	reqid = strtoull(sp, &ep, 16);
1706 	if (ep[0] != '|' || errno != 0)
1707 		fatalx("Invalid report: invalid reqid: %s", line);
1708 	sp = ep + 1;
1709 
1710 	lka_report_filter_report(reqid, name, 0, direction, &tv, sp);
1711 }
1712