1 /*-
2 * Copyright 2016 Vsevolod Stakhov
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 /*
17 * Rspamd fuzzy storage server
18 */
19
20 #include "config.h"
21 #include "libserver/fuzzy_wire.h"
22 #include "util.h"
23 #include "rspamd.h"
24 #include "libserver/maps/map.h"
25 #include "libserver/maps/map_helpers.h"
26 #include "libserver/fuzzy_backend/fuzzy_backend.h"
27 #include "ottery.h"
28 #include "ref.h"
29 #include "xxhash.h"
30 #include "libserver/worker_util.h"
31 #include "libserver/rspamd_control.h"
32 #include "libcryptobox/cryptobox.h"
33 #include "libcryptobox/keypairs_cache.h"
34 #include "libcryptobox/keypair.h"
35 #include "libutil/hash.h"
36 #include "libserver/maps/map_private.h"
37 #include "contrib/uthash/utlist.h"
38 #include "lua/lua_common.h"
39 #include "unix-std.h"
40
41 #include <math.h>
42
43 /* Resync value in seconds */
44 #define DEFAULT_SYNC_TIMEOUT 60.0
45 #define DEFAULT_KEYPAIR_CACHE_SIZE 512
46 #define DEFAULT_MASTER_TIMEOUT 10.0
47 #define DEFAULT_UPDATES_MAXFAIL 3
48 #define COOKIE_SIZE 128
49 #define DEFAULT_MAX_BUCKETS 2000
50 #define DEFAULT_BUCKET_TTL 3600
51 #define DEFAULT_BUCKET_MASK 24
52
53 static const gchar *local_db_name = "local";
54
55 #define msg_err_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
56 session->name, session->uid, \
57 G_STRFUNC, \
58 __VA_ARGS__)
59 #define msg_warn_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
60 session->name, session->uid, \
61 G_STRFUNC, \
62 __VA_ARGS__)
63 #define msg_info_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
64 session->name, session->uid, \
65 G_STRFUNC, \
66 __VA_ARGS__)
67 #define msg_err_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
68 "fuzzy_collection", session->uid, \
69 G_STRFUNC, \
70 __VA_ARGS__)
71 #define msg_warn_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
72 "fuzzy_collection", session->uid, \
73 G_STRFUNC, \
74 __VA_ARGS__)
75 #define msg_info_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
76 "fuzzy_collection", session->uid, \
77 G_STRFUNC, \
78 __VA_ARGS__)
79
80 /* Init functions */
81 gpointer init_fuzzy (struct rspamd_config *cfg);
82 void start_fuzzy (struct rspamd_worker *worker);
83
84 worker_t fuzzy_worker = {
85 "fuzzy", /* Name */
86 init_fuzzy, /* Init function */
87 start_fuzzy, /* Start function */
88 RSPAMD_WORKER_HAS_SOCKET,
89 RSPAMD_WORKER_SOCKET_UDP, /* UDP socket */
90 RSPAMD_WORKER_VER /* Version info */
91 };
92
93 struct fuzzy_global_stat {
94 guint64 fuzzy_hashes;
95 /**< number of fuzzy hashes stored */
96 guint64 fuzzy_hashes_expired;
97 /**< number of fuzzy hashes expired */
98 guint64 fuzzy_hashes_checked[RSPAMD_FUZZY_EPOCH_MAX];
99 /**< amount of check requests for each epoch */
100 guint64 fuzzy_shingles_checked[RSPAMD_FUZZY_EPOCH_MAX];
101 /**< amount of shingle check requests for each epoch */
102 guint64 fuzzy_hashes_found[RSPAMD_FUZZY_EPOCH_MAX];
103 /**< amount of invalid requests */
104 guint64 invalid_requests;
105 /**< amount of delayed hashes found */
106 guint64 delayed_hashes;
107 };
108
109 struct fuzzy_key_stat {
110 guint64 checked;
111 guint64 matched;
112 guint64 added;
113 guint64 deleted;
114 guint64 errors;
115 rspamd_lru_hash_t *last_ips;
116 ref_entry_t ref;
117 };
118
119 struct rspamd_fuzzy_mirror {
120 gchar *name;
121 struct upstream_list *u;
122 struct rspamd_cryptobox_pubkey *key;
123 };
124
125 struct rspamd_leaky_bucket_elt {
126 rspamd_inet_addr_t *addr;
127 gdouble last;
128 gdouble cur;
129 };
130
131 static const guint64 rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL;
132
133 struct rspamd_fuzzy_storage_ctx {
134 guint64 magic;
135 /* Events base */
136 struct ev_loop *event_loop;
137 /* DNS resolver */
138 struct rspamd_dns_resolver *resolver;
139 /* Config */
140 struct rspamd_config *cfg;
141 /* END OF COMMON PART */
142 struct fuzzy_global_stat stat;
143 gdouble expire;
144 gdouble sync_timeout;
145 gdouble delay;
146 struct rspamd_radix_map_helper *update_ips;
147 struct rspamd_radix_map_helper *blocked_ips;
148 struct rspamd_radix_map_helper *ratelimit_whitelist;
149 struct rspamd_radix_map_helper *delay_whitelist;
150
151 const ucl_object_t *update_map;
152 const ucl_object_t *delay_whitelist_map;
153 const ucl_object_t *blocked_map;
154 const ucl_object_t *ratelimit_whitelist_map;
155
156 guint keypair_cache_size;
157 ev_timer stat_ev;
158 ev_io peer_ev;
159
160 /* Local keypair */
161 struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */
162 struct fuzzy_key *default_key;
163 GHashTable *keys;
164 gboolean encrypted_only;
165 gboolean read_only;
166 gboolean dedicated_update_worker;
167 struct rspamd_keypair_cache *keypair_cache;
168 struct rspamd_http_context *http_ctx;
169 rspamd_lru_hash_t *errors_ips;
170 rspamd_lru_hash_t *ratelimit_buckets;
171 struct rspamd_fuzzy_backend *backend;
172 GArray *updates_pending;
173 guint updates_failed;
174 guint updates_maxfail;
175 /* Used to send data between workers */
176 gint peer_fd;
177
178 /* Ratelimits */
179 guint leaky_bucket_ttl;
180 guint leaky_bucket_mask;
181 guint max_buckets;
182 gboolean ratelimit_log_only;
183 gdouble leaky_bucket_burst;
184 gdouble leaky_bucket_rate;
185
186 struct rspamd_worker *worker;
187 const ucl_object_t *skip_map;
188 struct rspamd_hash_map_helper *skip_hashes;
189 gint lua_pre_handler_cbref;
190 gint lua_post_handler_cbref;
191 guchar cookie[COOKIE_SIZE];
192 };
193
194 enum fuzzy_cmd_type {
195 CMD_NORMAL,
196 CMD_SHINGLE,
197 CMD_ENCRYPTED_NORMAL,
198 CMD_ENCRYPTED_SHINGLE
199 };
200
201 struct fuzzy_session {
202 struct rspamd_worker *worker;
203 rspamd_inet_addr_t *addr;
204 struct rspamd_fuzzy_storage_ctx *ctx;
205
206 struct rspamd_fuzzy_shingle_cmd cmd; /* Can handle both shingles and non-shingles */
207 struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
208 struct fuzzy_key_stat *ip_stat;
209
210 enum rspamd_fuzzy_epoch epoch;
211 enum fuzzy_cmd_type cmd_type;
212 gint fd;
213 guint64 time;
214 struct ev_io io;
215 ref_entry_t ref;
216 struct fuzzy_key_stat *key_stat;
217 struct rspamd_fuzzy_cmd_extension *extensions;
218 guchar nm[rspamd_cryptobox_MAX_NMBYTES];
219 };
220
221 struct fuzzy_peer_request {
222 ev_io io_ev;
223 struct fuzzy_peer_cmd cmd;
224 };
225
226 struct fuzzy_key {
227 struct rspamd_cryptobox_keypair *key;
228 struct rspamd_cryptobox_pubkey *pk;
229 struct fuzzy_key_stat *stat;
230 };
231
232 struct rspamd_updates_cbdata {
233 GArray *updates_pending;
234 struct rspamd_fuzzy_storage_ctx *ctx;
235 gchar *source;
236 gboolean final;
237 };
238
239
240 static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
241 static gboolean rspamd_fuzzy_process_updates_queue (
242 struct rspamd_fuzzy_storage_ctx *ctx,
243 const gchar *source, gboolean final);
244
245 static gboolean
rspamd_fuzzy_check_ratelimit(struct fuzzy_session * session)246 rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
247 {
248 rspamd_inet_addr_t *masked;
249 struct rspamd_leaky_bucket_elt *elt;
250 ev_tstamp now;
251
252 if (session->ctx->ratelimit_whitelist != NULL) {
253 if (rspamd_match_radix_map_addr (session->ctx->ratelimit_whitelist,
254 session->addr) != NULL) {
255 return TRUE;
256 }
257 }
258
259 /*
260 if (rspamd_inet_address_is_local (session->addr, TRUE)) {
261 return TRUE;
262 }
263 */
264
265 masked = rspamd_inet_address_copy (session->addr);
266
267 if (rspamd_inet_address_get_af (masked) == AF_INET) {
268 rspamd_inet_address_apply_mask (masked,
269 MIN (session->ctx->leaky_bucket_mask, 32));
270 }
271 else {
272 /* Must be at least /64 */
273 rspamd_inet_address_apply_mask (masked,
274 MIN (MAX (session->ctx->leaky_bucket_mask * 4, 64), 128));
275 }
276
277 now = ev_now (session->ctx->event_loop);
278 elt = rspamd_lru_hash_lookup (session->ctx->ratelimit_buckets, masked,
279 now);
280
281 if (elt) {
282 gboolean ratelimited = FALSE;
283
284 if (isnan (elt->cur)) {
285 /* Ratelimit exceeded, preserve it for the whole ttl */
286 ratelimited = TRUE;
287 }
288 else {
289 /* Update bucket */
290 if (elt->last < now) {
291 elt->cur -= session->ctx->leaky_bucket_rate * (now - elt->last);
292 elt->last = now;
293
294 if (elt->cur < 0) {
295 elt->cur = 0;
296 }
297 }
298 else {
299 elt->last = now;
300 }
301
302 /* Check bucket */
303 if (elt->cur >= session->ctx->leaky_bucket_burst) {
304
305 msg_info ("ratelimiting %s (%s), %.1f max elts",
306 rspamd_inet_address_to_string (session->addr),
307 rspamd_inet_address_to_string (masked),
308 session->ctx->leaky_bucket_burst);
309 elt->cur = NAN;
310 }
311 else {
312 elt->cur ++; /* Allow one more request */
313 }
314 }
315
316 rspamd_inet_address_free (masked);
317
318 return !ratelimited;
319 }
320 else {
321 /* New bucket */
322 elt = g_malloc (sizeof (*elt));
323 elt->addr = masked; /* transfer ownership */
324 elt->cur = 1;
325 elt->last = now;
326
327 rspamd_lru_hash_insert (session->ctx->ratelimit_buckets,
328 masked,
329 elt,
330 now,
331 session->ctx->leaky_bucket_ttl);
332 }
333
334 return TRUE;
335 }
336
337 static gboolean
rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx * ctx,rspamd_inet_addr_t * addr)338 rspamd_fuzzy_check_client (struct rspamd_fuzzy_storage_ctx *ctx,
339 rspamd_inet_addr_t *addr)
340 {
341 if (ctx->blocked_ips != NULL) {
342 if (rspamd_match_radix_map_addr (ctx->blocked_ips,
343 addr) != NULL) {
344 return FALSE;
345 }
346 }
347
348 return TRUE;
349 }
350
351 static gboolean
rspamd_fuzzy_check_write(struct fuzzy_session * session)352 rspamd_fuzzy_check_write (struct fuzzy_session *session)
353 {
354 if (session->ctx->read_only) {
355 return FALSE;
356 }
357
358 if (session->ctx->update_ips != NULL) {
359 if (rspamd_match_radix_map_addr (session->ctx->update_ips,
360 session->addr) == NULL) {
361 return FALSE;
362 }
363 else {
364 return TRUE;
365 }
366 }
367
368 return FALSE;
369 }
370
371 static void
fuzzy_key_stat_dtor(gpointer p)372 fuzzy_key_stat_dtor (gpointer p)
373 {
374 struct fuzzy_key_stat *st = p;
375
376 if (st->last_ips) {
377 rspamd_lru_hash_destroy (st->last_ips);
378 }
379
380 g_free (st);
381 }
382
383 static void
fuzzy_key_stat_unref(gpointer p)384 fuzzy_key_stat_unref (gpointer p)
385 {
386 struct fuzzy_key_stat *st = p;
387
388 REF_RELEASE (st);
389 }
390
391 static void
fuzzy_key_dtor(gpointer p)392 fuzzy_key_dtor (gpointer p)
393 {
394 struct fuzzy_key *key = p;
395
396 if (key) {
397 if (key->stat) {
398 REF_RELEASE (key->stat);
399 }
400
401 g_free (key);
402 }
403 }
404
405 static void
fuzzy_count_callback(guint64 count,void * ud)406 fuzzy_count_callback (guint64 count, void *ud)
407 {
408 struct rspamd_fuzzy_storage_ctx *ctx = ud;
409
410 ctx->stat.fuzzy_hashes = count;
411 }
412
413 static void
fuzzy_rl_bucket_free(gpointer p)414 fuzzy_rl_bucket_free (gpointer p)
415 {
416 struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *)p;
417
418 rspamd_inet_address_free (elt->addr);
419 g_free (elt);
420 }
421
422 static void
fuzzy_stat_count_callback(guint64 count,void * ud)423 fuzzy_stat_count_callback (guint64 count, void *ud)
424 {
425 struct rspamd_fuzzy_storage_ctx *ctx = ud;
426
427 ev_timer_again (ctx->event_loop, &ctx->stat_ev);
428 ctx->stat.fuzzy_hashes = count;
429 }
430
431 static void
rspamd_fuzzy_stat_callback(EV_P_ ev_timer * w,int revents)432 rspamd_fuzzy_stat_callback (EV_P_ ev_timer *w, int revents)
433 {
434 struct rspamd_fuzzy_storage_ctx *ctx =
435 (struct rspamd_fuzzy_storage_ctx *)w->data;
436 rspamd_fuzzy_backend_count (ctx->backend, fuzzy_stat_count_callback, ctx);
437 }
438
439
440 static void
fuzzy_update_version_callback(guint64 ver,void * ud)441 fuzzy_update_version_callback (guint64 ver, void *ud)
442 {
443 }
444
445 static void
rspamd_fuzzy_updates_cb(gboolean success,guint nadded,guint ndeleted,guint nextended,guint nignored,void * ud)446 rspamd_fuzzy_updates_cb (gboolean success,
447 guint nadded,
448 guint ndeleted,
449 guint nextended,
450 guint nignored,
451 void *ud)
452 {
453 struct rspamd_updates_cbdata *cbdata = ud;
454 struct rspamd_fuzzy_storage_ctx *ctx;
455 const gchar *source;
456
457 ctx = cbdata->ctx;
458 source = cbdata->source;
459
460 if (success) {
461 rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
462
463 msg_info ("successfully updated fuzzy storage %s: %d updates in queue; "
464 "%d pending currently; "
465 "%d added; %d deleted; %d extended; %d duplicates",
466 ctx->worker->cf->bind_conf ?
467 ctx->worker->cf->bind_conf->bind_line :
468 "unknown",
469 cbdata->updates_pending->len,
470 ctx->updates_pending->len,
471 nadded, ndeleted, nextended, nignored);
472 rspamd_fuzzy_backend_version (ctx->backend, source,
473 fuzzy_update_version_callback, NULL);
474 ctx->updates_failed = 0;
475
476 if (cbdata->final || ctx->worker->state != rspamd_worker_state_running) {
477 /* Plan exit */
478 ev_break (ctx->event_loop, EVBREAK_ALL);
479 }
480 }
481 else {
482 if (++ctx->updates_failed > ctx->updates_maxfail) {
483 msg_err ("cannot commit update transaction to fuzzy backend %s, discard "
484 "%ud updates after %d retries",
485 ctx->worker->cf->bind_conf ?
486 ctx->worker->cf->bind_conf->bind_line :
487 "unknown",
488 cbdata->updates_pending->len,
489 ctx->updates_maxfail);
490 ctx->updates_failed = 0;
491
492 if (cbdata->final || ctx->worker->state != rspamd_worker_state_running) {
493 /* Plan exit */
494 ev_break (ctx->event_loop, EVBREAK_ALL);
495 }
496 }
497 else {
498 msg_err ("cannot commit update transaction to fuzzy backend %s; "
499 "%ud updates are still left; %ud currently pending;"
500 " %d updates left",
501 ctx->worker->cf->bind_conf ?
502 ctx->worker->cf->bind_conf->bind_line :
503 "unknown",
504 cbdata->updates_pending->len,
505 ctx->updates_pending->len,
506 ctx->updates_maxfail - ctx->updates_failed);
507 /* Move the remaining updates to ctx queue */
508 g_array_append_vals (ctx->updates_pending,
509 cbdata->updates_pending->data,
510 cbdata->updates_pending->len);
511
512 if (cbdata->final) {
513 /* Try one more time */
514 rspamd_fuzzy_process_updates_queue (cbdata->ctx, cbdata->source,
515 cbdata->final);
516 }
517 }
518 }
519
520 g_array_free (cbdata->updates_pending, TRUE);
521 g_free (cbdata->source);
522 g_free (cbdata);
523 }
524
525 static gboolean
rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx * ctx,const gchar * source,gboolean final)526 rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
527 const gchar *source, gboolean final)
528 {
529
530 struct rspamd_updates_cbdata *cbdata;
531
532 if (ctx->updates_pending->len > 0) {
533 cbdata = g_malloc (sizeof (*cbdata));
534 cbdata->ctx = ctx;
535 cbdata->final = final;
536 cbdata->updates_pending = ctx->updates_pending;
537 ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
538 sizeof (struct fuzzy_peer_cmd),
539 MAX (cbdata->updates_pending->len, 1024));
540 cbdata->source = g_strdup (source);
541 rspamd_fuzzy_backend_process_updates (ctx->backend,
542 cbdata->updates_pending,
543 source, rspamd_fuzzy_updates_cb, cbdata);
544 return TRUE;
545 }
546 else if (final) {
547 /* No need to sync */
548 ev_break (ctx->event_loop, EVBREAK_ALL);
549 }
550
551 return FALSE;
552 }
553
554 static void
rspamd_fuzzy_reply_io(EV_P_ ev_io * w,int revents)555 rspamd_fuzzy_reply_io (EV_P_ ev_io *w, int revents)
556 {
557 struct fuzzy_session *session = (struct fuzzy_session *)w->data;
558
559 ev_io_stop (EV_A_ w);
560 rspamd_fuzzy_write_reply (session);
561 REF_RELEASE (session);
562 }
563
564 static void
rspamd_fuzzy_write_reply(struct fuzzy_session * session)565 rspamd_fuzzy_write_reply (struct fuzzy_session *session)
566 {
567 gssize r;
568 gsize len;
569 gconstpointer data;
570
571 if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
572 session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
573 /* Encrypted reply */
574 data = &session->reply;
575
576 if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
577 len = sizeof (session->reply);
578 }
579 else {
580 len = sizeof (session->reply.hdr) + sizeof (session->reply.rep.v1);
581 }
582 }
583 else {
584 data = &session->reply.rep;
585
586 if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
587 len = sizeof (session->reply.rep);
588 }
589 else {
590 len = sizeof (session->reply.rep.v1);
591 }
592 }
593
594 r = rspamd_inet_address_sendto (session->fd, data, len, 0,
595 session->addr);
596
597 if (r == -1) {
598 if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
599 /* Grab reference to avoid early destruction */
600 REF_RETAIN (session);
601 session->io.data = session;
602 ev_io_init (&session->io,
603 rspamd_fuzzy_reply_io, session->fd, EV_WRITE);
604 ev_io_start (session->ctx->event_loop, &session->io);
605 }
606 else {
607 msg_err ("error while writing reply: %s", strerror (errno));
608 }
609 }
610 }
611
612 static void
rspamd_fuzzy_update_stats(struct rspamd_fuzzy_storage_ctx * ctx,enum rspamd_fuzzy_epoch epoch,gboolean matched,gboolean is_shingle,gboolean is_delayed,struct fuzzy_key_stat * key_stat,struct fuzzy_key_stat * ip_stat,guint cmd,guint reply)613 rspamd_fuzzy_update_stats (struct rspamd_fuzzy_storage_ctx *ctx,
614 enum rspamd_fuzzy_epoch epoch,
615 gboolean matched,
616 gboolean is_shingle,
617 gboolean is_delayed,
618 struct fuzzy_key_stat *key_stat,
619 struct fuzzy_key_stat *ip_stat,
620 guint cmd, guint reply)
621 {
622 ctx->stat.fuzzy_hashes_checked[epoch] ++;
623
624 if (matched) {
625 ctx->stat.fuzzy_hashes_found[epoch]++;
626 }
627 if (is_shingle) {
628 ctx->stat.fuzzy_shingles_checked[epoch]++;
629 }
630 if (is_delayed) {
631 ctx->stat.delayed_hashes ++;
632 }
633
634 if (key_stat) {
635 if (!matched && reply != 0) {
636 key_stat->errors ++;
637 }
638 else {
639 if (cmd == FUZZY_CHECK) {
640 key_stat->checked++;
641
642 if (matched) {
643 key_stat->matched ++;
644 }
645 }
646 else if (cmd == FUZZY_WRITE) {
647 key_stat->added++;
648 }
649 else if (cmd == FUZZY_DEL) {
650 key_stat->deleted++;
651 }
652 }
653 }
654
655 if (ip_stat) {
656 if (!matched && reply != 0) {
657 ip_stat->errors++;
658 }
659 else {
660 if (cmd == FUZZY_CHECK) {
661 ip_stat->checked++;
662
663 if (matched) {
664 ip_stat->matched++;
665 }
666 }
667 else if (cmd == FUZZY_WRITE) {
668 ip_stat->added++;
669 }
670 else if (cmd == FUZZY_DEL) {
671 ip_stat->deleted++;
672 }
673 }
674 }
675 }
676
677 enum rspamd_fuzzy_reply_flags {
678 RSPAMD_FUZZY_REPLY_ENCRYPTED = 0x1u << 0u,
679 RSPAMD_FUZZY_REPLY_SHINGLE = 0x1u << 1u,
680 RSPAMD_FUZZY_REPLY_DELAY = 0x1u << 2u,
681 };
682
683 static void
rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd * cmd,struct rspamd_fuzzy_reply * result,struct fuzzy_session * session,gint flags)684 rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd,
685 struct rspamd_fuzzy_reply *result,
686 struct fuzzy_session *session,
687 gint flags)
688 {
689 gsize len;
690
691 if (cmd) {
692 result->v1.tag = cmd->tag;
693 memcpy (&session->reply.rep, result, sizeof (*result));
694
695 rspamd_fuzzy_update_stats (session->ctx,
696 session->epoch,
697 result->v1.prob > 0.5,
698 flags & RSPAMD_FUZZY_REPLY_SHINGLE,
699 flags & RSPAMD_FUZZY_REPLY_DELAY,
700 session->key_stat,
701 session->ip_stat,
702 cmd->cmd,
703 result->v1.value);
704
705 if (flags & RSPAMD_FUZZY_REPLY_DELAY) {
706 /* Hash is too fresh, need to delay it */
707 session->reply.rep.ts = 0;
708 session->reply.rep.v1.prob = 0.0;
709 session->reply.rep.v1.value = 0;
710 }
711
712 if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) {
713 /* We need also to encrypt reply */
714 ottery_rand_bytes (session->reply.hdr.nonce,
715 sizeof (session->reply.hdr.nonce));
716
717 /*
718 * For old replies we need to encrypt just old part, otherwise
719 * decryption would fail due to mac verification mistake
720 */
721
722 if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
723 len = sizeof (session->reply.rep);
724 }
725 else {
726 len = sizeof (session->reply.rep.v1);
727 }
728
729 rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
730 len,
731 session->reply.hdr.nonce,
732 session->nm,
733 session->reply.hdr.mac,
734 RSPAMD_CRYPTOBOX_MODE_25519);
735 }
736 }
737
738 rspamd_fuzzy_write_reply (session);
739 }
740
741 static gboolean
fuzzy_peer_try_send(gint fd,struct fuzzy_peer_request * up_req)742 fuzzy_peer_try_send (gint fd, struct fuzzy_peer_request *up_req)
743 {
744 gssize r;
745
746 r = write (fd, &up_req->cmd, sizeof (up_req->cmd));
747
748 if (r != sizeof (up_req->cmd)) {
749 return FALSE;
750 }
751
752 return TRUE;
753 }
754
755 static void
fuzzy_peer_send_io(EV_P_ ev_io * w,int revents)756 fuzzy_peer_send_io (EV_P_ ev_io *w, int revents)
757 {
758 struct fuzzy_peer_request *up_req = (struct fuzzy_peer_request *)w->data;
759
760 if (!fuzzy_peer_try_send (w->fd, up_req)) {
761 msg_err ("cannot send update request to the peer: %s", strerror (errno));
762 }
763
764 ev_io_stop (EV_A_ w);
765 g_free (up_req);
766 }
767
768 static void
rspamd_fuzzy_extensions_tolua(lua_State * L,struct fuzzy_session * session)769 rspamd_fuzzy_extensions_tolua (lua_State *L,
770 struct fuzzy_session *session)
771 {
772 struct rspamd_fuzzy_cmd_extension *ext;
773 rspamd_inet_addr_t *addr;
774
775 lua_createtable (L, 0, 0);
776
777 LL_FOREACH (session->extensions, ext) {
778 switch (ext->ext) {
779 case RSPAMD_FUZZY_EXT_SOURCE_DOMAIN:
780 lua_pushlstring (L, ext->payload, ext->length);
781 lua_setfield (L, -2, "domain");
782 break;
783 case RSPAMD_FUZZY_EXT_SOURCE_IP4:
784 addr = rspamd_inet_address_new (AF_INET, ext->payload);
785 rspamd_lua_ip_push (L, addr);
786 rspamd_inet_address_free (addr);
787 lua_setfield (L, -2, "ip");
788 break;
789 case RSPAMD_FUZZY_EXT_SOURCE_IP6:
790 addr = rspamd_inet_address_new (AF_INET6, ext->payload);
791 rspamd_lua_ip_push (L, addr);
792 rspamd_inet_address_free (addr);
793 lua_setfield (L, -2, "ip");
794 break;
795 }
796 }
797 }
798
799 static void
rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply * result,void * ud)800 rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud)
801 {
802 struct fuzzy_session *session = ud;
803 gboolean is_shingle = FALSE, __attribute__ ((unused)) encrypted = FALSE;
804 struct rspamd_fuzzy_cmd *cmd = NULL;
805 const struct rspamd_shingle *shingle = NULL;
806 struct rspamd_shingle sgl_cpy;
807 gint send_flags = 0;
808
809 switch (session->cmd_type) {
810 case CMD_ENCRYPTED_NORMAL:
811 encrypted = TRUE;
812 send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
813 /* Fallthrough */
814 case CMD_NORMAL:
815 cmd = &session->cmd.basic;
816 break;
817
818 case CMD_ENCRYPTED_SHINGLE:
819 encrypted = TRUE;
820 send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
821 /* Fallthrough */
822 case CMD_SHINGLE:
823 cmd = &session->cmd.basic;
824 memcpy (&sgl_cpy, &session->cmd.sgl, sizeof (sgl_cpy));
825 shingle = &sgl_cpy;
826 is_shingle = TRUE;
827 send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
828 break;
829 }
830
831 if (session->ctx->lua_post_handler_cbref != -1) {
832 /* Start lua post handler */
833 lua_State *L = session->ctx->cfg->lua_state;
834 gint err_idx, ret;
835
836 lua_pushcfunction (L, &rspamd_lua_traceback);
837 err_idx = lua_gettop (L);
838 /* Preallocate stack (small opt) */
839 lua_checkstack (L, err_idx + 9);
840 /* function */
841 lua_rawgeti (L, LUA_REGISTRYINDEX, session->ctx->lua_post_handler_cbref);
842 /* client IP */
843 rspamd_lua_ip_push (L, session->addr);
844 /* client command */
845 lua_pushinteger (L, cmd->cmd);
846 /* command value (push as rspamd_text) */
847 (void)lua_new_text (L, result->digest, sizeof (result->digest), FALSE);
848 /* is shingle */
849 lua_pushboolean (L, is_shingle);
850 /* result value */
851 lua_pushinteger (L, result->v1.value);
852 /* result probability */
853 lua_pushnumber (L, result->v1.prob);
854 /* result flag */
855 lua_pushinteger (L, result->v1.flag);
856 /* result timestamp */
857 lua_pushinteger (L, result->ts);
858 /* TODO: add additional data maybe (encryption, pubkey, etc) */
859 rspamd_fuzzy_extensions_tolua (L, session);
860
861 if ((ret = lua_pcall (L, 9, LUA_MULTRET, err_idx)) != 0) {
862 msg_err ("call to lua_post_handler lua "
863 "script failed (%d): %s", ret, lua_tostring (L, -1));
864 }
865 else {
866 /* Return values order:
867 * the first reply will be on err_idx + 1
868 * if it is true, then we need to read the former ones:
869 * 2-nd will be reply code
870 * 3-rd will be probability (or 0.0 if missing)
871 * 4-th value is flag (or default flag if missing)
872 */
873 ret = lua_toboolean (L, err_idx + 1);
874
875 if (ret) {
876 /* Artificial reply */
877 result->v1.value = lua_tointeger (L, err_idx + 2);
878
879 if (lua_isnumber (L, err_idx + 3)) {
880 result->v1.prob = lua_tonumber (L, err_idx + 3);
881 }
882 else {
883 result->v1.prob = 0.0f;
884 }
885
886 if (lua_isnumber (L, err_idx + 4)) {
887 result->v1.flag = lua_tointeger (L, err_idx + 4);
888 }
889
890 lua_settop (L, 0);
891 rspamd_fuzzy_make_reply (cmd, result, session, send_flags);
892 REF_RELEASE (session);
893
894 return;
895 }
896 }
897
898 lua_settop (L, 0);
899 }
900
901 if (!isnan (session->ctx->delay) &&
902 rspamd_match_radix_map_addr (session->ctx->delay_whitelist,
903 session->addr) == NULL) {
904 gdouble hash_age = rspamd_get_calendar_ticks () - result->ts;
905 gdouble jittered_age = rspamd_time_jitter (session->ctx->delay,
906 session->ctx->delay / 2.0);
907
908 if (hash_age < jittered_age) {
909 send_flags |= RSPAMD_FUZZY_REPLY_DELAY;
910 }
911 }
912
913 /* Refresh hash if found with strong confidence */
914 if (result->v1.prob > 0.9 && !session->ctx->read_only) {
915 struct fuzzy_peer_cmd up_cmd;
916 struct fuzzy_peer_request *up_req;
917
918 if (session->worker->index == 0) {
919 /* Just add to the queue */
920 memset (&up_cmd, 0, sizeof (up_cmd));
921 up_cmd.is_shingle = is_shingle;
922 memcpy (up_cmd.cmd.normal.digest, result->digest,
923 sizeof (up_cmd.cmd.normal.digest));
924 up_cmd.cmd.normal.flag = result->v1.flag;
925 up_cmd.cmd.normal.cmd = FUZZY_REFRESH;
926 up_cmd.cmd.normal.shingles_count = cmd->shingles_count;
927
928 if (is_shingle && shingle) {
929 memcpy (&up_cmd.cmd.shingle.sgl, shingle,
930 sizeof (up_cmd.cmd.shingle.sgl));
931 }
932
933 g_array_append_val (session->ctx->updates_pending, up_cmd);
934 }
935 else {
936 /* We need to send request to the peer */
937 up_req = g_malloc0 (sizeof (*up_req));
938 up_req->cmd.is_shingle = is_shingle;
939
940 memcpy (up_req->cmd.cmd.normal.digest, result->digest,
941 sizeof (up_req->cmd.cmd.normal.digest));
942 up_req->cmd.cmd.normal.flag = result->v1.flag;
943 up_req->cmd.cmd.normal.cmd = FUZZY_REFRESH;
944 up_req->cmd.cmd.normal.shingles_count = cmd->shingles_count;
945
946 if (is_shingle && shingle) {
947 memcpy (&up_req->cmd.cmd.shingle.sgl, shingle,
948 sizeof (up_req->cmd.cmd.shingle.sgl));
949 }
950
951 if (!fuzzy_peer_try_send (session->ctx->peer_fd, up_req)) {
952 up_req->io_ev.data = up_req;
953 ev_io_init (&up_req->io_ev, fuzzy_peer_send_io,
954 session->ctx->peer_fd, EV_WRITE);
955 ev_io_start (session->ctx->event_loop, &up_req->io_ev);
956 }
957 else {
958 g_free (up_req);
959 }
960 }
961 }
962
963 rspamd_fuzzy_make_reply (cmd, result, session, send_flags);
964
965 REF_RELEASE (session);
966 }
967
968 static void
rspamd_fuzzy_process_command(struct fuzzy_session * session)969 rspamd_fuzzy_process_command (struct fuzzy_session *session)
970 {
971 gboolean is_shingle = FALSE, __attribute__ ((unused)) encrypted = FALSE;
972 struct rspamd_fuzzy_cmd *cmd = NULL;
973 struct rspamd_fuzzy_reply result;
974 struct fuzzy_peer_cmd up_cmd;
975 struct fuzzy_peer_request *up_req;
976 struct fuzzy_key_stat *ip_stat = NULL;
977 gchar hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
978 rspamd_inet_addr_t *naddr;
979 gpointer ptr;
980 gsize up_len = 0;
981 gint send_flags = 0;
982
983 cmd = &session->cmd.basic;
984
985 switch (session->cmd_type) {
986 case CMD_NORMAL:
987 up_len = sizeof (session->cmd.basic);
988 break;
989 case CMD_SHINGLE:
990 up_len = sizeof (session->cmd);
991 is_shingle = TRUE;
992 send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
993 break;
994 case CMD_ENCRYPTED_NORMAL:
995 up_len = sizeof (session->cmd.basic);
996 encrypted = TRUE;
997 send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
998 break;
999 case CMD_ENCRYPTED_SHINGLE:
1000 up_len = sizeof (session->cmd);
1001 encrypted = TRUE;
1002 is_shingle = TRUE;
1003 send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE|RSPAMD_FUZZY_REPLY_ENCRYPTED;
1004 break;
1005 default:
1006 msg_err ("invalid command type: %d", session->cmd_type);
1007 return;
1008 }
1009
1010 memset (&result, 0, sizeof (result));
1011 memcpy (result.digest, cmd->digest, sizeof (result.digest));
1012 result.v1.flag = cmd->flag;
1013 result.v1.tag = cmd->tag;
1014
1015 if (session->ctx->lua_pre_handler_cbref != -1) {
1016 /* Start lua pre handler */
1017 lua_State *L = session->ctx->cfg->lua_state;
1018 gint err_idx, ret;
1019
1020 lua_pushcfunction (L, &rspamd_lua_traceback);
1021 err_idx = lua_gettop (L);
1022 /* Preallocate stack (small opt) */
1023 lua_checkstack (L, err_idx + 5);
1024 /* function */
1025 lua_rawgeti (L, LUA_REGISTRYINDEX, session->ctx->lua_pre_handler_cbref);
1026 /* client IP */
1027 rspamd_lua_ip_push (L, session->addr);
1028 /* client command */
1029 lua_pushinteger (L, cmd->cmd);
1030 /* command value (push as rspamd_text) */
1031 (void)lua_new_text (L, cmd->digest, sizeof (cmd->digest), FALSE);
1032 /* is shingle */
1033 lua_pushboolean (L, is_shingle);
1034 /* TODO: add additional data maybe (encryption, pubkey, etc) */
1035 rspamd_fuzzy_extensions_tolua (L, session);
1036
1037 if ((ret = lua_pcall (L, 5, LUA_MULTRET, err_idx)) != 0) {
1038 msg_err ("call to lua_pre_handler lua "
1039 "script failed (%d): %s", ret, lua_tostring (L, -1));
1040 }
1041 else {
1042 /* Return values order:
1043 * the first reply will be on err_idx + 1
1044 * if it is true, then we need to read the former ones:
1045 * 2-nd will be reply code
1046 * 3-rd will be probability (or 0.0 if missing)
1047 */
1048 ret = lua_toboolean (L, err_idx + 1);
1049
1050 if (ret) {
1051 /* Artificial reply */
1052 result.v1.value = lua_tointeger (L, err_idx + 2);
1053
1054 if (lua_isnumber (L, err_idx + 3)) {
1055 result.v1.prob = lua_tonumber (L, err_idx + 3);
1056 }
1057 else {
1058 result.v1.prob = 0.0f;
1059 }
1060
1061 lua_settop (L, 0);
1062 rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1063
1064 return;
1065 }
1066 }
1067
1068 lua_settop (L, 0);
1069 }
1070
1071
1072 if (G_UNLIKELY (cmd == NULL || up_len == 0)) {
1073 result.v1.value = 500;
1074 result.v1.prob = 0.0f;
1075 rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1076 return;
1077 }
1078
1079 if (session->ctx->encrypted_only && !encrypted) {
1080 /* Do not accept unencrypted commands */
1081 result.v1.value = 403;
1082 result.v1.prob = 0.0f;
1083 rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1084 return;
1085 }
1086
1087 if (session->key_stat) {
1088 ip_stat = rspamd_lru_hash_lookup (session->key_stat->last_ips,
1089 session->addr, -1);
1090
1091 if (ip_stat == NULL) {
1092 naddr = rspamd_inet_address_copy (session->addr);
1093 ip_stat = g_malloc0 (sizeof (*ip_stat));
1094 REF_INIT_RETAIN (ip_stat, fuzzy_key_stat_dtor);
1095 rspamd_lru_hash_insert (session->key_stat->last_ips,
1096 naddr, ip_stat, -1, 0);
1097 }
1098
1099 REF_RETAIN (ip_stat);
1100 session->ip_stat = ip_stat;
1101 }
1102
1103 if (cmd->cmd == FUZZY_CHECK) {
1104 bool can_continue = true;
1105
1106 if (session->ctx->ratelimit_buckets) {
1107 if (session->ctx->ratelimit_log_only) {
1108 (void)rspamd_fuzzy_check_ratelimit (session); /* Check but ignore */
1109 }
1110 else {
1111 can_continue = rspamd_fuzzy_check_ratelimit (session);
1112 }
1113 }
1114
1115 if (can_continue) {
1116 REF_RETAIN (session);
1117 rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
1118 rspamd_fuzzy_check_callback, session);
1119 }
1120 else {
1121 result.v1.value = 403;
1122 result.v1.prob = 0.0f;
1123 result.v1.flag = 0;
1124 rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1125 }
1126 }
1127 else if (cmd->cmd == FUZZY_STAT) {
1128 result.v1.prob = 1.0f;
1129 result.v1.value = 0;
1130 result.v1.flag = session->ctx->stat.fuzzy_hashes;
1131 rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1132 }
1133 else {
1134 if (rspamd_fuzzy_check_write (session)) {
1135 /* Check whitelist */
1136 if (session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
1137 rspamd_encode_hex_buf (cmd->digest, sizeof (cmd->digest),
1138 hexbuf, sizeof (hexbuf) - 1);
1139 hexbuf[sizeof (hexbuf) - 1] = '\0';
1140
1141 if (rspamd_match_hash_map (session->ctx->skip_hashes,
1142 hexbuf, sizeof (hexbuf) - 1)) {
1143 result.v1.value = 401;
1144 result.v1.prob = 0.0f;
1145
1146 goto reply;
1147 }
1148 }
1149
1150 if (session->worker->index == 0 || session->ctx->peer_fd == -1) {
1151 /* Just add to the queue */
1152 up_cmd.is_shingle = is_shingle;
1153 ptr = is_shingle ?
1154 (gpointer)&up_cmd.cmd.shingle :
1155 (gpointer)&up_cmd.cmd.normal;
1156 memcpy (ptr, cmd, up_len);
1157 g_array_append_val (session->ctx->updates_pending, up_cmd);
1158 }
1159 else {
1160 /* We need to send request to the peer */
1161 up_req = g_malloc0 (sizeof (*up_req));
1162 up_req->cmd.is_shingle = is_shingle;
1163 ptr = is_shingle ?
1164 (gpointer)&up_req->cmd.cmd.shingle :
1165 (gpointer)&up_req->cmd.cmd.normal;
1166 memcpy (ptr, cmd, up_len);
1167
1168 if (!fuzzy_peer_try_send (session->ctx->peer_fd, up_req)) {
1169 up_req->io_ev.data = up_req;
1170 ev_io_init (&up_req->io_ev, fuzzy_peer_send_io,
1171 session->ctx->peer_fd, EV_WRITE);
1172 ev_io_start (session->ctx->event_loop, &up_req->io_ev);
1173 }
1174 else {
1175 g_free (up_req);
1176 }
1177 }
1178
1179 result.v1.value = 0;
1180 result.v1.prob = 1.0f;
1181 }
1182 else {
1183 result.v1.value = 403;
1184 result.v1.prob = 0.0f;
1185 }
1186 reply:
1187 rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1188 }
1189 }
1190
1191
1192 static enum rspamd_fuzzy_epoch
rspamd_fuzzy_command_valid(struct rspamd_fuzzy_cmd * cmd,gint r)1193 rspamd_fuzzy_command_valid (struct rspamd_fuzzy_cmd *cmd, gint r)
1194 {
1195 enum rspamd_fuzzy_epoch ret = RSPAMD_FUZZY_EPOCH_MAX;
1196
1197 switch (cmd->version) {
1198 case 4:
1199 if (cmd->shingles_count > 0) {
1200 if (r >= sizeof (struct rspamd_fuzzy_shingle_cmd)) {
1201 ret = RSPAMD_FUZZY_EPOCH11;
1202 }
1203 }
1204 else {
1205 if (r >= sizeof (*cmd)) {
1206 ret = RSPAMD_FUZZY_EPOCH11;
1207 }
1208 }
1209 break;
1210 case 3:
1211 if (cmd->shingles_count > 0) {
1212 if (r == sizeof (struct rspamd_fuzzy_shingle_cmd)) {
1213 ret = RSPAMD_FUZZY_EPOCH10;
1214 }
1215 }
1216 else {
1217 if (r == sizeof (*cmd)) {
1218 ret = RSPAMD_FUZZY_EPOCH10;
1219 }
1220 }
1221 break;
1222 case 2:
1223 /*
1224 * rspamd 0.8 has slightly different tokenizer then it might be not
1225 * 100% compatible
1226 */
1227 if (cmd->shingles_count > 0) {
1228 if (r == sizeof (struct rspamd_fuzzy_shingle_cmd)) {
1229 ret = RSPAMD_FUZZY_EPOCH8;
1230 }
1231 }
1232 else {
1233 ret = RSPAMD_FUZZY_EPOCH8;
1234 }
1235 break;
1236 default:
1237 break;
1238 }
1239
1240 return ret;
1241 }
1242
1243 static gboolean
rspamd_fuzzy_decrypt_command(struct fuzzy_session * s,guchar * buf,gsize buflen)1244 rspamd_fuzzy_decrypt_command (struct fuzzy_session *s, guchar *buf, gsize buflen)
1245 {
1246 struct rspamd_fuzzy_encrypted_req_hdr hdr;
1247 struct rspamd_cryptobox_pubkey *rk;
1248 struct fuzzy_key *key;
1249
1250 if (s->ctx->default_key == NULL) {
1251 msg_warn ("received encrypted request when encryption is not enabled");
1252 return FALSE;
1253 }
1254
1255 if (buflen < sizeof (hdr)) {
1256 msg_warn ("XXX: should not be reached");
1257 return FALSE;
1258 }
1259
1260 memcpy (&hdr, buf, sizeof (hdr));
1261 buf += sizeof (hdr);
1262 buflen -= sizeof (hdr);
1263
1264 /* Try to find the desired key */
1265 key = g_hash_table_lookup (s->ctx->keys, hdr.key_id);
1266
1267 if (key == NULL) {
1268 /* Unknown key, assume default one */
1269 key = s->ctx->default_key;
1270 }
1271
1272 s->key_stat = key->stat;
1273
1274 /* Now process keypair */
1275 rk = rspamd_pubkey_from_bin (hdr.pubkey, sizeof (hdr.pubkey),
1276 RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519);
1277
1278 if (rk == NULL) {
1279 msg_err ("bad key; ip=%s",
1280 rspamd_inet_address_to_string (s->addr));
1281 return FALSE;
1282 }
1283
1284 rspamd_keypair_cache_process (s->ctx->keypair_cache, key->key, rk);
1285
1286 /* Now decrypt request */
1287 if (!rspamd_cryptobox_decrypt_nm_inplace (buf, buflen, hdr.nonce,
1288 rspamd_pubkey_get_nm (rk, key->key),
1289 hdr.mac, RSPAMD_CRYPTOBOX_MODE_25519)) {
1290 msg_err ("decryption failed; ip=%s",
1291 rspamd_inet_address_to_string (s->addr));
1292 rspamd_pubkey_unref (rk);
1293
1294 return FALSE;
1295 }
1296
1297 memcpy (s->nm, rspamd_pubkey_get_nm (rk, key->key), sizeof (s->nm));
1298 rspamd_pubkey_unref (rk);
1299
1300 return TRUE;
1301 }
1302
1303
1304 static gboolean
rspamd_fuzzy_extensions_from_wire(struct fuzzy_session * s,guchar * buf,gsize buflen)1305 rspamd_fuzzy_extensions_from_wire (struct fuzzy_session *s, guchar *buf, gsize buflen)
1306 {
1307 struct rspamd_fuzzy_cmd_extension *ext, *prev_ext;
1308 guchar *storage, *p = buf, *end = buf + buflen;
1309 gsize st_len = 0, n_ext = 0;
1310
1311 /* Read number of extensions to allocate array */
1312 while (p < end) {
1313 guchar cmd = *p++;
1314
1315 if (p < end) {
1316 if (cmd == RSPAMD_FUZZY_EXT_SOURCE_DOMAIN) {
1317 /* Next byte is buf length */
1318 guchar dom_len = *p++;
1319
1320 if (dom_len <= (end - p)) {
1321 st_len += dom_len;
1322 n_ext ++;
1323 }
1324 else {
1325 /* Truncation */
1326 return FALSE;
1327 }
1328
1329 p += dom_len;
1330 }
1331 else if (cmd == RSPAMD_FUZZY_EXT_SOURCE_IP4) {
1332 if (end - p >= sizeof (in_addr_t)) {
1333 n_ext ++;
1334 st_len += sizeof (in_addr_t);
1335 }
1336 else {
1337 /* Truncation */
1338 return FALSE;
1339 }
1340
1341 p += sizeof (in_addr_t);
1342 }
1343 else if (cmd == RSPAMD_FUZZY_EXT_SOURCE_IP6) {
1344 if (end - p >= sizeof (struct in6_addr)) {
1345 n_ext ++;
1346 st_len += sizeof (struct in6_addr);
1347 }
1348 else {
1349 /* Truncation */
1350 return FALSE;
1351 }
1352
1353 p += sizeof (struct in6_addr);
1354 }
1355 else {
1356 /* Invalid command */
1357 return FALSE;
1358 }
1359 }
1360 else {
1361 /* Truncated extension */
1362 return FALSE;
1363 }
1364 }
1365
1366 if (n_ext > 0) {
1367 p = buf;
1368 /*
1369 * Memory layout: n_ext of struct rspamd_fuzzy_cmd_extension
1370 * payload for each extension in a continious data segment
1371 */
1372 storage = g_malloc (n_ext * sizeof (struct rspamd_fuzzy_cmd_extension) +
1373 st_len);
1374
1375 guchar *data_buf = storage +
1376 n_ext * sizeof (struct rspamd_fuzzy_cmd_extension);
1377 ext = (struct rspamd_fuzzy_cmd_extension *)storage;
1378
1379 /* All validation has been done, so we can just go further */
1380 while (p < end) {
1381 prev_ext = ext;
1382 guchar cmd = *p++;
1383
1384 if (cmd == RSPAMD_FUZZY_EXT_SOURCE_DOMAIN) {
1385 /* Next byte is buf length */
1386 guchar dom_len = *p++;
1387 guchar *dest = data_buf;
1388
1389 ext->ext = RSPAMD_FUZZY_EXT_SOURCE_DOMAIN;
1390 ext->next = ext + 1;
1391 ext->length = dom_len;
1392 ext->payload = dest;
1393 memcpy (dest, p, dom_len);
1394 p += dom_len;
1395 data_buf += dom_len;
1396 ext = ext->next;
1397 }
1398 else if (cmd == RSPAMD_FUZZY_EXT_SOURCE_IP4) {
1399 guchar *dest = data_buf;
1400
1401 ext->ext = RSPAMD_FUZZY_EXT_SOURCE_IP4;
1402 ext->next = ext + 1;
1403 ext->length = sizeof (in_addr_t);
1404 ext->payload = dest;
1405 memcpy (dest, p, sizeof (in_addr_t));
1406 p += sizeof (in_addr_t);
1407 data_buf += sizeof (in_addr_t);
1408 ext = ext->next;
1409 }
1410 else if (cmd == RSPAMD_FUZZY_EXT_SOURCE_IP6) {
1411 guchar *dest = data_buf;
1412
1413 ext->ext = RSPAMD_FUZZY_EXT_SOURCE_IP6;
1414 ext->next = ext + 1;
1415 ext->length = sizeof (struct in6_addr);
1416 ext->payload = dest;
1417 memcpy (dest, p, sizeof (struct in6_addr));
1418 p += sizeof (struct in6_addr);
1419 data_buf += sizeof (struct in6_addr);
1420 ext = ext->next;
1421 }
1422 else {
1423 g_assert_not_reached ();
1424 }
1425 }
1426
1427 /* Last next should be NULL */
1428 prev_ext->next = NULL;
1429
1430 /* Rewind to the begin */
1431 ext = (struct rspamd_fuzzy_cmd_extension *)storage;
1432 s->extensions = ext;
1433 }
1434
1435 return TRUE;
1436 }
1437
1438 static gboolean
rspamd_fuzzy_cmd_from_wire(guchar * buf,guint buflen,struct fuzzy_session * s)1439 rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)
1440 {
1441 enum rspamd_fuzzy_epoch epoch;
1442 gboolean encrypted = FALSE;
1443
1444 if (buflen < sizeof (struct rspamd_fuzzy_cmd)) {
1445 msg_debug ("truncated fuzzy command of size %d received", buflen);
1446 return FALSE;
1447 }
1448
1449 /* Now check encryption */
1450
1451 if (buflen >= sizeof (struct rspamd_fuzzy_encrypted_cmd)) {
1452 if (memcmp (buf, fuzzy_encrypted_magic, sizeof (fuzzy_encrypted_magic)) == 0) {
1453 /* Encrypted command */
1454 encrypted = TRUE;
1455 }
1456 }
1457
1458 if (encrypted) {
1459 /* Decrypt first */
1460 if (!rspamd_fuzzy_decrypt_command (s, buf, buflen)) {
1461 return FALSE;
1462 }
1463 else {
1464 /*
1465 * Advance buffer to skip encrypted header.
1466 * Note that after rspamd_fuzzy_decrypt_command buf is unencrypted
1467 */
1468 buf += sizeof (struct rspamd_fuzzy_encrypted_req_hdr);
1469 buflen -= sizeof (struct rspamd_fuzzy_encrypted_req_hdr);
1470 }
1471 }
1472
1473 /* Fill the normal command */
1474 if (buflen < sizeof (s->cmd.basic)) {
1475 msg_debug ("truncated normal fuzzy command of size %d received", buflen);
1476 return FALSE;
1477 }
1478
1479 memcpy (&s->cmd.basic, buf, sizeof (s->cmd.basic));
1480 epoch = rspamd_fuzzy_command_valid (&s->cmd.basic, buflen);
1481
1482 if (epoch == RSPAMD_FUZZY_EPOCH_MAX) {
1483 msg_debug ("invalid fuzzy command of size %d received", buflen);
1484 return FALSE;
1485 }
1486
1487 s->epoch = epoch;
1488
1489 /* Advance buf */
1490 buf += sizeof (s->cmd.basic);
1491 buflen -= sizeof (s->cmd.basic);
1492
1493 if (s->cmd.basic.shingles_count > 0) {
1494 if (buflen >= sizeof (s->cmd.sgl)) {
1495 /* Copy the shingles part */
1496 memcpy (&s->cmd.sgl, buf, sizeof (s->cmd.sgl));
1497 }
1498 else {
1499 /* Truncated stuff */
1500 msg_debug ("truncated fuzzy shingles command of size %d received", buflen);
1501 return FALSE;
1502 }
1503
1504 buf += sizeof (s->cmd.sgl);
1505 buflen -= sizeof (s->cmd.sgl);
1506
1507 if (encrypted) {
1508 s->cmd_type = CMD_ENCRYPTED_SHINGLE;
1509 }
1510 else {
1511 s->cmd_type = CMD_SHINGLE;
1512 }
1513 }
1514 else {
1515 if (encrypted) {
1516 s->cmd_type = CMD_ENCRYPTED_NORMAL;
1517 }
1518 else {
1519 s->cmd_type = CMD_NORMAL;
1520 }
1521 }
1522
1523 if (buflen > 0) {
1524 /* Process possible extensions */
1525 if (!rspamd_fuzzy_extensions_from_wire (s, buf, buflen)) {
1526 msg_debug ("truncated fuzzy shingles command of size %d received", buflen);
1527 return FALSE;
1528 }
1529 }
1530
1531 return TRUE;
1532 }
1533
1534
1535 static void
fuzzy_session_destroy(gpointer d)1536 fuzzy_session_destroy (gpointer d)
1537 {
1538 struct fuzzy_session *session = d;
1539
1540 rspamd_inet_address_free (session->addr);
1541 rspamd_explicit_memzero (session->nm, sizeof (session->nm));
1542 session->worker->nconns--;
1543
1544 if (session->ip_stat) {
1545 REF_RELEASE (session->ip_stat);
1546 }
1547
1548 if (session->extensions) {
1549 g_free (session->extensions);
1550 }
1551
1552 g_free (session);
1553 }
1554
1555 #define FUZZY_INPUT_BUFLEN 1024
1556 #ifdef HAVE_RECVMMSG
1557 #define MSGVEC_LEN 16
1558 #else
1559 #define MSGVEC_LEN 1
1560 #endif
1561
1562 /*
1563 * Accept new connection and construct task
1564 */
1565 static void
accept_fuzzy_socket(EV_P_ ev_io * w,int revents)1566 accept_fuzzy_socket (EV_P_ ev_io *w, int revents)
1567 {
1568 struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
1569 struct fuzzy_session *session;
1570 gssize r, msg_len;
1571 guint64 *nerrors;
1572 struct iovec iovs[MSGVEC_LEN];
1573 guint8 bufs[MSGVEC_LEN][FUZZY_INPUT_BUFLEN];
1574 struct sockaddr_storage peer_sa[MSGVEC_LEN];
1575 socklen_t salen = sizeof (peer_sa[0]);
1576 #ifdef HAVE_RECVMMSG
1577 #define MSG_FIELD(msg, field) msg.msg_hdr.field
1578 struct mmsghdr msg[MSGVEC_LEN];
1579 #else
1580 #define MSG_FIELD(msg, field) msg.field
1581 struct msghdr msg[MSGVEC_LEN];
1582 #endif
1583
1584 memset (msg, 0, sizeof (*msg) * MSGVEC_LEN);
1585
1586 /* Prepare messages to receive */
1587 for (int i = 0; i < MSGVEC_LEN; i ++) {
1588 /* Prepare msghdr structs */
1589 iovs[i].iov_base = bufs[i];
1590 iovs[i].iov_len = sizeof (bufs[i]);
1591 MSG_FIELD(msg[i], msg_name) = (void *)&peer_sa[i];
1592 MSG_FIELD(msg[i], msg_namelen) = salen;
1593 MSG_FIELD(msg[i], msg_iov) = &iovs[i];
1594 MSG_FIELD(msg[i], msg_iovlen) = 1;
1595 }
1596
1597 /* Got some data */
1598 if (revents == EV_READ) {
1599
1600 for (;;) {
1601 #ifdef HAVE_RECVMMSG
1602 r = recvmmsg (w->fd, msg, MSGVEC_LEN, 0, NULL);
1603 #else
1604 r = recvmsg (w->fd, msg, 0);
1605 #endif
1606
1607 if (r == -1) {
1608 if (errno == EINTR) {
1609 continue;
1610 }
1611 else if (errno == EAGAIN || errno == EWOULDBLOCK) {
1612
1613 return;
1614 }
1615
1616 msg_err ("got error while reading from socket: %d, %s",
1617 errno,
1618 strerror (errno));
1619 return;
1620 }
1621
1622 #ifndef HAVE_RECVMMSG
1623 msg_len = r; /* Save real length in bytes here */
1624 r = 1; /* Assume that we have received a single message */
1625 #endif
1626
1627 for (int i = 0; i < r; i ++) {
1628 rspamd_inet_addr_t *client_addr;
1629
1630 client_addr = rspamd_inet_address_from_sa (MSG_FIELD(msg[i], msg_name),
1631 MSG_FIELD(msg[i], msg_namelen));
1632
1633 if (!rspamd_fuzzy_check_client (worker->ctx, client_addr)) {
1634 /* Disallow forbidden clients silently */
1635 rspamd_inet_address_free (client_addr);
1636 continue;
1637 }
1638
1639 session = g_malloc0 (sizeof (*session));
1640 REF_INIT_RETAIN (session, fuzzy_session_destroy);
1641 session->worker = worker;
1642 session->fd = w->fd;
1643 session->ctx = worker->ctx;
1644 session->time = (guint64) time (NULL);
1645 session->addr = client_addr;
1646 worker->nconns++;
1647
1648 /* Each message can have its length in case of recvmmsg */
1649 #ifdef HAVE_RECVMMSG
1650 msg_len = msg[i].msg_len;
1651 #endif
1652
1653 if (rspamd_fuzzy_cmd_from_wire (iovs[i].iov_base,
1654 msg_len, session)) {
1655 /* Check shingles count sanity */
1656 rspamd_fuzzy_process_command (session);
1657 }
1658 else {
1659 /* Discard input */
1660 session->ctx->stat.invalid_requests ++;
1661 msg_debug ("invalid fuzzy command of size %z received", r);
1662
1663 nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips,
1664 session->addr, -1);
1665
1666 if (nerrors == NULL) {
1667 nerrors = g_malloc (sizeof (*nerrors));
1668 *nerrors = 1;
1669 rspamd_lru_hash_insert (session->ctx->errors_ips,
1670 rspamd_inet_address_copy (session->addr),
1671 nerrors, -1, -1);
1672 }
1673 else {
1674 *nerrors = *nerrors + 1;
1675 }
1676 }
1677
1678 REF_RELEASE (session);
1679 }
1680 #ifdef HAVE_RECVMMSG
1681 /* Stop reading as we are using recvmmsg instead of recvmsg */
1682 break;
1683 #endif
1684 }
1685 }
1686 }
1687
1688 static gboolean
rspamd_fuzzy_storage_periodic_callback(void * ud)1689 rspamd_fuzzy_storage_periodic_callback (void *ud)
1690 {
1691 struct rspamd_fuzzy_storage_ctx *ctx = ud;
1692
1693 if (ctx->updates_pending->len > 0) {
1694 rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
1695
1696 return TRUE;
1697 }
1698
1699 return FALSE;
1700 }
1701
1702 static gboolean
rspamd_fuzzy_storage_sync(struct rspamd_main * rspamd_main,struct rspamd_worker * worker,gint fd,gint attached_fd,struct rspamd_control_command * cmd,gpointer ud)1703 rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
1704 struct rspamd_worker *worker, gint fd,
1705 gint attached_fd,
1706 struct rspamd_control_command *cmd,
1707 gpointer ud)
1708 {
1709 struct rspamd_fuzzy_storage_ctx *ctx = ud;
1710 struct rspamd_control_reply rep;
1711
1712 rep.reply.fuzzy_sync.status = 0;
1713 rep.type = RSPAMD_CONTROL_FUZZY_SYNC;
1714
1715 if (ctx->backend && worker->index == 0) {
1716 rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
1717 rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
1718 rspamd_fuzzy_storage_periodic_callback, ctx);
1719 }
1720
1721 if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
1722 msg_err ("cannot write reply to the control socket: %s",
1723 strerror (errno));
1724 }
1725
1726 return TRUE;
1727 }
1728
1729 static gboolean
rspamd_fuzzy_storage_reload(struct rspamd_main * rspamd_main,struct rspamd_worker * worker,gint fd,gint attached_fd,struct rspamd_control_command * cmd,gpointer ud)1730 rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
1731 struct rspamd_worker *worker, gint fd,
1732 gint attached_fd,
1733 struct rspamd_control_command *cmd,
1734 gpointer ud)
1735 {
1736 struct rspamd_fuzzy_storage_ctx *ctx = ud;
1737 GError *err = NULL;
1738 struct rspamd_control_reply rep;
1739
1740 msg_info ("reloading fuzzy storage after receiving reload command");
1741
1742 if (ctx->backend) {
1743 /* Close backend and reopen it one more time */
1744 rspamd_fuzzy_backend_close (ctx->backend);
1745 }
1746
1747 memset (&rep, 0, sizeof (rep));
1748 rep.type = RSPAMD_CONTROL_RELOAD;
1749
1750 if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->event_loop,
1751 worker->cf->options, rspamd_main->cfg,
1752 &err)) == NULL) {
1753 msg_err ("cannot open backend after reload: %e", err);
1754 rep.reply.reload.status = err->code;
1755 g_error_free (err);
1756 }
1757 else {
1758 rep.reply.reload.status = 0;
1759 }
1760
1761 if (ctx->backend && worker->index == 0) {
1762 rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
1763 rspamd_fuzzy_storage_periodic_callback, ctx);
1764 }
1765
1766 if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
1767 msg_err ("cannot write reply to the control socket: %s",
1768 strerror (errno));
1769 }
1770
1771 return TRUE;
1772 }
1773
1774 static ucl_object_t *
rspamd_fuzzy_storage_stat_key(struct fuzzy_key_stat * key_stat)1775 rspamd_fuzzy_storage_stat_key (struct fuzzy_key_stat *key_stat)
1776 {
1777 ucl_object_t *res;
1778
1779 res = ucl_object_typed_new (UCL_OBJECT);
1780
1781 ucl_object_insert_key (res, ucl_object_fromint (key_stat->checked),
1782 "checked", 0, false);
1783 ucl_object_insert_key (res, ucl_object_fromint (key_stat->matched),
1784 "matched", 0, false);
1785 ucl_object_insert_key (res, ucl_object_fromint (key_stat->added),
1786 "added", 0, false);
1787 ucl_object_insert_key (res, ucl_object_fromint (key_stat->deleted),
1788 "deleted", 0, false);
1789 ucl_object_insert_key (res, ucl_object_fromint (key_stat->errors),
1790 "errors", 0, false);
1791
1792 return res;
1793 }
1794
1795 static ucl_object_t *
rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx * ctx,gboolean ip_stat)1796 rspamd_fuzzy_stat_to_ucl (struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat)
1797 {
1798 struct fuzzy_key_stat *key_stat;
1799 GHashTableIter it;
1800 struct fuzzy_key *key;
1801 ucl_object_t *obj, *keys_obj, *elt, *ip_elt, *ip_cur;
1802 gpointer k, v;
1803 gint i;
1804 gchar keyname[17];
1805
1806 obj = ucl_object_typed_new (UCL_OBJECT);
1807
1808 keys_obj = ucl_object_typed_new (UCL_OBJECT);
1809 g_hash_table_iter_init (&it, ctx->keys);
1810
1811 while (g_hash_table_iter_next (&it, &k, &v)) {
1812 key = v;
1813 key_stat = key->stat;
1814
1815 if (key_stat) {
1816 rspamd_snprintf (keyname, sizeof (keyname), "%8bs", k);
1817
1818 elt = rspamd_fuzzy_storage_stat_key (key_stat);
1819
1820 if (key_stat->last_ips && ip_stat) {
1821 i = 0;
1822
1823 ip_elt = ucl_object_typed_new (UCL_OBJECT);
1824
1825 while ((i = rspamd_lru_hash_foreach (key_stat->last_ips,
1826 i, &k, &v)) != -1) {
1827 ip_cur = rspamd_fuzzy_storage_stat_key (v);
1828 ucl_object_insert_key (ip_elt, ip_cur,
1829 rspamd_inet_address_to_string (k), 0, true);
1830 }
1831
1832 ucl_object_insert_key (elt, ip_elt, "ips", 0, false);
1833 }
1834
1835 ucl_object_insert_key (keys_obj, elt, keyname, 0, true);
1836 }
1837 }
1838
1839 ucl_object_insert_key (obj, keys_obj, "keys", 0, false);
1840
1841 /* Now generic stats */
1842 ucl_object_insert_key (obj,
1843 ucl_object_fromint (ctx->stat.fuzzy_hashes),
1844 "fuzzy_stored",
1845 0,
1846 false);
1847 ucl_object_insert_key (obj,
1848 ucl_object_fromint (ctx->stat.fuzzy_hashes_expired),
1849 "fuzzy_expired",
1850 0,
1851 false);
1852 ucl_object_insert_key (obj,
1853 ucl_object_fromint (ctx->stat.invalid_requests),
1854 "invalid_requests",
1855 0,
1856 false);
1857 ucl_object_insert_key (obj,
1858 ucl_object_fromint (ctx->stat.delayed_hashes),
1859 "delayed_hashes",
1860 0,
1861 false);
1862
1863 if (ctx->errors_ips && ip_stat) {
1864 i = 0;
1865
1866 ip_elt = ucl_object_typed_new (UCL_OBJECT);
1867
1868 while ((i = rspamd_lru_hash_foreach (ctx->errors_ips, i, &k, &v)) != -1) {
1869 ucl_object_insert_key (ip_elt,
1870 ucl_object_fromint (*(guint64 *)v),
1871 rspamd_inet_address_to_string (k), 0, true);
1872 }
1873
1874 ucl_object_insert_key (obj,
1875 ip_elt,
1876 "errors_ips",
1877 0,
1878 false);
1879 }
1880
1881 /* Checked by epoch */
1882 elt = ucl_object_typed_new (UCL_ARRAY);
1883
1884 for (i = RSPAMD_FUZZY_EPOCH6; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
1885 ucl_array_append (elt,
1886 ucl_object_fromint (ctx->stat.fuzzy_hashes_checked[i]));
1887 }
1888
1889 ucl_object_insert_key (obj, elt, "fuzzy_checked", 0, false);
1890
1891 /* Shingles by epoch */
1892 elt = ucl_object_typed_new (UCL_ARRAY);
1893
1894 for (i = RSPAMD_FUZZY_EPOCH6; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
1895 ucl_array_append (elt,
1896 ucl_object_fromint (ctx->stat.fuzzy_shingles_checked[i]));
1897 }
1898
1899 ucl_object_insert_key (obj, elt, "fuzzy_shingles", 0, false);
1900
1901 /* Matched by epoch */
1902 elt = ucl_object_typed_new (UCL_ARRAY);
1903
1904 for (i = RSPAMD_FUZZY_EPOCH6; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
1905 ucl_array_append (elt,
1906 ucl_object_fromint (ctx->stat.fuzzy_hashes_found[i]));
1907 }
1908
1909 ucl_object_insert_key (obj, elt, "fuzzy_found", 0, false);
1910
1911
1912 return obj;
1913 }
1914
1915 static int
lua_fuzzy_add_pre_handler(lua_State * L)1916 lua_fuzzy_add_pre_handler (lua_State *L)
1917 {
1918 struct rspamd_worker *wrk, **pwrk = (struct rspamd_worker **)
1919 rspamd_lua_check_udata (L, 1, "rspamd{worker}");
1920 struct rspamd_fuzzy_storage_ctx *ctx;
1921
1922 if (!pwrk) {
1923 return luaL_error (L, "invalid arguments, worker + function are expected");
1924 }
1925
1926 wrk = *pwrk;
1927
1928 if (wrk && lua_isfunction (L, 2)) {
1929 ctx = (struct rspamd_fuzzy_storage_ctx *)wrk->ctx;
1930
1931 if (ctx->lua_pre_handler_cbref != -1) {
1932 /* Should not happen */
1933 luaL_unref (L, LUA_REGISTRYINDEX, ctx->lua_pre_handler_cbref);
1934 }
1935
1936 lua_pushvalue (L, 2);
1937 ctx->lua_pre_handler_cbref = luaL_ref (L, LUA_REGISTRYINDEX);
1938 }
1939 else {
1940 return luaL_error (L, "invalid arguments, worker + function are expected");
1941 }
1942
1943 return 0;
1944 }
1945
1946 static int
lua_fuzzy_add_post_handler(lua_State * L)1947 lua_fuzzy_add_post_handler (lua_State *L)
1948 {
1949 struct rspamd_worker *wrk, **pwrk = (struct rspamd_worker **)
1950 rspamd_lua_check_udata (L, 1, "rspamd{worker}");
1951 struct rspamd_fuzzy_storage_ctx *ctx;
1952
1953 if (!pwrk) {
1954 return luaL_error (L, "invalid arguments, worker + function are expected");
1955 }
1956
1957 wrk = *pwrk;
1958
1959 if (wrk && lua_isfunction (L, 2)) {
1960 ctx = (struct rspamd_fuzzy_storage_ctx *)wrk->ctx;
1961
1962 if (ctx->lua_post_handler_cbref != -1) {
1963 /* Should not happen */
1964 luaL_unref (L, LUA_REGISTRYINDEX, ctx->lua_post_handler_cbref);
1965 }
1966
1967 lua_pushvalue (L, 2);
1968 ctx->lua_post_handler_cbref = luaL_ref (L, LUA_REGISTRYINDEX);
1969 }
1970 else {
1971 return luaL_error (L, "invalid arguments, worker + function are expected");
1972 }
1973
1974 return 0;
1975 }
1976
1977 static gboolean
rspamd_fuzzy_storage_stat(struct rspamd_main * rspamd_main,struct rspamd_worker * worker,gint fd,gint attached_fd,struct rspamd_control_command * cmd,gpointer ud)1978 rspamd_fuzzy_storage_stat (struct rspamd_main *rspamd_main,
1979 struct rspamd_worker *worker, gint fd,
1980 gint attached_fd,
1981 struct rspamd_control_command *cmd,
1982 gpointer ud)
1983 {
1984 struct rspamd_fuzzy_storage_ctx *ctx = ud;
1985 struct rspamd_control_reply rep;
1986 ucl_object_t *obj;
1987 struct ucl_emitter_functions *emit_subr;
1988 guchar fdspace[CMSG_SPACE(sizeof (int))];
1989 struct iovec iov;
1990 struct msghdr msg;
1991 struct cmsghdr *cmsg;
1992
1993 gint outfd = -1;
1994 gchar tmppath[PATH_MAX];
1995
1996 memset (&rep, 0, sizeof (rep));
1997 rep.type = RSPAMD_CONTROL_FUZZY_STAT;
1998
1999 rspamd_snprintf (tmppath, sizeof (tmppath), "%s%c%s-XXXXXXXXXX",
2000 rspamd_main->cfg->temp_dir, G_DIR_SEPARATOR, "fuzzy-stat");
2001
2002 if ((outfd = mkstemp (tmppath)) == -1) {
2003 rep.reply.fuzzy_stat.status = errno;
2004 msg_info_main ("cannot make temporary stat file for fuzzy stat: %s",
2005 strerror (errno));
2006 }
2007 else {
2008 rep.reply.fuzzy_stat.status = 0;
2009
2010 memcpy (rep.reply.fuzzy_stat.storage_id,
2011 rspamd_fuzzy_backend_id (ctx->backend),
2012 sizeof (rep.reply.fuzzy_stat.storage_id));
2013
2014 obj = rspamd_fuzzy_stat_to_ucl (ctx, TRUE);
2015 emit_subr = ucl_object_emit_fd_funcs (outfd);
2016 ucl_object_emit_full (obj, UCL_EMIT_JSON_COMPACT, emit_subr, NULL);
2017 ucl_object_emit_funcs_free (emit_subr);
2018 ucl_object_unref (obj);
2019 /* Rewind output file */
2020 close (outfd);
2021 outfd = open (tmppath, O_RDONLY);
2022 unlink (tmppath);
2023 }
2024
2025 /* Now we can send outfd and status message */
2026 memset (&msg, 0, sizeof (msg));
2027
2028 /* Attach fd to the message */
2029 if (outfd != -1) {
2030 memset (fdspace, 0, sizeof (fdspace));
2031 msg.msg_control = fdspace;
2032 msg.msg_controllen = sizeof (fdspace);
2033 cmsg = CMSG_FIRSTHDR (&msg);
2034
2035 if (cmsg) {
2036 cmsg->cmsg_level = SOL_SOCKET;
2037 cmsg->cmsg_type = SCM_RIGHTS;
2038 cmsg->cmsg_len = CMSG_LEN (sizeof (int));
2039 memcpy (CMSG_DATA (cmsg), &outfd, sizeof (int));
2040 }
2041 }
2042
2043 iov.iov_base = &rep;
2044 iov.iov_len = sizeof (rep);
2045 msg.msg_iov = &iov;
2046 msg.msg_iovlen = 1;
2047
2048 if (sendmsg (fd, &msg, 0) == -1) {
2049 msg_err_main ("cannot send fuzzy stat: %s", strerror (errno));
2050 }
2051
2052 if (outfd != -1) {
2053 close (outfd);
2054 }
2055
2056 return TRUE;
2057 }
2058
2059 static gboolean
fuzzy_parse_keypair(rspamd_mempool_t * pool,const ucl_object_t * obj,gpointer ud,struct rspamd_rcl_section * section,GError ** err)2060 fuzzy_parse_keypair (rspamd_mempool_t *pool,
2061 const ucl_object_t *obj,
2062 gpointer ud,
2063 struct rspamd_rcl_section *section,
2064 GError **err)
2065 {
2066 struct rspamd_rcl_struct_parser *pd = ud;
2067 struct rspamd_fuzzy_storage_ctx *ctx;
2068 struct rspamd_cryptobox_keypair *kp;
2069 struct fuzzy_key_stat *keystat;
2070 struct fuzzy_key *key;
2071 const ucl_object_t *cur;
2072 const guchar *pk;
2073 ucl_object_iter_t it = NULL;
2074 gboolean ret;
2075
2076 ctx = pd->user_struct;
2077 pd->offset = G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, default_keypair);
2078
2079 /*
2080 * Single key
2081 */
2082 if (ucl_object_type (obj) == UCL_STRING || ucl_object_type (obj)
2083 == UCL_OBJECT) {
2084 ret = rspamd_rcl_parse_struct_keypair (pool, obj, pd, section, err);
2085
2086 if (!ret) {
2087 return ret;
2088 }
2089
2090 /* Insert key to the hash table */
2091 kp = ctx->default_keypair;
2092
2093 if (kp == NULL) {
2094 return FALSE;
2095 }
2096
2097 if (rspamd_keypair_alg (kp) != RSPAMD_CRYPTOBOX_MODE_25519 ||
2098 rspamd_keypair_type (kp) != RSPAMD_KEYPAIR_KEX) {
2099 return FALSE;
2100 }
2101
2102 key = g_malloc0 (sizeof (*key));
2103 key->key = kp;
2104 keystat = g_malloc0 (sizeof (*keystat));
2105 REF_INIT_RETAIN (keystat, fuzzy_key_stat_dtor);
2106 /* Hash of ip -> fuzzy_key_stat */
2107 keystat->last_ips = rspamd_lru_hash_new_full (1024,
2108 (GDestroyNotify) rspamd_inet_address_free,
2109 fuzzy_key_stat_unref,
2110 rspamd_inet_address_hash, rspamd_inet_address_equal);
2111 key->stat = keystat;
2112 pk = rspamd_keypair_component (kp, RSPAMD_KEYPAIR_COMPONENT_PK,
2113 NULL);
2114 g_hash_table_insert (ctx->keys, (gpointer)pk, key);
2115 ctx->default_key = key;
2116 msg_info_pool ("loaded keypair %*xs", 8, pk);
2117 }
2118 else if (ucl_object_type (obj) == UCL_ARRAY) {
2119 while ((cur = ucl_object_iterate (obj, &it, true)) != NULL) {
2120 if (!fuzzy_parse_keypair (pool, cur, pd, section, err)) {
2121 msg_err_pool ("cannot parse keypair");
2122 }
2123 }
2124 }
2125
2126 return TRUE;
2127 }
2128
2129 static guint
fuzzy_kp_hash(gconstpointer p)2130 fuzzy_kp_hash (gconstpointer p)
2131 {
2132 return *(guint *)p;
2133 }
2134
2135 static gboolean
fuzzy_kp_equal(gconstpointer a,gconstpointer b)2136 fuzzy_kp_equal (gconstpointer a, gconstpointer b)
2137 {
2138 const guchar *pa = a, *pb = b;
2139
2140 return (memcmp (pa, pb, RSPAMD_FUZZY_KEYLEN) == 0);
2141 }
2142
2143 gpointer
init_fuzzy(struct rspamd_config * cfg)2144 init_fuzzy (struct rspamd_config *cfg)
2145 {
2146 struct rspamd_fuzzy_storage_ctx *ctx;
2147 GQuark type;
2148
2149 type = g_quark_try_string ("fuzzy");
2150
2151 ctx = rspamd_mempool_alloc0 (cfg->cfg_pool,
2152 sizeof (struct rspamd_fuzzy_storage_ctx));
2153
2154 ctx->magic = rspamd_fuzzy_storage_magic;
2155 ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT;
2156 ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE;
2157 ctx->lua_pre_handler_cbref = -1;
2158 ctx->lua_post_handler_cbref = -1;
2159 ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal,
2160 NULL, fuzzy_key_dtor);
2161 rspamd_mempool_add_destructor (cfg->cfg_pool,
2162 (rspamd_mempool_destruct_t)g_hash_table_unref, ctx->keys);
2163 ctx->errors_ips = rspamd_lru_hash_new_full (1024,
2164 (GDestroyNotify) rspamd_inet_address_free, g_free,
2165 rspamd_inet_address_hash, rspamd_inet_address_equal);
2166 rspamd_mempool_add_destructor (cfg->cfg_pool,
2167 (rspamd_mempool_destruct_t)rspamd_lru_hash_destroy, ctx->errors_ips);
2168 ctx->cfg = cfg;
2169 ctx->updates_maxfail = DEFAULT_UPDATES_MAXFAIL;
2170 ctx->leaky_bucket_mask = DEFAULT_BUCKET_MASK;
2171 ctx->leaky_bucket_ttl = DEFAULT_BUCKET_TTL;
2172 ctx->max_buckets = DEFAULT_MAX_BUCKETS;
2173 ctx->leaky_bucket_burst = NAN;
2174 ctx->leaky_bucket_rate = NAN;
2175 ctx->delay = NAN;
2176
2177 rspamd_rcl_register_worker_option (cfg,
2178 type,
2179 "sync",
2180 rspamd_rcl_parse_struct_time,
2181 ctx,
2182 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
2183 sync_timeout),
2184 RSPAMD_CL_FLAG_TIME_FLOAT,
2185 "Time to perform database sync, default: "
2186 G_STRINGIFY (DEFAULT_SYNC_TIMEOUT) " seconds");
2187
2188 rspamd_rcl_register_worker_option (cfg,
2189 type,
2190 "expire",
2191 rspamd_rcl_parse_struct_time,
2192 ctx,
2193 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
2194 expire),
2195 RSPAMD_CL_FLAG_TIME_FLOAT,
2196 "Default expire time for hashes, default: "
2197 G_STRINGIFY (DEFAULT_EXPIRE) " seconds");
2198
2199 rspamd_rcl_register_worker_option (cfg,
2200 type,
2201 "delay",
2202 rspamd_rcl_parse_struct_time,
2203 ctx,
2204 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
2205 delay),
2206 RSPAMD_CL_FLAG_TIME_FLOAT,
2207 "Default delay time for hashes, default: not enabled");
2208
2209 rspamd_rcl_register_worker_option (cfg,
2210 type,
2211 "allow_update",
2212 rspamd_rcl_parse_struct_ucl,
2213 ctx,
2214 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, update_map),
2215 0,
2216 "Allow modifications from the following IP addresses");
2217
2218 rspamd_rcl_register_worker_option (cfg,
2219 type,
2220 "delay_whitelist",
2221 rspamd_rcl_parse_struct_ucl,
2222 ctx,
2223 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, delay_whitelist_map),
2224 0,
2225 "Disable delay check for the following IP addresses");
2226
2227 rspamd_rcl_register_worker_option (cfg,
2228 type,
2229 "keypair",
2230 fuzzy_parse_keypair,
2231 ctx,
2232 0,
2233 RSPAMD_CL_FLAG_MULTIPLE,
2234 "Encryption keypair (can be repeated for different keys)");
2235
2236 rspamd_rcl_register_worker_option (cfg,
2237 type,
2238 "keypair_cache_size",
2239 rspamd_rcl_parse_struct_integer,
2240 ctx,
2241 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
2242 keypair_cache_size),
2243 RSPAMD_CL_FLAG_UINT,
2244 "Size of keypairs cache, default: "
2245 G_STRINGIFY (DEFAULT_KEYPAIR_CACHE_SIZE));
2246
2247 rspamd_rcl_register_worker_option (cfg,
2248 type,
2249 "encrypted_only",
2250 rspamd_rcl_parse_struct_boolean,
2251 ctx,
2252 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, encrypted_only),
2253 0,
2254 "Allow encrypted requests only (and forbid all unknown keys or plaintext requests)");
2255 rspamd_rcl_register_worker_option (cfg,
2256 type,
2257 "dedicated_update_worker",
2258 rspamd_rcl_parse_struct_boolean,
2259 ctx,
2260 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, dedicated_update_worker),
2261 0,
2262 "Use worker 0 for updates only");
2263
2264 rspamd_rcl_register_worker_option (cfg,
2265 type,
2266 "read_only",
2267 rspamd_rcl_parse_struct_boolean,
2268 ctx,
2269 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, read_only),
2270 0,
2271 "Work in read only mode");
2272
2273
2274 rspamd_rcl_register_worker_option (cfg,
2275 type,
2276 "blocked",
2277 rspamd_rcl_parse_struct_ucl,
2278 ctx,
2279 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, blocked_map),
2280 0,
2281 "Block requests from specific networks");
2282
2283
2284 rspamd_rcl_register_worker_option (cfg,
2285 type,
2286 "updates_maxfail",
2287 rspamd_rcl_parse_struct_integer,
2288 ctx,
2289 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, updates_maxfail),
2290 RSPAMD_CL_FLAG_UINT,
2291 "Maximum number of updates to be failed before discarding");
2292 rspamd_rcl_register_worker_option (cfg,
2293 type,
2294 "skip_hashes",
2295 rspamd_rcl_parse_struct_ucl,
2296 ctx,
2297 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, skip_map),
2298 0,
2299 "Skip specific hashes from the map");
2300
2301 /* Ratelimits */
2302 rspamd_rcl_register_worker_option (cfg,
2303 type,
2304 "ratelimit_whitelist",
2305 rspamd_rcl_parse_struct_ucl,
2306 ctx,
2307 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_whitelist_map),
2308 0,
2309 "Skip specific addresses from rate limiting");
2310 rspamd_rcl_register_worker_option (cfg,
2311 type,
2312 "ratelimit_max_buckets",
2313 rspamd_rcl_parse_struct_integer,
2314 ctx,
2315 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, max_buckets),
2316 RSPAMD_CL_FLAG_UINT,
2317 "Maximum number of leaky buckets (default: " G_STRINGIFY(DEFAULT_MAX_BUCKETS) ")");
2318 rspamd_rcl_register_worker_option (cfg,
2319 type,
2320 "ratelimit_network_mask",
2321 rspamd_rcl_parse_struct_integer,
2322 ctx,
2323 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_mask),
2324 RSPAMD_CL_FLAG_UINT,
2325 "Network mask to apply for IPv4 rate addresses (default: " G_STRINGIFY(DEFAULT_BUCKET_MASK) ")");
2326 rspamd_rcl_register_worker_option (cfg,
2327 type,
2328 "ratelimit_bucket_ttl",
2329 rspamd_rcl_parse_struct_time,
2330 ctx,
2331 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_ttl),
2332 RSPAMD_CL_FLAG_TIME_INTEGER,
2333 "Time to live for ratelimit element (default: " G_STRINGIFY(DEFAULT_BUCKET_TTL) ")");
2334 rspamd_rcl_register_worker_option (cfg,
2335 type,
2336 "ratelimit_rate",
2337 rspamd_rcl_parse_struct_double,
2338 ctx,
2339 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_rate),
2340 0,
2341 "Leak rate in requests per second");
2342 rspamd_rcl_register_worker_option (cfg,
2343 type,
2344 "ratelimit_burst",
2345 rspamd_rcl_parse_struct_double,
2346 ctx,
2347 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_burst),
2348 0,
2349 "Peak value for ratelimit bucket");
2350 rspamd_rcl_register_worker_option (cfg,
2351 type,
2352 "ratelimit_log_only",
2353 rspamd_rcl_parse_struct_boolean,
2354 ctx,
2355 G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_log_only),
2356 0,
2357 "Don't really ban on ratelimit reaching, just log");
2358
2359
2360 return ctx;
2361 }
2362
2363 static void
rspamd_fuzzy_peer_io(EV_P_ ev_io * w,int revents)2364 rspamd_fuzzy_peer_io (EV_P_ ev_io *w, int revents)
2365 {
2366 struct fuzzy_peer_cmd cmd;
2367 struct rspamd_fuzzy_storage_ctx *ctx =
2368 (struct rspamd_fuzzy_storage_ctx *)w->data;
2369 gssize r;
2370
2371 for (;;) {
2372 r = read (w->fd, &cmd, sizeof (cmd));
2373
2374 if (r != sizeof (cmd)) {
2375 if (errno == EINTR) {
2376 continue;
2377 }
2378 if (errno != EAGAIN) {
2379 msg_err ("cannot read command from peers: %s", strerror (errno));
2380 }
2381
2382 break;
2383 }
2384 else {
2385 g_array_append_val (ctx->updates_pending, cmd);
2386 }
2387 }
2388 }
2389
2390 static void
fuzzy_peer_rep(struct rspamd_worker * worker,struct rspamd_srv_reply * rep,gint rep_fd,gpointer ud)2391 fuzzy_peer_rep (struct rspamd_worker *worker,
2392 struct rspamd_srv_reply *rep, gint rep_fd,
2393 gpointer ud)
2394 {
2395 struct rspamd_fuzzy_storage_ctx *ctx = ud;
2396 GList *cur;
2397 struct rspamd_worker_listen_socket *ls;
2398 struct rspamd_worker_accept_event *ac_ev;
2399
2400 ctx->peer_fd = rep_fd;
2401
2402 if (rep_fd == -1) {
2403 msg_err ("cannot receive peer fd from the main process");
2404 exit (EXIT_FAILURE);
2405 }
2406 else {
2407 rspamd_socket_nonblocking (rep_fd);
2408 }
2409
2410 msg_info ("got peer fd reply from the main process");
2411
2412 /* Start listening */
2413 cur = worker->cf->listen_socks;
2414 while (cur) {
2415 ls = cur->data;
2416
2417 if (ls->fd != -1) {
2418 msg_info ("start listening on %s",
2419 rspamd_inet_address_to_string_pretty (ls->addr));
2420
2421 if (ls->type == RSPAMD_WORKER_SOCKET_UDP) {
2422 ac_ev = g_malloc0 (sizeof (*ac_ev));
2423 ac_ev->accept_ev.data = worker;
2424 ac_ev->event_loop = ctx->event_loop;
2425 ev_io_init (&ac_ev->accept_ev, accept_fuzzy_socket, ls->fd,
2426 EV_READ);
2427 ev_io_start (ctx->event_loop, &ac_ev->accept_ev);
2428 DL_APPEND (worker->accept_events, ac_ev);
2429 }
2430 else {
2431 /* We allow TCP listeners only for a update worker */
2432 g_assert_not_reached ();
2433 }
2434 }
2435
2436 cur = g_list_next (cur);
2437 }
2438
2439 if (ctx->peer_fd != -1) {
2440 if (worker->index == 0) {
2441 /* Listen for peer requests */
2442 shutdown (ctx->peer_fd, SHUT_WR);
2443 ctx->peer_ev.data = ctx;
2444 ev_io_init (&ctx->peer_ev, rspamd_fuzzy_peer_io, ctx->peer_fd, EV_READ);
2445 ev_io_start (ctx->event_loop, &ctx->peer_ev);
2446 }
2447 else {
2448 shutdown (ctx->peer_fd, SHUT_RD);
2449 }
2450 }
2451 }
2452
2453 /*
2454 * Start worker process
2455 */
2456 __attribute__((noreturn))
2457 void
start_fuzzy(struct rspamd_worker * worker)2458 start_fuzzy (struct rspamd_worker *worker)
2459 {
2460 struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
2461 GError *err = NULL;
2462 struct rspamd_srv_command srv_cmd;
2463 struct rspamd_config *cfg = worker->srv->cfg;
2464
2465 g_assert (rspamd_worker_check_context (worker->ctx, rspamd_fuzzy_storage_magic));
2466 ctx->event_loop = rspamd_prepare_worker (worker,
2467 "fuzzy",
2468 NULL);
2469 ctx->peer_fd = -1;
2470 ctx->worker = worker;
2471 ctx->cfg = worker->srv->cfg;
2472 ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
2473 ctx->event_loop,
2474 worker->srv->cfg);
2475 rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
2476 ctx->event_loop, ctx->resolver->r);
2477 /* Since this worker uses maps it needs a valid HTTP context */
2478 ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
2479 ctx->cfg->ups_ctx);
2480
2481 if (ctx->keypair_cache_size > 0) {
2482 /* Create keypairs cache */
2483 ctx->keypair_cache = rspamd_keypair_cache_new (ctx->keypair_cache_size);
2484 }
2485
2486
2487 if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->event_loop,
2488 worker->cf->options, cfg, &err)) == NULL) {
2489 msg_err ("cannot open backend: %e", err);
2490 if (err) {
2491 g_error_free (err);
2492 }
2493 exit (EXIT_SUCCESS);
2494 }
2495
2496 rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
2497
2498
2499 if (worker->index == 0) {
2500 ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
2501 sizeof (struct fuzzy_peer_cmd), 1024);
2502 rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
2503 rspamd_fuzzy_storage_periodic_callback, ctx);
2504
2505 if (ctx->dedicated_update_worker && worker->cf->count > 1) {
2506 msg_info_config ("stop serving clients request in dedicated update mode");
2507 rspamd_worker_stop_accept (worker);
2508
2509 GList *cur = worker->cf->listen_socks;
2510
2511 while (cur) {
2512 struct rspamd_worker_listen_socket *ls =
2513 (struct rspamd_worker_listen_socket *)cur->data;
2514
2515 if (ls->fd != -1) {
2516 close (ls->fd);
2517 }
2518
2519 cur = g_list_next (cur);
2520 }
2521 }
2522 }
2523
2524 ctx->stat_ev.data = ctx;
2525 ev_timer_init (&ctx->stat_ev, rspamd_fuzzy_stat_callback, ctx->sync_timeout,
2526 ctx->sync_timeout);
2527 ev_timer_start (ctx->event_loop, &ctx->stat_ev);
2528 /* Register custom reload and stat commands for the control socket */
2529 rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD,
2530 rspamd_fuzzy_storage_reload, ctx);
2531 rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT,
2532 rspamd_fuzzy_storage_stat, ctx);
2533 rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
2534 rspamd_fuzzy_storage_sync, ctx);
2535
2536 /* Create radix trees */
2537 if (ctx->update_map != NULL) {
2538 rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->update_map,
2539 "Allow fuzzy updates from specified addresses",
2540 &ctx->update_ips, NULL, worker, "fuzzy update");
2541 }
2542
2543 if (ctx->skip_map != NULL) {
2544 struct rspamd_map *m;
2545
2546 if ((m = rspamd_map_add_from_ucl (cfg, ctx->skip_map,
2547 "Skip hashes",
2548 rspamd_kv_list_read,
2549 rspamd_kv_list_fin,
2550 rspamd_kv_list_dtor,
2551 (void **)&ctx->skip_hashes,
2552 worker, RSPAMD_MAP_DEFAULT)) == NULL) {
2553 msg_warn_config ("cannot load hashes list from %s",
2554 ucl_object_tostring (ctx->skip_map));
2555 }
2556 else {
2557 m->active_http = TRUE;
2558 }
2559 }
2560
2561 if (ctx->blocked_map != NULL) {
2562 rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->blocked_map,
2563 "Block fuzzy requests from the specific IPs",
2564 &ctx->blocked_ips,
2565 NULL,
2566 worker, "fuzzy blocked");
2567 }
2568
2569 /* Create radix trees */
2570 if (ctx->ratelimit_whitelist_map != NULL) {
2571 rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->ratelimit_whitelist_map,
2572 "Skip ratelimits from specific ip addresses/networks",
2573 &ctx->ratelimit_whitelist,
2574 NULL,
2575 worker, "fuzzy ratelimit whitelist");
2576 }
2577
2578 if (!isnan (ctx->delay) && ctx->delay_whitelist_map != NULL) {
2579 rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->delay_whitelist_map,
2580 "Skip delay from the following ips",
2581 &ctx->delay_whitelist, NULL, worker,
2582 "fuzzy delayed whitelist");
2583 }
2584
2585 /* Ratelimits */
2586 if (!isnan (ctx->leaky_bucket_rate) && !isnan (ctx->leaky_bucket_burst)) {
2587 ctx->ratelimit_buckets = rspamd_lru_hash_new_full (ctx->max_buckets,
2588 NULL, fuzzy_rl_bucket_free,
2589 rspamd_inet_address_hash, rspamd_inet_address_equal);
2590 }
2591
2592 /* Maps events */
2593 ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
2594 ctx->event_loop,
2595 worker->srv->cfg);
2596 rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
2597 ctx->resolver, worker, RSPAMD_MAP_WATCH_WORKER);
2598
2599 /* Get peer pipe */
2600 memset (&srv_cmd, 0, sizeof (srv_cmd));
2601 srv_cmd.type = RSPAMD_SRV_SOCKETPAIR;
2602 srv_cmd.cmd.spair.af = SOCK_DGRAM;
2603 srv_cmd.cmd.spair.pair_num = worker->index;
2604 memset (srv_cmd.cmd.spair.pair_id, 0, sizeof (srv_cmd.cmd.spair.pair_id));
2605 /* 6 bytes of id (including \0) and bind_conf id */
2606 G_STATIC_ASSERT (sizeof (srv_cmd.cmd.spair.pair_id) >=
2607 sizeof ("fuzzy") + sizeof (guint64));
2608
2609 memcpy (srv_cmd.cmd.spair.pair_id, "fuzzy", sizeof ("fuzzy"));
2610
2611 /* Distinguish workers from each others... */
2612 if (worker->cf->bind_conf && worker->cf->bind_conf->bind_line) {
2613 guint64 bind_hash = rspamd_cryptobox_fast_hash (worker->cf->bind_conf->bind_line,
2614 strlen (worker->cf->bind_conf->bind_line), 0xdeadbabe);
2615
2616 /* 8 more bytes */
2617 memcpy (srv_cmd.cmd.spair.pair_id + sizeof ("fuzzy"), &bind_hash,
2618 sizeof (bind_hash));
2619 }
2620
2621 rspamd_srv_send_command (worker, ctx->event_loop, &srv_cmd, -1,
2622 fuzzy_peer_rep, ctx);
2623
2624 luaL_Reg fuzzy_lua_reg = {
2625 .name = "add_fuzzy_pre_handler",
2626 .func = lua_fuzzy_add_pre_handler,
2627 };
2628 rspamd_lua_add_metamethod (ctx->cfg->lua_state, "rspamd{worker}", &fuzzy_lua_reg);
2629 fuzzy_lua_reg = (luaL_Reg){
2630 .name = "add_fuzzy_post_handler",
2631 .func = lua_fuzzy_add_post_handler,
2632 };
2633 rspamd_lua_add_metamethod (ctx->cfg->lua_state, "rspamd{worker}", &fuzzy_lua_reg);
2634
2635 rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
2636 worker);
2637
2638 ev_loop (ctx->event_loop, 0);
2639 rspamd_worker_block_signals ();
2640
2641 if (ctx->peer_fd != -1) {
2642 if (worker->index == 0) {
2643 ev_io_stop (ctx->event_loop, &ctx->peer_ev);
2644 }
2645 close (ctx->peer_fd);
2646 }
2647
2648 if (worker->index == 0 && ctx->updates_pending->len > 0) {
2649
2650 msg_info_config ("start another event loop to sync fuzzy storage");
2651
2652 if (rspamd_fuzzy_process_updates_queue (ctx, local_db_name, TRUE)) {
2653 ev_loop (ctx->event_loop, 0);
2654 msg_info_config ("sync cycle is done");
2655 }
2656 else {
2657 msg_info_config ("no need to sync");
2658 }
2659 }
2660
2661 rspamd_fuzzy_backend_close (ctx->backend);
2662
2663 if (worker->index == 0) {
2664 g_array_free (ctx->updates_pending, TRUE);
2665 }
2666
2667 if (ctx->keypair_cache) {
2668 rspamd_keypair_cache_destroy (ctx->keypair_cache);
2669 }
2670
2671 if (ctx->ratelimit_buckets) {
2672 rspamd_lru_hash_destroy (ctx->ratelimit_buckets);
2673 }
2674
2675 if (ctx->lua_pre_handler_cbref != -1) {
2676 luaL_unref (ctx->cfg->lua_state, LUA_REGISTRYINDEX, ctx->lua_pre_handler_cbref);
2677 }
2678
2679 if (ctx->lua_post_handler_cbref != -1) {
2680 luaL_unref (ctx->cfg->lua_state, LUA_REGISTRYINDEX, ctx->lua_post_handler_cbref);
2681 }
2682
2683 REF_RELEASE (ctx->cfg);
2684 rspamd_log_close (worker->srv->logger);
2685
2686 exit (EXIT_SUCCESS);
2687 }
2688