1 /*-
2 * Copyright 2019 Vsevolod Stakhov
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 /*
17 * Implementation of map files handling
18 */
19
20 #include "config.h"
21 #include "map.h"
22 #include "map_private.h"
23 #include "libserver/http/http_connection.h"
24 #include "libserver/http/http_private.h"
25 #include "rspamd.h"
26 #include "contrib/libev/ev.h"
27 #include "contrib/uthash/utlist.h"
28
29 #ifdef SYS_ZSTD
30 # include "zstd.h"
31 #else
32 # include "contrib/zstd/zstd.h"
33 #endif
34
35 #undef MAP_DEBUG_REFS
36 #ifdef MAP_DEBUG_REFS
37 #define MAP_RETAIN(x, t) do { \
38 msg_err (G_GNUC_PRETTY_FUNCTION ": " t ": retain ref %p, refcount: %d -> %d", (x), (x)->ref.refcount, (x)->ref.refcount + 1); \
39 REF_RETAIN(x); \
40 } while (0)
41
42 #define MAP_RELEASE(x, t) do { \
43 msg_err (G_GNUC_PRETTY_FUNCTION ": " t ": release ref %p, refcount: %d -> %d", (x), (x)->ref.refcount, (x)->ref.refcount - 1); \
44 REF_RELEASE(x); \
45 } while (0)
46 #else
47 #define MAP_RETAIN(x, t) REF_RETAIN(x)
48 #define MAP_RELEASE(x, t) REF_RELEASE(x)
49 #endif
50
51 enum rspamd_map_periodic_opts {
52 RSPAMD_MAP_SCHEDULE_NORMAL = 0,
53 RSPAMD_MAP_SCHEDULE_ERROR = (1u << 0u),
54 RSPAMD_MAP_SCHEDULE_LOCKED = (1u << 1u),
55 RSPAMD_MAP_SCHEDULE_INIT = (1u << 2u),
56 };
57
58 static void free_http_cbdata_common (struct http_callback_data *cbd,
59 gboolean plan_new);
60 static void free_http_cbdata_dtor (gpointer p);
61 static void free_http_cbdata (struct http_callback_data *cbd);
62 static void rspamd_map_process_periodic (struct map_periodic_cbdata *cbd);
63 static void rspamd_map_schedule_periodic (struct rspamd_map *map, int how);
64 static gboolean read_map_file_chunks (struct rspamd_map *map,
65 struct map_cb_data *cbdata,
66 const gchar *fname,
67 gsize len,
68 goffset off);
69 static gboolean rspamd_map_save_http_cached_file (struct rspamd_map *map,
70 struct rspamd_map_backend *bk,
71 struct http_map_data *htdata,
72 const guchar *data,
73 gsize len);
74 static gboolean rspamd_map_update_http_cached_file (struct rspamd_map *map,
75 struct rspamd_map_backend *bk,
76 struct http_map_data *htdata);
77
78 guint rspamd_map_log_id = (guint)-1;
RSPAMD_CONSTRUCTOR(rspamd_map_log_init)79 RSPAMD_CONSTRUCTOR(rspamd_map_log_init)
80 {
81 rspamd_map_log_id = rspamd_logger_add_debug_module("map");
82 }
83
84 /**
85 * Write HTTP request
86 */
87 static void
write_http_request(struct http_callback_data * cbd)88 write_http_request (struct http_callback_data *cbd)
89 {
90 gchar datebuf[128];
91 struct rspamd_http_message *msg;
92
93 msg = rspamd_http_new_message (HTTP_REQUEST);
94
95 if (cbd->bk->protocol == MAP_PROTO_HTTPS) {
96 msg->flags |= RSPAMD_HTTP_FLAG_SSL;
97 }
98
99 if (cbd->check) {
100 msg->method = HTTP_HEAD;
101 }
102
103 msg->url = rspamd_fstring_append (msg->url,
104 cbd->data->path, strlen (cbd->data->path));
105
106 if (cbd->check) {
107 if (cbd->data->last_modified != 0) {
108 rspamd_http_date_format (datebuf, sizeof (datebuf),
109 cbd->data->last_modified);
110 rspamd_http_message_add_header (msg, "If-Modified-Since",
111 datebuf);
112 }
113 if (cbd->data->etag) {
114 rspamd_http_message_add_header_len (msg, "If-None-Match",
115 cbd->data->etag->str, cbd->data->etag->len);
116 }
117 }
118
119 msg->url = rspamd_fstring_append (msg->url, cbd->data->rest,
120 strlen (cbd->data->rest));
121
122 if (cbd->data->userinfo) {
123 rspamd_http_message_add_header (msg, "Authorization",
124 cbd->data->userinfo);
125 }
126
127 MAP_RETAIN (cbd, "http_callback_data");
128 rspamd_http_connection_write_message (cbd->conn,
129 msg,
130 cbd->data->host,
131 NULL,
132 cbd,
133 cbd->timeout);
134 }
135
136 /**
137 * Callback for destroying HTTP callback data
138 */
139 static void
free_http_cbdata_common(struct http_callback_data * cbd,gboolean plan_new)140 free_http_cbdata_common (struct http_callback_data *cbd, gboolean plan_new)
141 {
142 struct map_periodic_cbdata *periodic = cbd->periodic;
143
144 if (cbd->shmem_data) {
145 rspamd_http_message_shmem_unref (cbd->shmem_data);
146 }
147
148 if (cbd->pk) {
149 rspamd_pubkey_unref (cbd->pk);
150 }
151
152 if (cbd->conn) {
153 rspamd_http_connection_unref (cbd->conn);
154 cbd->conn = NULL;
155 }
156
157 if (cbd->addrs) {
158 rspamd_inet_addr_t *addr;
159 guint i;
160
161 PTR_ARRAY_FOREACH (cbd->addrs, i, addr) {
162 rspamd_inet_address_free (addr);
163 }
164
165 g_ptr_array_free (cbd->addrs, TRUE);
166 }
167
168
169 MAP_RELEASE (cbd->bk, "rspamd_map_backend");
170
171 if (periodic) {
172 /* Detached in case of HTTP error */
173 MAP_RELEASE (periodic, "periodic");
174 }
175
176 g_free (cbd);
177 }
178
179 static void
free_http_cbdata(struct http_callback_data * cbd)180 free_http_cbdata (struct http_callback_data *cbd)
181 {
182 cbd->map->tmp_dtor = NULL;
183 cbd->map->tmp_dtor_data = NULL;
184
185 free_http_cbdata_common (cbd, TRUE);
186 }
187
188 static void
free_http_cbdata_dtor(gpointer p)189 free_http_cbdata_dtor (gpointer p)
190 {
191 struct http_callback_data *cbd = p;
192 struct rspamd_map *map;
193
194 map = cbd->map;
195 if (cbd->stage == http_map_http_conn) {
196 REF_RELEASE (cbd);
197 }
198 else {
199 /* We cannot terminate DNS requests sent */
200 cbd->stage = http_map_terminated;
201 }
202
203 msg_warn_map ("%s: "
204 "connection with http server is terminated: worker is stopping",
205 map->name);
206 }
207
208 /*
209 * HTTP callbacks
210 */
211 static void
http_map_error(struct rspamd_http_connection * conn,GError * err)212 http_map_error (struct rspamd_http_connection *conn,
213 GError *err)
214 {
215 struct http_callback_data *cbd = conn->ud;
216 struct rspamd_map *map;
217
218 map = cbd->map;
219
220 if (cbd->periodic) {
221 cbd->periodic->errored = TRUE;
222 msg_err_map ("error reading %s(%s): "
223 "connection with http server terminated incorrectly: %e",
224 cbd->bk->uri,
225 cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
226 err);
227
228 rspamd_map_process_periodic (cbd->periodic);
229 }
230
231 MAP_RELEASE (cbd, "http_callback_data");
232 }
233
234 static void
rspamd_map_cache_cb(struct ev_loop * loop,ev_timer * w,int revents)235 rspamd_map_cache_cb (struct ev_loop *loop, ev_timer *w, int revents)
236 {
237 struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *)
238 w->data;
239 struct rspamd_map *map;
240 struct http_map_data *data;
241
242 map = cache_cbd->map;
243 data = cache_cbd->data;
244
245 if (cache_cbd->gen != cache_cbd->data->gen) {
246 /* We have another update, so this cache element is obviously expired */
247 /*
248 * Important!: we do not set cache availability to zero here, as there
249 * might be fresh cache
250 */
251 msg_info_map ("cached data is now expired (gen mismatch %L != %L) for %s",
252 cache_cbd->gen, cache_cbd->data->gen, map->name);
253 MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
254 ev_timer_stop (loop, &cache_cbd->timeout);
255 g_free (cache_cbd);
256 }
257 else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) {
258 /*
259 * We checked map but we have not found anything more recent,
260 * reschedule cache check
261 */
262 if (cache_cbd->map->poll_timeout >
263 rspamd_get_calendar_ticks () - cache_cbd->data->last_checked) {
264 w->repeat = cache_cbd->map->poll_timeout -
265 (rspamd_get_calendar_ticks () - cache_cbd->data->last_checked);
266 }
267 else {
268 w->repeat = cache_cbd->map->poll_timeout;
269 }
270
271 if (w->repeat < 0) {
272 msg_info_map ("cached data for %s has skewed check time: %d last checked, %d poll timeout, %.2f diff",
273 map->name, (int)cache_cbd->data->last_checked,
274 (int)cache_cbd->map->poll_timeout,
275 (rspamd_get_calendar_ticks () - cache_cbd->data->last_checked));
276 w->repeat = 0.0;
277 }
278
279 cache_cbd->last_checked = cache_cbd->data->last_checked;
280 msg_debug_map ("cached data is up to date for %s", map->name);
281 ev_timer_again (loop, &cache_cbd->timeout);
282 }
283 else {
284 data->cur_cache_cbd = NULL;
285 g_atomic_int_set (&data->cache->available, 0);
286 MAP_RELEASE (cache_cbd->shm, "rspamd_http_map_cached_cbdata");
287 msg_info_map ("cached data is now expired for %s", map->name);
288 ev_timer_stop (loop, &cache_cbd->timeout);
289 g_free (cache_cbd);
290 }
291 }
292
293 static int
http_map_finish(struct rspamd_http_connection * conn,struct rspamd_http_message * msg)294 http_map_finish (struct rspamd_http_connection *conn,
295 struct rspamd_http_message *msg)
296 {
297 struct http_callback_data *cbd = conn->ud;
298 struct rspamd_map *map;
299 struct rspamd_map_backend *bk;
300 struct http_map_data *data;
301 struct rspamd_http_map_cached_cbdata *cache_cbd;
302 const rspamd_ftok_t *expires_hdr, *etag_hdr;
303 char next_check_date[128];
304 guchar *in = NULL;
305 gsize dlen = 0;
306
307 map = cbd->map;
308 bk = cbd->bk;
309 data = bk->data.hd;
310
311 if (msg->code == 200) {
312
313 if (cbd->check) {
314 msg_info_map ("need to reread map from %s", cbd->bk->uri);
315 cbd->periodic->need_modify = TRUE;
316 /* Reset the whole chain */
317 cbd->periodic->cur_backend = 0;
318 /* Reset cache, old cached data will be cleaned on timeout */
319 g_atomic_int_set (&data->cache->available, 0);
320 data->cur_cache_cbd = NULL;
321
322 rspamd_map_process_periodic (cbd->periodic);
323 MAP_RELEASE (cbd, "http_callback_data");
324
325 return 0;
326 }
327
328 cbd->data->last_checked = msg->date;
329
330 if (msg->last_modified) {
331 cbd->data->last_modified = msg->last_modified;
332 }
333 else {
334 cbd->data->last_modified = msg->date;
335 }
336
337
338 /* Unsigned version - just open file */
339 cbd->shmem_data = rspamd_http_message_shmem_ref (msg);
340 cbd->data_len = msg->body_buf.len;
341
342 if (cbd->data_len == 0) {
343 msg_err_map ("cannot read empty map");
344 goto err;
345 }
346
347 g_assert (cbd->shmem_data != NULL);
348
349 in = rspamd_shmem_xmap (cbd->shmem_data->shm_name, PROT_READ, &dlen);
350
351 if (in == NULL) {
352 msg_err_map ("cannot read tempfile %s: %s",
353 cbd->shmem_data->shm_name,
354 strerror (errno));
355 goto err;
356 }
357
358 /* Check for expires */
359 double cached_timeout = map->poll_timeout * 2;
360
361 expires_hdr = rspamd_http_message_find_header (msg, "Expires");
362
363 if (expires_hdr) {
364 time_t hdate;
365
366 hdate = rspamd_http_parse_date (expires_hdr->begin, expires_hdr->len);
367
368 if (hdate != (time_t)-1 && hdate > msg->date) {
369 cached_timeout = map->next_check - msg->date +
370 map->poll_timeout * 2;
371
372 map->next_check = hdate;
373 }
374 else {
375 msg_info_map ("invalid expires header: %T, ignore it", expires_hdr);
376 map->next_check = 0;
377 }
378 }
379
380 /* Check for etag */
381 etag_hdr = rspamd_http_message_find_header (msg, "ETag");
382
383 if (etag_hdr) {
384 if (cbd->data->etag) {
385 /* Remove old etag */
386 rspamd_fstring_free (cbd->data->etag);
387 }
388
389 cbd->data->etag = rspamd_fstring_new_init (etag_hdr->begin,
390 etag_hdr->len);
391 }
392 else {
393 if (cbd->data->etag) {
394 /* Remove and clear old etag */
395 rspamd_fstring_free (cbd->data->etag);
396 cbd->data->etag = NULL;
397 }
398 }
399
400 MAP_RETAIN (cbd->shmem_data, "shmem_data");
401 cbd->data->gen ++;
402 /*
403 * We know that a map is in the locked state
404 */
405 g_atomic_int_set (&data->cache->available, 1);
406 /* Store cached data */
407 rspamd_strlcpy (data->cache->shmem_name, cbd->shmem_data->shm_name,
408 sizeof (data->cache->shmem_name));
409 data->cache->len = cbd->data_len;
410 data->cache->last_modified = cbd->data->last_modified;
411 cache_cbd = g_malloc0 (sizeof (*cache_cbd));
412 cache_cbd->shm = cbd->shmem_data;
413 cache_cbd->event_loop = cbd->event_loop;
414 cache_cbd->map = map;
415 cache_cbd->data = cbd->data;
416 cache_cbd->last_checked = cbd->data->last_checked;
417 cache_cbd->gen = cbd->data->gen;
418 MAP_RETAIN (cache_cbd->shm, "shmem_data");
419
420 ev_timer_init (&cache_cbd->timeout, rspamd_map_cache_cb, cached_timeout,
421 0.0);
422 ev_timer_start (cbd->event_loop, &cache_cbd->timeout);
423 cache_cbd->timeout.data = cache_cbd;
424 data->cur_cache_cbd = cache_cbd;
425
426 if (map->next_check) {
427 rspamd_http_date_format (next_check_date, sizeof (next_check_date),
428 map->next_check);
429 }
430 else {
431 rspamd_http_date_format (next_check_date, sizeof (next_check_date),
432 rspamd_get_calendar_ticks () + map->poll_timeout);
433 }
434
435
436 if (cbd->bk->is_compressed) {
437 ZSTD_DStream *zstream;
438 ZSTD_inBuffer zin;
439 ZSTD_outBuffer zout;
440 guchar *out;
441 gsize outlen, r;
442
443 zstream = ZSTD_createDStream ();
444 ZSTD_initDStream (zstream);
445
446 zin.pos = 0;
447 zin.src = in;
448 zin.size = dlen;
449
450 if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
451 outlen = ZSTD_DStreamOutSize ();
452 }
453
454 out = g_malloc (outlen);
455
456 zout.dst = out;
457 zout.pos = 0;
458 zout.size = outlen;
459
460 while (zin.pos < zin.size) {
461 r = ZSTD_decompressStream (zstream, &zout, &zin);
462
463 if (ZSTD_isError (r)) {
464 msg_err_map ("%s(%s): cannot decompress data: %s",
465 cbd->bk->uri,
466 rspamd_inet_address_to_string_pretty (cbd->addr),
467 ZSTD_getErrorName (r));
468 ZSTD_freeDStream (zstream);
469 g_free (out);
470 MAP_RELEASE (cbd->shmem_data, "shmem_data");
471 goto err;
472 }
473
474 if (zout.pos == zout.size) {
475 /* We need to extend output buffer */
476 zout.size = zout.size * 2 + 1.0;
477 out = g_realloc (zout.dst, zout.size);
478 zout.dst = out;
479 }
480 }
481
482 ZSTD_freeDStream (zstream);
483 msg_info_map ("%s(%s): read map data %z bytes compressed, "
484 "%z uncompressed, next check at %s",
485 cbd->bk->uri,
486 rspamd_inet_address_to_string_pretty (cbd->addr),
487 dlen, zout.pos, next_check_date);
488 map->read_callback (out, zout.pos, &cbd->periodic->cbdata, TRUE);
489 rspamd_map_save_http_cached_file (map, bk, cbd->data, out, zout.pos);
490 g_free (out);
491 }
492 else {
493 msg_info_map ("%s(%s): read map data %z bytes, next check at %s",
494 cbd->bk->uri,
495 rspamd_inet_address_to_string_pretty (cbd->addr),
496 dlen, next_check_date);
497 rspamd_map_save_http_cached_file (map, bk, cbd->data, in, cbd->data_len);
498 map->read_callback (in, cbd->data_len, &cbd->periodic->cbdata, TRUE);
499 }
500
501 MAP_RELEASE (cbd->shmem_data, "shmem_data");
502
503 cbd->periodic->cur_backend ++;
504 munmap (in, dlen);
505 rspamd_map_process_periodic (cbd->periodic);
506 }
507 else if (msg->code == 304 && cbd->check) {
508 cbd->data->last_checked = msg->date;
509
510 if (msg->last_modified) {
511 cbd->data->last_modified = msg->last_modified;
512 }
513 else {
514 cbd->data->last_modified = msg->date;
515 }
516
517 expires_hdr = rspamd_http_message_find_header (msg, "Expires");
518
519 if (expires_hdr) {
520 time_t hdate;
521
522 hdate = rspamd_http_parse_date (expires_hdr->begin, expires_hdr->len);
523 if (hdate != (time_t)-1 && hdate > msg->date) {
524 map->next_check = hdate;
525 }
526 else {
527 msg_info_map ("invalid expires header: %T, ignore it", expires_hdr);
528 map->next_check = 0;
529 }
530 }
531
532 etag_hdr = rspamd_http_message_find_header (msg, "ETag");
533
534 if (etag_hdr) {
535 if (cbd->data->etag) {
536 /* Remove old etag */
537 rspamd_fstring_free (cbd->data->etag);
538 cbd->data->etag = rspamd_fstring_new_init (etag_hdr->begin,
539 etag_hdr->len);
540 }
541 }
542
543 if (map->next_check) {
544 rspamd_http_date_format (next_check_date, sizeof (next_check_date),
545 map->next_check);
546 msg_info_map ("data is not modified for server %s, next check at %s "
547 "(http cache based: %T)",
548 cbd->data->host, next_check_date, expires_hdr);
549 }
550 else {
551 rspamd_http_date_format (next_check_date, sizeof (next_check_date),
552 rspamd_get_calendar_ticks () + map->poll_timeout);
553 msg_info_map ("data is not modified for server %s, next check at %s "
554 "(timer based)",
555 cbd->data->host, next_check_date);
556 }
557
558 rspamd_map_update_http_cached_file (map, bk, cbd->data);
559 cbd->periodic->cur_backend ++;
560 rspamd_map_process_periodic (cbd->periodic);
561 }
562 else {
563 msg_info_map ("cannot load map %s from %s: HTTP error %d",
564 bk->uri, cbd->data->host, msg->code);
565 goto err;
566 }
567
568 MAP_RELEASE (cbd, "http_callback_data");
569 return 0;
570
571 err:
572 cbd->periodic->errored = 1;
573 rspamd_map_process_periodic (cbd->periodic);
574 MAP_RELEASE (cbd, "http_callback_data");
575
576 return 0;
577 }
578
579 static gboolean
read_map_file_chunks(struct rspamd_map * map,struct map_cb_data * cbdata,const gchar * fname,gsize len,goffset off)580 read_map_file_chunks (struct rspamd_map *map, struct map_cb_data *cbdata,
581 const gchar *fname, gsize len, goffset off)
582 {
583 gint fd;
584 gssize r, avail;
585 gsize buflen = 1024 * 1024;
586 gchar *pos, *bytes;
587
588 fd = rspamd_file_xopen (fname, O_RDONLY, 0, TRUE);
589
590 if (fd == -1) {
591 msg_err_map ("can't open map for buffered reading %s: %s",
592 fname, strerror (errno));
593 return FALSE;
594 }
595
596 if (lseek (fd, off, SEEK_SET) == -1) {
597 msg_err_map ("can't seek in map to pos %d for buffered reading %s: %s",
598 (gint)off, fname, strerror (errno));
599 close (fd);
600
601 return FALSE;
602 }
603
604 buflen = MIN (len, buflen);
605 bytes = g_malloc (buflen);
606 avail = buflen;
607 pos = bytes;
608
609 while ((r = read (fd, pos, avail)) > 0) {
610 gchar *end = bytes + (pos - bytes) + r;
611 msg_debug_map ("%s: read map chunk, %z bytes", fname,
612 r);
613 pos = map->read_callback (bytes, end - bytes, cbdata, r == len);
614
615 if (pos && pos > bytes && pos < end) {
616 guint remain = end - pos;
617
618 memmove (bytes, pos, remain);
619 pos = bytes + remain;
620 /* Need to preserve the remain */
621 avail = ((gssize)buflen) - remain;
622
623 if (avail <= 0) {
624 /* Try realloc, too large element */
625 g_assert (buflen >= remain);
626 bytes = g_realloc (bytes, buflen * 2);
627
628 pos = bytes + remain; /* Adjust */
629 avail += buflen;
630 buflen *= 2;
631 }
632 }
633 else {
634 avail = buflen;
635 pos = bytes;
636 }
637
638 len -= r;
639 }
640
641 if (r == -1) {
642 msg_err_map ("can't read from map %s: %s", fname, strerror (errno));
643 close (fd);
644 g_free (bytes);
645
646 return FALSE;
647 }
648
649 close (fd);
650 g_free (bytes);
651
652 return TRUE;
653 }
654
655 static gboolean
rspamd_map_check_sig_pk_mem(const guchar * sig,gsize siglen,struct rspamd_map * map,const guchar * input,gsize inlen,struct rspamd_cryptobox_pubkey * pk)656 rspamd_map_check_sig_pk_mem (const guchar *sig,
657 gsize siglen,
658 struct rspamd_map *map,
659 const guchar *input,
660 gsize inlen,
661 struct rspamd_cryptobox_pubkey *pk)
662 {
663 GString *b32_key;
664 gboolean ret = TRUE;
665
666 if (siglen != rspamd_cryptobox_signature_bytes (RSPAMD_CRYPTOBOX_MODE_25519)) {
667 msg_err_map ("can't open signature for %s: invalid size: %z", map->name, siglen);
668
669 ret = FALSE;
670 }
671
672 if (ret && !rspamd_cryptobox_verify (sig, siglen, input, inlen,
673 rspamd_pubkey_get_pk (pk, NULL), RSPAMD_CRYPTOBOX_MODE_25519)) {
674 msg_err_map ("can't verify signature for %s: incorrect signature", map->name);
675
676 ret = FALSE;
677 }
678
679 if (ret) {
680 b32_key = rspamd_pubkey_print (pk,
681 RSPAMD_KEYPAIR_BASE32 | RSPAMD_KEYPAIR_PUBKEY);
682 msg_info_map ("verified signature for %s using trusted key %v",
683 map->name, b32_key);
684 g_string_free (b32_key, TRUE);
685 }
686
687 return ret;
688 }
689
690 static gboolean
rspamd_map_check_file_sig(const char * fname,struct rspamd_map * map,struct rspamd_map_backend * bk,const guchar * input,gsize inlen)691 rspamd_map_check_file_sig (const char *fname,
692 struct rspamd_map *map,
693 struct rspamd_map_backend *bk,
694 const guchar *input,
695 gsize inlen) {
696 guchar *data;
697 struct rspamd_cryptobox_pubkey *pk = NULL;
698 GString *b32_key;
699 gboolean ret = TRUE;
700 gsize len = 0;
701 gchar fpath[PATH_MAX];
702
703 if (bk->trusted_pubkey == NULL) {
704 /* Try to load and check pubkey */
705 rspamd_snprintf (fpath, sizeof (fpath), "%s.pub", fname);
706 data = rspamd_file_xmap (fpath, PROT_READ, &len, TRUE);
707
708 if (data == NULL) {
709 msg_err_map ("can't open pubkey %s: %s", fpath, strerror (errno));
710 return FALSE;
711 }
712
713 pk = rspamd_pubkey_from_base32 (data, len, RSPAMD_KEYPAIR_SIGN,
714 RSPAMD_CRYPTOBOX_MODE_25519);
715 munmap (data, len);
716
717 if (pk == NULL) {
718 msg_err_map ("can't load pubkey %s", fpath);
719 return FALSE;
720 }
721
722 /* We just check pk against the trusted db of keys */
723 b32_key = rspamd_pubkey_print (pk,
724 RSPAMD_KEYPAIR_BASE32 | RSPAMD_KEYPAIR_PUBKEY);
725 g_assert (b32_key != NULL);
726
727 if (g_hash_table_lookup (map->cfg->trusted_keys, b32_key->str) == NULL) {
728 msg_err_map ("pubkey loaded from %s is untrusted: %v", fpath,
729 b32_key);
730 g_string_free (b32_key, TRUE);
731 rspamd_pubkey_unref (pk);
732
733 return FALSE;
734 }
735
736 g_string_free (b32_key, TRUE);
737 }
738 else {
739 pk = rspamd_pubkey_ref (bk->trusted_pubkey);
740 }
741
742 rspamd_snprintf (fpath, sizeof (fpath), "%s.sig", fname);
743 data = rspamd_shmem_xmap (fpath, PROT_READ, &len);
744
745 if (data == NULL) {
746 msg_err_map ("can't open signature %s: %s", fpath, strerror (errno));
747 ret = FALSE;
748 }
749
750 if (ret) {
751 ret = rspamd_map_check_sig_pk_mem (data, len, map, input, inlen, pk);
752 munmap (data, len);
753 }
754
755 rspamd_pubkey_unref (pk);
756
757 return ret;
758 }
759
760 /**
761 * Callback for reading data from file
762 */
763 static gboolean
read_map_file(struct rspamd_map * map,struct file_map_data * data,struct rspamd_map_backend * bk,struct map_periodic_cbdata * periodic)764 read_map_file (struct rspamd_map *map, struct file_map_data *data,
765 struct rspamd_map_backend *bk, struct map_periodic_cbdata *periodic)
766 {
767 gchar *bytes;
768 gsize len;
769 struct stat st;
770
771 if (map->read_callback == NULL || map->fin_callback == NULL) {
772 msg_err_map ("%s: bad callback for reading map file",
773 data->filename);
774 return FALSE;
775 }
776
777 if (stat (data->filename, &st) == -1) {
778 /* File does not exist, skipping */
779 if (errno != ENOENT) {
780 msg_err_map ("%s: map file is unavailable for reading: %s",
781 data->filename, strerror (errno));
782
783 return FALSE;
784 }
785 else {
786 msg_info_map ("%s: map file is not found; "
787 "it will be read automatically if created",
788 data->filename);
789 return TRUE;
790 }
791 }
792
793 ev_stat_stat (map->event_loop, &data->st_ev);
794 len = st.st_size;
795
796 if (bk->is_signed) {
797 bytes = rspamd_file_xmap (data->filename, PROT_READ, &len, TRUE);
798
799 if (bytes == NULL) {
800 msg_err_map ("can't open map %s: %s", data->filename, strerror (errno));
801 return FALSE;
802 }
803
804 if (!rspamd_map_check_file_sig (data->filename, map, bk, bytes, len)) {
805 munmap (bytes, len);
806
807 return FALSE;
808 }
809
810 munmap (bytes, len);
811 }
812
813 if (len > 0) {
814 if (map->no_file_read) {
815 /* We just call read callback with backend name */
816 map->read_callback (data->filename, strlen (data->filename),
817 &periodic->cbdata, TRUE);
818 }
819 else {
820 if (bk->is_compressed) {
821 bytes = rspamd_file_xmap (data->filename, PROT_READ, &len, TRUE);
822
823 if (bytes == NULL) {
824 msg_err_map ("can't open map %s: %s", data->filename, strerror (errno));
825 return FALSE;
826 }
827
828 ZSTD_DStream *zstream;
829 ZSTD_inBuffer zin;
830 ZSTD_outBuffer zout;
831 guchar *out;
832 gsize outlen, r;
833
834 zstream = ZSTD_createDStream ();
835 ZSTD_initDStream (zstream);
836
837 zin.pos = 0;
838 zin.src = bytes;
839 zin.size = len;
840
841 if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
842 outlen = ZSTD_DStreamOutSize ();
843 }
844
845 out = g_malloc (outlen);
846
847 zout.dst = out;
848 zout.pos = 0;
849 zout.size = outlen;
850
851 while (zin.pos < zin.size) {
852 r = ZSTD_decompressStream (zstream, &zout, &zin);
853
854 if (ZSTD_isError (r)) {
855 msg_err_map ("%s: cannot decompress data: %s",
856 data->filename,
857 ZSTD_getErrorName (r));
858 ZSTD_freeDStream (zstream);
859 g_free (out);
860 munmap (bytes, len);
861 return FALSE;
862 }
863
864 if (zout.pos == zout.size) {
865 /* We need to extend output buffer */
866 zout.size = zout.size * 2 + 1;
867 out = g_realloc (zout.dst, zout.size);
868 zout.dst = out;
869 }
870 }
871
872 ZSTD_freeDStream (zstream);
873 msg_info_map ("%s: read map data, %z bytes compressed, "
874 "%z uncompressed)", data->filename,
875 len, zout.pos);
876 map->read_callback (out, zout.pos, &periodic->cbdata, TRUE);
877 g_free (out);
878
879 munmap (bytes, len);
880 }
881 else {
882 /* Perform buffered read: fail-safe */
883 if (!read_map_file_chunks (map, &periodic->cbdata, data->filename,
884 len, 0)) {
885 return FALSE;
886 }
887 }
888 }
889 }
890 else {
891 /* Empty map */
892 map->read_callback (NULL, 0, &periodic->cbdata, TRUE);
893 }
894
895 return TRUE;
896 }
897
898 static gboolean
read_map_static(struct rspamd_map * map,struct static_map_data * data,struct rspamd_map_backend * bk,struct map_periodic_cbdata * periodic)899 read_map_static (struct rspamd_map *map, struct static_map_data *data,
900 struct rspamd_map_backend *bk, struct map_periodic_cbdata *periodic)
901 {
902 guchar *bytes;
903 gsize len;
904
905 if (map->read_callback == NULL || map->fin_callback == NULL) {
906 msg_err_map ("%s: bad callback for reading map file", map->name);
907 data->processed = TRUE;
908 return FALSE;
909 }
910
911 bytes = data->data;
912 len = data->len;
913
914 if (len > 0) {
915 if (bk->is_compressed) {
916 ZSTD_DStream *zstream;
917 ZSTD_inBuffer zin;
918 ZSTD_outBuffer zout;
919 guchar *out;
920 gsize outlen, r;
921
922 zstream = ZSTD_createDStream ();
923 ZSTD_initDStream (zstream);
924
925 zin.pos = 0;
926 zin.src = bytes;
927 zin.size = len;
928
929 if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
930 outlen = ZSTD_DStreamOutSize ();
931 }
932
933 out = g_malloc (outlen);
934
935 zout.dst = out;
936 zout.pos = 0;
937 zout.size = outlen;
938
939 while (zin.pos < zin.size) {
940 r = ZSTD_decompressStream (zstream, &zout, &zin);
941
942 if (ZSTD_isError (r)) {
943 msg_err_map ("%s: cannot decompress data: %s",
944 map->name,
945 ZSTD_getErrorName (r));
946 ZSTD_freeDStream (zstream);
947 g_free (out);
948
949 return FALSE;
950 }
951
952 if (zout.pos == zout.size) {
953 /* We need to extend output buffer */
954 zout.size = zout.size * 2 + 1;
955 out = g_realloc (zout.dst, zout.size);
956 zout.dst = out;
957 }
958 }
959
960 ZSTD_freeDStream (zstream);
961 msg_info_map ("%s: read map data, %z bytes compressed, "
962 "%z uncompressed)",
963 map->name,
964 len, zout.pos);
965 map->read_callback (out, zout.pos, &periodic->cbdata, TRUE);
966 g_free (out);
967 }
968 else {
969 msg_info_map ("%s: read map data, %z bytes",
970 map->name, len);
971 map->read_callback (bytes, len, &periodic->cbdata, TRUE);
972 }
973 }
974 else {
975 map->read_callback (NULL, 0, &periodic->cbdata, TRUE);
976 }
977
978 data->processed = TRUE;
979
980 return TRUE;
981 }
982
983 static void
rspamd_map_periodic_dtor(struct map_periodic_cbdata * periodic)984 rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
985 {
986 struct rspamd_map *map;
987
988 map = periodic->map;
989 msg_debug_map ("periodic dtor %p", periodic);
990
991 if (periodic->need_modify) {
992 /* We are done */
993 periodic->map->fin_callback (&periodic->cbdata, periodic->map->user_data);
994 }
995 else {
996 /* Not modified */
997 }
998
999 if (periodic->locked) {
1000 g_atomic_int_set (periodic->map->locked, 0);
1001 msg_debug_map ("unlocked map %s", periodic->map->name);
1002
1003 if (periodic->map->wrk->state == rspamd_worker_state_running) {
1004 rspamd_map_schedule_periodic (periodic->map,
1005 RSPAMD_SYMBOL_RESULT_NORMAL);
1006 }
1007 else {
1008 msg_debug_map ("stop scheduling periodics for %s; terminating state",
1009 periodic->map->name);
1010 }
1011 }
1012
1013 g_free (periodic);
1014 }
1015
1016 /* Called on timer execution */
1017 static void
rspamd_map_periodic_callback(struct ev_loop * loop,ev_timer * w,int revents)1018 rspamd_map_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
1019 {
1020 struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *)w->data;
1021
1022 MAP_RETAIN (cbd, "periodic");
1023 ev_timer_stop (loop, w);
1024 rspamd_map_process_periodic (cbd);
1025 MAP_RELEASE (cbd, "periodic");
1026 }
1027
1028 static void
rspamd_map_schedule_periodic(struct rspamd_map * map,int how)1029 rspamd_map_schedule_periodic (struct rspamd_map *map, int how)
1030 {
1031 const gdouble error_mult = 20.0, lock_mult = 0.1;
1032 static const gdouble min_timer_interval = 2.0;
1033 const gchar *reason = "unknown reason";
1034 gdouble jittered_sec;
1035 gdouble timeout;
1036 struct map_periodic_cbdata *cbd;
1037
1038 if (map->scheduled_check || (map->wrk &&
1039 map->wrk->state != rspamd_worker_state_running)) {
1040 /*
1041 * Do not schedule check if some check is already scheduled or
1042 * if worker is going to die
1043 */
1044 return;
1045 }
1046
1047 if (!(how & RSPAMD_MAP_SCHEDULE_INIT) && map->static_only) {
1048 /* No need to schedule anything for static maps */
1049 return;
1050 }
1051
1052 if (map->non_trivial && map->next_check != 0) {
1053 timeout = map->next_check - rspamd_get_calendar_ticks ();
1054 map->next_check = 0;
1055
1056 if (timeout > 0 && timeout < map->poll_timeout) {
1057 /* Early check case, jitter */
1058 gdouble poll_timeout = map->poll_timeout;
1059
1060 if (how & RSPAMD_MAP_SCHEDULE_ERROR) {
1061 poll_timeout = map->poll_timeout * error_mult;
1062 reason = "early active non-trivial check (after error)";
1063 }
1064 else if (how & RSPAMD_MAP_SCHEDULE_LOCKED) {
1065 poll_timeout = map->poll_timeout * lock_mult;
1066 reason = "early active non-trivial check (after being locked)";
1067 }
1068 else {
1069 reason = "early active non-trivial check";
1070 }
1071
1072 jittered_sec = MIN (timeout, poll_timeout);
1073
1074 }
1075 else if (timeout <= 0) {
1076 /* Data is already expired, need to check */
1077 if (how & RSPAMD_MAP_SCHEDULE_ERROR) {
1078 /* In case of error we still need to increase delay */
1079 jittered_sec = map->poll_timeout * error_mult;
1080 reason = "expired non-trivial data (after error)";
1081 }
1082 else {
1083 jittered_sec = 0.0;
1084 reason = "expired non-trivial data";
1085 }
1086 }
1087 else {
1088 /* No need to check now, wait till next_check */
1089 jittered_sec = timeout;
1090 reason = "valid non-trivial data";
1091 }
1092 }
1093 else {
1094 /* No valid information when to check a map, plan a timer based check */
1095 timeout = map->poll_timeout;
1096
1097 if (how & RSPAMD_MAP_SCHEDULE_INIT) {
1098 if (map->active_http) {
1099 /* Spill maps load to get better chances to hit ssl cache */
1100 timeout = rspamd_time_jitter (0.0, 2.0);
1101 }
1102 else {
1103 timeout = 0.0;
1104 }
1105
1106 reason = "init scheduled check";
1107 }
1108 else {
1109 if (how & RSPAMD_MAP_SCHEDULE_ERROR) {
1110 timeout = map->poll_timeout * error_mult;
1111 reason = "errored scheduled check";
1112 }
1113 else if (how & RSPAMD_MAP_SCHEDULE_LOCKED) {
1114 timeout = map->poll_timeout * lock_mult;
1115 reason = "locked scheduled check";
1116 }
1117 else {
1118 reason = "normal scheduled check";
1119 }
1120 }
1121
1122 jittered_sec = rspamd_time_jitter (timeout, 0);
1123 }
1124
1125 /* Now, we do some sanity checks for jittered seconds */
1126 if (!(how & RSPAMD_MAP_SCHEDULE_INIT)) {
1127 /* Never allow too low interval between timer checks, it is epxensive */
1128 if (jittered_sec < min_timer_interval) {
1129 jittered_sec = rspamd_time_jitter (min_timer_interval, 0);
1130 }
1131
1132 if (map->non_trivial) {
1133 /*
1134 * Even if we are reported that we need to reload cache often, we
1135 * still want to be sane in terms of events...
1136 */
1137 if (jittered_sec < min_timer_interval * 2.0) {
1138 if (map->nelts > 0) {
1139 jittered_sec = min_timer_interval * 3.0;
1140 }
1141 }
1142 }
1143 }
1144
1145 cbd = g_malloc0 (sizeof (*cbd));
1146 cbd->cbdata.state = 0;
1147 cbd->cbdata.prev_data = *map->user_data;
1148 cbd->cbdata.cur_data = NULL;
1149 cbd->cbdata.map = map;
1150 cbd->map = map;
1151 map->scheduled_check = cbd;
1152 REF_INIT_RETAIN (cbd, rspamd_map_periodic_dtor);
1153
1154 cbd->ev.data = cbd;
1155 ev_timer_init (&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0);
1156 ev_timer_start (map->event_loop, &cbd->ev);
1157
1158 msg_debug_map ("schedule new periodic event %p in %.3f seconds for %s; reason: %s",
1159 cbd, jittered_sec, map->name, reason);
1160 }
1161
1162 static gint
rspamd_map_af_to_weight(const rspamd_inet_addr_t * addr)1163 rspamd_map_af_to_weight (const rspamd_inet_addr_t *addr)
1164 {
1165 int ret;
1166
1167 switch (rspamd_inet_address_get_af (addr)) {
1168 case AF_UNIX:
1169 ret = 2;
1170 break;
1171 case AF_INET:
1172 ret = 1;
1173 break;
1174 default:
1175 ret = 0;
1176 break;
1177 }
1178
1179 return ret;
1180 }
1181
1182 static gint
rspamd_map_dns_address_sort_func(gconstpointer a,gconstpointer b)1183 rspamd_map_dns_address_sort_func (gconstpointer a, gconstpointer b)
1184 {
1185 const rspamd_inet_addr_t *ip1 = *(const rspamd_inet_addr_t **)a,
1186 *ip2 = *(const rspamd_inet_addr_t **)b;
1187 gint w1, w2;
1188
1189 w1 = rspamd_map_af_to_weight (ip1);
1190 w2 = rspamd_map_af_to_weight (ip2);
1191
1192 /* Inverse order */
1193 return w2 - w1;
1194 }
1195
1196 static void
rspamd_map_dns_callback(struct rdns_reply * reply,void * arg)1197 rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
1198 {
1199 struct http_callback_data *cbd = arg;
1200 struct rdns_reply_entry *cur_rep;
1201 struct rspamd_map *map;
1202 guint flags = RSPAMD_HTTP_CLIENT_SIMPLE|RSPAMD_HTTP_CLIENT_SHARED;
1203
1204 map = cbd->map;
1205
1206 msg_debug_map ("got dns reply with code %s on stage %d",
1207 rdns_strerror (reply->code), cbd->stage);
1208
1209 if (cbd->stage == http_map_terminated) {
1210 MAP_RELEASE (cbd, "http_callback_data");
1211 return;
1212 }
1213
1214 if (reply->code == RDNS_RC_NOERROR) {
1215 DL_FOREACH (reply->entries, cur_rep) {
1216 rspamd_inet_addr_t *addr;
1217 addr = rspamd_inet_address_from_rnds (reply->entries);
1218
1219 if (addr != NULL) {
1220 rspamd_inet_address_set_port (addr, cbd->data->port);
1221 g_ptr_array_add (cbd->addrs, (void *)addr);
1222 }
1223 }
1224
1225 if (cbd->stage == http_map_resolve_host2) {
1226 /* We have still one request pending */
1227 cbd->stage = http_map_resolve_host1;
1228 }
1229 else if (cbd->stage == http_map_resolve_host1) {
1230 cbd->stage = http_map_http_conn;
1231 }
1232 }
1233 else if (cbd->stage < http_map_http_conn) {
1234 if (cbd->stage == http_map_resolve_host2) {
1235 /* We have still one request pending */
1236 cbd->stage = http_map_resolve_host1;
1237 }
1238 else if (cbd->addrs->len == 0) {
1239 /* We could not resolve host, so cowardly fail here */
1240 msg_err_map ("cannot resolve %s: %s", cbd->data->host,
1241 rdns_strerror (reply->code));
1242 cbd->periodic->errored = 1;
1243 rspamd_map_process_periodic (cbd->periodic);
1244 }
1245 else {
1246 /* We have at least one address, so we can continue... */
1247 cbd->stage = http_map_http_conn;
1248 }
1249 }
1250
1251 if (cbd->stage == http_map_http_conn && cbd->addrs->len > 0) {
1252 rspamd_ptr_array_shuffle (cbd->addrs);
1253 gint idx = 0;
1254 /*
1255 * For the existing addr we can just select any address as we have
1256 * data available
1257 */
1258 if (cbd->map->nelts > 0 && rspamd_random_double_fast () > 0.5) {
1259 /* Already shuffled, use whatever is the first */
1260 cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index (cbd->addrs, idx);
1261 }
1262 else {
1263 /* Always prefer IPv4 as IPv6 is almost all the time broken */
1264 g_ptr_array_sort (cbd->addrs, rspamd_map_dns_address_sort_func);
1265 cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index (cbd->addrs, idx);
1266 }
1267
1268 retry:
1269 msg_debug_map ("try open http connection to %s",
1270 rspamd_inet_address_to_string_pretty (cbd->addr));
1271 cbd->conn = rspamd_http_connection_new_client (NULL,
1272 NULL,
1273 http_map_error,
1274 http_map_finish,
1275 flags,
1276 cbd->addr);
1277
1278 if (cbd->conn != NULL) {
1279 write_http_request (cbd);
1280 }
1281 else {
1282 if (idx < cbd->addrs->len - 1) {
1283 /* We can retry */
1284 idx++;
1285 rspamd_inet_addr_t *prev_addr = cbd->addr;
1286 cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index (cbd->addrs, idx);
1287 msg_info_map ("cannot connect to %s to get data for %s: %s, retry with %s (%d of %d)",
1288 rspamd_inet_address_to_string_pretty (prev_addr),
1289 cbd->bk->uri,
1290 strerror (errno),
1291 rspamd_inet_address_to_string_pretty (cbd->addr),
1292 idx + 1, cbd->addrs->len);
1293 goto retry;
1294 }
1295 else {
1296 /* Nothing else left */
1297 cbd->periodic->errored = TRUE;
1298 msg_err_map ("error reading %s(%s): "
1299 "connection with http server terminated incorrectly: %s",
1300 cbd->bk->uri,
1301 cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
1302 strerror (errno));
1303
1304 rspamd_map_process_periodic (cbd->periodic);
1305 }
1306 }
1307 }
1308
1309 MAP_RELEASE (cbd, "http_callback_data");
1310 }
1311
1312 static gboolean
rspamd_map_read_cached(struct rspamd_map * map,struct rspamd_map_backend * bk,struct map_periodic_cbdata * periodic,const gchar * host)1313 rspamd_map_read_cached (struct rspamd_map *map, struct rspamd_map_backend *bk,
1314 struct map_periodic_cbdata *periodic, const gchar *host)
1315 {
1316 gsize len;
1317 gpointer in;
1318 struct http_map_data *data;
1319
1320 data = bk->data.hd;
1321
1322 in = rspamd_shmem_xmap (data->cache->shmem_name, PROT_READ, &len);
1323
1324 if (in == NULL) {
1325 msg_err ("cannot map cache from %s: %s", data->cache->shmem_name,
1326 strerror (errno));
1327 return FALSE;
1328 }
1329
1330 if (len < data->cache->len) {
1331 msg_err ("cannot map cache from %s: bad length %z, %z expected",
1332 data->cache->shmem_name,
1333 len, data->cache->len);
1334 munmap (in, len);
1335
1336 return FALSE;
1337 }
1338
1339 if (bk->is_compressed) {
1340 ZSTD_DStream *zstream;
1341 ZSTD_inBuffer zin;
1342 ZSTD_outBuffer zout;
1343 guchar *out;
1344 gsize outlen, r;
1345
1346 zstream = ZSTD_createDStream ();
1347 ZSTD_initDStream (zstream);
1348
1349 zin.pos = 0;
1350 zin.src = in;
1351 zin.size = len;
1352
1353 if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
1354 outlen = ZSTD_DStreamOutSize ();
1355 }
1356
1357 out = g_malloc (outlen);
1358
1359 zout.dst = out;
1360 zout.pos = 0;
1361 zout.size = outlen;
1362
1363 while (zin.pos < zin.size) {
1364 r = ZSTD_decompressStream (zstream, &zout, &zin);
1365
1366 if (ZSTD_isError (r)) {
1367 msg_err_map ("%s: cannot decompress data: %s",
1368 bk->uri,
1369 ZSTD_getErrorName (r));
1370 ZSTD_freeDStream (zstream);
1371 g_free (out);
1372 munmap (in, len);
1373 return FALSE;
1374 }
1375
1376 if (zout.pos == zout.size) {
1377 /* We need to extend output buffer */
1378 zout.size = zout.size * 2 + 1;
1379 out = g_realloc (zout.dst, zout.size);
1380 zout.dst = out;
1381 }
1382 }
1383
1384 ZSTD_freeDStream (zstream);
1385 msg_info_map ("%s: read map data cached %z bytes compressed, "
1386 "%z uncompressed", bk->uri,
1387 len, zout.pos);
1388 map->read_callback (out, zout.pos, &periodic->cbdata, TRUE);
1389 g_free (out);
1390 }
1391 else {
1392 msg_info_map ("%s: read map data cached %z bytes", bk->uri,
1393 len);
1394 map->read_callback (in, len, &periodic->cbdata, TRUE);
1395 }
1396
1397 munmap (in, len);
1398
1399 return TRUE;
1400 }
1401
1402 static gboolean
rspamd_map_has_http_cached_file(struct rspamd_map * map,struct rspamd_map_backend * bk)1403 rspamd_map_has_http_cached_file (struct rspamd_map *map,
1404 struct rspamd_map_backend *bk)
1405 {
1406 gchar path[PATH_MAX];
1407 guchar digest[rspamd_cryptobox_HASHBYTES];
1408 struct rspamd_config *cfg = map->cfg;
1409 struct stat st;
1410
1411 if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') {
1412 return FALSE;
1413 }
1414
1415 rspamd_cryptobox_hash (digest, bk->uri, strlen (bk->uri), NULL, 0);
1416 rspamd_snprintf (path, sizeof (path), "%s%c%*xs.map", cfg->maps_cache_dir,
1417 G_DIR_SEPARATOR, 20, digest);
1418
1419 if (stat (path, &st) != -1 && st.st_size >
1420 sizeof (struct rspamd_http_file_data)) {
1421 return TRUE;
1422 }
1423
1424 return FALSE;
1425 }
1426
1427 static gboolean
rspamd_map_save_http_cached_file(struct rspamd_map * map,struct rspamd_map_backend * bk,struct http_map_data * htdata,const guchar * data,gsize len)1428 rspamd_map_save_http_cached_file (struct rspamd_map *map,
1429 struct rspamd_map_backend *bk,
1430 struct http_map_data *htdata,
1431 const guchar *data,
1432 gsize len)
1433 {
1434 gchar path[PATH_MAX];
1435 guchar digest[rspamd_cryptobox_HASHBYTES];
1436 struct rspamd_config *cfg = map->cfg;
1437 gint fd;
1438 struct rspamd_http_file_data header;
1439
1440 if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') {
1441 return FALSE;
1442 }
1443
1444 rspamd_cryptobox_hash (digest, bk->uri, strlen (bk->uri), NULL, 0);
1445 rspamd_snprintf (path, sizeof (path), "%s%c%*xs.map", cfg->maps_cache_dir,
1446 G_DIR_SEPARATOR, 20, digest);
1447
1448 fd = rspamd_file_xopen (path, O_WRONLY | O_TRUNC | O_CREAT,
1449 00600, FALSE);
1450
1451 if (fd == -1) {
1452 return FALSE;
1453 }
1454
1455 if (!rspamd_file_lock (fd, FALSE)) {
1456 msg_err_map ("cannot lock file %s: %s", path, strerror (errno));
1457 close (fd);
1458
1459 return FALSE;
1460 }
1461
1462 memcpy (header.magic, rspamd_http_file_magic, sizeof (rspamd_http_file_magic));
1463 header.mtime = htdata->last_modified;
1464 header.next_check = map->next_check;
1465 header.data_off = sizeof (header);
1466
1467 if (htdata->etag) {
1468 header.data_off += RSPAMD_FSTRING_LEN (htdata->etag);
1469 header.etag_len = RSPAMD_FSTRING_LEN (htdata->etag);
1470 }
1471 else {
1472 header.etag_len = 0;
1473 }
1474
1475 if (write (fd, &header, sizeof (header)) != sizeof (header)) {
1476 msg_err_map ("cannot write file %s (header stage): %s", path, strerror (errno));
1477 rspamd_file_unlock (fd, FALSE);
1478 close (fd);
1479
1480 return FALSE;
1481 }
1482
1483 if (header.etag_len > 0) {
1484 if (write (fd, RSPAMD_FSTRING_DATA (htdata->etag), header.etag_len) !=
1485 header.etag_len) {
1486 msg_err_map ("cannot write file %s (etag stage): %s", path, strerror (errno));
1487 rspamd_file_unlock (fd, FALSE);
1488 close (fd);
1489
1490 return FALSE;
1491 }
1492 }
1493
1494 /* Now write the rest */
1495 if (write (fd, data, len) != len) {
1496 msg_err_map ("cannot write file %s (data stage): %s", path, strerror (errno));
1497 rspamd_file_unlock (fd, FALSE);
1498 close (fd);
1499
1500 return FALSE;
1501 }
1502
1503 rspamd_file_unlock (fd, FALSE);
1504 close (fd);
1505
1506 msg_info_map ("saved data from %s in %s, %uz bytes", bk->uri, path, len +
1507 sizeof (header) + header.etag_len);
1508
1509 return TRUE;
1510 }
1511
1512 static gboolean
rspamd_map_update_http_cached_file(struct rspamd_map * map,struct rspamd_map_backend * bk,struct http_map_data * htdata)1513 rspamd_map_update_http_cached_file (struct rspamd_map *map,
1514 struct rspamd_map_backend *bk,
1515 struct http_map_data *htdata)
1516 {
1517 gchar path[PATH_MAX];
1518 guchar digest[rspamd_cryptobox_HASHBYTES];
1519 struct rspamd_config *cfg = map->cfg;
1520 gint fd;
1521 struct rspamd_http_file_data header;
1522
1523 if (!rspamd_map_has_http_cached_file (map, bk)) {
1524 return FALSE;
1525 }
1526
1527 rspamd_cryptobox_hash (digest, bk->uri, strlen (bk->uri), NULL, 0);
1528 rspamd_snprintf (path, sizeof (path), "%s%c%*xs.map", cfg->maps_cache_dir,
1529 G_DIR_SEPARATOR, 20, digest);
1530
1531 fd = rspamd_file_xopen (path, O_WRONLY,
1532 00600, FALSE);
1533
1534 if (fd == -1) {
1535 return FALSE;
1536 }
1537
1538 if (!rspamd_file_lock (fd, FALSE)) {
1539 msg_err_map ("cannot lock file %s: %s", path, strerror (errno));
1540 close (fd);
1541
1542 return FALSE;
1543 }
1544
1545 memcpy (header.magic, rspamd_http_file_magic, sizeof (rspamd_http_file_magic));
1546 header.mtime = htdata->last_modified;
1547 header.next_check = map->next_check;
1548 header.data_off = sizeof (header);
1549
1550 if (htdata->etag) {
1551 header.data_off += RSPAMD_FSTRING_LEN (htdata->etag);
1552 header.etag_len = RSPAMD_FSTRING_LEN (htdata->etag);
1553 }
1554 else {
1555 header.etag_len = 0;
1556 }
1557
1558 if (write (fd, &header, sizeof (header)) != sizeof (header)) {
1559 msg_err_map ("cannot update file %s (header stage): %s", path, strerror (errno));
1560 rspamd_file_unlock (fd, FALSE);
1561 close (fd);
1562
1563 return FALSE;
1564 }
1565
1566 if (header.etag_len > 0) {
1567 if (write (fd, RSPAMD_FSTRING_DATA (htdata->etag), header.etag_len) !=
1568 header.etag_len) {
1569 msg_err_map ("cannot update file %s (etag stage): %s", path, strerror (errno));
1570 rspamd_file_unlock (fd, FALSE);
1571 close (fd);
1572
1573 return FALSE;
1574 }
1575 }
1576
1577 rspamd_file_unlock (fd, FALSE);
1578 close (fd);
1579
1580 return TRUE;
1581 }
1582
1583
1584 static gboolean
rspamd_map_read_http_cached_file(struct rspamd_map * map,struct rspamd_map_backend * bk,struct http_map_data * htdata,struct map_cb_data * cbdata)1585 rspamd_map_read_http_cached_file (struct rspamd_map *map,
1586 struct rspamd_map_backend *bk,
1587 struct http_map_data *htdata,
1588 struct map_cb_data *cbdata)
1589 {
1590 gchar path[PATH_MAX];
1591 guchar digest[rspamd_cryptobox_HASHBYTES];
1592 struct rspamd_config *cfg = map->cfg;
1593 gint fd;
1594 struct stat st;
1595 struct rspamd_http_file_data header;
1596
1597 if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') {
1598 return FALSE;
1599 }
1600
1601 rspamd_cryptobox_hash (digest, bk->uri, strlen (bk->uri), NULL, 0);
1602 rspamd_snprintf (path, sizeof (path), "%s%c%*xs.map", cfg->maps_cache_dir,
1603 G_DIR_SEPARATOR, 20, digest);
1604
1605 fd = rspamd_file_xopen (path, O_RDONLY, 00600, FALSE);
1606
1607 if (fd == -1) {
1608 return FALSE;
1609 }
1610
1611 if (!rspamd_file_lock (fd, FALSE)) {
1612 msg_err_map ("cannot lock file %s: %s", path, strerror (errno));
1613 close (fd);
1614
1615 return FALSE;
1616 }
1617
1618 (void)fstat (fd, &st);
1619
1620 if (read (fd, &header, sizeof (header)) != sizeof (header)) {
1621 msg_err_map ("cannot read file %s (header stage): %s", path, strerror (errno));
1622 rspamd_file_unlock (fd, FALSE);
1623 close (fd);
1624
1625 return FALSE;
1626 }
1627
1628 if (memcmp (header.magic, rspamd_http_file_magic,
1629 sizeof (rspamd_http_file_magic)) != 0) {
1630 msg_warn_map ("invalid or old version magic in file %s; ignore it", path);
1631 rspamd_file_unlock (fd, FALSE);
1632 close (fd);
1633
1634 return FALSE;
1635 }
1636
1637 double now = rspamd_get_calendar_ticks ();
1638
1639 if (header.next_check > now) {
1640 map->next_check = header.next_check;
1641 }
1642 else {
1643 map->next_check = now;
1644 }
1645
1646 htdata->last_modified = header.mtime;
1647
1648 if (header.etag_len > 0) {
1649 rspamd_fstring_t *etag = rspamd_fstring_sized_new (header.etag_len);
1650
1651 if (read (fd, RSPAMD_FSTRING_DATA (etag), header.etag_len) != header.etag_len) {
1652 msg_err_map ("cannot read file %s (etag stage): %s", path,
1653 strerror (errno));
1654 rspamd_file_unlock (fd, FALSE);
1655 rspamd_fstring_free (etag);
1656 close (fd);
1657
1658 return FALSE;
1659 }
1660
1661 etag->len = header.etag_len;
1662
1663 if (htdata->etag) {
1664 /* FIXME: should be dealt somehow better */
1665 msg_warn_map ("etag is already defined as %V; cached is %V; ignore cached",
1666 htdata->etag, etag);
1667 rspamd_fstring_free (etag);
1668 }
1669 else {
1670 htdata->etag = etag;
1671 }
1672 }
1673
1674 rspamd_file_unlock (fd, FALSE);
1675 close (fd);
1676
1677 /* Now read file data */
1678 /* Perform buffered read: fail-safe */
1679 if (!read_map_file_chunks (map, cbdata, path,
1680 st.st_size - header.data_off, header.data_off)) {
1681 return FALSE;
1682 }
1683
1684 struct tm tm;
1685 gchar ncheck_buf[32], lm_buf[32];
1686
1687 rspamd_localtime (map->next_check, &tm);
1688 strftime (ncheck_buf, sizeof (ncheck_buf) - 1, "%Y-%m-%d %H:%M:%S", &tm);
1689 rspamd_localtime (htdata->last_modified, &tm);
1690 strftime (lm_buf, sizeof (lm_buf) - 1, "%Y-%m-%d %H:%M:%S", &tm);
1691
1692 msg_info_map ("read cached data for %s from %s, %uz bytes; next check at: %s;"
1693 " last modified on: %s; etag: %V",
1694 bk->uri,
1695 path,
1696 (size_t)(st.st_size - header.data_off),
1697 ncheck_buf,
1698 lm_buf,
1699 htdata->etag);
1700
1701 return TRUE;
1702 }
1703
1704 /**
1705 * Async HTTP callback
1706 */
1707 static void
rspamd_map_common_http_callback(struct rspamd_map * map,struct rspamd_map_backend * bk,struct map_periodic_cbdata * periodic,gboolean check)1708 rspamd_map_common_http_callback (struct rspamd_map *map,
1709 struct rspamd_map_backend *bk,
1710 struct map_periodic_cbdata *periodic,
1711 gboolean check)
1712 {
1713 struct http_map_data *data;
1714 struct http_callback_data *cbd;
1715 guint flags = RSPAMD_HTTP_CLIENT_SIMPLE|RSPAMD_HTTP_CLIENT_SHARED;
1716
1717 data = bk->data.hd;
1718
1719 if (g_atomic_int_get (&data->cache->available) == 1) {
1720 /* Read cached data */
1721 if (check) {
1722 if (data->last_modified < data->cache->last_modified) {
1723 msg_info_map ("need to reread cached map triggered by %s "
1724 "(%d our modify time, %d cached modify time)",
1725 bk->uri,
1726 (int)data->last_modified,
1727 (int)data->cache->last_modified);
1728 periodic->need_modify = TRUE;
1729 /* Reset the whole chain */
1730 periodic->cur_backend = 0;
1731 rspamd_map_process_periodic (periodic);
1732 }
1733 else {
1734 if (map->active_http) {
1735 /* Check even if there is a cached version */
1736 goto check;
1737 }
1738 else {
1739 /* Switch to the next backend */
1740 periodic->cur_backend++;
1741 rspamd_map_process_periodic (periodic);
1742 }
1743 }
1744
1745 return;
1746 }
1747 else {
1748 if (map->active_http &&
1749 data->last_modified > data->cache->last_modified) {
1750 goto check;
1751 }
1752 else if (rspamd_map_read_cached (map, bk, periodic, data->host)) {
1753 /* Switch to the next backend */
1754 periodic->cur_backend++;
1755 data->last_modified = data->cache->last_modified;
1756 rspamd_map_process_periodic (periodic);
1757
1758 return;
1759 }
1760 }
1761 }
1762 else if (!map->active_http) {
1763 /* Switch to the next backend */
1764 periodic->cur_backend ++;
1765 rspamd_map_process_periodic (periodic);
1766
1767 return;
1768 }
1769
1770 check:
1771 cbd = g_malloc0 (sizeof (struct http_callback_data));
1772
1773 cbd->event_loop = map->event_loop;
1774 cbd->addrs = g_ptr_array_sized_new (4);
1775 cbd->map = map;
1776 cbd->data = data;
1777 cbd->check = check;
1778 cbd->periodic = periodic;
1779 MAP_RETAIN (periodic, "periodic");
1780 cbd->bk = bk;
1781 MAP_RETAIN (bk, "rspamd_map_backend");
1782 cbd->stage = http_map_terminated;
1783 REF_INIT_RETAIN (cbd, free_http_cbdata);
1784
1785 msg_debug_map ("%s map data from %s", check ? "checking" : "reading",
1786 data->host);
1787
1788 /* Try address */
1789 rspamd_inet_addr_t *addr = NULL;
1790
1791 if (rspamd_parse_inet_address (&addr, data->host,
1792 strlen (data->host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
1793 rspamd_inet_address_set_port (addr, cbd->data->port);
1794 g_ptr_array_add (cbd->addrs, (void *)addr);
1795 cbd->conn = rspamd_http_connection_new_client (
1796 NULL,
1797 NULL,
1798 http_map_error,
1799 http_map_finish,
1800 flags,
1801 addr);
1802
1803 if (cbd->conn != NULL) {
1804 cbd->stage = http_map_http_conn;
1805 write_http_request (cbd);
1806 cbd->addr = addr;
1807 MAP_RELEASE (cbd, "http_callback_data");
1808 }
1809 else {
1810 msg_warn_map ("cannot load map: cannot connect to %s: %s",
1811 data->host, strerror (errno));
1812 MAP_RELEASE (cbd, "http_callback_data");
1813 }
1814
1815 return;
1816 }
1817 else if (map->r->r) {
1818 /* Send both A and AAAA requests */
1819 guint nreq = 0;
1820
1821 if (rdns_make_request_full (map->r->r, rspamd_map_dns_callback, cbd,
1822 map->cfg->dns_timeout, map->cfg->dns_retransmits, 1,
1823 data->host, RDNS_REQUEST_A)) {
1824 MAP_RETAIN (cbd, "http_callback_data");
1825 nreq ++;
1826 }
1827 if (rdns_make_request_full (map->r->r, rspamd_map_dns_callback, cbd,
1828 map->cfg->dns_timeout, map->cfg->dns_retransmits, 1,
1829 data->host, RDNS_REQUEST_AAAA)) {
1830 MAP_RETAIN (cbd, "http_callback_data");
1831 nreq ++;
1832 }
1833
1834 if (nreq == 2) {
1835 cbd->stage = http_map_resolve_host2;
1836 }
1837 else if (nreq == 1) {
1838 cbd->stage = http_map_resolve_host1;
1839 }
1840
1841 map->tmp_dtor = free_http_cbdata_dtor;
1842 map->tmp_dtor_data = cbd;
1843 }
1844 else {
1845 msg_warn_map ("cannot load map: DNS resolver is not initialized");
1846 cbd->periodic->errored = TRUE;
1847 }
1848
1849 MAP_RELEASE (cbd, "http_callback_data");
1850 }
1851
1852 static void
rspamd_map_http_check_callback(struct map_periodic_cbdata * cbd)1853 rspamd_map_http_check_callback (struct map_periodic_cbdata *cbd)
1854 {
1855 struct rspamd_map *map;
1856 struct rspamd_map_backend *bk;
1857
1858 map = cbd->map;
1859 bk = g_ptr_array_index (cbd->map->backends, cbd->cur_backend);
1860
1861 rspamd_map_common_http_callback (map, bk, cbd, TRUE);
1862 }
1863
1864 static void
rspamd_map_http_read_callback(struct map_periodic_cbdata * cbd)1865 rspamd_map_http_read_callback (struct map_periodic_cbdata *cbd)
1866 {
1867 struct rspamd_map *map;
1868 struct rspamd_map_backend *bk;
1869
1870 map = cbd->map;
1871 bk = g_ptr_array_index (cbd->map->backends, cbd->cur_backend);
1872 rspamd_map_common_http_callback (map, bk, cbd, FALSE);
1873 }
1874
1875 static void
rspamd_map_file_check_callback(struct map_periodic_cbdata * periodic)1876 rspamd_map_file_check_callback (struct map_periodic_cbdata *periodic)
1877 {
1878 struct rspamd_map *map;
1879 struct file_map_data *data;
1880 struct rspamd_map_backend *bk;
1881
1882 map = periodic->map;
1883 bk = g_ptr_array_index (map->backends, periodic->cur_backend);
1884 data = bk->data.fd;
1885
1886 if (data->need_modify) {
1887 periodic->need_modify = TRUE;
1888 periodic->cur_backend = 0;
1889 data->need_modify = FALSE;
1890
1891 rspamd_map_process_periodic (periodic);
1892
1893 return;
1894 }
1895
1896 map = periodic->map;
1897 /* Switch to the next backend as the rest is handled by ev_stat */
1898 periodic->cur_backend ++;
1899 rspamd_map_process_periodic (periodic);
1900 }
1901
1902 static void
rspamd_map_static_check_callback(struct map_periodic_cbdata * periodic)1903 rspamd_map_static_check_callback (struct map_periodic_cbdata *periodic)
1904 {
1905 struct rspamd_map *map;
1906 struct static_map_data *data;
1907 struct rspamd_map_backend *bk;
1908
1909 map = periodic->map;
1910 bk = g_ptr_array_index (map->backends, periodic->cur_backend);
1911 data = bk->data.sd;
1912
1913 if (!data->processed) {
1914 periodic->need_modify = TRUE;
1915 periodic->cur_backend = 0;
1916
1917 rspamd_map_process_periodic (periodic);
1918
1919 return;
1920 }
1921
1922 /* Switch to the next backend */
1923 periodic->cur_backend ++;
1924 rspamd_map_process_periodic (periodic);
1925 }
1926
1927 static void
rspamd_map_file_read_callback(struct map_periodic_cbdata * periodic)1928 rspamd_map_file_read_callback (struct map_periodic_cbdata *periodic)
1929 {
1930 struct rspamd_map *map;
1931 struct file_map_data *data;
1932 struct rspamd_map_backend *bk;
1933
1934 map = periodic->map;
1935
1936 bk = g_ptr_array_index (map->backends, periodic->cur_backend);
1937 data = bk->data.fd;
1938
1939 msg_info_map ("rereading map file %s", data->filename);
1940
1941 if (!read_map_file (map, data, bk, periodic)) {
1942 periodic->errored = TRUE;
1943 }
1944
1945 /* Switch to the next backend */
1946 periodic->cur_backend ++;
1947 rspamd_map_process_periodic (periodic);
1948 }
1949
1950 static void
rspamd_map_static_read_callback(struct map_periodic_cbdata * periodic)1951 rspamd_map_static_read_callback (struct map_periodic_cbdata *periodic)
1952 {
1953 struct rspamd_map *map;
1954 struct static_map_data *data;
1955 struct rspamd_map_backend *bk;
1956
1957 map = periodic->map;
1958
1959 bk = g_ptr_array_index (map->backends, periodic->cur_backend);
1960 data = bk->data.sd;
1961
1962 msg_info_map ("rereading static map");
1963
1964 if (!read_map_static (map, data, bk, periodic)) {
1965 periodic->errored = TRUE;
1966 }
1967
1968 /* Switch to the next backend */
1969 periodic->cur_backend ++;
1970 rspamd_map_process_periodic (periodic);
1971 }
1972
1973 static void
rspamd_map_process_periodic(struct map_periodic_cbdata * cbd)1974 rspamd_map_process_periodic (struct map_periodic_cbdata *cbd)
1975 {
1976 struct rspamd_map_backend *bk;
1977 struct rspamd_map *map;
1978
1979 map = cbd->map;
1980 map->scheduled_check = NULL;
1981
1982 if (!map->file_only && !cbd->locked) {
1983 if (!g_atomic_int_compare_and_exchange (cbd->map->locked,
1984 0, 1)) {
1985 msg_debug_map (
1986 "don't try to reread map %s as it is locked by other process, "
1987 "will reread it later", cbd->map->name);
1988 rspamd_map_schedule_periodic (map, RSPAMD_MAP_SCHEDULE_LOCKED);
1989 MAP_RELEASE (cbd, "periodic");
1990
1991 return;
1992 }
1993 else {
1994 msg_debug_map ("locked map %s", cbd->map->name);
1995 cbd->locked = TRUE;
1996 }
1997 }
1998
1999 if (cbd->errored) {
2000 /* We should not check other backends if some backend has failed */
2001 rspamd_map_schedule_periodic (cbd->map, RSPAMD_MAP_SCHEDULE_ERROR);
2002
2003 if (cbd->locked) {
2004 g_atomic_int_set (cbd->map->locked, 0);
2005 cbd->locked = FALSE;
2006 }
2007
2008 msg_debug_map ("unlocked map %s, refcount=%d", cbd->map->name,
2009 cbd->ref.refcount);
2010 MAP_RELEASE (cbd, "periodic");
2011
2012 return;
2013 }
2014
2015 /* For each backend we need to check for modifications */
2016 if (cbd->cur_backend >= cbd->map->backends->len) {
2017 /* Last backend */
2018 msg_debug_map ("finished map: %d of %d", cbd->cur_backend,
2019 cbd->map->backends->len);
2020 MAP_RELEASE (cbd, "periodic");
2021
2022 return;
2023 }
2024
2025 if (cbd->map->wrk && cbd->map->wrk->state == rspamd_worker_state_running) {
2026 bk = g_ptr_array_index (cbd->map->backends, cbd->cur_backend);
2027 g_assert (bk != NULL);
2028
2029 if (cbd->need_modify) {
2030 /* Load data from the next backend */
2031 switch (bk->protocol) {
2032 case MAP_PROTO_HTTP:
2033 case MAP_PROTO_HTTPS:
2034 rspamd_map_http_read_callback (cbd);
2035 break;
2036 case MAP_PROTO_FILE:
2037 rspamd_map_file_read_callback (cbd);
2038 break;
2039 case MAP_PROTO_STATIC:
2040 rspamd_map_static_read_callback (cbd);
2041 break;
2042 }
2043 } else {
2044 /* Check the next backend */
2045 switch (bk->protocol) {
2046 case MAP_PROTO_HTTP:
2047 case MAP_PROTO_HTTPS:
2048 rspamd_map_http_check_callback (cbd);
2049 break;
2050 case MAP_PROTO_FILE:
2051 rspamd_map_file_check_callback (cbd);
2052 break;
2053 case MAP_PROTO_STATIC:
2054 rspamd_map_static_check_callback (cbd);
2055 break;
2056 }
2057 }
2058 }
2059 }
2060
2061 static void
rspamd_map_on_stat(struct ev_loop * loop,ev_stat * w,int revents)2062 rspamd_map_on_stat (struct ev_loop *loop, ev_stat *w, int revents)
2063 {
2064 struct rspamd_map *map = (struct rspamd_map *)w->data;
2065
2066 if (w->attr.st_nlink > 0) {
2067 msg_info_map ("old mtime is %t (size = %Hz), "
2068 "new mtime is %t (size = %Hz) for map file %s",
2069 w->prev.st_mtime, (gsize)w->prev.st_size,
2070 w->attr.st_mtime, (gsize)w->attr.st_size,
2071 w->path);
2072
2073 /* Fire need modify flag */
2074 struct rspamd_map_backend *bk;
2075 guint i;
2076
2077 PTR_ARRAY_FOREACH (map->backends, i, bk) {
2078 if (bk->protocol == MAP_PROTO_FILE) {
2079 bk->data.fd->need_modify = TRUE;
2080 }
2081 }
2082
2083 map->next_check = 0;
2084
2085 if (map->scheduled_check) {
2086 ev_timer_stop (map->event_loop, &map->scheduled_check->ev);
2087 MAP_RELEASE (map->scheduled_check, "rspamd_map_on_stat");
2088 map->scheduled_check = NULL;
2089 }
2090
2091 rspamd_map_schedule_periodic (map, RSPAMD_MAP_SCHEDULE_INIT);
2092 }
2093 }
2094
2095 /* Start watching event for all maps */
2096 void
rspamd_map_watch(struct rspamd_config * cfg,struct ev_loop * event_loop,struct rspamd_dns_resolver * resolver,struct rspamd_worker * worker,enum rspamd_map_watch_type how)2097 rspamd_map_watch (struct rspamd_config *cfg,
2098 struct ev_loop *event_loop,
2099 struct rspamd_dns_resolver *resolver,
2100 struct rspamd_worker *worker,
2101 enum rspamd_map_watch_type how)
2102 {
2103 GList *cur = cfg->maps;
2104 struct rspamd_map *map;
2105 struct rspamd_map_backend *bk;
2106 guint i;
2107
2108 g_assert (how > RSPAMD_MAP_WATCH_MIN && how < RSPAMD_MAP_WATCH_MAX);
2109
2110 /* First of all do synced read of data */
2111 while (cur) {
2112 map = cur->data;
2113 map->event_loop = event_loop;
2114 map->r = resolver;
2115
2116 if (map->wrk == NULL && how != RSPAMD_MAP_WATCH_WORKER) {
2117 /* Generic scanner map */
2118 map->wrk = worker;
2119
2120 if (how == RSPAMD_MAP_WATCH_PRIMARY_CONTROLLER) {
2121 map->active_http = TRUE;
2122 }
2123 else {
2124 map->active_http = FALSE;
2125 }
2126 }
2127 else if (map->wrk != NULL && map->wrk == worker) {
2128 /* Map is bound to a specific worker */
2129 map->active_http = TRUE;
2130 }
2131 else {
2132 /* Skip map for this worker as irrelevant */
2133 cur = g_list_next (cur);
2134 continue;
2135 }
2136
2137 if (!map->active_http) {
2138 /* Check cached version more frequently as it is cheap */
2139
2140 if (map->poll_timeout >= cfg->map_timeout &&
2141 cfg->map_file_watch_multiplier < 1.0) {
2142 map->poll_timeout =
2143 map->poll_timeout * cfg->map_file_watch_multiplier;
2144 }
2145 }
2146
2147 map->file_only = TRUE;
2148 map->static_only = TRUE;
2149
2150 PTR_ARRAY_FOREACH (map->backends, i, bk) {
2151 bk->event_loop = event_loop;
2152
2153 if (bk->protocol == MAP_PROTO_FILE) {
2154 struct file_map_data *data;
2155
2156 data = bk->data.fd;
2157
2158 ev_stat_init (&data->st_ev, rspamd_map_on_stat,
2159 data->filename, map->poll_timeout * cfg->map_file_watch_multiplier);
2160 data->st_ev.data = map;
2161 ev_stat_start (event_loop, &data->st_ev);
2162 map->static_only = FALSE;
2163 }
2164 else if ((bk->protocol == MAP_PROTO_HTTP ||
2165 bk->protocol == MAP_PROTO_HTTPS)) {
2166 if (map->active_http) {
2167 map->non_trivial = TRUE;
2168 }
2169
2170 map->static_only = FALSE;
2171 map->file_only = FALSE;
2172 }
2173 }
2174
2175 rspamd_map_schedule_periodic (map, RSPAMD_MAP_SCHEDULE_INIT);
2176
2177 cur = g_list_next (cur);
2178 }
2179 }
2180
2181 void
rspamd_map_preload(struct rspamd_config * cfg)2182 rspamd_map_preload (struct rspamd_config *cfg)
2183 {
2184 GList *cur = cfg->maps;
2185 struct rspamd_map *map;
2186 struct rspamd_map_backend *bk;
2187 guint i;
2188 gboolean map_ok;
2189
2190 /* First of all do synced read of data */
2191 while (cur) {
2192 map = cur->data;
2193 map_ok = TRUE;
2194
2195 PTR_ARRAY_FOREACH (map->backends, i, bk) {
2196 if (!(bk->protocol == MAP_PROTO_FILE ||
2197 bk->protocol == MAP_PROTO_STATIC)) {
2198
2199 if (bk->protocol == MAP_PROTO_HTTP ||
2200 bk->protocol == MAP_PROTO_HTTPS) {
2201 if (!rspamd_map_has_http_cached_file (map, bk)) {
2202
2203 if (!map->fallback_backend) {
2204 map_ok = FALSE;
2205 }
2206 break;
2207 }
2208 else {
2209 continue; /* We are yet fine */
2210 }
2211 }
2212 map_ok = FALSE;
2213 break;
2214 }
2215 }
2216
2217 if (map_ok) {
2218 struct map_periodic_cbdata fake_cbd;
2219 gboolean succeed = TRUE;
2220
2221 memset (&fake_cbd, 0, sizeof (fake_cbd));
2222 fake_cbd.cbdata.state = 0;
2223 fake_cbd.cbdata.prev_data = *map->user_data;
2224 fake_cbd.cbdata.cur_data = NULL;
2225 fake_cbd.cbdata.map = map;
2226 fake_cbd.map = map;
2227
2228 PTR_ARRAY_FOREACH (map->backends, i, bk) {
2229 fake_cbd.cur_backend = i;
2230
2231 if (bk->protocol == MAP_PROTO_FILE) {
2232 if (!read_map_file (map, bk->data.fd, bk, &fake_cbd)) {
2233 succeed = FALSE;
2234 break;
2235 }
2236 }
2237 else if (bk->protocol == MAP_PROTO_STATIC) {
2238 if (!read_map_static (map, bk->data.sd, bk, &fake_cbd)) {
2239 succeed = FALSE;
2240 break;
2241 }
2242 }
2243 else if (bk->protocol == MAP_PROTO_HTTP ||
2244 bk->protocol == MAP_PROTO_HTTPS) {
2245 if (!rspamd_map_read_http_cached_file (map, bk, bk->data.hd,
2246 &fake_cbd.cbdata)) {
2247
2248 if (map->fallback_backend) {
2249 /* Try fallback */
2250 g_assert (map->fallback_backend->protocol ==
2251 MAP_PROTO_FILE);
2252 if (!read_map_file (map,
2253 map->fallback_backend->data.fd,
2254 map->fallback_backend, &fake_cbd)) {
2255 succeed = FALSE;
2256 break;
2257 }
2258 }
2259 else {
2260 succeed = FALSE;
2261 break;
2262 }
2263 }
2264 }
2265 else {
2266 g_assert_not_reached ();
2267 }
2268 }
2269
2270 if (succeed) {
2271 map->fin_callback (&fake_cbd.cbdata, map->user_data);
2272 }
2273 else {
2274 msg_info_map ("preload of %s failed", map->name);
2275 }
2276
2277 }
2278
2279 cur = g_list_next (cur);
2280 }
2281 }
2282
2283 void
rspamd_map_remove_all(struct rspamd_config * cfg)2284 rspamd_map_remove_all (struct rspamd_config *cfg)
2285 {
2286 struct rspamd_map *map;
2287 GList *cur;
2288 struct rspamd_map_backend *bk;
2289 struct map_cb_data cbdata;
2290 guint i;
2291
2292 for (cur = cfg->maps; cur != NULL; cur = g_list_next (cur)) {
2293 map = cur->data;
2294
2295 if (map->tmp_dtor) {
2296 map->tmp_dtor (map->tmp_dtor_data);
2297 }
2298
2299 if (map->dtor) {
2300 cbdata.prev_data = NULL;
2301 cbdata.map = map;
2302 cbdata.cur_data = *map->user_data;
2303
2304 map->dtor (&cbdata);
2305 *map->user_data = NULL;
2306 }
2307
2308 for (i = 0; i < map->backends->len; i ++) {
2309 bk = g_ptr_array_index (map->backends, i);
2310
2311 MAP_RELEASE (bk, "rspamd_map_backend");
2312 }
2313
2314 if (map->fallback_backend) {
2315 MAP_RELEASE (map->fallback_backend, "rspamd_map_backend");
2316 }
2317 }
2318
2319 g_list_free (cfg->maps);
2320 cfg->maps = NULL;
2321 }
2322
2323 static const gchar *
rspamd_map_check_proto(struct rspamd_config * cfg,const gchar * map_line,struct rspamd_map_backend * bk)2324 rspamd_map_check_proto (struct rspamd_config *cfg,
2325 const gchar *map_line, struct rspamd_map_backend *bk)
2326 {
2327 const gchar *pos = map_line, *end, *end_key;
2328
2329 g_assert (bk != NULL);
2330 g_assert (pos != NULL);
2331
2332 end = pos + strlen (pos);
2333
2334 /* Static check */
2335 if (g_ascii_strcasecmp (pos, "static") == 0) {
2336 bk->protocol = MAP_PROTO_STATIC;
2337 bk->uri = g_strdup (pos);
2338
2339 return pos;
2340 }
2341 else if (g_ascii_strcasecmp (pos, "zst+static") == 0) {
2342 bk->protocol = MAP_PROTO_STATIC;
2343 bk->uri = g_strdup (pos + 4);
2344 bk->is_compressed = TRUE;
2345
2346 return pos + 4;
2347 }
2348
2349 for (;;) {
2350 if (g_ascii_strncasecmp (pos, "sign+", sizeof ("sign+") - 1) == 0) {
2351 bk->is_signed = TRUE;
2352 pos += sizeof ("sign+") - 1;
2353 }
2354 else if (g_ascii_strncasecmp (pos, "fallback+", sizeof ("fallback+") - 1) == 0) {
2355 bk->is_fallback = TRUE;
2356 pos += sizeof ("fallback+") - 1;
2357 }
2358 else if (g_ascii_strncasecmp (pos, "key=", sizeof ("key=") - 1) == 0) {
2359 pos += sizeof ("key=") - 1;
2360 end_key = memchr (pos, '+', end - pos);
2361
2362 if (end_key != NULL) {
2363 bk->trusted_pubkey = rspamd_pubkey_from_base32 (pos, end_key - pos,
2364 RSPAMD_KEYPAIR_SIGN, RSPAMD_CRYPTOBOX_MODE_25519);
2365
2366 if (bk->trusted_pubkey == NULL) {
2367 msg_err_config ("cannot read pubkey from map: %s",
2368 map_line);
2369 return NULL;
2370 }
2371 pos = end_key + 1;
2372 } else if (end - pos > 64) {
2373 /* Try hex encoding */
2374 bk->trusted_pubkey = rspamd_pubkey_from_hex (pos, 64,
2375 RSPAMD_KEYPAIR_SIGN, RSPAMD_CRYPTOBOX_MODE_25519);
2376
2377 if (bk->trusted_pubkey == NULL) {
2378 msg_err_config ("cannot read pubkey from map: %s",
2379 map_line);
2380 return NULL;
2381 }
2382 pos += 64;
2383 } else {
2384 msg_err_config ("cannot read pubkey from map: %s",
2385 map_line);
2386 return NULL;
2387 }
2388
2389 if (*pos == '+' || *pos == ':') {
2390 pos++;
2391 }
2392 }
2393 else {
2394 /* No known flags */
2395 break;
2396 }
2397 }
2398
2399 bk->protocol = MAP_PROTO_FILE;
2400
2401 if (g_ascii_strncasecmp (pos, "http://", sizeof ("http://") - 1) == 0) {
2402 bk->protocol = MAP_PROTO_HTTP;
2403 /* Include http:// */
2404 bk->uri = g_strdup (pos);
2405 pos += sizeof ("http://") - 1;
2406 }
2407 else if (g_ascii_strncasecmp (pos, "https://", sizeof ("https://") - 1) == 0) {
2408 bk->protocol = MAP_PROTO_HTTPS;
2409 /* Include https:// */
2410 bk->uri = g_strdup (pos);
2411 pos += sizeof ("https://") - 1;
2412 }
2413 else if (g_ascii_strncasecmp (pos, "file://", sizeof ("file://") - 1) == 0) {
2414 pos += sizeof ("file://") - 1;
2415 /* Exclude file:// */
2416 bk->uri = g_strdup (pos);
2417 }
2418 else if (*pos == '/') {
2419 /* Trivial file case */
2420 bk->uri = g_strdup (pos);
2421 }
2422 else {
2423 msg_err_config ("invalid map fetching protocol: %s", map_line);
2424
2425 return NULL;
2426 }
2427
2428 if (bk->protocol != MAP_PROTO_FILE && bk->is_signed) {
2429 msg_err_config ("signed maps are no longer supported for HTTP(s): %s", map_line);
2430 }
2431
2432 return pos;
2433 }
2434
2435 gboolean
rspamd_map_is_map(const gchar * map_line)2436 rspamd_map_is_map (const gchar *map_line)
2437 {
2438 gboolean ret = FALSE;
2439
2440 g_assert (map_line != NULL);
2441
2442 if (map_line[0] == '/') {
2443 ret = TRUE;
2444 }
2445 else if (g_ascii_strncasecmp (map_line, "sign+", sizeof ("sign+") - 1) == 0) {
2446 ret = TRUE;
2447 }
2448 else if (g_ascii_strncasecmp (map_line, "fallback+", sizeof ("fallback+") - 1) == 0) {
2449 ret = TRUE;
2450 }
2451 else if (g_ascii_strncasecmp (map_line, "file://", sizeof ("file://") - 1) == 0) {
2452 ret = TRUE;
2453 }
2454 else if (g_ascii_strncasecmp (map_line, "http://", sizeof ("http://") - 1) == 0) {
2455 ret = TRUE;
2456 }
2457 else if (g_ascii_strncasecmp (map_line, "https://", sizeof ("https://") - 1) == 0) {
2458 ret = TRUE;
2459 }
2460
2461 return ret;
2462 }
2463
2464 static void
rspamd_map_backend_dtor(struct rspamd_map_backend * bk)2465 rspamd_map_backend_dtor (struct rspamd_map_backend *bk)
2466 {
2467 g_free (bk->uri);
2468
2469 switch (bk->protocol) {
2470 case MAP_PROTO_FILE:
2471 if (bk->data.fd) {
2472 ev_stat_stop (bk->event_loop, &bk->data.fd->st_ev);
2473 g_free (bk->data.fd->filename);
2474 g_free (bk->data.fd);
2475 }
2476 break;
2477 case MAP_PROTO_STATIC:
2478 if (bk->data.sd) {
2479 if (bk->data.sd->data) {
2480 g_free (bk->data.sd->data);
2481 }
2482
2483 g_free (bk->data.sd);
2484 }
2485 break;
2486 case MAP_PROTO_HTTP:
2487 case MAP_PROTO_HTTPS:
2488 if (bk->data.hd) {
2489 struct http_map_data *data = bk->data.hd;
2490
2491 g_free (data->host);
2492 g_free (data->path);
2493 g_free (data->rest);
2494
2495 if (data->userinfo) {
2496 g_free (data->userinfo);
2497 }
2498
2499 if (data->etag) {
2500 rspamd_fstring_free (data->etag);
2501 }
2502
2503 if (g_atomic_int_compare_and_exchange (&data->cache->available, 1, 0)) {
2504 if (data->cur_cache_cbd) {
2505 MAP_RELEASE (data->cur_cache_cbd->shm,
2506 "rspamd_http_map_cached_cbdata");
2507 ev_timer_stop (data->cur_cache_cbd->event_loop,
2508 &data->cur_cache_cbd->timeout);
2509 g_free (data->cur_cache_cbd);
2510 data->cur_cache_cbd = NULL;
2511 }
2512
2513 unlink (data->cache->shmem_name);
2514 }
2515
2516 g_free (bk->data.hd);
2517 }
2518 break;
2519 }
2520
2521 if (bk->trusted_pubkey) {
2522 rspamd_pubkey_unref (bk->trusted_pubkey);
2523 }
2524
2525 g_free (bk);
2526 }
2527
2528 static struct rspamd_map_backend *
rspamd_map_parse_backend(struct rspamd_config * cfg,const gchar * map_line)2529 rspamd_map_parse_backend (struct rspamd_config *cfg, const gchar *map_line)
2530 {
2531 struct rspamd_map_backend *bk;
2532 struct file_map_data *fdata = NULL;
2533 struct http_map_data *hdata = NULL;
2534 struct static_map_data *sdata = NULL;
2535 struct http_parser_url up;
2536 const gchar *end, *p;
2537 rspamd_ftok_t tok;
2538
2539 bk = g_malloc0 (sizeof (*bk));
2540 REF_INIT_RETAIN (bk, rspamd_map_backend_dtor);
2541
2542 if (!rspamd_map_check_proto (cfg, map_line, bk)) {
2543 goto err;
2544 }
2545
2546 if (bk->is_fallback && bk->protocol != MAP_PROTO_FILE) {
2547 msg_err_config ("fallback backend must be file for %s", bk->uri);
2548
2549 goto err;
2550 }
2551
2552 end = map_line + strlen (map_line);
2553 if (end - map_line > 5) {
2554 p = end - 5;
2555 if (g_ascii_strcasecmp (p, ".zstd") == 0) {
2556 bk->is_compressed = TRUE;
2557 }
2558 p = end - 4;
2559 if (g_ascii_strcasecmp (p, ".zst") == 0) {
2560 bk->is_compressed = TRUE;
2561 }
2562 }
2563
2564 /* Now check for each proto separately */
2565 if (bk->protocol == MAP_PROTO_FILE) {
2566 fdata = g_malloc0 (sizeof (struct file_map_data));
2567
2568 if (access (bk->uri, R_OK) == -1) {
2569 if (errno != ENOENT) {
2570 msg_err_config ("cannot open file '%s': %s", bk->uri, strerror (errno));
2571 goto err;
2572 }
2573
2574 msg_info_config (
2575 "map '%s' is not found, but it can be loaded automatically later",
2576 bk->uri);
2577 }
2578
2579 fdata->filename = g_strdup (bk->uri);
2580 bk->data.fd = fdata;
2581 }
2582 else if (bk->protocol == MAP_PROTO_HTTP || bk->protocol == MAP_PROTO_HTTPS) {
2583 hdata = g_malloc0 (sizeof (struct http_map_data));
2584
2585 memset (&up, 0, sizeof (up));
2586 if (http_parser_parse_url (bk->uri, strlen (bk->uri), FALSE,
2587 &up) != 0) {
2588 msg_err_config ("cannot parse HTTP url: %s", bk->uri);
2589 goto err;
2590 }
2591 else {
2592 if (!(up.field_set & 1u << UF_HOST)) {
2593 msg_err_config ("cannot parse HTTP url: %s: no host", bk->uri);
2594 goto err;
2595 }
2596
2597 tok.begin = bk->uri + up.field_data[UF_HOST].off;
2598 tok.len = up.field_data[UF_HOST].len;
2599 hdata->host = rspamd_ftokdup (&tok);
2600
2601 if (up.field_set & (1u << UF_PORT)) {
2602 hdata->port = up.port;
2603 }
2604 else {
2605 if (bk->protocol == MAP_PROTO_HTTP) {
2606 hdata->port = 80;
2607 }
2608 else {
2609 hdata->port = 443;
2610 }
2611 }
2612
2613 if (up.field_set & (1u << UF_PATH)) {
2614 tok.begin = bk->uri + up.field_data[UF_PATH].off;
2615 tok.len = up.field_data[UF_PATH].len;
2616
2617 hdata->path = rspamd_ftokdup (&tok);
2618
2619 /* We also need to check query + fragment */
2620 if (up.field_set & ((1u << UF_QUERY) | (1u << UF_FRAGMENT))) {
2621 tok.begin = bk->uri + up.field_data[UF_PATH].off +
2622 up.field_data[UF_PATH].len;
2623 tok.len = strlen (tok.begin);
2624 hdata->rest = rspamd_ftokdup (&tok);
2625 }
2626 else {
2627 hdata->rest = g_strdup ("");
2628 }
2629 }
2630
2631 if (up.field_set & (1u << UF_USERINFO)) {
2632 /* Create authorisation header for basic auth */
2633 guint len = sizeof ("Basic ") +
2634 up.field_data[UF_USERINFO].len * 8 / 5 + 4;
2635 hdata->userinfo = g_malloc (len);
2636 rspamd_snprintf (hdata->userinfo, len, "Basic %*Bs",
2637 (int)up.field_data[UF_USERINFO].len,
2638 bk->uri + up.field_data[UF_USERINFO].off);
2639 }
2640 }
2641
2642 hdata->cache = rspamd_mempool_alloc0_shared (cfg->cfg_pool,
2643 sizeof (*hdata->cache));
2644
2645 bk->data.hd = hdata;
2646 }
2647 else if (bk->protocol == MAP_PROTO_STATIC) {
2648 sdata = g_malloc0 (sizeof (*sdata));
2649 bk->data.sd = sdata;
2650 }
2651
2652 bk->id = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_T1HA,
2653 bk->uri, strlen (bk->uri), 0xdeadbabe);
2654
2655 return bk;
2656
2657 err:
2658 MAP_RELEASE (bk, "rspamd_map_backend");
2659
2660 if (hdata) {
2661 g_free (hdata);
2662 }
2663
2664 if (fdata) {
2665 g_free (fdata);
2666 }
2667
2668 if (sdata) {
2669 g_free (sdata);
2670 }
2671
2672 return NULL;
2673 }
2674
2675 static void
rspamd_map_calculate_hash(struct rspamd_map * map)2676 rspamd_map_calculate_hash (struct rspamd_map *map)
2677 {
2678 struct rspamd_map_backend *bk;
2679 guint i;
2680 rspamd_cryptobox_hash_state_t st;
2681 gchar *cksum_encoded, cksum[rspamd_cryptobox_HASHBYTES];
2682
2683 rspamd_cryptobox_hash_init (&st, NULL, 0);
2684
2685 for (i = 0; i < map->backends->len; i ++) {
2686 bk = g_ptr_array_index (map->backends, i);
2687 rspamd_cryptobox_hash_update (&st, bk->uri, strlen (bk->uri));
2688 }
2689
2690 rspamd_cryptobox_hash_final (&st, cksum);
2691 cksum_encoded = rspamd_encode_base32 (cksum, sizeof (cksum), RSPAMD_BASE32_DEFAULT);
2692 rspamd_strlcpy (map->tag, cksum_encoded, sizeof (map->tag));
2693 g_free (cksum_encoded);
2694 }
2695
2696 static gboolean
rspamd_map_add_static_string(struct rspamd_config * cfg,const ucl_object_t * elt,GString * target)2697 rspamd_map_add_static_string (struct rspamd_config *cfg,
2698 const ucl_object_t *elt,
2699 GString *target)
2700 {
2701 gsize sz;
2702 const gchar *dline;
2703
2704 if (ucl_object_type (elt) != UCL_STRING) {
2705 msg_err_config ("map has static backend but `data` is "
2706 "not string like: %s",
2707 ucl_object_type_to_string (elt->type));
2708 return FALSE;
2709 }
2710
2711 /* Otherwise, we copy data to the backend */
2712 dline = ucl_object_tolstring (elt, &sz);
2713
2714 if (sz == 0) {
2715 msg_err_config ("map has static backend but empty no data");
2716 return FALSE;
2717 }
2718
2719 g_string_append_len (target, dline, sz);
2720 g_string_append_c (target, '\n');
2721
2722 return TRUE;
2723 }
2724
2725 struct rspamd_map *
rspamd_map_add(struct rspamd_config * cfg,const gchar * map_line,const gchar * description,map_cb_t read_callback,map_fin_cb_t fin_callback,map_dtor_t dtor,void ** user_data,struct rspamd_worker * worker,int flags)2726 rspamd_map_add (struct rspamd_config *cfg,
2727 const gchar *map_line,
2728 const gchar *description,
2729 map_cb_t read_callback,
2730 map_fin_cb_t fin_callback,
2731 map_dtor_t dtor,
2732 void **user_data,
2733 struct rspamd_worker *worker,
2734 int flags)
2735 {
2736 struct rspamd_map *map;
2737 struct rspamd_map_backend *bk;
2738
2739 bk = rspamd_map_parse_backend (cfg, map_line);
2740 if (bk == NULL) {
2741 return NULL;
2742 }
2743
2744 if (bk->is_fallback) {
2745 msg_err_config ("cannot add map with fallback only backend: %s", bk->uri);
2746 REF_RELEASE (bk);
2747
2748 return NULL;
2749 }
2750
2751 map = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_map));
2752 map->read_callback = read_callback;
2753 map->fin_callback = fin_callback;
2754 map->dtor = dtor;
2755 map->user_data = user_data;
2756 map->cfg = cfg;
2757 map->id = rspamd_random_uint64_fast ();
2758 map->locked =
2759 rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (gint));
2760 map->backends = g_ptr_array_sized_new (1);
2761 map->wrk = worker;
2762 rspamd_mempool_add_destructor (cfg->cfg_pool, rspamd_ptr_array_free_hard,
2763 map->backends);
2764 g_ptr_array_add (map->backends, bk);
2765 map->name = rspamd_mempool_strdup (cfg->cfg_pool, map_line);
2766 map->no_file_read = (flags & RSPAMD_MAP_FILE_NO_READ);
2767
2768 if (bk->protocol == MAP_PROTO_FILE) {
2769 map->poll_timeout = (cfg->map_timeout * cfg->map_file_watch_multiplier);
2770 } else {
2771 map->poll_timeout = cfg->map_timeout;
2772 }
2773
2774 if (description != NULL) {
2775 map->description = rspamd_mempool_strdup (cfg->cfg_pool, description);
2776 }
2777
2778 rspamd_map_calculate_hash (map);
2779 msg_info_map ("added map %s", bk->uri);
2780
2781 cfg->maps = g_list_prepend (cfg->maps, map);
2782
2783 return map;
2784 }
2785
2786 struct rspamd_map *
rspamd_map_add_fake(struct rspamd_config * cfg,const gchar * description,const gchar * name)2787 rspamd_map_add_fake (struct rspamd_config *cfg,
2788 const gchar *description,
2789 const gchar *name)
2790 {
2791 struct rspamd_map *map;
2792
2793 map = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_map));
2794 map->cfg = cfg;
2795 map->id = rspamd_random_uint64_fast ();
2796 map->name = rspamd_mempool_strdup (cfg->cfg_pool, name);
2797 map->user_data = (void **)↦ /* to prevent null pointer dereferencing */
2798
2799 if (description != NULL) {
2800 map->description = rspamd_mempool_strdup (cfg->cfg_pool, description);
2801 }
2802
2803 return map;
2804 }
2805
2806 static inline void
rspamd_map_add_backend(struct rspamd_map * map,struct rspamd_map_backend * bk)2807 rspamd_map_add_backend (struct rspamd_map *map, struct rspamd_map_backend *bk)
2808 {
2809 if (bk->is_fallback) {
2810 if (map->fallback_backend) {
2811 msg_warn_map ("redefining fallback backend from %s to %s",
2812 map->fallback_backend->uri, bk->uri);
2813 }
2814
2815 map->fallback_backend = bk;
2816 }
2817 else {
2818 g_ptr_array_add (map->backends, bk);
2819 }
2820 }
2821
2822 struct rspamd_map*
rspamd_map_add_from_ucl(struct rspamd_config * cfg,const ucl_object_t * obj,const gchar * description,map_cb_t read_callback,map_fin_cb_t fin_callback,map_dtor_t dtor,void ** user_data,struct rspamd_worker * worker,gint flags)2823 rspamd_map_add_from_ucl (struct rspamd_config *cfg,
2824 const ucl_object_t *obj,
2825 const gchar *description,
2826 map_cb_t read_callback,
2827 map_fin_cb_t fin_callback,
2828 map_dtor_t dtor,
2829 void **user_data,
2830 struct rspamd_worker *worker,
2831 gint flags)
2832 {
2833 ucl_object_iter_t it = NULL;
2834 const ucl_object_t *cur, *elt;
2835 struct rspamd_map *map;
2836 struct rspamd_map_backend *bk;
2837 guint i;
2838
2839 g_assert (obj != NULL);
2840
2841 if (ucl_object_type (obj) == UCL_STRING) {
2842 /* Just a plain string */
2843 return rspamd_map_add (cfg, ucl_object_tostring (obj), description,
2844 read_callback, fin_callback, dtor, user_data, worker, flags);
2845 }
2846
2847 map = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_map));
2848 map->read_callback = read_callback;
2849 map->fin_callback = fin_callback;
2850 map->dtor = dtor;
2851 map->user_data = user_data;
2852 map->cfg = cfg;
2853 map->id = rspamd_random_uint64_fast ();
2854 map->locked =
2855 rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (gint));
2856 map->backends = g_ptr_array_new ();
2857 map->wrk = worker;
2858 map->no_file_read = (flags & RSPAMD_MAP_FILE_NO_READ);
2859 rspamd_mempool_add_destructor (cfg->cfg_pool, rspamd_ptr_array_free_hard,
2860 map->backends);
2861 map->poll_timeout = cfg->map_timeout;
2862
2863 if (description) {
2864 map->description = rspamd_mempool_strdup (cfg->cfg_pool, description);
2865 }
2866
2867 if (ucl_object_type (obj) == UCL_ARRAY) {
2868 /* Add array of maps as multiple backends */
2869 while ((cur = ucl_object_iterate (obj, &it, true)) != NULL) {
2870 if (ucl_object_type (cur) == UCL_STRING) {
2871 bk = rspamd_map_parse_backend (cfg, ucl_object_tostring (cur));
2872
2873 if (bk != NULL) {
2874 rspamd_map_add_backend (map, bk);
2875
2876 if (!map->name) {
2877 map->name = rspamd_mempool_strdup (cfg->cfg_pool,
2878 ucl_object_tostring (cur));
2879 }
2880 }
2881 }
2882 else {
2883 msg_err_config ("bad map element type: %s",
2884 ucl_object_type_to_string (ucl_object_type (cur)));
2885 }
2886 }
2887
2888 if (map->backends->len == 0) {
2889 msg_err_config ("map has no urls to be loaded: empty list");
2890 goto err;
2891 }
2892 }
2893 else if (ucl_object_type (obj) == UCL_OBJECT) {
2894 elt = ucl_object_lookup (obj, "name");
2895 if (elt && ucl_object_type (elt) == UCL_STRING) {
2896 map->name = rspamd_mempool_strdup (cfg->cfg_pool,
2897 ucl_object_tostring (elt));
2898 }
2899
2900 elt = ucl_object_lookup (obj, "description");
2901 if (elt && ucl_object_type (elt) == UCL_STRING) {
2902 map->description = rspamd_mempool_strdup (cfg->cfg_pool,
2903 ucl_object_tostring (elt));
2904 }
2905
2906 elt = ucl_object_lookup_any (obj, "timeout", "poll", "poll_time",
2907 "watch_interval", NULL);
2908 if (elt) {
2909 map->poll_timeout = ucl_object_todouble (elt);
2910 }
2911
2912 elt = ucl_object_lookup_any (obj, "upstreams", "url", "urls", NULL);
2913 if (elt == NULL) {
2914 msg_err_config ("map has no urls to be loaded: no elt");
2915 goto err;
2916 }
2917
2918 if (ucl_object_type (elt) == UCL_ARRAY) {
2919 /* Add array of maps as multiple backends */
2920 it = ucl_object_iterate_new (elt);
2921
2922 while ((cur = ucl_object_iterate_safe (it, true)) != NULL) {
2923 if (ucl_object_type (cur) == UCL_STRING) {
2924 bk = rspamd_map_parse_backend (cfg, ucl_object_tostring (cur));
2925
2926 if (bk != NULL) {
2927 rspamd_map_add_backend (map, bk);
2928
2929 if (!map->name) {
2930 map->name = rspamd_mempool_strdup (cfg->cfg_pool,
2931 ucl_object_tostring (cur));
2932 }
2933 }
2934 }
2935 else {
2936 msg_err_config ("bad map element type: %s",
2937 ucl_object_type_to_string (ucl_object_type (cur)));
2938 ucl_object_iterate_free (it);
2939 goto err;
2940 }
2941 }
2942
2943 ucl_object_iterate_free (it);
2944
2945 if (map->backends->len == 0) {
2946 msg_err_config ("map has no urls to be loaded: empty object list");
2947 goto err;
2948 }
2949 }
2950 else if (ucl_object_type (elt) == UCL_STRING) {
2951 bk = rspamd_map_parse_backend (cfg, ucl_object_tostring (elt));
2952
2953 if (bk != NULL) {
2954 rspamd_map_add_backend (map, bk);
2955
2956 if (!map->name) {
2957 map->name = rspamd_mempool_strdup (cfg->cfg_pool,
2958 ucl_object_tostring (elt));
2959 }
2960 }
2961 }
2962
2963 if (!map->backends || map->backends->len == 0) {
2964 msg_err_config ("map has no urls to be loaded: no valid backends");
2965 goto err;
2966 }
2967 }
2968 else {
2969 msg_err_config ("map has invalid type for value: %s",
2970 ucl_object_type_to_string (ucl_object_type (obj)));
2971 goto err;
2972 }
2973
2974 gboolean all_local = TRUE;
2975
2976 PTR_ARRAY_FOREACH (map->backends, i, bk) {
2977 if (bk->protocol == MAP_PROTO_STATIC) {
2978 GString *map_data;
2979 /* We need data field in ucl */
2980 elt = ucl_object_lookup (obj, "data");
2981
2982 if (elt == NULL) {
2983 msg_err_config ("map has static backend but no `data` field");
2984 goto err;
2985 }
2986
2987
2988 if (ucl_object_type (elt) == UCL_STRING) {
2989 map_data = g_string_sized_new (32);
2990
2991 if (rspamd_map_add_static_string (cfg, elt, map_data)) {
2992 bk->data.sd->data = map_data->str;
2993 bk->data.sd->len = map_data->len;
2994 g_string_free (map_data, FALSE);
2995 }
2996 else {
2997 g_string_free (map_data, TRUE);
2998 msg_err_config ("map has static backend with invalid `data` field");
2999 goto err;
3000 }
3001 }
3002 else if (ucl_object_type (elt) == UCL_ARRAY) {
3003 map_data = g_string_sized_new (32);
3004 it = ucl_object_iterate_new (elt);
3005
3006 while ((cur = ucl_object_iterate_safe (it, true))) {
3007 if (!rspamd_map_add_static_string (cfg, cur, map_data)) {
3008 g_string_free (map_data, TRUE);
3009 msg_err_config ("map has static backend with invalid "
3010 "`data` field");
3011 ucl_object_iterate_free (it);
3012 goto err;
3013 }
3014 }
3015
3016 ucl_object_iterate_free (it);
3017 bk->data.sd->data = map_data->str;
3018 bk->data.sd->len = map_data->len;
3019 g_string_free (map_data, FALSE);
3020 }
3021 }
3022 else if (bk->protocol != MAP_PROTO_FILE) {
3023 all_local = FALSE;
3024 }
3025 }
3026
3027 if (all_local) {
3028 map->poll_timeout = (map->poll_timeout *
3029 cfg->map_file_watch_multiplier);
3030 }
3031
3032 rspamd_map_calculate_hash (map);
3033 msg_debug_map ("added map from ucl");
3034
3035 cfg->maps = g_list_prepend (cfg->maps, map);
3036
3037 return map;
3038
3039 err:
3040
3041 if (map) {
3042 PTR_ARRAY_FOREACH (map->backends, i, bk) {
3043 MAP_RELEASE (bk, "rspamd_map_backend");
3044 }
3045 }
3046
3047 return NULL;
3048 }
3049
3050 rspamd_map_traverse_function
rspamd_map_get_traverse_function(struct rspamd_map * map)3051 rspamd_map_get_traverse_function (struct rspamd_map *map)
3052 {
3053 if (map) {
3054 return map->traverse_function;
3055 }
3056
3057 return NULL;
3058 }
3059
3060 void
rspamd_map_traverse(struct rspamd_map * map,rspamd_map_traverse_cb cb,gpointer cbdata,gboolean reset_hits)3061 rspamd_map_traverse (struct rspamd_map *map, rspamd_map_traverse_cb cb,
3062 gpointer cbdata, gboolean reset_hits)
3063 {
3064 if (*map->user_data && map->traverse_function) {
3065 map->traverse_function (*map->user_data, cb, cbdata, reset_hits);
3066 }
3067 }
3068