1 /*-
2 * Copyright 2016 Vsevolod Stakhov
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "config.h"
18 #include "fuzzy_backend.h"
19 #include "fuzzy_backend_sqlite.h"
20 #include "fuzzy_backend_redis.h"
21 #include "cfg_file.h"
22 #include "fuzzy_wire.h"
23
24 #define DEFAULT_EXPIRE 172800L
25
26 enum rspamd_fuzzy_backend_type {
27 RSPAMD_FUZZY_BACKEND_SQLITE = 0,
28 RSPAMD_FUZZY_BACKEND_REDIS = 1,
29 };
30
31 static void* rspamd_fuzzy_backend_init_sqlite (struct rspamd_fuzzy_backend *bk,
32 const ucl_object_t *obj, struct rspamd_config *cfg, GError **err);
33 static void rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk,
34 const struct rspamd_fuzzy_cmd *cmd,
35 rspamd_fuzzy_check_cb cb, void *ud,
36 void *subr_ud);
37 static void rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk,
38 GArray *updates, const gchar *src,
39 rspamd_fuzzy_update_cb cb, void *ud,
40 void *subr_ud);
41 static void rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk,
42 rspamd_fuzzy_count_cb cb, void *ud,
43 void *subr_ud);
44 static void rspamd_fuzzy_backend_version_sqlite (struct rspamd_fuzzy_backend *bk,
45 const gchar *src,
46 rspamd_fuzzy_version_cb cb, void *ud,
47 void *subr_ud);
48 static const gchar* rspamd_fuzzy_backend_id_sqlite (struct rspamd_fuzzy_backend *bk,
49 void *subr_ud);
50 static void rspamd_fuzzy_backend_expire_sqlite (struct rspamd_fuzzy_backend *bk,
51 void *subr_ud);
52 static void rspamd_fuzzy_backend_close_sqlite (struct rspamd_fuzzy_backend *bk,
53 void *subr_ud);
54
55 struct rspamd_fuzzy_backend_subr {
56 void* (*init) (struct rspamd_fuzzy_backend *bk, const ucl_object_t *obj,
57 struct rspamd_config *cfg,
58 GError **err);
59 void (*check) (struct rspamd_fuzzy_backend *bk,
60 const struct rspamd_fuzzy_cmd *cmd,
61 rspamd_fuzzy_check_cb cb, void *ud,
62 void *subr_ud);
63 void (*update) (struct rspamd_fuzzy_backend *bk,
64 GArray *updates, const gchar *src,
65 rspamd_fuzzy_update_cb cb, void *ud,
66 void *subr_ud);
67 void (*count) (struct rspamd_fuzzy_backend *bk,
68 rspamd_fuzzy_count_cb cb, void *ud,
69 void *subr_ud);
70 void (*version) (struct rspamd_fuzzy_backend *bk,
71 const gchar *src,
72 rspamd_fuzzy_version_cb cb, void *ud,
73 void *subr_ud);
74 const gchar* (*id) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
75 void (*periodic) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
76 void (*close) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
77 };
78
79 static const struct rspamd_fuzzy_backend_subr fuzzy_subrs[] = {
80 [RSPAMD_FUZZY_BACKEND_SQLITE] = {
81 .init = rspamd_fuzzy_backend_init_sqlite,
82 .check = rspamd_fuzzy_backend_check_sqlite,
83 .update = rspamd_fuzzy_backend_update_sqlite,
84 .count = rspamd_fuzzy_backend_count_sqlite,
85 .version = rspamd_fuzzy_backend_version_sqlite,
86 .id = rspamd_fuzzy_backend_id_sqlite,
87 .periodic = rspamd_fuzzy_backend_expire_sqlite,
88 .close = rspamd_fuzzy_backend_close_sqlite,
89 },
90 #ifdef WITH_HIREDIS
91 [RSPAMD_FUZZY_BACKEND_REDIS] = {
92 .init = rspamd_fuzzy_backend_init_redis,
93 .check = rspamd_fuzzy_backend_check_redis,
94 .update = rspamd_fuzzy_backend_update_redis,
95 .count = rspamd_fuzzy_backend_count_redis,
96 .version = rspamd_fuzzy_backend_version_redis,
97 .id = rspamd_fuzzy_backend_id_redis,
98 .periodic = rspamd_fuzzy_backend_expire_redis,
99 .close = rspamd_fuzzy_backend_close_redis,
100 }
101 #endif
102 };
103
104 struct rspamd_fuzzy_backend {
105 enum rspamd_fuzzy_backend_type type;
106 gdouble expire;
107 gdouble sync;
108 struct ev_loop *event_loop;
109 rspamd_fuzzy_periodic_cb periodic_cb;
110 void *periodic_ud;
111 const struct rspamd_fuzzy_backend_subr *subr;
112 void *subr_ud;
113 ev_timer periodic_event;
114 };
115
116 static GQuark
rspamd_fuzzy_backend_quark(void)117 rspamd_fuzzy_backend_quark (void)
118 {
119 return g_quark_from_static_string ("fuzzy-backend");
120 }
121
122 static void*
rspamd_fuzzy_backend_init_sqlite(struct rspamd_fuzzy_backend * bk,const ucl_object_t * obj,struct rspamd_config * cfg,GError ** err)123 rspamd_fuzzy_backend_init_sqlite (struct rspamd_fuzzy_backend *bk,
124 const ucl_object_t *obj, struct rspamd_config *cfg, GError **err)
125 {
126 const ucl_object_t *elt;
127
128 elt = ucl_object_lookup_any (obj, "hashfile", "hash_file", "file",
129 "database", NULL);
130
131 if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
132 g_set_error (err, rspamd_fuzzy_backend_quark (),
133 EINVAL, "missing sqlite3 path");
134 return NULL;
135 }
136
137 return rspamd_fuzzy_backend_sqlite_open (ucl_object_tostring (elt),
138 FALSE, err);
139 }
140
141 static void
rspamd_fuzzy_backend_check_sqlite(struct rspamd_fuzzy_backend * bk,const struct rspamd_fuzzy_cmd * cmd,rspamd_fuzzy_check_cb cb,void * ud,void * subr_ud)142 rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk,
143 const struct rspamd_fuzzy_cmd *cmd,
144 rspamd_fuzzy_check_cb cb, void *ud,
145 void *subr_ud)
146 {
147 struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
148 struct rspamd_fuzzy_reply rep;
149
150 rep = rspamd_fuzzy_backend_sqlite_check (sq, cmd, bk->expire);
151
152 if (cb) {
153 cb (&rep, ud);
154 }
155 }
156
157 static void
rspamd_fuzzy_backend_update_sqlite(struct rspamd_fuzzy_backend * bk,GArray * updates,const gchar * src,rspamd_fuzzy_update_cb cb,void * ud,void * subr_ud)158 rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk,
159 GArray *updates, const gchar *src,
160 rspamd_fuzzy_update_cb cb, void *ud,
161 void *subr_ud)
162 {
163 struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
164 gboolean success = FALSE;
165 guint i;
166 struct fuzzy_peer_cmd *io_cmd;
167 struct rspamd_fuzzy_cmd *cmd;
168 gpointer ptr;
169 guint nupdates = 0, nadded = 0, ndeleted = 0, nextended = 0, nignored = 0;
170
171 if (rspamd_fuzzy_backend_sqlite_prepare_update (sq, src)) {
172 for (i = 0; i < updates->len; i ++) {
173 io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
174
175 if (io_cmd->is_shingle) {
176 cmd = &io_cmd->cmd.shingle.basic;
177 ptr = &io_cmd->cmd.shingle;
178 }
179 else {
180 cmd = &io_cmd->cmd.normal;
181 ptr = &io_cmd->cmd.normal;
182 }
183
184 if (cmd->cmd == FUZZY_WRITE) {
185 rspamd_fuzzy_backend_sqlite_add (sq, ptr);
186 nadded ++;
187 nupdates ++;
188 }
189 else if (cmd->cmd == FUZZY_DEL) {
190 rspamd_fuzzy_backend_sqlite_del (sq, ptr);
191 ndeleted ++;
192 nupdates ++;
193 }
194 else {
195 if (cmd->cmd == FUZZY_REFRESH) {
196 nextended ++;
197 }
198 else {
199 nignored ++;
200 }
201 }
202 }
203
204 if (rspamd_fuzzy_backend_sqlite_finish_update (sq, src,
205 nupdates > 0)) {
206 success = TRUE;
207 }
208 }
209
210 if (cb) {
211 cb (success, nadded, ndeleted, nextended, nignored, ud);
212 }
213 }
214
215 static void
rspamd_fuzzy_backend_count_sqlite(struct rspamd_fuzzy_backend * bk,rspamd_fuzzy_count_cb cb,void * ud,void * subr_ud)216 rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk,
217 rspamd_fuzzy_count_cb cb, void *ud,
218 void *subr_ud)
219 {
220 struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
221 guint64 nhashes;
222
223 nhashes = rspamd_fuzzy_backend_sqlite_count (sq);
224
225 if (cb) {
226 cb (nhashes, ud);
227 }
228 }
229
230 static void
rspamd_fuzzy_backend_version_sqlite(struct rspamd_fuzzy_backend * bk,const gchar * src,rspamd_fuzzy_version_cb cb,void * ud,void * subr_ud)231 rspamd_fuzzy_backend_version_sqlite (struct rspamd_fuzzy_backend *bk,
232 const gchar *src,
233 rspamd_fuzzy_version_cb cb, void *ud,
234 void *subr_ud)
235 {
236 struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
237 guint64 rev;
238
239 rev = rspamd_fuzzy_backend_sqlite_version (sq, src);
240
241 if (cb) {
242 cb (rev, ud);
243 }
244 }
245
246 static const gchar*
rspamd_fuzzy_backend_id_sqlite(struct rspamd_fuzzy_backend * bk,void * subr_ud)247 rspamd_fuzzy_backend_id_sqlite (struct rspamd_fuzzy_backend *bk,
248 void *subr_ud)
249 {
250 struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
251
252 return rspamd_fuzzy_sqlite_backend_id (sq);
253 }
254 static void
rspamd_fuzzy_backend_expire_sqlite(struct rspamd_fuzzy_backend * bk,void * subr_ud)255 rspamd_fuzzy_backend_expire_sqlite (struct rspamd_fuzzy_backend *bk,
256 void *subr_ud)
257 {
258 struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
259
260 rspamd_fuzzy_backend_sqlite_sync (sq, bk->expire, TRUE);
261 }
262
263 static void
rspamd_fuzzy_backend_close_sqlite(struct rspamd_fuzzy_backend * bk,void * subr_ud)264 rspamd_fuzzy_backend_close_sqlite (struct rspamd_fuzzy_backend *bk,
265 void *subr_ud)
266 {
267 struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
268
269 rspamd_fuzzy_backend_sqlite_close (sq);
270 }
271
272
273 struct rspamd_fuzzy_backend *
rspamd_fuzzy_backend_create(struct ev_loop * ev_base,const ucl_object_t * config,struct rspamd_config * cfg,GError ** err)274 rspamd_fuzzy_backend_create (struct ev_loop *ev_base,
275 const ucl_object_t *config,
276 struct rspamd_config *cfg,
277 GError **err)
278 {
279 struct rspamd_fuzzy_backend *bk;
280 enum rspamd_fuzzy_backend_type type = RSPAMD_FUZZY_BACKEND_SQLITE;
281 const ucl_object_t *elt;
282 gdouble expire = DEFAULT_EXPIRE;
283
284 if (config != NULL) {
285 elt = ucl_object_lookup (config, "backend");
286
287 if (elt != NULL && ucl_object_type (elt) == UCL_STRING) {
288 if (strcmp (ucl_object_tostring (elt), "sqlite") == 0) {
289 type = RSPAMD_FUZZY_BACKEND_SQLITE;
290 }
291 else if (strcmp (ucl_object_tostring (elt), "redis") == 0) {
292 type = RSPAMD_FUZZY_BACKEND_REDIS;
293 }
294 else {
295 g_set_error (err, rspamd_fuzzy_backend_quark (),
296 EINVAL, "invalid backend type: %s",
297 ucl_object_tostring (elt));
298 return NULL;
299 }
300 }
301
302 elt = ucl_object_lookup (config, "expire");
303
304 if (elt != NULL) {
305 expire = ucl_object_todouble (elt);
306 }
307 }
308
309 bk = g_malloc0 (sizeof (*bk));
310 bk->event_loop = ev_base;
311 bk->expire = expire;
312 bk->type = type;
313 bk->subr = &fuzzy_subrs[type];
314
315 if ((bk->subr_ud = bk->subr->init (bk, config, cfg, err)) == NULL) {
316 g_free (bk);
317
318 return NULL;
319 }
320
321 return bk;
322 }
323
324
325 void
rspamd_fuzzy_backend_check(struct rspamd_fuzzy_backend * bk,const struct rspamd_fuzzy_cmd * cmd,rspamd_fuzzy_check_cb cb,void * ud)326 rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk,
327 const struct rspamd_fuzzy_cmd *cmd,
328 rspamd_fuzzy_check_cb cb, void *ud)
329 {
330 g_assert (bk != NULL);
331
332 bk->subr->check (bk, cmd, cb, ud, bk->subr_ud);
333 }
334
335 static guint
rspamd_fuzzy_digest_hash(gconstpointer key)336 rspamd_fuzzy_digest_hash (gconstpointer key)
337 {
338 guint ret;
339
340 /* Distirbuted uniformly already */
341 memcpy (&ret, key, sizeof (ret));
342
343 return ret;
344 }
345
346 static gboolean
rspamd_fuzzy_digest_equal(gconstpointer v,gconstpointer v2)347 rspamd_fuzzy_digest_equal (gconstpointer v, gconstpointer v2)
348 {
349 return memcmp (v, v2, rspamd_cryptobox_HASHBYTES) == 0;
350 }
351
352 static void
rspamd_fuzzy_backend_deduplicate_queue(GArray * updates)353 rspamd_fuzzy_backend_deduplicate_queue (GArray *updates)
354 {
355 GHashTable *seen = g_hash_table_new (rspamd_fuzzy_digest_hash,
356 rspamd_fuzzy_digest_equal);
357 struct fuzzy_peer_cmd *io_cmd, *found;
358 struct rspamd_fuzzy_cmd *cmd;
359 guchar *digest;
360 guint i;
361
362 for (i = 0; i < updates->len; i ++) {
363 io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
364
365 if (io_cmd->is_shingle) {
366 cmd = &io_cmd->cmd.shingle.basic;
367 }
368 else {
369 cmd = &io_cmd->cmd.normal;
370 }
371
372 digest = cmd->digest;
373
374 found = g_hash_table_lookup (seen, digest);
375
376 if (found == NULL) {
377 /* Add to the seen list, if not a duplicate (huh?) */
378 if (cmd->cmd != FUZZY_DUP) {
379 g_hash_table_insert (seen, digest, io_cmd);
380 }
381 }
382 else {
383 if (found->cmd.normal.flag != cmd->flag) {
384 /* TODO: deal with flags better at some point */
385 continue;
386 }
387
388 /* Apply heuristic */
389 switch (cmd->cmd) {
390 case FUZZY_WRITE:
391 if (found->cmd.normal.cmd == FUZZY_WRITE) {
392 /* Already seen */
393 found->cmd.normal.value += cmd->value;
394 cmd->cmd = FUZZY_DUP; /* Ignore this one */
395 }
396 else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
397 /* Seen refresh command, remove it as write has higher priority */
398 g_hash_table_replace (seen, digest, io_cmd);
399 found->cmd.normal.cmd = FUZZY_DUP;
400 }
401 else if (found->cmd.normal.cmd == FUZZY_DEL) {
402 /* Request delete + add, weird, but ignore add */
403 cmd->cmd = FUZZY_DUP; /* Ignore this one */
404 }
405 break;
406 case FUZZY_REFRESH:
407 if (found->cmd.normal.cmd == FUZZY_WRITE) {
408 /* No need to expire, handled by addition */
409 cmd->cmd = FUZZY_DUP; /* Ignore this one */
410 }
411 else if (found->cmd.normal.cmd == FUZZY_DEL) {
412 /* Request delete + expire, ignore expire */
413 cmd->cmd = FUZZY_DUP; /* Ignore this one */
414 }
415 else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
416 /* Already handled */
417 cmd->cmd = FUZZY_DUP; /* Ignore this one */
418 }
419 break;
420 case FUZZY_DEL:
421 /* Delete has priority over all other commands */
422 g_hash_table_replace (seen, digest, io_cmd);
423 found->cmd.normal.cmd = FUZZY_DUP;
424 break;
425 default:
426 break;
427 }
428 }
429 }
430
431 g_hash_table_unref (seen);
432 }
433
434 void
rspamd_fuzzy_backend_process_updates(struct rspamd_fuzzy_backend * bk,GArray * updates,const gchar * src,rspamd_fuzzy_update_cb cb,void * ud)435 rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
436 GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
437 void *ud)
438 {
439 g_assert (bk != NULL);
440 g_assert (updates != NULL);
441
442 if (updates) {
443 rspamd_fuzzy_backend_deduplicate_queue (updates);
444 bk->subr->update (bk, updates, src, cb, ud, bk->subr_ud);
445 }
446 else if (cb) {
447 cb (TRUE, 0, 0, 0, 0, ud);
448 }
449 }
450
451
452 void
rspamd_fuzzy_backend_count(struct rspamd_fuzzy_backend * bk,rspamd_fuzzy_count_cb cb,void * ud)453 rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *bk,
454 rspamd_fuzzy_count_cb cb, void *ud)
455 {
456 g_assert (bk != NULL);
457
458 bk->subr->count (bk, cb, ud, bk->subr_ud);
459 }
460
461
462 void
rspamd_fuzzy_backend_version(struct rspamd_fuzzy_backend * bk,const gchar * src,rspamd_fuzzy_version_cb cb,void * ud)463 rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *bk,
464 const gchar *src,
465 rspamd_fuzzy_version_cb cb, void *ud)
466 {
467 g_assert (bk != NULL);
468
469 bk->subr->version (bk, src, cb, ud, bk->subr_ud);
470 }
471
472 const gchar *
rspamd_fuzzy_backend_id(struct rspamd_fuzzy_backend * bk)473 rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *bk)
474 {
475 g_assert (bk != NULL);
476
477 if (bk->subr->id) {
478 return bk->subr->id (bk, bk->subr_ud);
479 }
480
481 return NULL;
482 }
483
484 static inline void
rspamd_fuzzy_backend_periodic_sync(struct rspamd_fuzzy_backend * bk)485 rspamd_fuzzy_backend_periodic_sync (struct rspamd_fuzzy_backend *bk)
486 {
487 if (bk->periodic_cb) {
488 if (bk->periodic_cb (bk->periodic_ud)) {
489 if (bk->subr->periodic) {
490 bk->subr->periodic (bk, bk->subr_ud);
491 }
492 }
493 }
494 else {
495 if (bk->subr->periodic) {
496 bk->subr->periodic (bk, bk->subr_ud);
497 }
498 }
499 }
500
501 static void
rspamd_fuzzy_backend_periodic_cb(EV_P_ ev_timer * w,int revents)502 rspamd_fuzzy_backend_periodic_cb (EV_P_ ev_timer *w, int revents)
503 {
504 struct rspamd_fuzzy_backend *bk = (struct rspamd_fuzzy_backend *)w->data;
505 gdouble jittered;
506
507 jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0);
508 w->repeat = jittered;
509 rspamd_fuzzy_backend_periodic_sync (bk);
510 ev_timer_again (EV_A_ w);
511 }
512
513 void
rspamd_fuzzy_backend_start_update(struct rspamd_fuzzy_backend * bk,gdouble timeout,rspamd_fuzzy_periodic_cb cb,void * ud)514 rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *bk,
515 gdouble timeout,
516 rspamd_fuzzy_periodic_cb cb,
517 void *ud)
518 {
519 gdouble jittered;
520
521 g_assert (bk != NULL);
522
523 if (bk->subr->periodic) {
524 if (bk->sync > 0.0) {
525 ev_timer_stop (bk->event_loop, &bk->periodic_event);
526 }
527
528 if (cb) {
529 bk->periodic_cb = cb;
530 bk->periodic_ud = ud;
531 }
532
533 rspamd_fuzzy_backend_periodic_sync (bk);
534 bk->sync = timeout;
535 jittered = rspamd_time_jitter (timeout, timeout / 2.0);
536
537 bk->periodic_event.data = bk;
538 ev_timer_init (&bk->periodic_event, rspamd_fuzzy_backend_periodic_cb,
539 jittered, 0.0);
540 ev_timer_start (bk->event_loop, &bk->periodic_event);
541 }
542 }
543
544 void
rspamd_fuzzy_backend_close(struct rspamd_fuzzy_backend * bk)545 rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk)
546 {
547 g_assert (bk != NULL);
548
549 if (bk->sync > 0.0) {
550 rspamd_fuzzy_backend_periodic_sync (bk);
551 ev_timer_stop (bk->event_loop, &bk->periodic_event);
552 }
553
554 bk->subr->close (bk, bk->subr_ud);
555
556 g_free (bk);
557 }
558
559 struct ev_loop*
rspamd_fuzzy_backend_event_base(struct rspamd_fuzzy_backend * backend)560 rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend)
561 {
562 return backend->event_loop;
563 }
564
565 gdouble
rspamd_fuzzy_backend_get_expire(struct rspamd_fuzzy_backend * backend)566 rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend)
567 {
568 return backend->expire;
569 }
570