1 /*
2  * Copyright (c) 2007-2012, Vsevolod Stakhov
3  * All rights reserved.
4 
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * Redistributions of source code must retain the above copyright notice, this
8  * list of conditions and the following disclaimer. Redistributions in binary form
9  * must reproduce the above copyright notice, this list of conditions and the
10  * following disclaimer in the documentation and/or other materials provided with
11  * the distribution. Neither the name of the author nor the names of its
12  * contributors may be used to endorse or promote products derived from this
13  * software without specific prior written permission.
14 
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
22  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
23  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
24  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  */
26 
27 #include "config.h"
28 
29 #include "utlist.h"
30 #include "cfg_file.h"
31 #include "rmilter.h"
32 #include "libspamd.h"
33 #include "mfapi.h"
34 #include "ucl.h"
35 #include "http_parser.h"
36 #include "sds.h"
37 #include "contrib/zstd/zstd.h"
38 #include <math.h>
39 #include <sys/mman.h>
40 
41 /* Maximum time in seconds during which spamd server is marked inactive after scan error */
42 #define INACTIVE_INTERVAL 60.0
43 /* Maximum number of failed attempts before marking server as inactive */
44 #define MAX_FAILED 5
45 /* Maximum inactive timeout (20 min) */
46 #define MAX_TIMEOUT 1200.0
47 
48 /* Global mutexes */
49 
50 #ifdef _THREAD_SAFE
51 pthread_mutex_t mx_spamd_write = PTHREAD_MUTEX_INITIALIZER;
52 #endif
53 
54 static int
rmilter_spamd_parser_on_body(http_parser * parser,const char * at,size_t length)55 rmilter_spamd_parser_on_body (http_parser * parser, const char *at, size_t length)
56 {
57 	struct rspamd_metric_result *res = parser->data;
58 	struct mlfi_priv *priv;
59 	struct ucl_parser *up;
60 	ucl_object_t *obj;
61 	ucl_object_iter_t it = NULL;
62 	struct rspamd_symbol *sym;
63 	const ucl_object_t *metric, *elt, *sym_elt;
64 	const char *act_str;
65 
66 	priv = res->priv;
67 	up = ucl_parser_new (0);
68 
69 	if (res->compressed) {
70 		/* Decompress first */
71 		ZSTD_DStream *zstream;
72 		ZSTD_inBuffer zin;
73 		ZSTD_outBuffer zout;
74 		char *out;
75 		size_t outlen, r;
76 
77 		zstream = ZSTD_createDStream ();
78 		ZSTD_initDStream (zstream);
79 
80 		zin.pos = 0;
81 		zin.src = at;
82 		zin.size = length;
83 
84 		if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
85 			outlen = ZSTD_DStreamOutSize ();
86 		}
87 
88 		out = malloc (outlen);
89 
90 		if (out == NULL) {
91 			msg_err ("<%s>; malloc error: %s", priv->mlfi_id,
92 					strerror (errno));
93 			ucl_parser_free (up);
94 
95 			return -1;
96 		}
97 
98 		zout.dst = out;
99 		zout.pos = 0;
100 		zout.size = outlen;
101 
102 		while (zin.pos < zin.size) {
103 			r = ZSTD_decompressStream (zstream, &zout, &zin);
104 
105 			if (ZSTD_isError (r)) {
106 				msg_err ("<%s>; decompression error: %s", priv->mlfi_id,
107 						ZSTD_getErrorName (r));
108 				ZSTD_freeDStream (zstream);
109 				free (out);
110 				ucl_parser_free (up);
111 
112 				return -1;
113 			}
114 
115 			if (zout.pos == zout.size) {
116 				/* We need to extend output buffer */
117 				zout.size = zout.size * 1.5 + 1.0;
118 				out = realloc (zout.dst, zout.size);
119 
120 				if (out == NULL) {
121 					msg_err ("<%s>; malloc error: %s", priv->mlfi_id,
122 							strerror (errno));
123 					ucl_parser_free (up);
124 					free (zout.dst);
125 
126 					return -1;
127 				}
128 				else {
129 					zout.dst = out;
130 				}
131 			}
132 		}
133 
134 		ZSTD_freeDStream (zstream);
135 
136 		if (!ucl_parser_add_chunk (up, out, zout.pos)) {
137 			msg_err ("<%s>; cannot parse reply from rspamd: %s",
138 					priv->mlfi_id, ucl_parser_get_error (up));
139 			ucl_parser_free (up);
140 			free (out);
141 
142 			return -1;
143 		}
144 
145 		free (out);
146 	}
147 	else {
148 		if (!ucl_parser_add_chunk (up, at, length)) {
149 			msg_err ("<%s>; cannot parse reply from rspamd: %s",
150 					priv->mlfi_id, ucl_parser_get_error (up));
151 			ucl_parser_free (up);
152 
153 			return -1;
154 		}
155 	}
156 
157 	obj = ucl_parser_get_object (up);
158 	ucl_parser_free (up);
159 	res->obj = obj;
160 
161 	if (obj == NULL || ucl_object_type (obj) != UCL_OBJECT) {
162 		msg_err ("<%s>; cannot parse reply from rspamd: bad top object", priv->mlfi_id);
163 		ucl_object_unref (obj);
164 
165 		return -1;
166 	}
167 
168 	metric = ucl_object_lookup (obj, "default");
169 
170 	if (metric == NULL || ucl_object_type (metric) != UCL_OBJECT) {
171 		msg_err ("<%s>; cannot parse reply from rspamd: no default metric result", priv->mlfi_id);
172 		ucl_object_unref (obj);
173 
174 		return -1;
175 	}
176 
177 	res->parsed = true;
178 	elt = ucl_object_lookup (metric, "score");
179 	res->score = ucl_object_todouble (elt);
180 
181 	elt = ucl_object_lookup (metric, "required_score");
182 	res->required_score = ucl_object_todouble (elt);
183 
184 	elt = ucl_object_lookup (metric, "action");
185 	act_str = ucl_object_tostring (elt);
186 
187 	elt = ucl_object_lookup (metric, "subject");
188 	res->subject = ucl_object_tostring (elt);
189 
190 	elt = ucl_object_lookup (obj, "message-id");
191 	res->message_id = ucl_object_tostring (elt);
192 
193 	elt = ucl_object_lookup (obj, "dkim-signature");
194 	if (elt) {
195 		res->dkim_signature = ucl_object_tostring (elt);
196 	}
197 
198 	if (act_str) {
199 		if (strcmp (act_str, "reject") == 0) {
200 			res->action = METRIC_ACTION_REJECT;
201 		}
202 		else if (strcmp (act_str, "greylist") == 0) {
203 			res->action = METRIC_ACTION_GREYLIST;
204 		}
205 		else if (strcmp (act_str, "add header") == 0) {
206 			res->action = METRIC_ACTION_ADD_HEADER;
207 		}
208 		else if (strcmp (act_str, "rewrite subject") == 0) {
209 			res->action = METRIC_ACTION_REWRITE_SUBJECT;
210 		}
211 		else if (strcmp (act_str, "soft reject") == 0) {
212 			res->action = METRIC_ACTION_SOFT_REJECT;
213 		}
214 		else {
215 			res->action = METRIC_ACTION_NOACTION;
216 		}
217 	}
218 	else {
219 		msg_warn ("<%s>; invalid reply from rspamd: no action found, assume no action", priv->mlfi_id);
220 		res->action = METRIC_ACTION_NOACTION;
221 	}
222 
223 	while ((sym_elt = ucl_object_iterate (metric, &it, true)) != NULL) {
224 		/* Here we assume that all objects found here are symbols */
225 		if (ucl_object_type (sym_elt) == UCL_OBJECT) {
226 			sym = malloc (sizeof (*sym));
227 
228 			if (sym != NULL) {
229 				sym->symbol = ucl_object_key (sym_elt);
230 				elt = ucl_object_lookup (sym_elt, "score");
231 				sym->score = ucl_object_todouble (elt);
232 				sym->options = ucl_object_lookup (sym_elt, "options");
233 
234 				DL_APPEND (res->symbols, sym);
235 			}
236 		}
237 	}
238 
239 	return 0;
240 }
241 
242 static int
rmilter_spamd_symcmp(struct rspamd_symbol * s1,struct rspamd_symbol * s2)243 rmilter_spamd_symcmp (struct rspamd_symbol *s1, struct rspamd_symbol *s2)
244 {
245 	return fabs (s2->score) - fabs (s1->score);
246 }
247 
248 /*
249  * rspamdscan_socket() - send file to specified host. See spamdscan() for
250  * load-balanced wrapper.
251  *
252  * returns 0 when spam not found, 1 when spam found, -1 on some error during scan (try another server), -2
253  * on unexpected error (probably clamd died on our file, fallback to another
254  * host not recommended)
255  */
256 
257 static int
rspamdscan_socket(SMFICTX * ctx,struct mlfi_priv * priv,const struct spamd_server * srv,struct config_file * cfg,struct rspamd_metric_result * res,int dkim_only)258 rspamdscan_socket(SMFICTX *ctx, struct mlfi_priv *priv,
259 		const struct spamd_server *srv, struct config_file *cfg,
260 		struct rspamd_metric_result *res,
261 		int dkim_only)
262 {
263 	sds buf = NULL;
264 	char *io_buf = NULL;
265 	struct sockaddr_un server_un;
266 	struct sockaddr_in server_in;
267 	int s = -1, fd = -1, ofl, size = 0, ret = -1;
268 	struct stat sb;
269 	struct rspamd_metric_result *cur = NULL;
270 	struct rcpt *rcpt;
271 	struct rspamd_symbol *cur_symbol;
272 	struct http_parser parser;
273 	struct http_parser_settings ps;
274 	void *map = NULL;
275 	const size_t iobuf_len = 16384;
276 	uint64_t r;
277 
278 	/* somebody doesn't need reply... */
279 	if (!srv) {
280 		return -1;
281 	}
282 
283 	s = rmilter_connect_addr (srv->name, srv->port, cfg->spamd_connect_timeout, priv);
284 
285 	if (s == -1) {
286 		msg_warn("<%s>; rspamd: cannot connect to %s: %s",  priv->mlfi_id,
287 				srv->name, strerror (errno));
288 		goto err;
289 	}
290 
291 	if (rmilter_poll_fd (s, cfg->spamd_connect_timeout, POLLOUT) < 1) {
292 		msg_warn("<%s>; rspamd: timeout waiting writing, %s",  priv->mlfi_id, srv->name);
293 		errno = ETIMEDOUT;
294 		goto err;
295 	}
296 
297 	if (priv->file[0] != '\0') {
298 		fd = open (priv->file, O_RDONLY);
299 
300 		if (fd == -1) {
301 			msg_warn("<%s>; rspamd: open (%s), %s", priv->mlfi_id, srv->name,
302 					strerror (errno));
303 			goto err;
304 		}
305 
306 		if (fstat (fd, &sb) == -1) {
307 			msg_warn ("<%s>; rspamd: stat (%s), %s", priv->mlfi_id, srv->name,
308 					strerror (errno));
309 			goto err;
310 		}
311 	}
312 	else {
313 		memset (&sb, 0, sizeof (sb));
314 	}
315 
316 	/* Set blocking again */
317 	ofl = fcntl (s, F_GETFL, 0);
318 	fcntl (s, F_SETFL, ofl & (~O_NONBLOCK));
319 
320 	buf = sdsnewlen (NULL, 512);
321 	sdsclear (buf);
322 	buf = sdscatfmt (buf,
323 			"POST /symbols HTTP/1.0\r\n",
324 			(uint64_t)sb.st_size);
325 
326 	DL_FOREACH (priv->rcpts, rcpt)
327 	{
328 		buf = sdscatfmt (buf, "Rcpt: %s\r\n", rcpt->r_addr);
329 	}
330 
331 	if (priv->priv_from[0] != '\0') {
332 		buf = sdscatfmt (buf, "From: %s\r\n", priv->priv_from);
333 	}
334 
335 	if (priv->priv_helo[0] != '\0') {
336 		buf = sdscatfmt (buf, "Helo: %s\r\n", priv->priv_helo);
337 	}
338 
339 	if (priv->priv_hostname[0] != '\0'
340 			&& memcmp (priv->priv_hostname, "unknown", 8) != 0) {
341 		buf = sdscatfmt (buf, "Hostname: %s\r\n",
342 				priv->priv_hostname);
343 	}
344 
345 	if (priv->priv_ip[0] != '\0') {
346 		buf = sdscatfmt (buf, "IP: %s\r\n", priv->priv_ip);
347 	}
348 
349 	if (priv->priv_user[0] != '\0') {
350 		buf = sdscatfmt (buf, "User: %s\r\n", priv->priv_user);
351 	}
352 
353 	buf = sdscatfmt (buf, "Queue-ID: %s\r\n", priv->queue_id);
354 
355 	if (cfg->spamd_settings_id) {
356 		buf = sdscatfmt (buf, "Settings-ID: %s\r\n", cfg->spamd_settings_id);
357 	}
358 
359 	if (priv->mta_tag[0] != '\0') {
360 		 buf = sdscatfmt (buf, "MTA-Tag: %s\r\n", priv->mta_tag);
361 	}
362 
363 	if (dkim_only) {
364 		/* Add specific settings to enable merely DKIM module */
365 		buf = sdscatfmt (buf, "Settings: {\"groups_enabled\":[\"dkim\"]}\r\n");
366 	}
367 
368 	if (priv->file[0] != '\0') {
369 		if (cfg->compression_enable) {
370 			unsigned char *out;
371 			size_t outlen;
372 
373 			map = mmap (NULL, sb.st_size, PROT_READ, MAP_SHARED, fd, 0);
374 
375 			if (map == MAP_FAILED) {
376 				map = NULL;
377 				msg_warn ("<%s>; rspamd: mmap (%s), %s", priv->mlfi_id, srv->name,
378 						strerror (errno));
379 				goto err;
380 			}
381 
382 			outlen = ZSTD_compressBound (sb.st_size);
383 			out = malloc (outlen);
384 
385 			if (out == NULL) {
386 				msg_warn ("<%s>; rspamd: malloc (%s), %s", priv->mlfi_id, srv->name,
387 						strerror (errno));
388 				goto err;
389 			}
390 
391 			r = ZSTD_compress (out, outlen, map, sb.st_size, 1);
392 			msg_info ("<%s>; rspamd: compressed message, %lu bytes to %lu bytes",
393 					priv->mlfi_id, (unsigned long)sb.st_size, (unsigned long)r);
394 			munmap (map, sb.st_size);
395 			map = NULL;
396 
397 			if (ZSTD_isError (r)) {
398 				msg_warn ("<%s>; rspamd: zstd compress (%s), %s",
399 						priv->mlfi_id, srv->name,
400 						ZSTD_getErrorName (r));
401 				free (out);
402 				goto err;
403 			}
404 
405 			buf = sdscatfmt (buf, "Compression: zstd\r\n");
406 			buf = sdscatfmt (buf, "Content-Type: application/x-compressed\r\n"
407 					"Content-Length: %U\r\n\r\n", r);
408 
409 			if (rmilter_atomic_write (s, buf, sdslen (buf)) == -1) {
410 				msg_warn("<%s>; rspamd: write (%s), %s", priv->mlfi_id, srv->name,
411 						strerror (errno));
412 				goto err;
413 			}
414 
415 			if (rmilter_atomic_write (s, out, r) == -1) {
416 				free (out);
417 				msg_warn ("<%s>; rspamd: write (%s), %s", priv->mlfi_id, srv->name,
418 						strerror (errno));
419 				goto err;
420 			}
421 
422 			free (out);
423 		}
424 		else {
425 			(void)map;
426 			r = sb.st_size;
427 			buf = sdscatfmt (buf, "Content-Type: text/plain\r\n"
428 					"Content-Length: %U\r\n\r\n", r);
429 
430 			if (rmilter_atomic_write (s, buf, sdslen (buf)) == -1) {
431 				msg_warn("<%s>; rspamd: write (%s), %s", priv->mlfi_id, srv->name,
432 						strerror (errno));
433 				goto err;
434 			}
435 
436 #if defined(FREEBSD) && defined(HAVE_SENDFILE)
437 			if (sendfile(fd, s, 0, 0, 0, 0, 0) != 0) {
438 				msg_warn("<%s>; rspamd: sendfile (%s), %s", priv->mlfi_id, srv->name, strerror (errno));
439 				goto err;
440 			}
441 #elif defined(LINUX) && defined(HAVE_SENDFILE)
442 			off_t off = 0;
443 			if (sendfile(s, fd, &off, sb.st_size) == -1) {
444 				msg_warn("<%s>; rspamd: sendfile (%s), %s", priv->mlfi_id, srv->name, strerror (errno));
445 				goto err;
446 			}
447 #else
448 			map = mmap (NULL, sb.st_size, PROT_READ, MAP_SHARED, fd, 0);
449 
450 			if (map == MAP_FAILED) {
451 				map = NULL;
452 				msg_warn ("<%s>; rspamd: mmap (%s), %s", priv->mlfi_id, srv->name,
453 						strerror (errno));
454 				goto err;
455 			}
456 
457 			if (rmilter_atomic_write (s, map, sb.st_size) == -1) {
458 				msg_warn ("<%s>; rspamd: write (%s), %s", priv->mlfi_id, srv->name,
459 						strerror (errno));
460 				goto err;
461 			}
462 
463 			munmap (map, sb.st_size);
464 #endif
465 		}
466 	}
467 	else {
468 		buf = sdscatfmt (buf, "Content-Length: 0\r\n\r\n");
469 
470 		if (rmilter_atomic_write (s, buf, sdslen (buf)) == -1) {
471 			msg_warn("<%s>; rspamd: write (%s), %s", priv->mlfi_id, srv->name,
472 					strerror (errno));
473 			goto err;
474 		}
475 	}
476 
477 	fcntl (s, F_SETFL, ofl|O_NONBLOCK);
478 
479 	/*
480 	 * read results
481 	 */
482 	sdsclear (buf);
483 
484 	for (;;) {
485 		ssize_t r;
486 
487 		if (io_buf == NULL) {
488 			io_buf = malloc (iobuf_len);
489 		}
490 
491 		if (io_buf == NULL) {
492 			msg_err ("<%s>; rspamd: malloc (%s), %s", priv->mlfi_id, srv->name,
493 					strerror (errno));
494 			goto err;
495 		}
496 
497 		if (rmilter_poll_fd (s, cfg->spamd_results_timeout, POLLIN) < 1) {
498 			msg_warn("<%s>; rspamd: timeout waiting results %s", priv->mlfi_id,
499 					srv->name);
500 			goto err;
501 		}
502 
503 		r = read (s, io_buf, iobuf_len);
504 
505 		if (r == -1) {
506 			if (errno == EAGAIN || errno == EINTR) {
507 				continue;
508 			}
509 			else {
510 				msg_warn("<%s>; rspamd: read, %s, %s", priv->mlfi_id,  srv->name,
511 						strerror (errno));
512 				goto err;
513 			}
514 		}
515 		else if (r == 0) {
516 			break;
517 		}
518 		else {
519 			buf = sdscatlen (buf, io_buf, r);
520 		}
521 	}
522 
523 	size = sdslen (buf);
524 
525 	if (size == 0) {
526 		msg_err ("<%s>; rspamd; got empty reply from %s",
527 				priv->mlfi_id, srv->name);
528 		goto err;
529 	}
530 
531 	/* Now we need to parse HTTP reply */
532 	memset (&parser, 0, sizeof (parser));
533 	http_parser_init (&parser, HTTP_RESPONSE);
534 
535 	memset (&ps, 0, sizeof (ps));
536 	res->priv = priv;
537 	res->compressed = cfg->compression_enable;
538 	ps.on_body = rmilter_spamd_parser_on_body;
539 	parser.data = res;
540 	parser.content_length = size;
541 
542 	if (http_parser_execute (&parser, &ps, buf, size) != (size_t)size) {
543 		msg_err ("<%s>; rspamd; HTTP parser error: %s when rspamd reply",
544 				priv->mlfi_id, http_errno_description (parser.http_errno));
545 		goto err;
546 	}
547 
548 	if (!res->parsed) {
549 		if (parser.status_code != 200) {
550 			msg_err ("<%s>; rspamd; HTTP error: bad status code: %d",
551 					priv->mlfi_id, (int)parser.status_code);
552 		}
553 		else {
554 			msg_err ("<%s>; rspamd; HTTP error: cannot parse reply",
555 					priv->mlfi_id);
556 		}
557 
558 		goto err;
559 	}
560 
561 	ret = 0;
562 
563 err:
564 	if (fd != -1) {
565 		close (fd);
566 	}
567 
568 	if (s != -1) {
569 		close (s);
570 	}
571 
572 	if (buf) {
573 		sdsfree (buf);
574 	}
575 
576 	if (map != NULL) {
577 		munmap (map, sb.st_size);
578 	}
579 
580 	if (io_buf) {
581 		free (io_buf);
582 	}
583 
584 	return ret;
585 }
586 #undef TEST_WORD
587 
588 static void
rmiler_process_rspamd_block(const ucl_object_t * obj,SMFICTX * ctx)589 rmiler_process_rspamd_block (const ucl_object_t *obj, SMFICTX *ctx)
590 {
591 	const ucl_object_t *elt, *cur, *cur_elt;
592 	ucl_object_iter_t it;
593 	int nhdr;
594 
595 	if (obj && ucl_object_type (obj) == UCL_OBJECT) {
596 		elt = ucl_object_lookup (obj, "remove_headers");
597 		/*
598 		 * remove_headers:  {"name": 1, ... }
599 		 * where number is the header's position starting from '1'
600 		 */
601 		if (elt && ucl_object_type (elt) == UCL_OBJECT) {
602 			it = NULL;
603 
604 			while ((cur = ucl_object_iterate (elt, &it, true)) != NULL) {
605 				if (ucl_object_type (cur) == UCL_INT) {
606 					nhdr = ucl_object_toint (cur);
607 
608 					if (nhdr >= 1) {
609 						smfi_chgheader (ctx, (char *)ucl_object_key (cur),
610 								nhdr, NULL);
611 					}
612 				}
613 			}
614 		}
615 
616 		elt = ucl_object_lookup (obj, "add_headers");
617 		/*
618 		 * add_headers: {"name": "value", ... }
619 		 * name could have multiple values
620 		 */
621 		if (elt && ucl_object_type (elt) == UCL_OBJECT) {
622 			it = NULL;
623 
624 			while ((cur = ucl_object_iterate (elt, &it, true)) != NULL) {
625 				LL_FOREACH (cur, cur_elt) {
626 					if (ucl_object_type (cur_elt) == UCL_STRING) {
627 						smfi_addheader (ctx, (char *)ucl_object_key (cur),
628 								(char *)ucl_object_tostring (cur_elt));
629 					}
630 				}
631 			}
632 		}
633 	}
634 }
635 
636 /*
637  * spamdscan() - send file to one of remote spamd, with pseudo load-balancing
638  * (select one random server, fallback to others in case of errors).
639  *
640  * returns 0 if file scanned and spam not found,
641  * 1 if file scanned and spam found ,
642  * 2 if file scanned and this is probably spam,
643  * -1 when retry limit exceeded, -2 on unexpected error, e.g. unexpected reply from
644  * server (suppose scanned message killed spamd...)
645  */
646 
647 struct rspamd_metric_result*
spamdscan(void * _ctx,struct mlfi_priv * priv,struct config_file * cfg,int extra,int dkim_only)648 spamdscan (void *_ctx, struct mlfi_priv *priv, struct config_file *cfg, int extra,
649 		int dkim_only)
650 {
651 	static const int max_syslog_len = 900;
652 	int retry, r = -2, to_trace = 0, i, j, ret;
653 	struct timeval t;
654 	double ts, tf;
655 	struct spamd_server *selected = NULL;
656 	char bar_buf[128], hdrbuf[256];
657 	char *prefix = "s", *c;
658 	struct rspamd_metric_result *res;
659 	struct rspamd_symbol *cur_symbol, *tmp_symbol;
660 	struct timespec sleep_ts;
661 	SMFICTX *ctx = _ctx;
662 	sds optbuf, logbuf, headerbuf;
663 	const ucl_object_t *obj;
664 	struct rcpt *rcpt;
665 	bool extended_options = true, print_symbols = true, extended_headers = false;
666 
667 	gettimeofday (&t, NULL);
668 	ts = t.tv_sec + t.tv_usec / 1000000.0;
669 	retry = cfg->spamd_retry_count;
670 	sleep_ts.tv_sec = cfg->spamd_retry_timeout / 1000;
671 	sleep_ts.tv_nsec = (cfg->spamd_retry_timeout % 1000) * 1000000ULL;
672 
673 	res = malloc (sizeof (*res));
674 
675 	if (res == NULL) {
676 		msg_err("<%s>; spamdscan: malloc falied, %s", priv->mlfi_id,
677 				strerror (errno));
678 		return NULL;
679 	}
680 
681 	memset (res, 0, sizeof (*res));
682 
683 	/* try to scan with available servers */
684 	while (1) {
685 		if (extra) {
686 			selected = (struct spamd_server *) get_random_upstream (
687 					(void *) cfg->extra_spamd_servers,
688 					cfg->extra_spamd_servers_num, sizeof(struct spamd_server),
689 					t.tv_sec, cfg->spamd_error_time, cfg->spamd_dead_time,
690 					cfg->spamd_maxerrors, priv);
691 		}
692 		else {
693 			selected = (struct spamd_server *) get_random_upstream (
694 					(void *) cfg->spamd_servers, cfg->spamd_servers_num,
695 					sizeof(struct spamd_server), t.tv_sec,
696 					cfg->spamd_error_time, cfg->spamd_dead_time,
697 					cfg->spamd_maxerrors, priv);
698 		}
699 		if (selected == NULL) {
700 			msg_err("<%s>; spamdscan: upstream get error, %s", priv->mlfi_id,
701 					priv->file);
702 			free (res);
703 
704 			return NULL;
705 		}
706 
707 		msg_info ("<%s>; spamdscan: start scanning message on %s", priv->mlfi_id,
708 				selected->name);
709 
710 		prefix = "rs";
711 		r = rspamdscan_socket (ctx, priv, selected, cfg, res, dkim_only);
712 
713 		msg_info("<%s>; spamdscan: finish scanning message on %s", priv->mlfi_id,
714 				selected->name);
715 
716 		if (r == 0 || r == 1) {
717 			upstream_ok (&selected->up, t.tv_sec);
718 			break;
719 		}
720 		upstream_fail (&selected->up, t.tv_sec);
721 		if (r == -2) {
722 			msg_warn("<%s>; %spamdscan: unexpected problem, %s, %s",
723 					priv->mlfi_id, prefix, selected->name, priv->file);
724 			break;
725 		}
726 		if (--retry < 1) {
727 			msg_warn("<%s>; %spamdscan: retry limit exceeded, %s, %s",
728 					priv->mlfi_id, prefix, selected->name, priv->file);
729 			break;
730 		}
731 
732 		msg_warn("<%s>; %spamdscan: failed to scan, retry, %s, %s",
733 				priv->mlfi_id, prefix, selected->name, priv->file);
734 		nanosleep (&sleep_ts, NULL);
735 	}
736 
737 	if (r < 0) {
738 		free (res);
739 		return NULL;
740 	}
741 
742 	/*
743 	 * print scanning time, server and result
744 	 */
745 	gettimeofday (&t, NULL);
746 	tf = t.tv_sec + t.tv_usec / 1000000.0;
747 
748 	logbuf = sdsnewlen (NULL, 1024);
749 	headerbuf = sdsnewlen (NULL, 512);
750 
751 	if (res->symbols) {
752 		/* Sort symbols by scores from high to low */
753 		DL_SORT (res->symbols, rmilter_spamd_symcmp);
754 	}
755 
756 	if (res->message_id) {
757 		rmilter_strlcpy (priv->message_id, res->message_id,
758 				sizeof (priv->message_id));
759 	}
760 
761 	extended_headers = true;
762 	DL_FOREACH (priv->rcpts, rcpt) {
763 		if (!is_whitelisted_rcpt (&cfg->extended_rcpts, rcpt->r_addr)) {
764 			extended_headers = false;
765 		}
766 	}
767 
768 	if (!extended_headers && cfg->extended_spam_headers && !priv->authenticated) {
769 		extended_headers = true;
770 	}
771 
772 log_retry:
773 	sdsclear (logbuf);
774 	sdsclear (headerbuf);
775 
776 	/* Parse res tailq */
777 	if (extended_headers) {
778 		headerbuf = sdscatprintf (headerbuf, "%s: %s [%.2f / %.2f]%c",
779 				"default", res->score > res->required_score ? "True" : "False",
780 				res->score, res->required_score,
781 				res->symbols != NULL ? '\n' : ' ');
782 	}
783 
784 	logbuf = sdscatprintf (logbuf,
785 					"<%s>; spamdscan: scan, time: %.3f, server: %s, metric: "
786 					"default: [%.3f / %.3f], symbols: ",
787 					priv->mlfi_id, tf - ts, selected->name, res->score,
788 					res->required_score);
789 
790 	/* Write symbols */
791 	if (res->symbols == NULL) {
792 		logbuf = sdscatprintf (logbuf, "no symbols");
793 	}
794 	else {
795 		optbuf = sdsempty ();
796 
797 		DL_FOREACH_SAFE (res->symbols, cur_symbol, tmp_symbol) {
798 			sdsclear (optbuf);
799 
800 			if (cur_symbol->symbol) {
801 
802 				if (cur_symbol->options && extended_options) {
803 					ucl_object_iter_t it = NULL;
804 					const ucl_object_t *elt;
805 					bool first = true;
806 
807 					while ((elt = ucl_object_iterate (cur_symbol->options,
808 							&it, true)) != NULL) {
809 						if (ucl_object_type (elt) == UCL_STRING) {
810 							if (first) {
811 								optbuf = sdscat (optbuf,
812 										ucl_object_tostring (elt));
813 								first = false;
814 							}
815 							else {
816 								optbuf = sdscat (optbuf, ", ");
817 								optbuf = sdscat (optbuf,
818 										ucl_object_tostring (elt));
819 							}
820 						}
821 					}
822 				}
823 
824 				if (print_symbols) {
825 					if (cur_symbol->next) {
826 						logbuf = sdscatprintf (logbuf, "%s(%.2f)[%s], ",
827 								cur_symbol->symbol, cur_symbol->score, optbuf);
828 					}
829 					else {
830 						logbuf = sdscatprintf (logbuf, "%s(%.2f)[%s]",
831 								cur_symbol->symbol, cur_symbol->score, optbuf);
832 					}
833 					if (cfg->trace_symbol) {
834 						c = strchr (cur_symbol->symbol, '(');
835 						if (c != NULL) {
836 							*c = '\0';
837 						}
838 						if (!strcmp (cfg->trace_symbol, cur_symbol->symbol)) {
839 							to_trace++;
840 						}
841 					}
842 				}
843 
844 				if (extended_headers) {
845 					if (cur_symbol->next) {
846 						headerbuf = sdscatprintf (headerbuf,
847 								" %s(%.2f)[%s]\n", cur_symbol->symbol,
848 								cur_symbol->score, optbuf);
849 					}
850 					else {
851 						headerbuf = sdscatprintf (headerbuf,
852 								" %s(%.2f)[%s]",
853 								cur_symbol->symbol,
854 								cur_symbol->score, optbuf);
855 					}
856 				}
857 			}
858 		}
859 
860 		sdsfree (optbuf);
861 	}
862 
863 	if (sdslen (logbuf) > max_syslog_len) {
864 		if (extended_options) {
865 			/* Try to retry without options */
866 			extended_options = false;
867 			msg_info ("<%s>; spamdscan: too large reply: %d, skip options",
868 					priv->mlfi_id, (int)sdslen (logbuf));
869 			goto log_retry;
870 		}
871 		else if (print_symbols) {
872 			msg_info ("<%s>; spamdscan: too large reply: %d, skip symbols",
873 					priv->mlfi_id, (int)sdslen (logbuf));
874 			print_symbols = false;
875 			goto log_retry;
876 		}
877 		else {
878 			/* Truncate reply */
879 			msg_err ("<%s>; spamdscan: too large reply: %d, truncate reply",
880 					priv->mlfi_id, (int)sdslen (logbuf));
881 			sdsrange (logbuf, 0, max_syslog_len);
882 		}
883 	}
884 
885 	msg_info ("%s", logbuf);
886 	sdsfree (logbuf);
887 
888 	if (extended_headers) {
889 		if (extra) {
890 			smfi_addheader (ctx, "X-Spamd-Extra-Result", headerbuf);
891 		}
892 		else {
893 			smfi_addheader (ctx, "X-Spamd-Result", headerbuf);
894 		}
895 	}
896 
897 	sdsfree (headerbuf);
898 
899 	/* All other statistic headers */
900 	if (extended_headers) {
901 		if (extra) {
902 			smfi_addheader (ctx, "X-Rspamd-Extra-Server", selected->name);
903 			snprintf (hdrbuf, sizeof(hdrbuf), "%.2f", tf - ts);
904 			smfi_addheader (ctx, "X-Rspamd-Extra-Scan-Time", hdrbuf);
905 		}
906 		else {
907 			smfi_addheader (ctx, "X-Rspamd-Server", selected->name);
908 			snprintf (hdrbuf, sizeof (hdrbuf), "%.2f", tf - ts);
909 			smfi_addheader (ctx, "X-Rspamd-Scan-Time", hdrbuf);
910 			smfi_addheader (ctx, "X-Rspamd-Queue-ID", priv->queue_id);
911 		}
912 	}
913 
914 	if (res->dkim_signature) {
915 		/* Add dkim signature passed from rspamd */
916 
917 		/*
918 		 * According to milter docs, we need to be extra careful
919 		 * when folding headers:
920 		 * Neither the name nor the value of the header is checked for standards
921 		 * compliance. However, each line of the header must be under 2048
922 		 * characters and should be under 998 characters.
923 		 * If longer headers are needed, make them multi-line.
924 		 * To make a multi-line header, insert a line feed (ASCII 0x0a, or \n
925 		 * in C) followed by at least one whitespace character such as a
926 		 * space (ASCII 0x20) or tab (ASCII 0x09, or \t in C).
927 		 * The line feed should NOT be preceded by a carriage return (ASCII 0x0d);
928 		 * the MTA will add this automatically.
929 		 * It is the filter writer's responsibility to ensure that no s
930 		 * tandards are violated.
931 		 */
932 		char *dkim_buf = malloc (strlen (res->dkim_signature) + 1);
933 
934 		if (dkim_buf != NULL) {
935 			char *d;
936 			const char *s;
937 
938 			s = res->dkim_signature;
939 			d = dkim_buf;
940 
941 			while (*s) {
942 				size_t len = strcspn (s, "\r");
943 
944 				if (len > 0) {
945 					memcpy (d, s, len);
946 					d += len;
947 					s += len + strspn (s + len, "\r");
948 				}
949 				else {
950 					s ++;
951 				}
952 			}
953 
954 			*d = '\0';
955 
956 			smfi_addheader (ctx, "DKIM-Signature", dkim_buf);
957 
958 			free (dkim_buf);
959 		}
960 	}
961 
962 	obj = ucl_object_lookup (res->obj, "rmilter");
963 	if (obj) {
964 		rmiler_process_rspamd_block (obj, ctx);
965 	}
966 
967 	obj = ucl_object_lookup (res->obj, "messages");
968 	if (obj) {
969 		const ucl_object_t *smtp_res;
970 
971 		smtp_res = ucl_object_lookup (obj, "smtp_message");
972 
973 		if (smtp_res) {
974 			res->message = ucl_object_tostring (smtp_res);
975 		}
976 	}
977 
978 	/* Trace spam messages to specific addr */
979 	if (!extra && to_trace && cfg->trace_addr) {
980 		smfi_addrcpt (ctx, cfg->trace_addr);
981 		smfi_setpriv (ctx, priv);
982 	}
983 
984 	return res;
985 }
986 
987 void
spamd_free_result(struct rspamd_metric_result * mres)988 spamd_free_result (struct rspamd_metric_result *mres)
989 {
990 	struct rspamd_symbol *cur_symbol, *tmp_symbol;
991 
992 	if (mres) {
993 		DL_FOREACH_SAFE (mres->symbols, cur_symbol, tmp_symbol) {
994 			free (cur_symbol);
995 		}
996 
997 		ucl_object_unref (mres->obj);
998 		free (mres);
999 	}
1000 }
1001