1 /*
2 * Copyright (c) 2007-2012, Vsevolod Stakhov
3 * All rights reserved.
4
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * Redistributions of source code must retain the above copyright notice, this
8 * list of conditions and the following disclaimer. Redistributions in binary form
9 * must reproduce the above copyright notice, this list of conditions and the
10 * following disclaimer in the documentation and/or other materials provided with
11 * the distribution. Neither the name of the author nor the names of its
12 * contributors may be used to endorse or promote products derived from this
13 * software without specific prior written permission.
14
15 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
22 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
23 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
24 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 */
26
27 #include "config.h"
28
29 #include "utlist.h"
30 #include "cfg_file.h"
31 #include "rmilter.h"
32 #include "libspamd.h"
33 #include "mfapi.h"
34 #include "ucl.h"
35 #include "http_parser.h"
36 #include "sds.h"
37 #include "contrib/zstd/zstd.h"
38 #include <math.h>
39 #include <sys/mman.h>
40
41 /* Maximum time in seconds during which spamd server is marked inactive after scan error */
42 #define INACTIVE_INTERVAL 60.0
43 /* Maximum number of failed attempts before marking server as inactive */
44 #define MAX_FAILED 5
45 /* Maximum inactive timeout (20 min) */
46 #define MAX_TIMEOUT 1200.0
47
48 /* Global mutexes */
49
50 #ifdef _THREAD_SAFE
51 pthread_mutex_t mx_spamd_write = PTHREAD_MUTEX_INITIALIZER;
52 #endif
53
54 static int
rmilter_spamd_parser_on_body(http_parser * parser,const char * at,size_t length)55 rmilter_spamd_parser_on_body (http_parser * parser, const char *at, size_t length)
56 {
57 struct rspamd_metric_result *res = parser->data;
58 struct mlfi_priv *priv;
59 struct ucl_parser *up;
60 ucl_object_t *obj;
61 ucl_object_iter_t it = NULL;
62 struct rspamd_symbol *sym;
63 const ucl_object_t *metric, *elt, *sym_elt;
64 const char *act_str;
65
66 priv = res->priv;
67 up = ucl_parser_new (0);
68
69 if (res->compressed) {
70 /* Decompress first */
71 ZSTD_DStream *zstream;
72 ZSTD_inBuffer zin;
73 ZSTD_outBuffer zout;
74 char *out;
75 size_t outlen, r;
76
77 zstream = ZSTD_createDStream ();
78 ZSTD_initDStream (zstream);
79
80 zin.pos = 0;
81 zin.src = at;
82 zin.size = length;
83
84 if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
85 outlen = ZSTD_DStreamOutSize ();
86 }
87
88 out = malloc (outlen);
89
90 if (out == NULL) {
91 msg_err ("<%s>; malloc error: %s", priv->mlfi_id,
92 strerror (errno));
93 ucl_parser_free (up);
94
95 return -1;
96 }
97
98 zout.dst = out;
99 zout.pos = 0;
100 zout.size = outlen;
101
102 while (zin.pos < zin.size) {
103 r = ZSTD_decompressStream (zstream, &zout, &zin);
104
105 if (ZSTD_isError (r)) {
106 msg_err ("<%s>; decompression error: %s", priv->mlfi_id,
107 ZSTD_getErrorName (r));
108 ZSTD_freeDStream (zstream);
109 free (out);
110 ucl_parser_free (up);
111
112 return -1;
113 }
114
115 if (zout.pos == zout.size) {
116 /* We need to extend output buffer */
117 zout.size = zout.size * 1.5 + 1.0;
118 out = realloc (zout.dst, zout.size);
119
120 if (out == NULL) {
121 msg_err ("<%s>; malloc error: %s", priv->mlfi_id,
122 strerror (errno));
123 ucl_parser_free (up);
124 free (zout.dst);
125
126 return -1;
127 }
128 else {
129 zout.dst = out;
130 }
131 }
132 }
133
134 ZSTD_freeDStream (zstream);
135
136 if (!ucl_parser_add_chunk (up, out, zout.pos)) {
137 msg_err ("<%s>; cannot parse reply from rspamd: %s",
138 priv->mlfi_id, ucl_parser_get_error (up));
139 ucl_parser_free (up);
140 free (out);
141
142 return -1;
143 }
144
145 free (out);
146 }
147 else {
148 if (!ucl_parser_add_chunk (up, at, length)) {
149 msg_err ("<%s>; cannot parse reply from rspamd: %s",
150 priv->mlfi_id, ucl_parser_get_error (up));
151 ucl_parser_free (up);
152
153 return -1;
154 }
155 }
156
157 obj = ucl_parser_get_object (up);
158 ucl_parser_free (up);
159 res->obj = obj;
160
161 if (obj == NULL || ucl_object_type (obj) != UCL_OBJECT) {
162 msg_err ("<%s>; cannot parse reply from rspamd: bad top object", priv->mlfi_id);
163 ucl_object_unref (obj);
164
165 return -1;
166 }
167
168 metric = ucl_object_lookup (obj, "default");
169
170 if (metric == NULL || ucl_object_type (metric) != UCL_OBJECT) {
171 msg_err ("<%s>; cannot parse reply from rspamd: no default metric result", priv->mlfi_id);
172 ucl_object_unref (obj);
173
174 return -1;
175 }
176
177 res->parsed = true;
178 elt = ucl_object_lookup (metric, "score");
179 res->score = ucl_object_todouble (elt);
180
181 elt = ucl_object_lookup (metric, "required_score");
182 res->required_score = ucl_object_todouble (elt);
183
184 elt = ucl_object_lookup (metric, "action");
185 act_str = ucl_object_tostring (elt);
186
187 elt = ucl_object_lookup (metric, "subject");
188 res->subject = ucl_object_tostring (elt);
189
190 elt = ucl_object_lookup (obj, "message-id");
191 res->message_id = ucl_object_tostring (elt);
192
193 elt = ucl_object_lookup (obj, "dkim-signature");
194 if (elt) {
195 res->dkim_signature = ucl_object_tostring (elt);
196 }
197
198 if (act_str) {
199 if (strcmp (act_str, "reject") == 0) {
200 res->action = METRIC_ACTION_REJECT;
201 }
202 else if (strcmp (act_str, "greylist") == 0) {
203 res->action = METRIC_ACTION_GREYLIST;
204 }
205 else if (strcmp (act_str, "add header") == 0) {
206 res->action = METRIC_ACTION_ADD_HEADER;
207 }
208 else if (strcmp (act_str, "rewrite subject") == 0) {
209 res->action = METRIC_ACTION_REWRITE_SUBJECT;
210 }
211 else if (strcmp (act_str, "soft reject") == 0) {
212 res->action = METRIC_ACTION_SOFT_REJECT;
213 }
214 else {
215 res->action = METRIC_ACTION_NOACTION;
216 }
217 }
218 else {
219 msg_warn ("<%s>; invalid reply from rspamd: no action found, assume no action", priv->mlfi_id);
220 res->action = METRIC_ACTION_NOACTION;
221 }
222
223 while ((sym_elt = ucl_object_iterate (metric, &it, true)) != NULL) {
224 /* Here we assume that all objects found here are symbols */
225 if (ucl_object_type (sym_elt) == UCL_OBJECT) {
226 sym = malloc (sizeof (*sym));
227
228 if (sym != NULL) {
229 sym->symbol = ucl_object_key (sym_elt);
230 elt = ucl_object_lookup (sym_elt, "score");
231 sym->score = ucl_object_todouble (elt);
232 sym->options = ucl_object_lookup (sym_elt, "options");
233
234 DL_APPEND (res->symbols, sym);
235 }
236 }
237 }
238
239 return 0;
240 }
241
242 static int
rmilter_spamd_symcmp(struct rspamd_symbol * s1,struct rspamd_symbol * s2)243 rmilter_spamd_symcmp (struct rspamd_symbol *s1, struct rspamd_symbol *s2)
244 {
245 return fabs (s2->score) - fabs (s1->score);
246 }
247
248 /*
249 * rspamdscan_socket() - send file to specified host. See spamdscan() for
250 * load-balanced wrapper.
251 *
252 * returns 0 when spam not found, 1 when spam found, -1 on some error during scan (try another server), -2
253 * on unexpected error (probably clamd died on our file, fallback to another
254 * host not recommended)
255 */
256
257 static int
rspamdscan_socket(SMFICTX * ctx,struct mlfi_priv * priv,const struct spamd_server * srv,struct config_file * cfg,struct rspamd_metric_result * res,int dkim_only)258 rspamdscan_socket(SMFICTX *ctx, struct mlfi_priv *priv,
259 const struct spamd_server *srv, struct config_file *cfg,
260 struct rspamd_metric_result *res,
261 int dkim_only)
262 {
263 sds buf = NULL;
264 char *io_buf = NULL;
265 struct sockaddr_un server_un;
266 struct sockaddr_in server_in;
267 int s = -1, fd = -1, ofl, size = 0, ret = -1;
268 struct stat sb;
269 struct rspamd_metric_result *cur = NULL;
270 struct rcpt *rcpt;
271 struct rspamd_symbol *cur_symbol;
272 struct http_parser parser;
273 struct http_parser_settings ps;
274 void *map = NULL;
275 const size_t iobuf_len = 16384;
276 uint64_t r;
277
278 /* somebody doesn't need reply... */
279 if (!srv) {
280 return -1;
281 }
282
283 s = rmilter_connect_addr (srv->name, srv->port, cfg->spamd_connect_timeout, priv);
284
285 if (s == -1) {
286 msg_warn("<%s>; rspamd: cannot connect to %s: %s", priv->mlfi_id,
287 srv->name, strerror (errno));
288 goto err;
289 }
290
291 if (rmilter_poll_fd (s, cfg->spamd_connect_timeout, POLLOUT) < 1) {
292 msg_warn("<%s>; rspamd: timeout waiting writing, %s", priv->mlfi_id, srv->name);
293 errno = ETIMEDOUT;
294 goto err;
295 }
296
297 if (priv->file[0] != '\0') {
298 fd = open (priv->file, O_RDONLY);
299
300 if (fd == -1) {
301 msg_warn("<%s>; rspamd: open (%s), %s", priv->mlfi_id, srv->name,
302 strerror (errno));
303 goto err;
304 }
305
306 if (fstat (fd, &sb) == -1) {
307 msg_warn ("<%s>; rspamd: stat (%s), %s", priv->mlfi_id, srv->name,
308 strerror (errno));
309 goto err;
310 }
311 }
312 else {
313 memset (&sb, 0, sizeof (sb));
314 }
315
316 /* Set blocking again */
317 ofl = fcntl (s, F_GETFL, 0);
318 fcntl (s, F_SETFL, ofl & (~O_NONBLOCK));
319
320 buf = sdsnewlen (NULL, 512);
321 sdsclear (buf);
322 buf = sdscatfmt (buf,
323 "POST /symbols HTTP/1.0\r\n",
324 (uint64_t)sb.st_size);
325
326 DL_FOREACH (priv->rcpts, rcpt)
327 {
328 buf = sdscatfmt (buf, "Rcpt: %s\r\n", rcpt->r_addr);
329 }
330
331 if (priv->priv_from[0] != '\0') {
332 buf = sdscatfmt (buf, "From: %s\r\n", priv->priv_from);
333 }
334
335 if (priv->priv_helo[0] != '\0') {
336 buf = sdscatfmt (buf, "Helo: %s\r\n", priv->priv_helo);
337 }
338
339 if (priv->priv_hostname[0] != '\0'
340 && memcmp (priv->priv_hostname, "unknown", 8) != 0) {
341 buf = sdscatfmt (buf, "Hostname: %s\r\n",
342 priv->priv_hostname);
343 }
344
345 if (priv->priv_ip[0] != '\0') {
346 buf = sdscatfmt (buf, "IP: %s\r\n", priv->priv_ip);
347 }
348
349 if (priv->priv_user[0] != '\0') {
350 buf = sdscatfmt (buf, "User: %s\r\n", priv->priv_user);
351 }
352
353 buf = sdscatfmt (buf, "Queue-ID: %s\r\n", priv->queue_id);
354
355 if (cfg->spamd_settings_id) {
356 buf = sdscatfmt (buf, "Settings-ID: %s\r\n", cfg->spamd_settings_id);
357 }
358
359 if (priv->mta_tag[0] != '\0') {
360 buf = sdscatfmt (buf, "MTA-Tag: %s\r\n", priv->mta_tag);
361 }
362
363 if (dkim_only) {
364 /* Add specific settings to enable merely DKIM module */
365 buf = sdscatfmt (buf, "Settings: {\"groups_enabled\":[\"dkim\"]}\r\n");
366 }
367
368 if (priv->file[0] != '\0') {
369 if (cfg->compression_enable) {
370 unsigned char *out;
371 size_t outlen;
372
373 map = mmap (NULL, sb.st_size, PROT_READ, MAP_SHARED, fd, 0);
374
375 if (map == MAP_FAILED) {
376 map = NULL;
377 msg_warn ("<%s>; rspamd: mmap (%s), %s", priv->mlfi_id, srv->name,
378 strerror (errno));
379 goto err;
380 }
381
382 outlen = ZSTD_compressBound (sb.st_size);
383 out = malloc (outlen);
384
385 if (out == NULL) {
386 msg_warn ("<%s>; rspamd: malloc (%s), %s", priv->mlfi_id, srv->name,
387 strerror (errno));
388 goto err;
389 }
390
391 r = ZSTD_compress (out, outlen, map, sb.st_size, 1);
392 msg_info ("<%s>; rspamd: compressed message, %lu bytes to %lu bytes",
393 priv->mlfi_id, (unsigned long)sb.st_size, (unsigned long)r);
394 munmap (map, sb.st_size);
395 map = NULL;
396
397 if (ZSTD_isError (r)) {
398 msg_warn ("<%s>; rspamd: zstd compress (%s), %s",
399 priv->mlfi_id, srv->name,
400 ZSTD_getErrorName (r));
401 free (out);
402 goto err;
403 }
404
405 buf = sdscatfmt (buf, "Compression: zstd\r\n");
406 buf = sdscatfmt (buf, "Content-Type: application/x-compressed\r\n"
407 "Content-Length: %U\r\n\r\n", r);
408
409 if (rmilter_atomic_write (s, buf, sdslen (buf)) == -1) {
410 msg_warn("<%s>; rspamd: write (%s), %s", priv->mlfi_id, srv->name,
411 strerror (errno));
412 goto err;
413 }
414
415 if (rmilter_atomic_write (s, out, r) == -1) {
416 free (out);
417 msg_warn ("<%s>; rspamd: write (%s), %s", priv->mlfi_id, srv->name,
418 strerror (errno));
419 goto err;
420 }
421
422 free (out);
423 }
424 else {
425 (void)map;
426 r = sb.st_size;
427 buf = sdscatfmt (buf, "Content-Type: text/plain\r\n"
428 "Content-Length: %U\r\n\r\n", r);
429
430 if (rmilter_atomic_write (s, buf, sdslen (buf)) == -1) {
431 msg_warn("<%s>; rspamd: write (%s), %s", priv->mlfi_id, srv->name,
432 strerror (errno));
433 goto err;
434 }
435
436 #if defined(FREEBSD) && defined(HAVE_SENDFILE)
437 if (sendfile(fd, s, 0, 0, 0, 0, 0) != 0) {
438 msg_warn("<%s>; rspamd: sendfile (%s), %s", priv->mlfi_id, srv->name, strerror (errno));
439 goto err;
440 }
441 #elif defined(LINUX) && defined(HAVE_SENDFILE)
442 off_t off = 0;
443 if (sendfile(s, fd, &off, sb.st_size) == -1) {
444 msg_warn("<%s>; rspamd: sendfile (%s), %s", priv->mlfi_id, srv->name, strerror (errno));
445 goto err;
446 }
447 #else
448 map = mmap (NULL, sb.st_size, PROT_READ, MAP_SHARED, fd, 0);
449
450 if (map == MAP_FAILED) {
451 map = NULL;
452 msg_warn ("<%s>; rspamd: mmap (%s), %s", priv->mlfi_id, srv->name,
453 strerror (errno));
454 goto err;
455 }
456
457 if (rmilter_atomic_write (s, map, sb.st_size) == -1) {
458 msg_warn ("<%s>; rspamd: write (%s), %s", priv->mlfi_id, srv->name,
459 strerror (errno));
460 goto err;
461 }
462
463 munmap (map, sb.st_size);
464 #endif
465 }
466 }
467 else {
468 buf = sdscatfmt (buf, "Content-Length: 0\r\n\r\n");
469
470 if (rmilter_atomic_write (s, buf, sdslen (buf)) == -1) {
471 msg_warn("<%s>; rspamd: write (%s), %s", priv->mlfi_id, srv->name,
472 strerror (errno));
473 goto err;
474 }
475 }
476
477 fcntl (s, F_SETFL, ofl|O_NONBLOCK);
478
479 /*
480 * read results
481 */
482 sdsclear (buf);
483
484 for (;;) {
485 ssize_t r;
486
487 if (io_buf == NULL) {
488 io_buf = malloc (iobuf_len);
489 }
490
491 if (io_buf == NULL) {
492 msg_err ("<%s>; rspamd: malloc (%s), %s", priv->mlfi_id, srv->name,
493 strerror (errno));
494 goto err;
495 }
496
497 if (rmilter_poll_fd (s, cfg->spamd_results_timeout, POLLIN) < 1) {
498 msg_warn("<%s>; rspamd: timeout waiting results %s", priv->mlfi_id,
499 srv->name);
500 goto err;
501 }
502
503 r = read (s, io_buf, iobuf_len);
504
505 if (r == -1) {
506 if (errno == EAGAIN || errno == EINTR) {
507 continue;
508 }
509 else {
510 msg_warn("<%s>; rspamd: read, %s, %s", priv->mlfi_id, srv->name,
511 strerror (errno));
512 goto err;
513 }
514 }
515 else if (r == 0) {
516 break;
517 }
518 else {
519 buf = sdscatlen (buf, io_buf, r);
520 }
521 }
522
523 size = sdslen (buf);
524
525 if (size == 0) {
526 msg_err ("<%s>; rspamd; got empty reply from %s",
527 priv->mlfi_id, srv->name);
528 goto err;
529 }
530
531 /* Now we need to parse HTTP reply */
532 memset (&parser, 0, sizeof (parser));
533 http_parser_init (&parser, HTTP_RESPONSE);
534
535 memset (&ps, 0, sizeof (ps));
536 res->priv = priv;
537 res->compressed = cfg->compression_enable;
538 ps.on_body = rmilter_spamd_parser_on_body;
539 parser.data = res;
540 parser.content_length = size;
541
542 if (http_parser_execute (&parser, &ps, buf, size) != (size_t)size) {
543 msg_err ("<%s>; rspamd; HTTP parser error: %s when rspamd reply",
544 priv->mlfi_id, http_errno_description (parser.http_errno));
545 goto err;
546 }
547
548 if (!res->parsed) {
549 if (parser.status_code != 200) {
550 msg_err ("<%s>; rspamd; HTTP error: bad status code: %d",
551 priv->mlfi_id, (int)parser.status_code);
552 }
553 else {
554 msg_err ("<%s>; rspamd; HTTP error: cannot parse reply",
555 priv->mlfi_id);
556 }
557
558 goto err;
559 }
560
561 ret = 0;
562
563 err:
564 if (fd != -1) {
565 close (fd);
566 }
567
568 if (s != -1) {
569 close (s);
570 }
571
572 if (buf) {
573 sdsfree (buf);
574 }
575
576 if (map != NULL) {
577 munmap (map, sb.st_size);
578 }
579
580 if (io_buf) {
581 free (io_buf);
582 }
583
584 return ret;
585 }
586 #undef TEST_WORD
587
588 static void
rmiler_process_rspamd_block(const ucl_object_t * obj,SMFICTX * ctx)589 rmiler_process_rspamd_block (const ucl_object_t *obj, SMFICTX *ctx)
590 {
591 const ucl_object_t *elt, *cur, *cur_elt;
592 ucl_object_iter_t it;
593 int nhdr;
594
595 if (obj && ucl_object_type (obj) == UCL_OBJECT) {
596 elt = ucl_object_lookup (obj, "remove_headers");
597 /*
598 * remove_headers: {"name": 1, ... }
599 * where number is the header's position starting from '1'
600 */
601 if (elt && ucl_object_type (elt) == UCL_OBJECT) {
602 it = NULL;
603
604 while ((cur = ucl_object_iterate (elt, &it, true)) != NULL) {
605 if (ucl_object_type (cur) == UCL_INT) {
606 nhdr = ucl_object_toint (cur);
607
608 if (nhdr >= 1) {
609 smfi_chgheader (ctx, (char *)ucl_object_key (cur),
610 nhdr, NULL);
611 }
612 }
613 }
614 }
615
616 elt = ucl_object_lookup (obj, "add_headers");
617 /*
618 * add_headers: {"name": "value", ... }
619 * name could have multiple values
620 */
621 if (elt && ucl_object_type (elt) == UCL_OBJECT) {
622 it = NULL;
623
624 while ((cur = ucl_object_iterate (elt, &it, true)) != NULL) {
625 LL_FOREACH (cur, cur_elt) {
626 if (ucl_object_type (cur_elt) == UCL_STRING) {
627 smfi_addheader (ctx, (char *)ucl_object_key (cur),
628 (char *)ucl_object_tostring (cur_elt));
629 }
630 }
631 }
632 }
633 }
634 }
635
636 /*
637 * spamdscan() - send file to one of remote spamd, with pseudo load-balancing
638 * (select one random server, fallback to others in case of errors).
639 *
640 * returns 0 if file scanned and spam not found,
641 * 1 if file scanned and spam found ,
642 * 2 if file scanned and this is probably spam,
643 * -1 when retry limit exceeded, -2 on unexpected error, e.g. unexpected reply from
644 * server (suppose scanned message killed spamd...)
645 */
646
647 struct rspamd_metric_result*
spamdscan(void * _ctx,struct mlfi_priv * priv,struct config_file * cfg,int extra,int dkim_only)648 spamdscan (void *_ctx, struct mlfi_priv *priv, struct config_file *cfg, int extra,
649 int dkim_only)
650 {
651 static const int max_syslog_len = 900;
652 int retry, r = -2, to_trace = 0, i, j, ret;
653 struct timeval t;
654 double ts, tf;
655 struct spamd_server *selected = NULL;
656 char bar_buf[128], hdrbuf[256];
657 char *prefix = "s", *c;
658 struct rspamd_metric_result *res;
659 struct rspamd_symbol *cur_symbol, *tmp_symbol;
660 struct timespec sleep_ts;
661 SMFICTX *ctx = _ctx;
662 sds optbuf, logbuf, headerbuf;
663 const ucl_object_t *obj;
664 struct rcpt *rcpt;
665 bool extended_options = true, print_symbols = true, extended_headers = false;
666
667 gettimeofday (&t, NULL);
668 ts = t.tv_sec + t.tv_usec / 1000000.0;
669 retry = cfg->spamd_retry_count;
670 sleep_ts.tv_sec = cfg->spamd_retry_timeout / 1000;
671 sleep_ts.tv_nsec = (cfg->spamd_retry_timeout % 1000) * 1000000ULL;
672
673 res = malloc (sizeof (*res));
674
675 if (res == NULL) {
676 msg_err("<%s>; spamdscan: malloc falied, %s", priv->mlfi_id,
677 strerror (errno));
678 return NULL;
679 }
680
681 memset (res, 0, sizeof (*res));
682
683 /* try to scan with available servers */
684 while (1) {
685 if (extra) {
686 selected = (struct spamd_server *) get_random_upstream (
687 (void *) cfg->extra_spamd_servers,
688 cfg->extra_spamd_servers_num, sizeof(struct spamd_server),
689 t.tv_sec, cfg->spamd_error_time, cfg->spamd_dead_time,
690 cfg->spamd_maxerrors, priv);
691 }
692 else {
693 selected = (struct spamd_server *) get_random_upstream (
694 (void *) cfg->spamd_servers, cfg->spamd_servers_num,
695 sizeof(struct spamd_server), t.tv_sec,
696 cfg->spamd_error_time, cfg->spamd_dead_time,
697 cfg->spamd_maxerrors, priv);
698 }
699 if (selected == NULL) {
700 msg_err("<%s>; spamdscan: upstream get error, %s", priv->mlfi_id,
701 priv->file);
702 free (res);
703
704 return NULL;
705 }
706
707 msg_info ("<%s>; spamdscan: start scanning message on %s", priv->mlfi_id,
708 selected->name);
709
710 prefix = "rs";
711 r = rspamdscan_socket (ctx, priv, selected, cfg, res, dkim_only);
712
713 msg_info("<%s>; spamdscan: finish scanning message on %s", priv->mlfi_id,
714 selected->name);
715
716 if (r == 0 || r == 1) {
717 upstream_ok (&selected->up, t.tv_sec);
718 break;
719 }
720 upstream_fail (&selected->up, t.tv_sec);
721 if (r == -2) {
722 msg_warn("<%s>; %spamdscan: unexpected problem, %s, %s",
723 priv->mlfi_id, prefix, selected->name, priv->file);
724 break;
725 }
726 if (--retry < 1) {
727 msg_warn("<%s>; %spamdscan: retry limit exceeded, %s, %s",
728 priv->mlfi_id, prefix, selected->name, priv->file);
729 break;
730 }
731
732 msg_warn("<%s>; %spamdscan: failed to scan, retry, %s, %s",
733 priv->mlfi_id, prefix, selected->name, priv->file);
734 nanosleep (&sleep_ts, NULL);
735 }
736
737 if (r < 0) {
738 free (res);
739 return NULL;
740 }
741
742 /*
743 * print scanning time, server and result
744 */
745 gettimeofday (&t, NULL);
746 tf = t.tv_sec + t.tv_usec / 1000000.0;
747
748 logbuf = sdsnewlen (NULL, 1024);
749 headerbuf = sdsnewlen (NULL, 512);
750
751 if (res->symbols) {
752 /* Sort symbols by scores from high to low */
753 DL_SORT (res->symbols, rmilter_spamd_symcmp);
754 }
755
756 if (res->message_id) {
757 rmilter_strlcpy (priv->message_id, res->message_id,
758 sizeof (priv->message_id));
759 }
760
761 extended_headers = true;
762 DL_FOREACH (priv->rcpts, rcpt) {
763 if (!is_whitelisted_rcpt (&cfg->extended_rcpts, rcpt->r_addr)) {
764 extended_headers = false;
765 }
766 }
767
768 if (!extended_headers && cfg->extended_spam_headers && !priv->authenticated) {
769 extended_headers = true;
770 }
771
772 log_retry:
773 sdsclear (logbuf);
774 sdsclear (headerbuf);
775
776 /* Parse res tailq */
777 if (extended_headers) {
778 headerbuf = sdscatprintf (headerbuf, "%s: %s [%.2f / %.2f]%c",
779 "default", res->score > res->required_score ? "True" : "False",
780 res->score, res->required_score,
781 res->symbols != NULL ? '\n' : ' ');
782 }
783
784 logbuf = sdscatprintf (logbuf,
785 "<%s>; spamdscan: scan, time: %.3f, server: %s, metric: "
786 "default: [%.3f / %.3f], symbols: ",
787 priv->mlfi_id, tf - ts, selected->name, res->score,
788 res->required_score);
789
790 /* Write symbols */
791 if (res->symbols == NULL) {
792 logbuf = sdscatprintf (logbuf, "no symbols");
793 }
794 else {
795 optbuf = sdsempty ();
796
797 DL_FOREACH_SAFE (res->symbols, cur_symbol, tmp_symbol) {
798 sdsclear (optbuf);
799
800 if (cur_symbol->symbol) {
801
802 if (cur_symbol->options && extended_options) {
803 ucl_object_iter_t it = NULL;
804 const ucl_object_t *elt;
805 bool first = true;
806
807 while ((elt = ucl_object_iterate (cur_symbol->options,
808 &it, true)) != NULL) {
809 if (ucl_object_type (elt) == UCL_STRING) {
810 if (first) {
811 optbuf = sdscat (optbuf,
812 ucl_object_tostring (elt));
813 first = false;
814 }
815 else {
816 optbuf = sdscat (optbuf, ", ");
817 optbuf = sdscat (optbuf,
818 ucl_object_tostring (elt));
819 }
820 }
821 }
822 }
823
824 if (print_symbols) {
825 if (cur_symbol->next) {
826 logbuf = sdscatprintf (logbuf, "%s(%.2f)[%s], ",
827 cur_symbol->symbol, cur_symbol->score, optbuf);
828 }
829 else {
830 logbuf = sdscatprintf (logbuf, "%s(%.2f)[%s]",
831 cur_symbol->symbol, cur_symbol->score, optbuf);
832 }
833 if (cfg->trace_symbol) {
834 c = strchr (cur_symbol->symbol, '(');
835 if (c != NULL) {
836 *c = '\0';
837 }
838 if (!strcmp (cfg->trace_symbol, cur_symbol->symbol)) {
839 to_trace++;
840 }
841 }
842 }
843
844 if (extended_headers) {
845 if (cur_symbol->next) {
846 headerbuf = sdscatprintf (headerbuf,
847 " %s(%.2f)[%s]\n", cur_symbol->symbol,
848 cur_symbol->score, optbuf);
849 }
850 else {
851 headerbuf = sdscatprintf (headerbuf,
852 " %s(%.2f)[%s]",
853 cur_symbol->symbol,
854 cur_symbol->score, optbuf);
855 }
856 }
857 }
858 }
859
860 sdsfree (optbuf);
861 }
862
863 if (sdslen (logbuf) > max_syslog_len) {
864 if (extended_options) {
865 /* Try to retry without options */
866 extended_options = false;
867 msg_info ("<%s>; spamdscan: too large reply: %d, skip options",
868 priv->mlfi_id, (int)sdslen (logbuf));
869 goto log_retry;
870 }
871 else if (print_symbols) {
872 msg_info ("<%s>; spamdscan: too large reply: %d, skip symbols",
873 priv->mlfi_id, (int)sdslen (logbuf));
874 print_symbols = false;
875 goto log_retry;
876 }
877 else {
878 /* Truncate reply */
879 msg_err ("<%s>; spamdscan: too large reply: %d, truncate reply",
880 priv->mlfi_id, (int)sdslen (logbuf));
881 sdsrange (logbuf, 0, max_syslog_len);
882 }
883 }
884
885 msg_info ("%s", logbuf);
886 sdsfree (logbuf);
887
888 if (extended_headers) {
889 if (extra) {
890 smfi_addheader (ctx, "X-Spamd-Extra-Result", headerbuf);
891 }
892 else {
893 smfi_addheader (ctx, "X-Spamd-Result", headerbuf);
894 }
895 }
896
897 sdsfree (headerbuf);
898
899 /* All other statistic headers */
900 if (extended_headers) {
901 if (extra) {
902 smfi_addheader (ctx, "X-Rspamd-Extra-Server", selected->name);
903 snprintf (hdrbuf, sizeof(hdrbuf), "%.2f", tf - ts);
904 smfi_addheader (ctx, "X-Rspamd-Extra-Scan-Time", hdrbuf);
905 }
906 else {
907 smfi_addheader (ctx, "X-Rspamd-Server", selected->name);
908 snprintf (hdrbuf, sizeof (hdrbuf), "%.2f", tf - ts);
909 smfi_addheader (ctx, "X-Rspamd-Scan-Time", hdrbuf);
910 smfi_addheader (ctx, "X-Rspamd-Queue-ID", priv->queue_id);
911 }
912 }
913
914 if (res->dkim_signature) {
915 /* Add dkim signature passed from rspamd */
916
917 /*
918 * According to milter docs, we need to be extra careful
919 * when folding headers:
920 * Neither the name nor the value of the header is checked for standards
921 * compliance. However, each line of the header must be under 2048
922 * characters and should be under 998 characters.
923 * If longer headers are needed, make them multi-line.
924 * To make a multi-line header, insert a line feed (ASCII 0x0a, or \n
925 * in C) followed by at least one whitespace character such as a
926 * space (ASCII 0x20) or tab (ASCII 0x09, or \t in C).
927 * The line feed should NOT be preceded by a carriage return (ASCII 0x0d);
928 * the MTA will add this automatically.
929 * It is the filter writer's responsibility to ensure that no s
930 * tandards are violated.
931 */
932 char *dkim_buf = malloc (strlen (res->dkim_signature) + 1);
933
934 if (dkim_buf != NULL) {
935 char *d;
936 const char *s;
937
938 s = res->dkim_signature;
939 d = dkim_buf;
940
941 while (*s) {
942 size_t len = strcspn (s, "\r");
943
944 if (len > 0) {
945 memcpy (d, s, len);
946 d += len;
947 s += len + strspn (s + len, "\r");
948 }
949 else {
950 s ++;
951 }
952 }
953
954 *d = '\0';
955
956 smfi_addheader (ctx, "DKIM-Signature", dkim_buf);
957
958 free (dkim_buf);
959 }
960 }
961
962 obj = ucl_object_lookup (res->obj, "rmilter");
963 if (obj) {
964 rmiler_process_rspamd_block (obj, ctx);
965 }
966
967 obj = ucl_object_lookup (res->obj, "messages");
968 if (obj) {
969 const ucl_object_t *smtp_res;
970
971 smtp_res = ucl_object_lookup (obj, "smtp_message");
972
973 if (smtp_res) {
974 res->message = ucl_object_tostring (smtp_res);
975 }
976 }
977
978 /* Trace spam messages to specific addr */
979 if (!extra && to_trace && cfg->trace_addr) {
980 smfi_addrcpt (ctx, cfg->trace_addr);
981 smfi_setpriv (ctx, priv);
982 }
983
984 return res;
985 }
986
987 void
spamd_free_result(struct rspamd_metric_result * mres)988 spamd_free_result (struct rspamd_metric_result *mres)
989 {
990 struct rspamd_symbol *cur_symbol, *tmp_symbol;
991
992 if (mres) {
993 DL_FOREACH_SAFE (mres->symbols, cur_symbol, tmp_symbol) {
994 free (cur_symbol);
995 }
996
997 ucl_object_unref (mres->obj);
998 free (mres);
999 }
1000 }
1001