1 /*-
2  * Copyright 2016 Vsevolod Stakhov
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 /*
17  * Rspamd fuzzy storage server
18  */
19 
20 #include "config.h"
21 #include "libserver/fuzzy_wire.h"
22 #include "util.h"
23 #include "rspamd.h"
24 #include "libserver/maps/map.h"
25 #include "libserver/maps/map_helpers.h"
26 #include "libserver/fuzzy_backend/fuzzy_backend.h"
27 #include "ottery.h"
28 #include "ref.h"
29 #include "xxhash.h"
30 #include "libserver/worker_util.h"
31 #include "libserver/rspamd_control.h"
32 #include "libcryptobox/cryptobox.h"
33 #include "libcryptobox/keypairs_cache.h"
34 #include "libcryptobox/keypair.h"
35 #include "libutil/hash.h"
36 #include "libserver/maps/map_private.h"
37 #include "contrib/uthash/utlist.h"
38 #include "lua/lua_common.h"
39 #include "unix-std.h"
40 
41 #include <math.h>
42 
43 /* Resync value in seconds */
44 #define DEFAULT_SYNC_TIMEOUT 60.0
45 #define DEFAULT_KEYPAIR_CACHE_SIZE 512
46 #define DEFAULT_MASTER_TIMEOUT 10.0
47 #define DEFAULT_UPDATES_MAXFAIL 3
48 #define COOKIE_SIZE 128
49 #define DEFAULT_MAX_BUCKETS 2000
50 #define DEFAULT_BUCKET_TTL 3600
51 #define DEFAULT_BUCKET_MASK 24
52 
53 static const gchar *local_db_name = "local";
54 
55 #define msg_err_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
56         session->name, session->uid, \
57         G_STRFUNC, \
58         __VA_ARGS__)
59 #define msg_warn_fuzzy_update(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
60         session->name, session->uid, \
61         G_STRFUNC, \
62         __VA_ARGS__)
63 #define msg_info_fuzzy_update(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
64         session->name, session->uid, \
65         G_STRFUNC, \
66         __VA_ARGS__)
67 #define msg_err_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
68         "fuzzy_collection", session->uid, \
69         G_STRFUNC, \
70         __VA_ARGS__)
71 #define msg_warn_fuzzy_collection(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
72        "fuzzy_collection", session->uid, \
73         G_STRFUNC, \
74         __VA_ARGS__)
75 #define msg_info_fuzzy_collection(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
76        "fuzzy_collection", session->uid, \
77         G_STRFUNC, \
78         __VA_ARGS__)
79 
80 /* Init functions */
81 gpointer init_fuzzy (struct rspamd_config *cfg);
82 void start_fuzzy (struct rspamd_worker *worker);
83 
84 worker_t fuzzy_worker = {
85 		"fuzzy",                    /* Name */
86 		init_fuzzy,                 /* Init function */
87 		start_fuzzy,                /* Start function */
88 		RSPAMD_WORKER_HAS_SOCKET,
89 		RSPAMD_WORKER_SOCKET_UDP,   /* UDP socket */
90 		RSPAMD_WORKER_VER           /* Version info */
91 };
92 
93 struct fuzzy_global_stat {
94 	guint64 fuzzy_hashes;
95 	/**< number of fuzzy hashes stored					*/
96 	guint64 fuzzy_hashes_expired;
97 	/**< number of fuzzy hashes expired					*/
98 	guint64 fuzzy_hashes_checked[RSPAMD_FUZZY_EPOCH_MAX];
99 	/**< amount of check requests for each epoch		*/
100 	guint64 fuzzy_shingles_checked[RSPAMD_FUZZY_EPOCH_MAX];
101 	/**< amount of shingle check requests for each epoch	*/
102 	guint64 fuzzy_hashes_found[RSPAMD_FUZZY_EPOCH_MAX];
103 	/**< amount of invalid requests				*/
104 	guint64 invalid_requests;
105 	/**< amount of delayed hashes found				*/
106 	guint64 delayed_hashes;
107 };
108 
109 struct fuzzy_key_stat {
110 	guint64 checked;
111 	guint64 matched;
112 	guint64 added;
113 	guint64 deleted;
114 	guint64 errors;
115 	rspamd_lru_hash_t *last_ips;
116 	ref_entry_t ref;
117 };
118 
119 struct rspamd_fuzzy_mirror {
120 	gchar *name;
121 	struct upstream_list *u;
122 	struct rspamd_cryptobox_pubkey *key;
123 };
124 
125 struct rspamd_leaky_bucket_elt {
126 	rspamd_inet_addr_t *addr;
127 	gdouble last;
128 	gdouble cur;
129 };
130 
131 static const guint64 rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL;
132 
133 struct rspamd_fuzzy_storage_ctx {
134 	guint64 magic;
135 	/* Events base */
136 	struct ev_loop *event_loop;
137 	/* DNS resolver */
138 	struct rspamd_dns_resolver *resolver;
139 	/* Config */
140 	struct rspamd_config *cfg;
141 	/* END OF COMMON PART */
142 	struct fuzzy_global_stat stat;
143 	gdouble expire;
144 	gdouble sync_timeout;
145 	gdouble delay;
146 	struct rspamd_radix_map_helper *update_ips;
147 	struct rspamd_radix_map_helper *blocked_ips;
148 	struct rspamd_radix_map_helper *ratelimit_whitelist;
149 	struct rspamd_radix_map_helper *delay_whitelist;
150 
151 	const ucl_object_t *update_map;
152 	const ucl_object_t *delay_whitelist_map;
153 	const ucl_object_t *blocked_map;
154 	const ucl_object_t *ratelimit_whitelist_map;
155 
156 	guint keypair_cache_size;
157 	ev_timer stat_ev;
158 	ev_io peer_ev;
159 
160 	/* Local keypair */
161 	struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */
162 	struct fuzzy_key *default_key;
163 	GHashTable *keys;
164 	gboolean encrypted_only;
165 	gboolean read_only;
166 	gboolean dedicated_update_worker;
167 	struct rspamd_keypair_cache *keypair_cache;
168 	struct rspamd_http_context *http_ctx;
169 	rspamd_lru_hash_t *errors_ips;
170 	rspamd_lru_hash_t *ratelimit_buckets;
171 	struct rspamd_fuzzy_backend *backend;
172 	GArray *updates_pending;
173 	guint updates_failed;
174 	guint updates_maxfail;
175 	/* Used to send data between workers */
176 	gint peer_fd;
177 
178 	/* Ratelimits */
179 	guint leaky_bucket_ttl;
180 	guint leaky_bucket_mask;
181 	guint max_buckets;
182 	gboolean ratelimit_log_only;
183 	gdouble leaky_bucket_burst;
184 	gdouble leaky_bucket_rate;
185 
186 	struct rspamd_worker *worker;
187 	const ucl_object_t *skip_map;
188 	struct rspamd_hash_map_helper *skip_hashes;
189 	gint lua_pre_handler_cbref;
190 	gint lua_post_handler_cbref;
191 	guchar cookie[COOKIE_SIZE];
192 };
193 
194 enum fuzzy_cmd_type {
195 	CMD_NORMAL,
196 	CMD_SHINGLE,
197 	CMD_ENCRYPTED_NORMAL,
198 	CMD_ENCRYPTED_SHINGLE
199 };
200 
201 struct fuzzy_session {
202 	struct rspamd_worker *worker;
203 	rspamd_inet_addr_t *addr;
204 	struct rspamd_fuzzy_storage_ctx *ctx;
205 
206 	struct rspamd_fuzzy_shingle_cmd cmd; /* Can handle both shingles and non-shingles */
207 	struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
208 	struct fuzzy_key_stat *ip_stat;
209 
210 	enum rspamd_fuzzy_epoch epoch;
211 	enum fuzzy_cmd_type cmd_type;
212 	gint fd;
213 	guint64 time;
214 	struct ev_io io;
215 	ref_entry_t ref;
216 	struct fuzzy_key_stat *key_stat;
217 	struct rspamd_fuzzy_cmd_extension *extensions;
218 	guchar nm[rspamd_cryptobox_MAX_NMBYTES];
219 };
220 
221 struct fuzzy_peer_request {
222 	ev_io io_ev;
223 	struct fuzzy_peer_cmd cmd;
224 };
225 
226 struct fuzzy_key {
227 	struct rspamd_cryptobox_keypair *key;
228 	struct rspamd_cryptobox_pubkey *pk;
229 	struct fuzzy_key_stat *stat;
230 };
231 
232 struct rspamd_updates_cbdata {
233 	GArray *updates_pending;
234 	struct rspamd_fuzzy_storage_ctx *ctx;
235 	gchar *source;
236 	gboolean final;
237 };
238 
239 
240 static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
241 static gboolean rspamd_fuzzy_process_updates_queue (
242 		struct rspamd_fuzzy_storage_ctx *ctx,
243 		const gchar *source, gboolean final);
244 
245 static gboolean
rspamd_fuzzy_check_ratelimit(struct fuzzy_session * session)246 rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
247 {
248 	rspamd_inet_addr_t *masked;
249 	struct rspamd_leaky_bucket_elt *elt;
250 	ev_tstamp now;
251 
252 	if (session->ctx->ratelimit_whitelist != NULL) {
253 		if (rspamd_match_radix_map_addr (session->ctx->ratelimit_whitelist,
254 				session->addr) != NULL) {
255 			return TRUE;
256 		}
257 	}
258 
259 	/*
260 	if (rspamd_inet_address_is_local (session->addr, TRUE)) {
261 		return TRUE;
262 	}
263 	*/
264 
265 	masked = rspamd_inet_address_copy (session->addr);
266 
267 	if (rspamd_inet_address_get_af (masked) == AF_INET) {
268 		rspamd_inet_address_apply_mask (masked,
269 				MIN (session->ctx->leaky_bucket_mask, 32));
270 	}
271 	else {
272 		/* Must be at least /64 */
273 		rspamd_inet_address_apply_mask (masked,
274 				MIN (MAX (session->ctx->leaky_bucket_mask * 4, 64), 128));
275 	}
276 
277 	now = ev_now (session->ctx->event_loop);
278 	elt = rspamd_lru_hash_lookup (session->ctx->ratelimit_buckets, masked,
279 			now);
280 
281 	if (elt) {
282 		gboolean ratelimited = FALSE;
283 
284 		if (isnan (elt->cur)) {
285 			/* Ratelimit exceeded, preserve it for the whole ttl */
286 			ratelimited = TRUE;
287 		}
288 		else {
289 			/* Update bucket */
290 			if (elt->last < now) {
291 				elt->cur -= session->ctx->leaky_bucket_rate * (now - elt->last);
292 				elt->last = now;
293 
294 				if (elt->cur < 0) {
295 					elt->cur = 0;
296 				}
297 			}
298 			else {
299 				elt->last = now;
300 			}
301 
302 			/* Check bucket */
303 			if (elt->cur >= session->ctx->leaky_bucket_burst) {
304 
305 				msg_info ("ratelimiting %s (%s), %.1f max elts",
306 						rspamd_inet_address_to_string (session->addr),
307 						rspamd_inet_address_to_string (masked),
308 						session->ctx->leaky_bucket_burst);
309 				elt->cur = NAN;
310 			}
311 			else {
312 				elt->cur ++; /* Allow one more request */
313 			}
314 		}
315 
316 		rspamd_inet_address_free (masked);
317 
318 		return !ratelimited;
319 	}
320 	else {
321 		/* New bucket */
322 		elt = g_malloc (sizeof (*elt));
323 		elt->addr = masked; /* transfer ownership */
324 		elt->cur = 1;
325 		elt->last = now;
326 
327 		rspamd_lru_hash_insert (session->ctx->ratelimit_buckets,
328 				masked,
329 				elt,
330 				now,
331 				session->ctx->leaky_bucket_ttl);
332 	}
333 
334 	return TRUE;
335 }
336 
337 static gboolean
rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx * ctx,rspamd_inet_addr_t * addr)338 rspamd_fuzzy_check_client (struct rspamd_fuzzy_storage_ctx *ctx,
339 		rspamd_inet_addr_t *addr)
340 {
341 	if (ctx->blocked_ips != NULL) {
342 		if (rspamd_match_radix_map_addr (ctx->blocked_ips,
343 				addr) != NULL) {
344 			return FALSE;
345 		}
346 	}
347 
348 	return TRUE;
349 }
350 
351 static gboolean
rspamd_fuzzy_check_write(struct fuzzy_session * session)352 rspamd_fuzzy_check_write (struct fuzzy_session *session)
353 {
354 	if (session->ctx->read_only) {
355 		return FALSE;
356 	}
357 
358 	if (session->ctx->update_ips != NULL) {
359 		if (rspamd_match_radix_map_addr (session->ctx->update_ips,
360 				session->addr) == NULL) {
361 			return FALSE;
362 		}
363 		else {
364 			return TRUE;
365 		}
366 	}
367 
368 	return FALSE;
369 }
370 
371 static void
fuzzy_key_stat_dtor(gpointer p)372 fuzzy_key_stat_dtor (gpointer p)
373 {
374 	struct fuzzy_key_stat *st = p;
375 
376 	if (st->last_ips) {
377 		rspamd_lru_hash_destroy (st->last_ips);
378 	}
379 
380 	g_free (st);
381 }
382 
383 static void
fuzzy_key_stat_unref(gpointer p)384 fuzzy_key_stat_unref (gpointer p)
385 {
386 	struct fuzzy_key_stat *st = p;
387 
388 	REF_RELEASE (st);
389 }
390 
391 static void
fuzzy_key_dtor(gpointer p)392 fuzzy_key_dtor (gpointer p)
393 {
394 	struct fuzzy_key *key = p;
395 
396 	if (key) {
397 		if (key->stat) {
398 			REF_RELEASE (key->stat);
399 		}
400 
401 		g_free (key);
402 	}
403 }
404 
405 static void
fuzzy_count_callback(guint64 count,void * ud)406 fuzzy_count_callback (guint64 count, void *ud)
407 {
408 	struct rspamd_fuzzy_storage_ctx *ctx = ud;
409 
410 	ctx->stat.fuzzy_hashes = count;
411 }
412 
413 static void
fuzzy_rl_bucket_free(gpointer p)414 fuzzy_rl_bucket_free (gpointer p)
415 {
416 	struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *)p;
417 
418 	rspamd_inet_address_free (elt->addr);
419 	g_free (elt);
420 }
421 
422 static void
fuzzy_stat_count_callback(guint64 count,void * ud)423 fuzzy_stat_count_callback (guint64 count, void *ud)
424 {
425 	struct rspamd_fuzzy_storage_ctx *ctx = ud;
426 
427 	ev_timer_again (ctx->event_loop, &ctx->stat_ev);
428 	ctx->stat.fuzzy_hashes = count;
429 }
430 
431 static void
rspamd_fuzzy_stat_callback(EV_P_ ev_timer * w,int revents)432 rspamd_fuzzy_stat_callback (EV_P_ ev_timer *w, int revents)
433 {
434 	struct rspamd_fuzzy_storage_ctx *ctx =
435 			(struct rspamd_fuzzy_storage_ctx *)w->data;
436 	rspamd_fuzzy_backend_count (ctx->backend, fuzzy_stat_count_callback, ctx);
437 }
438 
439 
440 static void
fuzzy_update_version_callback(guint64 ver,void * ud)441 fuzzy_update_version_callback (guint64 ver, void *ud)
442 {
443 }
444 
445 static void
rspamd_fuzzy_updates_cb(gboolean success,guint nadded,guint ndeleted,guint nextended,guint nignored,void * ud)446 rspamd_fuzzy_updates_cb (gboolean success,
447 						 guint nadded,
448 						 guint ndeleted,
449 						 guint nextended,
450 						 guint nignored,
451 						 void *ud)
452 {
453 	struct rspamd_updates_cbdata *cbdata = ud;
454 	struct rspamd_fuzzy_storage_ctx *ctx;
455 	const gchar *source;
456 
457 	ctx = cbdata->ctx;
458 	source = cbdata->source;
459 
460 	if (success) {
461 		rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
462 
463 		msg_info ("successfully updated fuzzy storage %s: %d updates in queue; "
464 				  "%d pending currently; "
465 				  "%d added; %d deleted; %d extended; %d duplicates",
466 				ctx->worker->cf->bind_conf ?
467 					 ctx->worker->cf->bind_conf->bind_line :
468 					 "unknown",
469 				cbdata->updates_pending->len,
470 				ctx->updates_pending->len,
471 				nadded, ndeleted, nextended, nignored);
472 		rspamd_fuzzy_backend_version (ctx->backend, source,
473 				fuzzy_update_version_callback, NULL);
474 		ctx->updates_failed = 0;
475 
476 		if (cbdata->final || ctx->worker->state != rspamd_worker_state_running) {
477 			/* Plan exit */
478 			ev_break (ctx->event_loop, EVBREAK_ALL);
479 		}
480 	}
481 	else {
482 		if (++ctx->updates_failed > ctx->updates_maxfail) {
483 			msg_err ("cannot commit update transaction to fuzzy backend %s, discard "
484 					 "%ud updates after %d retries",
485 					ctx->worker->cf->bind_conf ?
486 						ctx->worker->cf->bind_conf->bind_line :
487 						"unknown",
488 					cbdata->updates_pending->len,
489 					ctx->updates_maxfail);
490 			ctx->updates_failed = 0;
491 
492 			if (cbdata->final || ctx->worker->state != rspamd_worker_state_running) {
493 				/* Plan exit */
494 				ev_break (ctx->event_loop, EVBREAK_ALL);
495 			}
496 		}
497 		else {
498 			msg_err ("cannot commit update transaction to fuzzy backend %s; "
499 					 "%ud updates are still left; %ud currently pending;"
500 					 " %d updates left",
501 					ctx->worker->cf->bind_conf ?
502 						ctx->worker->cf->bind_conf->bind_line :
503 						"unknown",
504 					cbdata->updates_pending->len,
505 					ctx->updates_pending->len,
506 					ctx->updates_maxfail - ctx->updates_failed);
507 			/* Move the remaining updates to ctx queue */
508 			g_array_append_vals (ctx->updates_pending,
509 					cbdata->updates_pending->data,
510 					cbdata->updates_pending->len);
511 
512 			if (cbdata->final) {
513 				/* Try one more time */
514 				rspamd_fuzzy_process_updates_queue (cbdata->ctx, cbdata->source,
515 						cbdata->final);
516 			}
517 		}
518 	}
519 
520 	g_array_free (cbdata->updates_pending, TRUE);
521 	g_free (cbdata->source);
522 	g_free (cbdata);
523 }
524 
525 static gboolean
rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx * ctx,const gchar * source,gboolean final)526 rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
527 		const gchar *source, gboolean final)
528 {
529 
530 	struct rspamd_updates_cbdata *cbdata;
531 
532 	if (ctx->updates_pending->len > 0) {
533 		cbdata = g_malloc (sizeof (*cbdata));
534 		cbdata->ctx = ctx;
535 		cbdata->final = final;
536 		cbdata->updates_pending = ctx->updates_pending;
537 		ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
538 				sizeof (struct fuzzy_peer_cmd),
539 				MAX (cbdata->updates_pending->len, 1024));
540 		cbdata->source = g_strdup (source);
541 		rspamd_fuzzy_backend_process_updates (ctx->backend,
542 				cbdata->updates_pending,
543 				source, rspamd_fuzzy_updates_cb, cbdata);
544 		return TRUE;
545 	}
546 	else if (final) {
547 		/* No need to sync */
548 		ev_break (ctx->event_loop, EVBREAK_ALL);
549 	}
550 
551 	return FALSE;
552 }
553 
554 static void
rspamd_fuzzy_reply_io(EV_P_ ev_io * w,int revents)555 rspamd_fuzzy_reply_io (EV_P_ ev_io *w, int revents)
556 {
557 	struct fuzzy_session *session = (struct fuzzy_session *)w->data;
558 
559 	ev_io_stop (EV_A_ w);
560 	rspamd_fuzzy_write_reply (session);
561 	REF_RELEASE (session);
562 }
563 
564 static void
rspamd_fuzzy_write_reply(struct fuzzy_session * session)565 rspamd_fuzzy_write_reply (struct fuzzy_session *session)
566 {
567 	gssize r;
568 	gsize len;
569 	gconstpointer data;
570 
571 	if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
572 				session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
573 		/* Encrypted reply */
574 		data = &session->reply;
575 
576 		if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
577 			len = sizeof (session->reply);
578 		}
579 		else {
580 			len = sizeof (session->reply.hdr) + sizeof (session->reply.rep.v1);
581 		}
582 	}
583 	else {
584 		data = &session->reply.rep;
585 
586 		if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
587 			len = sizeof (session->reply.rep);
588 		}
589 		else {
590 			len = sizeof (session->reply.rep.v1);
591 		}
592 	}
593 
594 	r = rspamd_inet_address_sendto (session->fd, data, len, 0,
595 			session->addr);
596 
597 	if (r == -1) {
598 		if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
599 			/* Grab reference to avoid early destruction */
600 			REF_RETAIN (session);
601 			session->io.data = session;
602 			ev_io_init (&session->io,
603 					rspamd_fuzzy_reply_io, session->fd, EV_WRITE);
604 			ev_io_start (session->ctx->event_loop, &session->io);
605 		}
606 		else {
607 			msg_err ("error while writing reply: %s", strerror (errno));
608 		}
609 	}
610 }
611 
612 static void
rspamd_fuzzy_update_stats(struct rspamd_fuzzy_storage_ctx * ctx,enum rspamd_fuzzy_epoch epoch,gboolean matched,gboolean is_shingle,gboolean is_delayed,struct fuzzy_key_stat * key_stat,struct fuzzy_key_stat * ip_stat,guint cmd,guint reply)613 rspamd_fuzzy_update_stats (struct rspamd_fuzzy_storage_ctx *ctx,
614 		enum rspamd_fuzzy_epoch epoch,
615 		gboolean matched,
616 		gboolean is_shingle,
617 		gboolean is_delayed,
618 		struct fuzzy_key_stat *key_stat,
619 		struct fuzzy_key_stat *ip_stat,
620 		guint cmd, guint reply)
621 {
622 	ctx->stat.fuzzy_hashes_checked[epoch] ++;
623 
624 	if (matched) {
625 		ctx->stat.fuzzy_hashes_found[epoch]++;
626 	}
627 	if (is_shingle) {
628 		ctx->stat.fuzzy_shingles_checked[epoch]++;
629 	}
630 	if (is_delayed) {
631 		ctx->stat.delayed_hashes ++;
632 	}
633 
634 	if (key_stat) {
635 		if (!matched && reply != 0) {
636 			key_stat->errors ++;
637 		}
638 		else {
639 			if (cmd == FUZZY_CHECK) {
640 				key_stat->checked++;
641 
642 				if (matched) {
643 					key_stat->matched ++;
644 				}
645 			}
646 			else if (cmd == FUZZY_WRITE) {
647 				key_stat->added++;
648 			}
649 			else if (cmd == FUZZY_DEL) {
650 				key_stat->deleted++;
651 			}
652 		}
653 	}
654 
655 	if (ip_stat) {
656 		if (!matched && reply != 0) {
657 			ip_stat->errors++;
658 		}
659 		else {
660 			if (cmd == FUZZY_CHECK) {
661 				ip_stat->checked++;
662 
663 				if (matched) {
664 					ip_stat->matched++;
665 				}
666 			}
667 			else if (cmd == FUZZY_WRITE) {
668 				ip_stat->added++;
669 			}
670 			else if (cmd == FUZZY_DEL) {
671 				ip_stat->deleted++;
672 			}
673 		}
674 	}
675 }
676 
677 enum rspamd_fuzzy_reply_flags {
678 	RSPAMD_FUZZY_REPLY_ENCRYPTED = 0x1u << 0u,
679 	RSPAMD_FUZZY_REPLY_SHINGLE = 0x1u << 1u,
680 	RSPAMD_FUZZY_REPLY_DELAY = 0x1u << 2u,
681 };
682 
683 static void
rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd * cmd,struct rspamd_fuzzy_reply * result,struct fuzzy_session * session,gint flags)684 rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd,
685 		struct rspamd_fuzzy_reply *result,
686 		struct fuzzy_session *session,
687 		gint flags)
688 {
689 	gsize len;
690 
691 	if (cmd) {
692 		result->v1.tag = cmd->tag;
693 		memcpy (&session->reply.rep, result, sizeof (*result));
694 
695 		rspamd_fuzzy_update_stats (session->ctx,
696 				session->epoch,
697 				result->v1.prob > 0.5,
698 				flags & RSPAMD_FUZZY_REPLY_SHINGLE,
699 				flags & RSPAMD_FUZZY_REPLY_DELAY,
700 				session->key_stat,
701 				session->ip_stat,
702 				cmd->cmd,
703 				result->v1.value);
704 
705 		if (flags & RSPAMD_FUZZY_REPLY_DELAY) {
706 			/* Hash is too fresh, need to delay it */
707 			session->reply.rep.ts = 0;
708 			session->reply.rep.v1.prob = 0.0;
709 			session->reply.rep.v1.value = 0;
710 		}
711 
712 		if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) {
713 			/* We need also to encrypt reply */
714 			ottery_rand_bytes (session->reply.hdr.nonce,
715 					sizeof (session->reply.hdr.nonce));
716 
717 			/*
718 			 * For old replies we need to encrypt just old part, otherwise
719 			 * decryption would fail due to mac verification mistake
720 			 */
721 
722 			if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
723 				len = sizeof (session->reply.rep);
724 			}
725 			else {
726 				len = sizeof (session->reply.rep.v1);
727 			}
728 
729 			rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
730 					len,
731 					session->reply.hdr.nonce,
732 					session->nm,
733 					session->reply.hdr.mac,
734 					RSPAMD_CRYPTOBOX_MODE_25519);
735 		}
736 	}
737 
738 	rspamd_fuzzy_write_reply (session);
739 }
740 
741 static gboolean
fuzzy_peer_try_send(gint fd,struct fuzzy_peer_request * up_req)742 fuzzy_peer_try_send (gint fd, struct fuzzy_peer_request *up_req)
743 {
744 	gssize r;
745 
746 	r = write (fd, &up_req->cmd, sizeof (up_req->cmd));
747 
748 	if (r != sizeof (up_req->cmd)) {
749 		return FALSE;
750 	}
751 
752 	return TRUE;
753 }
754 
755 static void
fuzzy_peer_send_io(EV_P_ ev_io * w,int revents)756 fuzzy_peer_send_io (EV_P_ ev_io *w, int revents)
757 {
758 	struct fuzzy_peer_request *up_req = (struct fuzzy_peer_request *)w->data;
759 
760 	if (!fuzzy_peer_try_send (w->fd, up_req)) {
761 		msg_err ("cannot send update request to the peer: %s", strerror (errno));
762 	}
763 
764 	ev_io_stop (EV_A_ w);
765 	g_free (up_req);
766 }
767 
768 static void
rspamd_fuzzy_extensions_tolua(lua_State * L,struct fuzzy_session * session)769 rspamd_fuzzy_extensions_tolua (lua_State *L,
770 							   struct fuzzy_session *session)
771 {
772 	struct rspamd_fuzzy_cmd_extension *ext;
773 	rspamd_inet_addr_t *addr;
774 
775 	lua_createtable (L, 0, 0);
776 
777 	LL_FOREACH (session->extensions, ext) {
778 		switch (ext->ext) {
779 		case RSPAMD_FUZZY_EXT_SOURCE_DOMAIN:
780 			lua_pushlstring (L, ext->payload, ext->length);
781 			lua_setfield (L, -2, "domain");
782 			break;
783 		case RSPAMD_FUZZY_EXT_SOURCE_IP4:
784 			addr = rspamd_inet_address_new (AF_INET, ext->payload);
785 			rspamd_lua_ip_push (L, addr);
786 			rspamd_inet_address_free (addr);
787 			lua_setfield (L, -2, "ip");
788 			break;
789 		case RSPAMD_FUZZY_EXT_SOURCE_IP6:
790 			addr = rspamd_inet_address_new (AF_INET6, ext->payload);
791 			rspamd_lua_ip_push (L, addr);
792 			rspamd_inet_address_free (addr);
793 			lua_setfield (L, -2, "ip");
794 			break;
795 		}
796 	}
797 }
798 
799 static void
rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply * result,void * ud)800 rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud)
801 {
802 	struct fuzzy_session *session = ud;
803 	gboolean is_shingle = FALSE, __attribute__ ((unused)) encrypted = FALSE;
804 	struct rspamd_fuzzy_cmd *cmd = NULL;
805 	const struct rspamd_shingle *shingle = NULL;
806 	struct rspamd_shingle sgl_cpy;
807 	gint send_flags = 0;
808 
809 	switch (session->cmd_type) {
810 	case CMD_ENCRYPTED_NORMAL:
811 		encrypted = TRUE;
812 		send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
813 		/* Fallthrough */
814 	case CMD_NORMAL:
815 		cmd = &session->cmd.basic;
816 		break;
817 
818 	case CMD_ENCRYPTED_SHINGLE:
819 		encrypted = TRUE;
820 		send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
821 		/* Fallthrough */
822 	case CMD_SHINGLE:
823 		cmd = &session->cmd.basic;
824 		memcpy (&sgl_cpy, &session->cmd.sgl, sizeof (sgl_cpy));
825 		shingle = &sgl_cpy;
826 		is_shingle = TRUE;
827 		send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
828 		break;
829 	}
830 
831 	if (session->ctx->lua_post_handler_cbref != -1) {
832 		/* Start lua post handler */
833 		lua_State *L = session->ctx->cfg->lua_state;
834 		gint err_idx, ret;
835 
836 		lua_pushcfunction (L, &rspamd_lua_traceback);
837 		err_idx = lua_gettop (L);
838 		/* Preallocate stack (small opt) */
839 		lua_checkstack (L, err_idx + 9);
840 		/* function */
841 		lua_rawgeti (L, LUA_REGISTRYINDEX, session->ctx->lua_post_handler_cbref);
842 		/* client IP */
843 		rspamd_lua_ip_push (L, session->addr);
844 		/* client command */
845 		lua_pushinteger (L, cmd->cmd);
846 		/* command value (push as rspamd_text) */
847 		(void)lua_new_text (L, result->digest, sizeof (result->digest), FALSE);
848 		/* is shingle */
849 		lua_pushboolean (L, is_shingle);
850 		/* result value */
851 		lua_pushinteger (L, result->v1.value);
852 		/* result probability */
853 		lua_pushnumber (L, result->v1.prob);
854 		/* result flag */
855 		lua_pushinteger (L, result->v1.flag);
856 		/* result timestamp */
857 		lua_pushinteger (L, result->ts);
858 		/* TODO: add additional data maybe (encryption, pubkey, etc) */
859 		rspamd_fuzzy_extensions_tolua (L, session);
860 
861 		if ((ret = lua_pcall (L, 9, LUA_MULTRET, err_idx)) != 0) {
862 			msg_err ("call to lua_post_handler lua "
863 					 "script failed (%d): %s", ret, lua_tostring (L, -1));
864 		}
865 		else {
866 			/* Return values order:
867 			 * the first reply will be on err_idx + 1
868 			 * if it is true, then we need to read the former ones:
869 			 * 2-nd will be reply code
870 			 * 3-rd will be probability (or 0.0 if missing)
871 			 * 4-th value is flag (or default flag if missing)
872 			 */
873 			ret = lua_toboolean (L, err_idx + 1);
874 
875 			if (ret) {
876 				/* Artificial reply */
877 				result->v1.value = lua_tointeger (L, err_idx + 2);
878 
879 				if (lua_isnumber (L, err_idx + 3)) {
880 					result->v1.prob = lua_tonumber (L, err_idx + 3);
881 				}
882 				else {
883 					result->v1.prob = 0.0f;
884 				}
885 
886 				if (lua_isnumber (L, err_idx + 4)) {
887 					result->v1.flag = lua_tointeger (L, err_idx + 4);
888 				}
889 
890 				lua_settop (L, 0);
891 				rspamd_fuzzy_make_reply (cmd, result, session, send_flags);
892 				REF_RELEASE (session);
893 
894 				return;
895 			}
896 		}
897 
898 		lua_settop (L, 0);
899 	}
900 
901 	if (!isnan (session->ctx->delay) &&
902 			rspamd_match_radix_map_addr (session->ctx->delay_whitelist,
903 					session->addr) == NULL)  {
904 		gdouble hash_age = rspamd_get_calendar_ticks () - result->ts;
905 		gdouble jittered_age = rspamd_time_jitter (session->ctx->delay,
906 				session->ctx->delay / 2.0);
907 
908 		if (hash_age < jittered_age) {
909 			send_flags |= RSPAMD_FUZZY_REPLY_DELAY;
910 		}
911 	}
912 
913 	/* Refresh hash if found with strong confidence */
914 	if (result->v1.prob > 0.9 && !session->ctx->read_only) {
915 		struct fuzzy_peer_cmd up_cmd;
916 		struct fuzzy_peer_request *up_req;
917 
918 		if (session->worker->index == 0) {
919 			/* Just add to the queue */
920 			memset (&up_cmd, 0, sizeof (up_cmd));
921 			up_cmd.is_shingle = is_shingle;
922 			memcpy (up_cmd.cmd.normal.digest, result->digest,
923 					sizeof (up_cmd.cmd.normal.digest));
924 			up_cmd.cmd.normal.flag = result->v1.flag;
925 			up_cmd.cmd.normal.cmd = FUZZY_REFRESH;
926 			up_cmd.cmd.normal.shingles_count = cmd->shingles_count;
927 
928 			if (is_shingle && shingle) {
929 				memcpy (&up_cmd.cmd.shingle.sgl, shingle,
930 						sizeof (up_cmd.cmd.shingle.sgl));
931 			}
932 
933 			g_array_append_val (session->ctx->updates_pending, up_cmd);
934 		}
935 		else {
936 			/* We need to send request to the peer */
937 			up_req = g_malloc0 (sizeof (*up_req));
938 			up_req->cmd.is_shingle = is_shingle;
939 
940 			memcpy (up_req->cmd.cmd.normal.digest, result->digest,
941 					sizeof (up_req->cmd.cmd.normal.digest));
942 			up_req->cmd.cmd.normal.flag = result->v1.flag;
943 			up_req->cmd.cmd.normal.cmd = FUZZY_REFRESH;
944 			up_req->cmd.cmd.normal.shingles_count = cmd->shingles_count;
945 
946 			if (is_shingle && shingle) {
947 				memcpy (&up_req->cmd.cmd.shingle.sgl, shingle,
948 						sizeof (up_req->cmd.cmd.shingle.sgl));
949 			}
950 
951 			if (!fuzzy_peer_try_send (session->ctx->peer_fd, up_req)) {
952 				up_req->io_ev.data = up_req;
953 				ev_io_init (&up_req->io_ev, fuzzy_peer_send_io,
954 						session->ctx->peer_fd, EV_WRITE);
955 				ev_io_start (session->ctx->event_loop, &up_req->io_ev);
956 			}
957 			else {
958 				g_free (up_req);
959 			}
960 		}
961 	}
962 
963 	rspamd_fuzzy_make_reply (cmd, result, session, send_flags);
964 
965 	REF_RELEASE (session);
966 }
967 
968 static void
rspamd_fuzzy_process_command(struct fuzzy_session * session)969 rspamd_fuzzy_process_command (struct fuzzy_session *session)
970 {
971 	gboolean is_shingle = FALSE, __attribute__ ((unused)) encrypted = FALSE;
972 	struct rspamd_fuzzy_cmd *cmd = NULL;
973 	struct rspamd_fuzzy_reply result;
974 	struct fuzzy_peer_cmd up_cmd;
975 	struct fuzzy_peer_request *up_req;
976 	struct fuzzy_key_stat *ip_stat = NULL;
977 	gchar hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
978 	rspamd_inet_addr_t *naddr;
979 	gpointer ptr;
980 	gsize up_len = 0;
981 	gint send_flags = 0;
982 
983 	cmd = &session->cmd.basic;
984 
985 	switch (session->cmd_type) {
986 	case CMD_NORMAL:
987 		up_len = sizeof (session->cmd.basic);
988 		break;
989 	case CMD_SHINGLE:
990 		up_len = sizeof (session->cmd);
991 		is_shingle = TRUE;
992 		send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
993 		break;
994 	case CMD_ENCRYPTED_NORMAL:
995 		up_len = sizeof (session->cmd.basic);
996 		encrypted = TRUE;
997 		send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
998 		break;
999 	case CMD_ENCRYPTED_SHINGLE:
1000 		up_len = sizeof (session->cmd);
1001 		encrypted = TRUE;
1002 		is_shingle = TRUE;
1003 		send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE|RSPAMD_FUZZY_REPLY_ENCRYPTED;
1004 		break;
1005 	default:
1006 		msg_err ("invalid command type: %d", session->cmd_type);
1007 		return;
1008 	}
1009 
1010 	memset (&result, 0, sizeof (result));
1011 	memcpy (result.digest, cmd->digest, sizeof (result.digest));
1012 	result.v1.flag = cmd->flag;
1013 	result.v1.tag = cmd->tag;
1014 
1015 	if (session->ctx->lua_pre_handler_cbref != -1) {
1016 		/* Start lua pre handler */
1017 		lua_State *L = session->ctx->cfg->lua_state;
1018 		gint err_idx, ret;
1019 
1020 		lua_pushcfunction (L, &rspamd_lua_traceback);
1021 		err_idx = lua_gettop (L);
1022 		/* Preallocate stack (small opt) */
1023 		lua_checkstack (L, err_idx + 5);
1024 		/* function */
1025 		lua_rawgeti (L, LUA_REGISTRYINDEX, session->ctx->lua_pre_handler_cbref);
1026 		/* client IP */
1027 		rspamd_lua_ip_push (L, session->addr);
1028 		/* client command */
1029 		lua_pushinteger (L, cmd->cmd);
1030 		/* command value (push as rspamd_text) */
1031 		(void)lua_new_text (L, cmd->digest, sizeof (cmd->digest), FALSE);
1032 		/* is shingle */
1033 		lua_pushboolean (L, is_shingle);
1034 		/* TODO: add additional data maybe (encryption, pubkey, etc) */
1035 		rspamd_fuzzy_extensions_tolua (L, session);
1036 
1037 		if ((ret = lua_pcall (L, 5, LUA_MULTRET, err_idx)) != 0) {
1038 			msg_err ("call to lua_pre_handler lua "
1039 						  "script failed (%d): %s", ret, lua_tostring (L, -1));
1040 		}
1041 		else {
1042 			/* Return values order:
1043 			 * the first reply will be on err_idx + 1
1044 			 * if it is true, then we need to read the former ones:
1045 			 * 2-nd will be reply code
1046 			 * 3-rd will be probability (or 0.0 if missing)
1047 			 */
1048 			ret = lua_toboolean (L, err_idx + 1);
1049 
1050 			if (ret) {
1051 				/* Artificial reply */
1052 				result.v1.value = lua_tointeger (L, err_idx + 2);
1053 
1054 				if (lua_isnumber (L, err_idx + 3)) {
1055 					result.v1.prob = lua_tonumber (L, err_idx + 3);
1056 				}
1057 				else {
1058 					result.v1.prob = 0.0f;
1059 				}
1060 
1061 				lua_settop (L, 0);
1062 				rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1063 
1064 				return;
1065 			}
1066 		}
1067 
1068 		lua_settop (L, 0);
1069 	}
1070 
1071 
1072 	if (G_UNLIKELY (cmd == NULL || up_len == 0)) {
1073 		result.v1.value = 500;
1074 		result.v1.prob = 0.0f;
1075 		rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1076 		return;
1077 	}
1078 
1079 	if (session->ctx->encrypted_only && !encrypted) {
1080 		/* Do not accept unencrypted commands */
1081 		result.v1.value = 403;
1082 		result.v1.prob = 0.0f;
1083 		rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1084 		return;
1085 	}
1086 
1087 	if (session->key_stat) {
1088 		ip_stat = rspamd_lru_hash_lookup (session->key_stat->last_ips,
1089 				session->addr, -1);
1090 
1091 		if (ip_stat == NULL) {
1092 			naddr = rspamd_inet_address_copy (session->addr);
1093 			ip_stat = g_malloc0 (sizeof (*ip_stat));
1094 			REF_INIT_RETAIN (ip_stat, fuzzy_key_stat_dtor);
1095 			rspamd_lru_hash_insert (session->key_stat->last_ips,
1096 					naddr, ip_stat, -1, 0);
1097 		}
1098 
1099 		REF_RETAIN (ip_stat);
1100 		session->ip_stat = ip_stat;
1101 	}
1102 
1103 	if (cmd->cmd == FUZZY_CHECK) {
1104 		bool can_continue = true;
1105 
1106 		if (session->ctx->ratelimit_buckets) {
1107 			if (session->ctx->ratelimit_log_only) {
1108 				(void)rspamd_fuzzy_check_ratelimit (session); /* Check but ignore */
1109 			}
1110 			else {
1111 				can_continue = rspamd_fuzzy_check_ratelimit (session);
1112 			}
1113 		}
1114 
1115 		if (can_continue) {
1116 			REF_RETAIN (session);
1117 			rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
1118 					rspamd_fuzzy_check_callback, session);
1119 		}
1120 		else {
1121 			result.v1.value = 403;
1122 			result.v1.prob = 0.0f;
1123 			result.v1.flag = 0;
1124 			rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1125 		}
1126 	}
1127 	else if (cmd->cmd == FUZZY_STAT) {
1128 		result.v1.prob = 1.0f;
1129 		result.v1.value = 0;
1130 		result.v1.flag = session->ctx->stat.fuzzy_hashes;
1131 		rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1132 	}
1133 	else {
1134 		if (rspamd_fuzzy_check_write (session)) {
1135 			/* Check whitelist */
1136 			if (session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
1137 				rspamd_encode_hex_buf (cmd->digest, sizeof (cmd->digest),
1138 					hexbuf, sizeof (hexbuf) - 1);
1139 				hexbuf[sizeof (hexbuf) - 1] = '\0';
1140 
1141 				if (rspamd_match_hash_map (session->ctx->skip_hashes,
1142 						hexbuf, sizeof (hexbuf) - 1)) {
1143 					result.v1.value = 401;
1144 					result.v1.prob = 0.0f;
1145 
1146 					goto reply;
1147 				}
1148 			}
1149 
1150 			if (session->worker->index == 0 || session->ctx->peer_fd == -1) {
1151 				/* Just add to the queue */
1152 				up_cmd.is_shingle = is_shingle;
1153 				ptr = is_shingle ?
1154 						(gpointer)&up_cmd.cmd.shingle :
1155 						(gpointer)&up_cmd.cmd.normal;
1156 				memcpy (ptr, cmd, up_len);
1157 				g_array_append_val (session->ctx->updates_pending, up_cmd);
1158 			}
1159 			else {
1160 				/* We need to send request to the peer */
1161 				up_req = g_malloc0 (sizeof (*up_req));
1162 				up_req->cmd.is_shingle = is_shingle;
1163 				ptr = is_shingle ?
1164 						(gpointer)&up_req->cmd.cmd.shingle :
1165 						(gpointer)&up_req->cmd.cmd.normal;
1166 				memcpy (ptr, cmd, up_len);
1167 
1168 				if (!fuzzy_peer_try_send (session->ctx->peer_fd, up_req)) {
1169 					up_req->io_ev.data = up_req;
1170 					ev_io_init (&up_req->io_ev, fuzzy_peer_send_io,
1171 							session->ctx->peer_fd, EV_WRITE);
1172 					ev_io_start (session->ctx->event_loop, &up_req->io_ev);
1173 				}
1174 				else {
1175 					g_free (up_req);
1176 				}
1177 			}
1178 
1179 			result.v1.value = 0;
1180 			result.v1.prob = 1.0f;
1181 		}
1182 		else {
1183 			result.v1.value = 403;
1184 			result.v1.prob = 0.0f;
1185 		}
1186 reply:
1187 		rspamd_fuzzy_make_reply (cmd, &result, session, send_flags);
1188 	}
1189 }
1190 
1191 
1192 static enum rspamd_fuzzy_epoch
rspamd_fuzzy_command_valid(struct rspamd_fuzzy_cmd * cmd,gint r)1193 rspamd_fuzzy_command_valid (struct rspamd_fuzzy_cmd *cmd, gint r)
1194 {
1195 	enum rspamd_fuzzy_epoch ret = RSPAMD_FUZZY_EPOCH_MAX;
1196 
1197 	switch (cmd->version) {
1198 	case 4:
1199 		if (cmd->shingles_count > 0) {
1200 			if (r >= sizeof (struct rspamd_fuzzy_shingle_cmd)) {
1201 				ret = RSPAMD_FUZZY_EPOCH11;
1202 			}
1203 		}
1204 		else {
1205 			if (r >= sizeof (*cmd)) {
1206 				ret = RSPAMD_FUZZY_EPOCH11;
1207 			}
1208 		}
1209 		break;
1210 	case 3:
1211 		if (cmd->shingles_count > 0) {
1212 			if (r == sizeof (struct rspamd_fuzzy_shingle_cmd)) {
1213 				ret = RSPAMD_FUZZY_EPOCH10;
1214 			}
1215 		}
1216 		else {
1217 			if (r == sizeof (*cmd)) {
1218 				ret = RSPAMD_FUZZY_EPOCH10;
1219 			}
1220 		}
1221 		break;
1222 	case 2:
1223 		/*
1224 		 * rspamd 0.8 has slightly different tokenizer then it might be not
1225 		 * 100% compatible
1226 		 */
1227 		if (cmd->shingles_count > 0) {
1228 			if (r == sizeof (struct rspamd_fuzzy_shingle_cmd)) {
1229 				ret = RSPAMD_FUZZY_EPOCH8;
1230 			}
1231 		}
1232 		else {
1233 			ret = RSPAMD_FUZZY_EPOCH8;
1234 		}
1235 		break;
1236 	default:
1237 		break;
1238 	}
1239 
1240 	return ret;
1241 }
1242 
1243 static gboolean
rspamd_fuzzy_decrypt_command(struct fuzzy_session * s,guchar * buf,gsize buflen)1244 rspamd_fuzzy_decrypt_command (struct fuzzy_session *s, guchar *buf, gsize buflen)
1245 {
1246 	struct rspamd_fuzzy_encrypted_req_hdr hdr;
1247 	struct rspamd_cryptobox_pubkey *rk;
1248 	struct fuzzy_key *key;
1249 
1250 	if (s->ctx->default_key == NULL) {
1251 		msg_warn ("received encrypted request when encryption is not enabled");
1252 		return FALSE;
1253 	}
1254 
1255 	if (buflen < sizeof (hdr)) {
1256 		msg_warn ("XXX: should not be reached");
1257 		return FALSE;
1258 	}
1259 
1260 	memcpy (&hdr, buf, sizeof (hdr));
1261 	buf += sizeof (hdr);
1262 	buflen -= sizeof (hdr);
1263 
1264 	/* Try to find the desired key */
1265 	key = g_hash_table_lookup (s->ctx->keys, hdr.key_id);
1266 
1267 	if (key == NULL) {
1268 		/* Unknown key, assume default one */
1269 		key = s->ctx->default_key;
1270 	}
1271 
1272 	s->key_stat = key->stat;
1273 
1274 	/* Now process keypair */
1275 	rk = rspamd_pubkey_from_bin (hdr.pubkey, sizeof (hdr.pubkey),
1276 			RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519);
1277 
1278 	if (rk == NULL) {
1279 		msg_err ("bad key; ip=%s",
1280 				rspamd_inet_address_to_string (s->addr));
1281 		return FALSE;
1282 	}
1283 
1284 	rspamd_keypair_cache_process (s->ctx->keypair_cache, key->key, rk);
1285 
1286 	/* Now decrypt request */
1287 	if (!rspamd_cryptobox_decrypt_nm_inplace (buf, buflen, hdr.nonce,
1288 			rspamd_pubkey_get_nm (rk, key->key),
1289 			hdr.mac, RSPAMD_CRYPTOBOX_MODE_25519)) {
1290 		msg_err ("decryption failed; ip=%s",
1291 				rspamd_inet_address_to_string (s->addr));
1292 		rspamd_pubkey_unref (rk);
1293 
1294 		return FALSE;
1295 	}
1296 
1297 	memcpy (s->nm, rspamd_pubkey_get_nm (rk, key->key), sizeof (s->nm));
1298 	rspamd_pubkey_unref (rk);
1299 
1300 	return TRUE;
1301 }
1302 
1303 
1304 static gboolean
rspamd_fuzzy_extensions_from_wire(struct fuzzy_session * s,guchar * buf,gsize buflen)1305 rspamd_fuzzy_extensions_from_wire (struct fuzzy_session *s, guchar *buf, gsize buflen)
1306 {
1307 	struct rspamd_fuzzy_cmd_extension *ext, *prev_ext;
1308 	guchar *storage, *p = buf, *end = buf + buflen;
1309 	gsize st_len = 0, n_ext = 0;
1310 
1311 	/* Read number of extensions to allocate array */
1312 	while (p < end) {
1313 		guchar cmd = *p++;
1314 
1315 		if (p < end) {
1316 			if (cmd == RSPAMD_FUZZY_EXT_SOURCE_DOMAIN) {
1317 				/* Next byte is buf length */
1318 				guchar dom_len = *p++;
1319 
1320 				if (dom_len <= (end - p)) {
1321 					st_len += dom_len;
1322 					n_ext ++;
1323 				}
1324 				else {
1325 					/* Truncation */
1326 					return FALSE;
1327 				}
1328 
1329 				p += dom_len;
1330 			}
1331 			else if (cmd == RSPAMD_FUZZY_EXT_SOURCE_IP4) {
1332 				if (end - p >= sizeof (in_addr_t)) {
1333 					n_ext ++;
1334 					st_len += sizeof (in_addr_t);
1335 				}
1336 				else {
1337 					/* Truncation */
1338 					return FALSE;
1339 				}
1340 
1341 				p += sizeof (in_addr_t);
1342 			}
1343 			else if (cmd == RSPAMD_FUZZY_EXT_SOURCE_IP6) {
1344 				if (end - p >= sizeof (struct in6_addr)) {
1345 					n_ext ++;
1346 					st_len += sizeof (struct in6_addr);
1347 				}
1348 				else {
1349 					/* Truncation */
1350 					return FALSE;
1351 				}
1352 
1353 				p += sizeof (struct in6_addr);
1354 			}
1355 			else {
1356 				/* Invalid command */
1357 				return FALSE;
1358 			}
1359 		}
1360 		else {
1361 			/* Truncated extension */
1362 			return FALSE;
1363 		}
1364 	}
1365 
1366 	if (n_ext > 0) {
1367 		p = buf;
1368 		/*
1369 		 * Memory layout: n_ext of struct rspamd_fuzzy_cmd_extension
1370 		 *                payload for each extension in a continious data segment
1371 		 */
1372 		storage = g_malloc (n_ext * sizeof (struct rspamd_fuzzy_cmd_extension) +
1373 				st_len);
1374 
1375 		guchar *data_buf = storage +
1376 				n_ext * sizeof (struct rspamd_fuzzy_cmd_extension);
1377 		ext = (struct rspamd_fuzzy_cmd_extension *)storage;
1378 
1379 		/* All validation has been done, so we can just go further */
1380 		while (p < end) {
1381 			prev_ext = ext;
1382 			guchar cmd = *p++;
1383 
1384 			if (cmd == RSPAMD_FUZZY_EXT_SOURCE_DOMAIN) {
1385 				/* Next byte is buf length */
1386 				guchar dom_len = *p++;
1387 				guchar *dest = data_buf;
1388 
1389 				ext->ext = RSPAMD_FUZZY_EXT_SOURCE_DOMAIN;
1390 				ext->next = ext + 1;
1391 				ext->length = dom_len;
1392 				ext->payload = dest;
1393 				memcpy (dest, p, dom_len);
1394 				p += dom_len;
1395 				data_buf += dom_len;
1396 				ext = ext->next;
1397 			}
1398 			else if (cmd == RSPAMD_FUZZY_EXT_SOURCE_IP4) {
1399 				guchar *dest = data_buf;
1400 
1401 				ext->ext = RSPAMD_FUZZY_EXT_SOURCE_IP4;
1402 				ext->next = ext + 1;
1403 				ext->length = sizeof (in_addr_t);
1404 				ext->payload = dest;
1405 				memcpy (dest, p, sizeof (in_addr_t));
1406 				p += sizeof (in_addr_t);
1407 				data_buf += sizeof (in_addr_t);
1408 				ext = ext->next;
1409 			}
1410 			else if (cmd == RSPAMD_FUZZY_EXT_SOURCE_IP6) {
1411 				guchar *dest = data_buf;
1412 
1413 				ext->ext = RSPAMD_FUZZY_EXT_SOURCE_IP6;
1414 				ext->next = ext + 1;
1415 				ext->length = sizeof (struct in6_addr);
1416 				ext->payload = dest;
1417 				memcpy (dest, p, sizeof (struct in6_addr));
1418 				p += sizeof (struct in6_addr);
1419 				data_buf += sizeof (struct in6_addr);
1420 				ext = ext->next;
1421 			}
1422 			else {
1423 				g_assert_not_reached ();
1424 			}
1425 		}
1426 
1427 		/* Last next should be NULL */
1428 		prev_ext->next = NULL;
1429 
1430 		/* Rewind to the begin */
1431 		ext = (struct rspamd_fuzzy_cmd_extension *)storage;
1432 		s->extensions = ext;
1433 	}
1434 
1435 	return TRUE;
1436 }
1437 
1438 static gboolean
rspamd_fuzzy_cmd_from_wire(guchar * buf,guint buflen,struct fuzzy_session * s)1439 rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)
1440 {
1441 	enum rspamd_fuzzy_epoch epoch;
1442 	gboolean encrypted = FALSE;
1443 
1444 	if (buflen < sizeof (struct rspamd_fuzzy_cmd)) {
1445 		msg_debug ("truncated fuzzy command of size %d received", buflen);
1446 		return FALSE;
1447 	}
1448 
1449 	/* Now check encryption */
1450 
1451 	if (buflen >= sizeof (struct rspamd_fuzzy_encrypted_cmd)) {
1452 		if (memcmp (buf, fuzzy_encrypted_magic, sizeof (fuzzy_encrypted_magic)) == 0) {
1453 			/* Encrypted command */
1454 			encrypted = TRUE;
1455 		}
1456 	}
1457 
1458 	if (encrypted) {
1459 		/* Decrypt first */
1460 		if (!rspamd_fuzzy_decrypt_command (s, buf, buflen)) {
1461 			return FALSE;
1462 		}
1463 		else {
1464 			/*
1465 			 * Advance buffer to skip encrypted header.
1466 			 * Note that after rspamd_fuzzy_decrypt_command buf is unencrypted
1467 			 */
1468 			buf += sizeof (struct rspamd_fuzzy_encrypted_req_hdr);
1469 			buflen -= sizeof (struct rspamd_fuzzy_encrypted_req_hdr);
1470 		}
1471 	}
1472 
1473 	/* Fill the normal command */
1474 	if (buflen < sizeof (s->cmd.basic)) {
1475 		msg_debug ("truncated normal fuzzy command of size %d received", buflen);
1476 		return FALSE;
1477 	}
1478 
1479 	memcpy (&s->cmd.basic, buf, sizeof (s->cmd.basic));
1480 	epoch = rspamd_fuzzy_command_valid (&s->cmd.basic, buflen);
1481 
1482 	if (epoch == RSPAMD_FUZZY_EPOCH_MAX) {
1483 		msg_debug ("invalid fuzzy command of size %d received", buflen);
1484 		return FALSE;
1485 	}
1486 
1487 	s->epoch = epoch;
1488 
1489 	/* Advance buf */
1490 	buf += sizeof (s->cmd.basic);
1491 	buflen -= sizeof (s->cmd.basic);
1492 
1493 	if (s->cmd.basic.shingles_count > 0) {
1494 		if (buflen >= sizeof (s->cmd.sgl)) {
1495 			/* Copy the shingles part */
1496 			memcpy (&s->cmd.sgl, buf, sizeof (s->cmd.sgl));
1497 		}
1498 		else {
1499 			/* Truncated stuff */
1500 			msg_debug ("truncated fuzzy shingles command of size %d received", buflen);
1501 			return FALSE;
1502 		}
1503 
1504 		buf += sizeof (s->cmd.sgl);
1505 		buflen -= sizeof (s->cmd.sgl);
1506 
1507 		if (encrypted) {
1508 			s->cmd_type = CMD_ENCRYPTED_SHINGLE;
1509 		}
1510 		else {
1511 			s->cmd_type = CMD_SHINGLE;
1512 		}
1513 	}
1514 	else {
1515 		if (encrypted) {
1516 			s->cmd_type = CMD_ENCRYPTED_NORMAL;
1517 		}
1518 		else {
1519 			s->cmd_type = CMD_NORMAL;
1520 		}
1521 	}
1522 
1523 	if (buflen > 0) {
1524 		/* Process possible extensions */
1525 		if (!rspamd_fuzzy_extensions_from_wire (s, buf, buflen)) {
1526 			msg_debug ("truncated fuzzy shingles command of size %d received", buflen);
1527 			return FALSE;
1528 		}
1529 	}
1530 
1531 	return TRUE;
1532 }
1533 
1534 
1535 static void
fuzzy_session_destroy(gpointer d)1536 fuzzy_session_destroy (gpointer d)
1537 {
1538 	struct fuzzy_session *session = d;
1539 
1540 	rspamd_inet_address_free (session->addr);
1541 	rspamd_explicit_memzero (session->nm, sizeof (session->nm));
1542 	session->worker->nconns--;
1543 
1544 	if (session->ip_stat) {
1545 		REF_RELEASE (session->ip_stat);
1546 	}
1547 
1548 	if (session->extensions) {
1549 		g_free (session->extensions);
1550 	}
1551 
1552 	g_free (session);
1553 }
1554 
1555 #define FUZZY_INPUT_BUFLEN 1024
1556 #ifdef HAVE_RECVMMSG
1557 #define MSGVEC_LEN 16
1558 #else
1559 #define MSGVEC_LEN 1
1560 #endif
1561 
1562 /*
1563  * Accept new connection and construct task
1564  */
1565 static void
accept_fuzzy_socket(EV_P_ ev_io * w,int revents)1566 accept_fuzzy_socket (EV_P_ ev_io *w, int revents)
1567 {
1568 	struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
1569 	struct fuzzy_session *session;
1570 	gssize r, msg_len;
1571 	guint64 *nerrors;
1572 	struct iovec iovs[MSGVEC_LEN];
1573 	guint8 bufs[MSGVEC_LEN][FUZZY_INPUT_BUFLEN];
1574 	struct sockaddr_storage peer_sa[MSGVEC_LEN];
1575 	socklen_t salen = sizeof (peer_sa[0]);
1576 #ifdef HAVE_RECVMMSG
1577 #define MSG_FIELD(msg, field) msg.msg_hdr.field
1578 	struct mmsghdr msg[MSGVEC_LEN];
1579 #else
1580 #define MSG_FIELD(msg, field) msg.field
1581 	struct msghdr msg[MSGVEC_LEN];
1582 #endif
1583 
1584 	memset (msg, 0, sizeof (*msg) * MSGVEC_LEN);
1585 
1586 	/* Prepare messages to receive */
1587 	for (int i = 0; i < MSGVEC_LEN; i ++) {
1588 		/* Prepare msghdr structs */
1589 		iovs[i].iov_base = bufs[i];
1590 		iovs[i].iov_len = sizeof (bufs[i]);
1591 		MSG_FIELD(msg[i], msg_name) = (void *)&peer_sa[i];
1592 		MSG_FIELD(msg[i], msg_namelen) = salen;
1593 		MSG_FIELD(msg[i], msg_iov) = &iovs[i];
1594 		MSG_FIELD(msg[i], msg_iovlen) = 1;
1595 	}
1596 
1597 	/* Got some data */
1598 	if (revents == EV_READ) {
1599 
1600 		for (;;) {
1601 #ifdef HAVE_RECVMMSG
1602 			r = recvmmsg (w->fd, msg, MSGVEC_LEN, 0, NULL);
1603 #else
1604 			r = recvmsg (w->fd, msg, 0);
1605 #endif
1606 
1607 			if (r == -1) {
1608 				if (errno == EINTR) {
1609 					continue;
1610 				}
1611 				else if (errno == EAGAIN || errno == EWOULDBLOCK) {
1612 
1613 					return;
1614 				}
1615 
1616 				msg_err ("got error while reading from socket: %d, %s",
1617 						errno,
1618 						strerror (errno));
1619 				return;
1620 			}
1621 
1622 #ifndef HAVE_RECVMMSG
1623 			msg_len = r; /* Save real length in bytes here */
1624 			r = 1; /* Assume that we have received a single message */
1625 #endif
1626 
1627 			for (int i = 0; i < r; i ++) {
1628 				rspamd_inet_addr_t *client_addr;
1629 
1630 				client_addr = rspamd_inet_address_from_sa (MSG_FIELD(msg[i], msg_name),
1631 						MSG_FIELD(msg[i], msg_namelen));
1632 
1633 				if (!rspamd_fuzzy_check_client (worker->ctx, client_addr)) {
1634 					/* Disallow forbidden clients silently */
1635 					rspamd_inet_address_free (client_addr);
1636 					continue;
1637 				}
1638 
1639 				session = g_malloc0 (sizeof (*session));
1640 				REF_INIT_RETAIN (session, fuzzy_session_destroy);
1641 				session->worker = worker;
1642 				session->fd = w->fd;
1643 				session->ctx = worker->ctx;
1644 				session->time = (guint64) time (NULL);
1645 				session->addr = client_addr;
1646 				worker->nconns++;
1647 
1648 				/* Each message can have its length in case of recvmmsg */
1649 #ifdef HAVE_RECVMMSG
1650 				msg_len = msg[i].msg_len;
1651 #endif
1652 
1653 				if (rspamd_fuzzy_cmd_from_wire (iovs[i].iov_base,
1654 						msg_len, session)) {
1655 					/* Check shingles count sanity */
1656 					rspamd_fuzzy_process_command (session);
1657 				}
1658 				else {
1659 					/* Discard input */
1660 					session->ctx->stat.invalid_requests ++;
1661 					msg_debug ("invalid fuzzy command of size %z received", r);
1662 
1663 					nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips,
1664 							session->addr, -1);
1665 
1666 					if (nerrors == NULL) {
1667 						nerrors = g_malloc (sizeof (*nerrors));
1668 						*nerrors = 1;
1669 						rspamd_lru_hash_insert (session->ctx->errors_ips,
1670 								rspamd_inet_address_copy (session->addr),
1671 								nerrors, -1, -1);
1672 					}
1673 					else {
1674 						*nerrors = *nerrors + 1;
1675 					}
1676 				}
1677 
1678 				REF_RELEASE (session);
1679 			}
1680 #ifdef HAVE_RECVMMSG
1681 			/* Stop reading as we are using recvmmsg instead of recvmsg */
1682 			break;
1683 #endif
1684 		}
1685 	}
1686 }
1687 
1688 static gboolean
rspamd_fuzzy_storage_periodic_callback(void * ud)1689 rspamd_fuzzy_storage_periodic_callback (void *ud)
1690 {
1691 	struct rspamd_fuzzy_storage_ctx *ctx = ud;
1692 
1693 	if (ctx->updates_pending->len > 0) {
1694 		rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
1695 
1696 		return TRUE;
1697 	}
1698 
1699 	return FALSE;
1700 }
1701 
1702 static gboolean
rspamd_fuzzy_storage_sync(struct rspamd_main * rspamd_main,struct rspamd_worker * worker,gint fd,gint attached_fd,struct rspamd_control_command * cmd,gpointer ud)1703 rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
1704 		struct rspamd_worker *worker, gint fd,
1705 		gint attached_fd,
1706 		struct rspamd_control_command *cmd,
1707 		gpointer ud)
1708 {
1709 	struct rspamd_fuzzy_storage_ctx *ctx = ud;
1710 	struct rspamd_control_reply rep;
1711 
1712 	rep.reply.fuzzy_sync.status = 0;
1713 	rep.type = RSPAMD_CONTROL_FUZZY_SYNC;
1714 
1715 	if (ctx->backend && worker->index == 0) {
1716 		rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
1717 		rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
1718 				rspamd_fuzzy_storage_periodic_callback, ctx);
1719 	}
1720 
1721 	if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
1722 		msg_err ("cannot write reply to the control socket: %s",
1723 				strerror (errno));
1724 	}
1725 
1726 	return TRUE;
1727 }
1728 
1729 static gboolean
rspamd_fuzzy_storage_reload(struct rspamd_main * rspamd_main,struct rspamd_worker * worker,gint fd,gint attached_fd,struct rspamd_control_command * cmd,gpointer ud)1730 rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
1731 		struct rspamd_worker *worker, gint fd,
1732 		gint attached_fd,
1733 		struct rspamd_control_command *cmd,
1734 		gpointer ud)
1735 {
1736 	struct rspamd_fuzzy_storage_ctx *ctx = ud;
1737 	GError *err = NULL;
1738 	struct rspamd_control_reply rep;
1739 
1740 	msg_info ("reloading fuzzy storage after receiving reload command");
1741 
1742 	if (ctx->backend) {
1743 		/* Close backend and reopen it one more time */
1744 		rspamd_fuzzy_backend_close (ctx->backend);
1745 	}
1746 
1747 	memset (&rep, 0, sizeof (rep));
1748 	rep.type = RSPAMD_CONTROL_RELOAD;
1749 
1750 	if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->event_loop,
1751 			worker->cf->options, rspamd_main->cfg,
1752 			&err)) == NULL) {
1753 		msg_err ("cannot open backend after reload: %e", err);
1754 		rep.reply.reload.status = err->code;
1755 		g_error_free (err);
1756 	}
1757 	else {
1758 		rep.reply.reload.status = 0;
1759 	}
1760 
1761 	if (ctx->backend && worker->index == 0) {
1762 		rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
1763 				rspamd_fuzzy_storage_periodic_callback, ctx);
1764 	}
1765 
1766 	if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
1767 		msg_err ("cannot write reply to the control socket: %s",
1768 				strerror (errno));
1769 	}
1770 
1771 	return TRUE;
1772 }
1773 
1774 static ucl_object_t *
rspamd_fuzzy_storage_stat_key(struct fuzzy_key_stat * key_stat)1775 rspamd_fuzzy_storage_stat_key (struct fuzzy_key_stat *key_stat)
1776 {
1777 	ucl_object_t *res;
1778 
1779 	res = ucl_object_typed_new (UCL_OBJECT);
1780 
1781 	ucl_object_insert_key (res, ucl_object_fromint (key_stat->checked),
1782 			"checked", 0, false);
1783 	ucl_object_insert_key (res, ucl_object_fromint (key_stat->matched),
1784 			"matched", 0, false);
1785 	ucl_object_insert_key (res, ucl_object_fromint (key_stat->added),
1786 			"added", 0, false);
1787 	ucl_object_insert_key (res, ucl_object_fromint (key_stat->deleted),
1788 			"deleted", 0, false);
1789 	ucl_object_insert_key (res, ucl_object_fromint (key_stat->errors),
1790 			"errors", 0, false);
1791 
1792 	return res;
1793 }
1794 
1795 static ucl_object_t *
rspamd_fuzzy_stat_to_ucl(struct rspamd_fuzzy_storage_ctx * ctx,gboolean ip_stat)1796 rspamd_fuzzy_stat_to_ucl (struct rspamd_fuzzy_storage_ctx *ctx, gboolean ip_stat)
1797 {
1798 	struct fuzzy_key_stat *key_stat;
1799 	GHashTableIter it;
1800 	struct fuzzy_key *key;
1801 	ucl_object_t *obj, *keys_obj, *elt, *ip_elt, *ip_cur;
1802 	gpointer k, v;
1803 	gint i;
1804 	gchar keyname[17];
1805 
1806 	obj = ucl_object_typed_new (UCL_OBJECT);
1807 
1808 	keys_obj = ucl_object_typed_new (UCL_OBJECT);
1809 	g_hash_table_iter_init (&it, ctx->keys);
1810 
1811 	while (g_hash_table_iter_next (&it, &k, &v)) {
1812 		key = v;
1813 		key_stat = key->stat;
1814 
1815 		if (key_stat) {
1816 			rspamd_snprintf (keyname, sizeof (keyname), "%8bs", k);
1817 
1818 			elt = rspamd_fuzzy_storage_stat_key (key_stat);
1819 
1820 			if (key_stat->last_ips && ip_stat) {
1821 				i = 0;
1822 
1823 				ip_elt = ucl_object_typed_new (UCL_OBJECT);
1824 
1825 				while ((i = rspamd_lru_hash_foreach (key_stat->last_ips,
1826 						i, &k, &v)) != -1) {
1827 					ip_cur = rspamd_fuzzy_storage_stat_key (v);
1828 					ucl_object_insert_key (ip_elt, ip_cur,
1829 							rspamd_inet_address_to_string (k), 0, true);
1830 				}
1831 
1832 				ucl_object_insert_key (elt, ip_elt, "ips", 0, false);
1833 			}
1834 
1835 			ucl_object_insert_key (keys_obj, elt, keyname, 0, true);
1836 		}
1837 	}
1838 
1839 	ucl_object_insert_key (obj, keys_obj, "keys", 0, false);
1840 
1841 	/* Now generic stats */
1842 	ucl_object_insert_key (obj,
1843 			ucl_object_fromint (ctx->stat.fuzzy_hashes),
1844 			"fuzzy_stored",
1845 			0,
1846 			false);
1847 	ucl_object_insert_key (obj,
1848 			ucl_object_fromint (ctx->stat.fuzzy_hashes_expired),
1849 			"fuzzy_expired",
1850 			0,
1851 			false);
1852 	ucl_object_insert_key (obj,
1853 			ucl_object_fromint (ctx->stat.invalid_requests),
1854 			"invalid_requests",
1855 			0,
1856 			false);
1857 	ucl_object_insert_key (obj,
1858 			ucl_object_fromint (ctx->stat.delayed_hashes),
1859 			"delayed_hashes",
1860 			0,
1861 			false);
1862 
1863 	if (ctx->errors_ips && ip_stat) {
1864 		i = 0;
1865 
1866 		ip_elt = ucl_object_typed_new (UCL_OBJECT);
1867 
1868 		while ((i = rspamd_lru_hash_foreach (ctx->errors_ips, i, &k, &v)) != -1) {
1869 			ucl_object_insert_key (ip_elt,
1870 					ucl_object_fromint (*(guint64 *)v),
1871 					rspamd_inet_address_to_string (k), 0, true);
1872 		}
1873 
1874 		ucl_object_insert_key (obj,
1875 				ip_elt,
1876 				"errors_ips",
1877 				0,
1878 				false);
1879 	}
1880 
1881 	/* Checked by epoch */
1882 	elt = ucl_object_typed_new (UCL_ARRAY);
1883 
1884 	for (i = RSPAMD_FUZZY_EPOCH6; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
1885 		ucl_array_append (elt,
1886 				ucl_object_fromint (ctx->stat.fuzzy_hashes_checked[i]));
1887 	}
1888 
1889 	ucl_object_insert_key (obj, elt, "fuzzy_checked", 0, false);
1890 
1891 	/* Shingles by epoch */
1892 	elt = ucl_object_typed_new (UCL_ARRAY);
1893 
1894 	for (i = RSPAMD_FUZZY_EPOCH6; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
1895 		ucl_array_append (elt,
1896 				ucl_object_fromint (ctx->stat.fuzzy_shingles_checked[i]));
1897 	}
1898 
1899 	ucl_object_insert_key (obj, elt, "fuzzy_shingles", 0, false);
1900 
1901 	/* Matched by epoch */
1902 	elt = ucl_object_typed_new (UCL_ARRAY);
1903 
1904 	for (i = RSPAMD_FUZZY_EPOCH6; i < RSPAMD_FUZZY_EPOCH_MAX; i++) {
1905 		ucl_array_append (elt,
1906 				ucl_object_fromint (ctx->stat.fuzzy_hashes_found[i]));
1907 	}
1908 
1909 	ucl_object_insert_key (obj, elt, "fuzzy_found", 0, false);
1910 
1911 
1912 	return obj;
1913 }
1914 
1915 static int
lua_fuzzy_add_pre_handler(lua_State * L)1916 lua_fuzzy_add_pre_handler (lua_State *L)
1917 {
1918 	struct rspamd_worker *wrk, **pwrk = (struct rspamd_worker **)
1919 			rspamd_lua_check_udata (L, 1, "rspamd{worker}");
1920 	struct rspamd_fuzzy_storage_ctx *ctx;
1921 
1922 	if (!pwrk) {
1923 		return luaL_error (L, "invalid arguments, worker + function are expected");
1924 	}
1925 
1926 	wrk = *pwrk;
1927 
1928 	if (wrk && lua_isfunction (L, 2)) {
1929 		ctx = (struct rspamd_fuzzy_storage_ctx *)wrk->ctx;
1930 
1931 		if (ctx->lua_pre_handler_cbref != -1) {
1932 			/* Should not happen */
1933 			luaL_unref (L, LUA_REGISTRYINDEX, ctx->lua_pre_handler_cbref);
1934 		}
1935 
1936 		lua_pushvalue (L, 2);
1937 		ctx->lua_pre_handler_cbref = luaL_ref (L, LUA_REGISTRYINDEX);
1938 	}
1939 	else {
1940 		return luaL_error (L, "invalid arguments, worker + function are expected");
1941 	}
1942 
1943 	return 0;
1944 }
1945 
1946 static int
lua_fuzzy_add_post_handler(lua_State * L)1947 lua_fuzzy_add_post_handler (lua_State *L)
1948 {
1949 	struct rspamd_worker *wrk, **pwrk = (struct rspamd_worker **)
1950 			rspamd_lua_check_udata (L, 1, "rspamd{worker}");
1951 	struct rspamd_fuzzy_storage_ctx *ctx;
1952 
1953 	if (!pwrk) {
1954 		return luaL_error (L, "invalid arguments, worker + function are expected");
1955 	}
1956 
1957 	wrk = *pwrk;
1958 
1959 	if (wrk && lua_isfunction (L, 2)) {
1960 		ctx = (struct rspamd_fuzzy_storage_ctx *)wrk->ctx;
1961 
1962 		if (ctx->lua_post_handler_cbref != -1) {
1963 			/* Should not happen */
1964 			luaL_unref (L, LUA_REGISTRYINDEX, ctx->lua_post_handler_cbref);
1965 		}
1966 
1967 		lua_pushvalue (L, 2);
1968 		ctx->lua_post_handler_cbref = luaL_ref (L, LUA_REGISTRYINDEX);
1969 	}
1970 	else {
1971 		return luaL_error (L, "invalid arguments, worker + function are expected");
1972 	}
1973 
1974 	return 0;
1975 }
1976 
1977 static gboolean
rspamd_fuzzy_storage_stat(struct rspamd_main * rspamd_main,struct rspamd_worker * worker,gint fd,gint attached_fd,struct rspamd_control_command * cmd,gpointer ud)1978 rspamd_fuzzy_storage_stat (struct rspamd_main *rspamd_main,
1979 		struct rspamd_worker *worker, gint fd,
1980 		gint attached_fd,
1981 		struct rspamd_control_command *cmd,
1982 		gpointer ud)
1983 {
1984 	struct rspamd_fuzzy_storage_ctx *ctx = ud;
1985 	struct rspamd_control_reply rep;
1986 	ucl_object_t *obj;
1987 	struct ucl_emitter_functions *emit_subr;
1988 	guchar fdspace[CMSG_SPACE(sizeof (int))];
1989 	struct iovec iov;
1990 	struct msghdr msg;
1991 	struct cmsghdr *cmsg;
1992 
1993 	gint outfd = -1;
1994 	gchar tmppath[PATH_MAX];
1995 
1996 	memset (&rep, 0, sizeof (rep));
1997 	rep.type = RSPAMD_CONTROL_FUZZY_STAT;
1998 
1999 	rspamd_snprintf (tmppath, sizeof (tmppath), "%s%c%s-XXXXXXXXXX",
2000 			rspamd_main->cfg->temp_dir, G_DIR_SEPARATOR, "fuzzy-stat");
2001 
2002 	if ((outfd = mkstemp (tmppath)) == -1) {
2003 		rep.reply.fuzzy_stat.status = errno;
2004 		msg_info_main ("cannot make temporary stat file for fuzzy stat: %s",
2005 			strerror (errno));
2006 	}
2007 	else {
2008 		rep.reply.fuzzy_stat.status = 0;
2009 
2010 		memcpy (rep.reply.fuzzy_stat.storage_id,
2011 				rspamd_fuzzy_backend_id (ctx->backend),
2012 				sizeof (rep.reply.fuzzy_stat.storage_id));
2013 
2014 		obj = rspamd_fuzzy_stat_to_ucl (ctx, TRUE);
2015 		emit_subr = ucl_object_emit_fd_funcs (outfd);
2016 		ucl_object_emit_full (obj, UCL_EMIT_JSON_COMPACT, emit_subr, NULL);
2017 		ucl_object_emit_funcs_free (emit_subr);
2018 		ucl_object_unref (obj);
2019 		/* Rewind output file */
2020 		close (outfd);
2021 		outfd = open (tmppath, O_RDONLY);
2022 		unlink (tmppath);
2023 	}
2024 
2025 	/* Now we can send outfd and status message */
2026 	memset (&msg, 0, sizeof (msg));
2027 
2028 	/* Attach fd to the message */
2029 	if (outfd != -1) {
2030 		memset (fdspace, 0, sizeof (fdspace));
2031 		msg.msg_control = fdspace;
2032 		msg.msg_controllen = sizeof (fdspace);
2033 		cmsg = CMSG_FIRSTHDR (&msg);
2034 
2035 		if (cmsg) {
2036 			cmsg->cmsg_level = SOL_SOCKET;
2037 			cmsg->cmsg_type = SCM_RIGHTS;
2038 			cmsg->cmsg_len = CMSG_LEN (sizeof (int));
2039 			memcpy (CMSG_DATA (cmsg), &outfd, sizeof (int));
2040 		}
2041 	}
2042 
2043 	iov.iov_base = &rep;
2044 	iov.iov_len = sizeof (rep);
2045 	msg.msg_iov = &iov;
2046 	msg.msg_iovlen = 1;
2047 
2048 	if (sendmsg (fd, &msg, 0) == -1) {
2049 		msg_err_main ("cannot send fuzzy stat: %s", strerror (errno));
2050 	}
2051 
2052 	if (outfd != -1) {
2053 		close (outfd);
2054 	}
2055 
2056 	return TRUE;
2057 }
2058 
2059 static gboolean
fuzzy_parse_keypair(rspamd_mempool_t * pool,const ucl_object_t * obj,gpointer ud,struct rspamd_rcl_section * section,GError ** err)2060 fuzzy_parse_keypair (rspamd_mempool_t *pool,
2061 		const ucl_object_t *obj,
2062 		gpointer ud,
2063 		struct rspamd_rcl_section *section,
2064 		GError **err)
2065 {
2066 	struct rspamd_rcl_struct_parser *pd = ud;
2067 	struct rspamd_fuzzy_storage_ctx *ctx;
2068 	struct rspamd_cryptobox_keypair *kp;
2069 	struct fuzzy_key_stat *keystat;
2070 	struct fuzzy_key *key;
2071 	const ucl_object_t *cur;
2072 	const guchar *pk;
2073 	ucl_object_iter_t it = NULL;
2074 	gboolean ret;
2075 
2076 	ctx = pd->user_struct;
2077 	pd->offset = G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, default_keypair);
2078 
2079 	/*
2080 	 * Single key
2081 	 */
2082 	if (ucl_object_type (obj) == UCL_STRING || ucl_object_type (obj)
2083 			== UCL_OBJECT) {
2084 		ret = rspamd_rcl_parse_struct_keypair (pool, obj, pd, section, err);
2085 
2086 		if (!ret) {
2087 			return ret;
2088 		}
2089 
2090 		/* Insert key to the hash table */
2091 		kp = ctx->default_keypair;
2092 
2093 		if (kp == NULL) {
2094 			return FALSE;
2095 		}
2096 
2097 		if (rspamd_keypair_alg (kp) != RSPAMD_CRYPTOBOX_MODE_25519 ||
2098 				rspamd_keypair_type (kp) != RSPAMD_KEYPAIR_KEX) {
2099 			return FALSE;
2100 		}
2101 
2102 		key = g_malloc0 (sizeof (*key));
2103 		key->key = kp;
2104 		keystat = g_malloc0 (sizeof (*keystat));
2105 		REF_INIT_RETAIN (keystat, fuzzy_key_stat_dtor);
2106 		/* Hash of ip -> fuzzy_key_stat */
2107 		keystat->last_ips = rspamd_lru_hash_new_full (1024,
2108 				(GDestroyNotify) rspamd_inet_address_free,
2109 				fuzzy_key_stat_unref,
2110 				rspamd_inet_address_hash, rspamd_inet_address_equal);
2111 		key->stat = keystat;
2112 		pk = rspamd_keypair_component (kp, RSPAMD_KEYPAIR_COMPONENT_PK,
2113 				NULL);
2114 		g_hash_table_insert (ctx->keys, (gpointer)pk, key);
2115 		ctx->default_key = key;
2116 		msg_info_pool ("loaded keypair %*xs", 8, pk);
2117 	}
2118 	else if (ucl_object_type (obj) == UCL_ARRAY) {
2119 		while ((cur = ucl_object_iterate (obj, &it, true)) != NULL) {
2120 			if (!fuzzy_parse_keypair (pool, cur, pd, section, err)) {
2121 				msg_err_pool ("cannot parse keypair");
2122 			}
2123 		}
2124 	}
2125 
2126 	return TRUE;
2127 }
2128 
2129 static guint
fuzzy_kp_hash(gconstpointer p)2130 fuzzy_kp_hash (gconstpointer p)
2131 {
2132 	return *(guint *)p;
2133 }
2134 
2135 static gboolean
fuzzy_kp_equal(gconstpointer a,gconstpointer b)2136 fuzzy_kp_equal (gconstpointer a, gconstpointer b)
2137 {
2138 	const guchar *pa = a, *pb = b;
2139 
2140 	return (memcmp (pa, pb, RSPAMD_FUZZY_KEYLEN) == 0);
2141 }
2142 
2143 gpointer
init_fuzzy(struct rspamd_config * cfg)2144 init_fuzzy (struct rspamd_config *cfg)
2145 {
2146 	struct rspamd_fuzzy_storage_ctx *ctx;
2147 	GQuark type;
2148 
2149 	type = g_quark_try_string ("fuzzy");
2150 
2151 	ctx = rspamd_mempool_alloc0 (cfg->cfg_pool,
2152 			sizeof (struct rspamd_fuzzy_storage_ctx));
2153 
2154 	ctx->magic = rspamd_fuzzy_storage_magic;
2155 	ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT;
2156 	ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE;
2157 	ctx->lua_pre_handler_cbref = -1;
2158 	ctx->lua_post_handler_cbref = -1;
2159 	ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal,
2160 			NULL, fuzzy_key_dtor);
2161 	rspamd_mempool_add_destructor (cfg->cfg_pool,
2162 			(rspamd_mempool_destruct_t)g_hash_table_unref, ctx->keys);
2163 	ctx->errors_ips = rspamd_lru_hash_new_full (1024,
2164 			(GDestroyNotify) rspamd_inet_address_free, g_free,
2165 			rspamd_inet_address_hash, rspamd_inet_address_equal);
2166 	rspamd_mempool_add_destructor (cfg->cfg_pool,
2167 			(rspamd_mempool_destruct_t)rspamd_lru_hash_destroy, ctx->errors_ips);
2168 	ctx->cfg = cfg;
2169 	ctx->updates_maxfail = DEFAULT_UPDATES_MAXFAIL;
2170 	ctx->leaky_bucket_mask = DEFAULT_BUCKET_MASK;
2171 	ctx->leaky_bucket_ttl = DEFAULT_BUCKET_TTL;
2172 	ctx->max_buckets = DEFAULT_MAX_BUCKETS;
2173 	ctx->leaky_bucket_burst = NAN;
2174 	ctx->leaky_bucket_rate = NAN;
2175 	ctx->delay = NAN;
2176 
2177 	rspamd_rcl_register_worker_option (cfg,
2178 			type,
2179 			"sync",
2180 			rspamd_rcl_parse_struct_time,
2181 			ctx,
2182 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
2183 						sync_timeout),
2184 			RSPAMD_CL_FLAG_TIME_FLOAT,
2185 			"Time to perform database sync, default: "
2186 			G_STRINGIFY (DEFAULT_SYNC_TIMEOUT) " seconds");
2187 
2188 	rspamd_rcl_register_worker_option (cfg,
2189 			type,
2190 			"expire",
2191 			rspamd_rcl_parse_struct_time,
2192 			ctx,
2193 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
2194 						expire),
2195 			RSPAMD_CL_FLAG_TIME_FLOAT,
2196 			"Default expire time for hashes, default: "
2197 			G_STRINGIFY (DEFAULT_EXPIRE) " seconds");
2198 
2199 	rspamd_rcl_register_worker_option (cfg,
2200 			type,
2201 			"delay",
2202 			rspamd_rcl_parse_struct_time,
2203 			ctx,
2204 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
2205 					delay),
2206 			RSPAMD_CL_FLAG_TIME_FLOAT,
2207 			"Default delay time for hashes, default: not enabled");
2208 
2209 	rspamd_rcl_register_worker_option (cfg,
2210 			type,
2211 			"allow_update",
2212 			rspamd_rcl_parse_struct_ucl,
2213 			ctx,
2214 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, update_map),
2215 			0,
2216 			"Allow modifications from the following IP addresses");
2217 
2218 	rspamd_rcl_register_worker_option (cfg,
2219 			type,
2220 			"delay_whitelist",
2221 			rspamd_rcl_parse_struct_ucl,
2222 			ctx,
2223 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, delay_whitelist_map),
2224 			0,
2225 			"Disable delay check for the following IP addresses");
2226 
2227 	rspamd_rcl_register_worker_option (cfg,
2228 			type,
2229 			"keypair",
2230 			fuzzy_parse_keypair,
2231 			ctx,
2232 			0,
2233 			RSPAMD_CL_FLAG_MULTIPLE,
2234 			"Encryption keypair (can be repeated for different keys)");
2235 
2236 	rspamd_rcl_register_worker_option (cfg,
2237 			type,
2238 			"keypair_cache_size",
2239 			rspamd_rcl_parse_struct_integer,
2240 			ctx,
2241 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
2242 						keypair_cache_size),
2243 			RSPAMD_CL_FLAG_UINT,
2244 			"Size of keypairs cache, default: "
2245 					G_STRINGIFY (DEFAULT_KEYPAIR_CACHE_SIZE));
2246 
2247 	rspamd_rcl_register_worker_option (cfg,
2248 			type,
2249 			"encrypted_only",
2250 			rspamd_rcl_parse_struct_boolean,
2251 			ctx,
2252 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, encrypted_only),
2253 			0,
2254 			"Allow encrypted requests only (and forbid all unknown keys or plaintext requests)");
2255 	rspamd_rcl_register_worker_option (cfg,
2256 			type,
2257 			"dedicated_update_worker",
2258 			rspamd_rcl_parse_struct_boolean,
2259 			ctx,
2260 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, dedicated_update_worker),
2261 			0,
2262 			"Use worker 0 for updates only");
2263 
2264 	rspamd_rcl_register_worker_option (cfg,
2265 			type,
2266 			"read_only",
2267 			rspamd_rcl_parse_struct_boolean,
2268 			ctx,
2269 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, read_only),
2270 			0,
2271 			"Work in read only mode");
2272 
2273 
2274 	rspamd_rcl_register_worker_option (cfg,
2275 			type,
2276 			"blocked",
2277 			rspamd_rcl_parse_struct_ucl,
2278 			ctx,
2279 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, blocked_map),
2280 			0,
2281 			"Block requests from specific networks");
2282 
2283 
2284 	rspamd_rcl_register_worker_option (cfg,
2285 			type,
2286 			"updates_maxfail",
2287 			rspamd_rcl_parse_struct_integer,
2288 			ctx,
2289 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, updates_maxfail),
2290 			RSPAMD_CL_FLAG_UINT,
2291 			"Maximum number of updates to be failed before discarding");
2292 	rspamd_rcl_register_worker_option (cfg,
2293 			type,
2294 			"skip_hashes",
2295 			rspamd_rcl_parse_struct_ucl,
2296 			ctx,
2297 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, skip_map),
2298 			0,
2299 			"Skip specific hashes from the map");
2300 
2301 	/* Ratelimits */
2302 	rspamd_rcl_register_worker_option (cfg,
2303 			type,
2304 			"ratelimit_whitelist",
2305 			rspamd_rcl_parse_struct_ucl,
2306 			ctx,
2307 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_whitelist_map),
2308 			0,
2309 			"Skip specific addresses from rate limiting");
2310 	rspamd_rcl_register_worker_option (cfg,
2311 			type,
2312 			"ratelimit_max_buckets",
2313 			rspamd_rcl_parse_struct_integer,
2314 			ctx,
2315 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, max_buckets),
2316 			RSPAMD_CL_FLAG_UINT,
2317 			"Maximum number of leaky buckets (default: " G_STRINGIFY(DEFAULT_MAX_BUCKETS) ")");
2318 	rspamd_rcl_register_worker_option (cfg,
2319 			type,
2320 			"ratelimit_network_mask",
2321 			rspamd_rcl_parse_struct_integer,
2322 			ctx,
2323 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_mask),
2324 			RSPAMD_CL_FLAG_UINT,
2325 			"Network mask to apply for IPv4 rate addresses (default: " G_STRINGIFY(DEFAULT_BUCKET_MASK) ")");
2326 	rspamd_rcl_register_worker_option (cfg,
2327 			type,
2328 			"ratelimit_bucket_ttl",
2329 			rspamd_rcl_parse_struct_time,
2330 			ctx,
2331 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_ttl),
2332 			RSPAMD_CL_FLAG_TIME_INTEGER,
2333 			"Time to live for ratelimit element (default: " G_STRINGIFY(DEFAULT_BUCKET_TTL) ")");
2334 	rspamd_rcl_register_worker_option (cfg,
2335 			type,
2336 			"ratelimit_rate",
2337 			rspamd_rcl_parse_struct_double,
2338 			ctx,
2339 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_rate),
2340 			0,
2341 			"Leak rate in requests per second");
2342 	rspamd_rcl_register_worker_option (cfg,
2343 			type,
2344 			"ratelimit_burst",
2345 			rspamd_rcl_parse_struct_double,
2346 			ctx,
2347 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_burst),
2348 			0,
2349 			"Peak value for ratelimit bucket");
2350 	rspamd_rcl_register_worker_option (cfg,
2351 			type,
2352 			"ratelimit_log_only",
2353 			rspamd_rcl_parse_struct_boolean,
2354 			ctx,
2355 			G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_log_only),
2356 			0,
2357 			"Don't really ban on ratelimit reaching, just log");
2358 
2359 
2360 	return ctx;
2361 }
2362 
2363 static void
rspamd_fuzzy_peer_io(EV_P_ ev_io * w,int revents)2364 rspamd_fuzzy_peer_io (EV_P_ ev_io *w, int revents)
2365 {
2366 	struct fuzzy_peer_cmd cmd;
2367 	struct rspamd_fuzzy_storage_ctx *ctx =
2368 			(struct rspamd_fuzzy_storage_ctx *)w->data;
2369 	gssize r;
2370 
2371 	for (;;) {
2372 		r = read (w->fd, &cmd, sizeof (cmd));
2373 
2374 		if (r != sizeof (cmd)) {
2375 			if (errno == EINTR) {
2376 				continue;
2377 			}
2378 			if (errno != EAGAIN) {
2379 				msg_err ("cannot read command from peers: %s", strerror (errno));
2380 			}
2381 
2382 			break;
2383 		}
2384 		else {
2385 			g_array_append_val (ctx->updates_pending, cmd);
2386 		}
2387 	}
2388 }
2389 
2390 static void
fuzzy_peer_rep(struct rspamd_worker * worker,struct rspamd_srv_reply * rep,gint rep_fd,gpointer ud)2391 fuzzy_peer_rep (struct rspamd_worker *worker,
2392 		struct rspamd_srv_reply *rep, gint rep_fd,
2393 		gpointer ud)
2394 {
2395 	struct rspamd_fuzzy_storage_ctx *ctx = ud;
2396 	GList *cur;
2397 	struct rspamd_worker_listen_socket *ls;
2398 	struct rspamd_worker_accept_event *ac_ev;
2399 
2400 	ctx->peer_fd = rep_fd;
2401 
2402 	if (rep_fd == -1) {
2403 		msg_err ("cannot receive peer fd from the main process");
2404 		exit (EXIT_FAILURE);
2405 	}
2406 	else {
2407 		rspamd_socket_nonblocking (rep_fd);
2408 	}
2409 
2410 	msg_info ("got peer fd reply from the main process");
2411 
2412 	/* Start listening */
2413 	cur = worker->cf->listen_socks;
2414 	while (cur) {
2415 		ls = cur->data;
2416 
2417 		if (ls->fd != -1) {
2418 			msg_info ("start listening on %s",
2419 					rspamd_inet_address_to_string_pretty (ls->addr));
2420 
2421 			if (ls->type == RSPAMD_WORKER_SOCKET_UDP) {
2422 				ac_ev = g_malloc0 (sizeof (*ac_ev));
2423 				ac_ev->accept_ev.data = worker;
2424 				ac_ev->event_loop = ctx->event_loop;
2425 				ev_io_init (&ac_ev->accept_ev, accept_fuzzy_socket, ls->fd,
2426 						EV_READ);
2427 				ev_io_start (ctx->event_loop, &ac_ev->accept_ev);
2428 				DL_APPEND (worker->accept_events, ac_ev);
2429 			}
2430 			else  {
2431 				/* We allow TCP listeners only for a update worker */
2432 				g_assert_not_reached ();
2433 			}
2434 		}
2435 
2436 		cur = g_list_next (cur);
2437 	}
2438 
2439 	if (ctx->peer_fd != -1) {
2440 		if (worker->index == 0) {
2441 			/* Listen for peer requests */
2442 			shutdown (ctx->peer_fd, SHUT_WR);
2443 			ctx->peer_ev.data = ctx;
2444 			ev_io_init (&ctx->peer_ev, rspamd_fuzzy_peer_io, ctx->peer_fd, EV_READ);
2445 			ev_io_start (ctx->event_loop, &ctx->peer_ev);
2446 		}
2447 		else {
2448 			shutdown (ctx->peer_fd, SHUT_RD);
2449 		}
2450 	}
2451 }
2452 
2453 /*
2454  * Start worker process
2455  */
2456 __attribute__((noreturn))
2457 void
start_fuzzy(struct rspamd_worker * worker)2458 start_fuzzy (struct rspamd_worker *worker)
2459 {
2460 	struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
2461 	GError *err = NULL;
2462 	struct rspamd_srv_command srv_cmd;
2463 	struct rspamd_config *cfg = worker->srv->cfg;
2464 
2465 	g_assert (rspamd_worker_check_context (worker->ctx, rspamd_fuzzy_storage_magic));
2466 	ctx->event_loop = rspamd_prepare_worker (worker,
2467 			"fuzzy",
2468 			NULL);
2469 	ctx->peer_fd = -1;
2470 	ctx->worker = worker;
2471 	ctx->cfg = worker->srv->cfg;
2472 	ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
2473 			ctx->event_loop,
2474 			worker->srv->cfg);
2475 	rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
2476 			ctx->event_loop, ctx->resolver->r);
2477 	/* Since this worker uses maps it needs a valid HTTP context */
2478 	ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
2479 			ctx->cfg->ups_ctx);
2480 
2481 	if (ctx->keypair_cache_size > 0) {
2482 		/* Create keypairs cache */
2483 		ctx->keypair_cache = rspamd_keypair_cache_new (ctx->keypair_cache_size);
2484 	}
2485 
2486 
2487 	if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->event_loop,
2488 			worker->cf->options, cfg, &err)) == NULL) {
2489 		msg_err ("cannot open backend: %e", err);
2490 		if (err) {
2491 			g_error_free (err);
2492 		}
2493 		exit (EXIT_SUCCESS);
2494 	}
2495 
2496 	rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
2497 
2498 
2499 	if (worker->index == 0) {
2500 		ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
2501 				sizeof (struct fuzzy_peer_cmd), 1024);
2502 		rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
2503 				rspamd_fuzzy_storage_periodic_callback, ctx);
2504 
2505 		if (ctx->dedicated_update_worker && worker->cf->count > 1) {
2506 			msg_info_config ("stop serving clients request in dedicated update mode");
2507 			rspamd_worker_stop_accept (worker);
2508 
2509 			GList *cur = worker->cf->listen_socks;
2510 
2511 			while (cur) {
2512 				struct rspamd_worker_listen_socket *ls =
2513 						(struct rspamd_worker_listen_socket *)cur->data;
2514 
2515 				if (ls->fd != -1) {
2516 					close (ls->fd);
2517 				}
2518 
2519 				cur = g_list_next (cur);
2520 			}
2521 		}
2522 	}
2523 
2524 	ctx->stat_ev.data = ctx;
2525 	ev_timer_init (&ctx->stat_ev, rspamd_fuzzy_stat_callback, ctx->sync_timeout,
2526 			ctx->sync_timeout);
2527 	ev_timer_start (ctx->event_loop, &ctx->stat_ev);
2528 	/* Register custom reload and stat commands for the control socket */
2529 	rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD,
2530 			rspamd_fuzzy_storage_reload, ctx);
2531 	rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT,
2532 			rspamd_fuzzy_storage_stat, ctx);
2533 	rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
2534 			rspamd_fuzzy_storage_sync, ctx);
2535 
2536 	/* Create radix trees */
2537 	if (ctx->update_map != NULL) {
2538 		rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->update_map,
2539 				"Allow fuzzy updates from specified addresses",
2540 				&ctx->update_ips, NULL, worker, "fuzzy update");
2541 	}
2542 
2543 	if (ctx->skip_map != NULL) {
2544 		struct rspamd_map *m;
2545 
2546 		if ((m = rspamd_map_add_from_ucl (cfg, ctx->skip_map,
2547 				"Skip hashes",
2548 				rspamd_kv_list_read,
2549 				rspamd_kv_list_fin,
2550 				rspamd_kv_list_dtor,
2551 				(void **)&ctx->skip_hashes,
2552 				worker, RSPAMD_MAP_DEFAULT)) == NULL) {
2553 			msg_warn_config ("cannot load hashes list from %s",
2554 					ucl_object_tostring (ctx->skip_map));
2555 		}
2556 		else {
2557 			m->active_http = TRUE;
2558 		}
2559 	}
2560 
2561 	if (ctx->blocked_map != NULL) {
2562 		rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->blocked_map,
2563 				"Block fuzzy requests from the specific IPs",
2564 				&ctx->blocked_ips,
2565 				NULL,
2566 				worker, "fuzzy blocked");
2567 	}
2568 
2569 	/* Create radix trees */
2570 	if (ctx->ratelimit_whitelist_map != NULL) {
2571 		rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->ratelimit_whitelist_map,
2572 				"Skip ratelimits from specific ip addresses/networks",
2573 				&ctx->ratelimit_whitelist,
2574 				NULL,
2575 				worker, "fuzzy ratelimit whitelist");
2576 	}
2577 
2578 	if (!isnan (ctx->delay) && ctx->delay_whitelist_map != NULL) {
2579 		rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->delay_whitelist_map,
2580 				"Skip delay from the following ips",
2581 				&ctx->delay_whitelist, NULL, worker,
2582 				"fuzzy delayed whitelist");
2583 	}
2584 
2585 	/* Ratelimits */
2586 	if (!isnan (ctx->leaky_bucket_rate) && !isnan (ctx->leaky_bucket_burst)) {
2587 		ctx->ratelimit_buckets = rspamd_lru_hash_new_full (ctx->max_buckets,
2588 				NULL, fuzzy_rl_bucket_free,
2589 				rspamd_inet_address_hash, rspamd_inet_address_equal);
2590 	}
2591 
2592 	/* Maps events */
2593 	ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
2594 			ctx->event_loop,
2595 			worker->srv->cfg);
2596 	rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
2597 			ctx->resolver, worker, RSPAMD_MAP_WATCH_WORKER);
2598 
2599 	/* Get peer pipe */
2600 	memset (&srv_cmd, 0, sizeof (srv_cmd));
2601 	srv_cmd.type = RSPAMD_SRV_SOCKETPAIR;
2602 	srv_cmd.cmd.spair.af = SOCK_DGRAM;
2603 	srv_cmd.cmd.spair.pair_num = worker->index;
2604 	memset (srv_cmd.cmd.spair.pair_id, 0, sizeof (srv_cmd.cmd.spair.pair_id));
2605 	/* 6 bytes of id (including \0) and bind_conf id */
2606 	G_STATIC_ASSERT (sizeof (srv_cmd.cmd.spair.pair_id) >=
2607 								sizeof ("fuzzy") + sizeof (guint64));
2608 
2609 	memcpy (srv_cmd.cmd.spair.pair_id, "fuzzy", sizeof ("fuzzy"));
2610 
2611 	/* Distinguish workers from each others... */
2612 	if (worker->cf->bind_conf && worker->cf->bind_conf->bind_line) {
2613 		guint64 bind_hash = rspamd_cryptobox_fast_hash (worker->cf->bind_conf->bind_line,
2614 				strlen (worker->cf->bind_conf->bind_line), 0xdeadbabe);
2615 
2616 		/* 8 more bytes */
2617 		memcpy (srv_cmd.cmd.spair.pair_id + sizeof ("fuzzy"), &bind_hash,
2618 				sizeof (bind_hash));
2619 	}
2620 
2621 	rspamd_srv_send_command (worker, ctx->event_loop, &srv_cmd, -1,
2622 			fuzzy_peer_rep, ctx);
2623 
2624 	luaL_Reg fuzzy_lua_reg = {
2625 			.name = "add_fuzzy_pre_handler",
2626 			.func = lua_fuzzy_add_pre_handler,
2627 	};
2628 	rspamd_lua_add_metamethod (ctx->cfg->lua_state, "rspamd{worker}", &fuzzy_lua_reg);
2629 	fuzzy_lua_reg = (luaL_Reg){
2630 			.name = "add_fuzzy_post_handler",
2631 			.func = lua_fuzzy_add_post_handler,
2632 	};
2633 	rspamd_lua_add_metamethod (ctx->cfg->lua_state, "rspamd{worker}", &fuzzy_lua_reg);
2634 
2635 	rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
2636 			worker);
2637 
2638 	ev_loop (ctx->event_loop, 0);
2639 	rspamd_worker_block_signals ();
2640 
2641 	if (ctx->peer_fd != -1) {
2642 		if (worker->index == 0) {
2643 			ev_io_stop (ctx->event_loop, &ctx->peer_ev);
2644 		}
2645 		close (ctx->peer_fd);
2646 	}
2647 
2648 	if (worker->index == 0 && ctx->updates_pending->len > 0) {
2649 
2650 		msg_info_config ("start another event loop to sync fuzzy storage");
2651 
2652 		if (rspamd_fuzzy_process_updates_queue (ctx, local_db_name, TRUE)) {
2653 			ev_loop (ctx->event_loop, 0);
2654 			msg_info_config ("sync cycle is done");
2655 		}
2656 		else {
2657 			msg_info_config ("no need to sync");
2658 		}
2659 	}
2660 
2661 	rspamd_fuzzy_backend_close (ctx->backend);
2662 
2663 	if (worker->index == 0) {
2664 		g_array_free (ctx->updates_pending, TRUE);
2665 	}
2666 
2667 	if (ctx->keypair_cache) {
2668 		rspamd_keypair_cache_destroy (ctx->keypair_cache);
2669 	}
2670 
2671 	if (ctx->ratelimit_buckets) {
2672 		rspamd_lru_hash_destroy (ctx->ratelimit_buckets);
2673 	}
2674 
2675 	if (ctx->lua_pre_handler_cbref != -1) {
2676 		luaL_unref (ctx->cfg->lua_state, LUA_REGISTRYINDEX, ctx->lua_pre_handler_cbref);
2677 	}
2678 
2679 	if (ctx->lua_post_handler_cbref != -1) {
2680 		luaL_unref (ctx->cfg->lua_state, LUA_REGISTRYINDEX, ctx->lua_post_handler_cbref);
2681 	}
2682 
2683 	REF_RELEASE (ctx->cfg);
2684 	rspamd_log_close (worker->srv->logger);
2685 
2686 	exit (EXIT_SUCCESS);
2687 }
2688