1 /*-
2 * Copyright 2016 Vsevolod Stakhov
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "config.h"
18 #include "ref.h"
19 #include "fuzzy_backend.h"
20 #include "fuzzy_backend_redis.h"
21 #include "redis_pool.h"
22 #include "cryptobox.h"
23 #include "str_util.h"
24 #include "upstream.h"
25 #include "contrib/hiredis/hiredis.h"
26 #include "contrib/hiredis/async.h"
27 #include "lua/lua_common.h"
28
29 #define REDIS_DEFAULT_PORT 6379
30 #define REDIS_DEFAULT_OBJECT "fuzzy"
31 #define REDIS_DEFAULT_TIMEOUT 2.0
32
33 #define msg_err_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
34 "fuzzy_redis", session->backend->id, \
35 G_STRFUNC, \
36 __VA_ARGS__)
37 #define msg_warn_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
38 "fuzzy_redis", session->backend->id, \
39 G_STRFUNC, \
40 __VA_ARGS__)
41 #define msg_info_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
42 "fuzzy_redis", session->backend->id, \
43 G_STRFUNC, \
44 __VA_ARGS__)
45 #define msg_debug_redis_session(...) rspamd_conditional_debug_fast (NULL, NULL, \
46 rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \
47 G_STRFUNC, \
48 __VA_ARGS__)
49
50 INIT_LOG_MODULE(fuzzy_redis)
51
52 struct rspamd_fuzzy_backend_redis {
53 lua_State *L;
54 const gchar *redis_object;
55 const gchar *password;
56 const gchar *dbname;
57 gchar *id;
58 struct rspamd_redis_pool *pool;
59 gdouble timeout;
60 gint conf_ref;
61 bool terminated;
62 ref_entry_t ref;
63 };
64
65 enum rspamd_fuzzy_redis_command {
66 RSPAMD_FUZZY_REDIS_COMMAND_COUNT,
67 RSPAMD_FUZZY_REDIS_COMMAND_VERSION,
68 RSPAMD_FUZZY_REDIS_COMMAND_UPDATES,
69 RSPAMD_FUZZY_REDIS_COMMAND_CHECK
70 };
71
72 struct rspamd_fuzzy_redis_session {
73 struct rspamd_fuzzy_backend_redis *backend;
74 redisAsyncContext *ctx;
75 ev_timer timeout;
76 const struct rspamd_fuzzy_cmd *cmd;
77 struct ev_loop *event_loop;
78 float prob;
79 gboolean shingles_checked;
80
81 enum rspamd_fuzzy_redis_command command;
82 guint nargs;
83
84 guint nadded;
85 guint ndeleted;
86 guint nextended;
87 guint nignored;
88
89 union {
90 rspamd_fuzzy_check_cb cb_check;
91 rspamd_fuzzy_update_cb cb_update;
92 rspamd_fuzzy_version_cb cb_version;
93 rspamd_fuzzy_count_cb cb_count;
94 } callback;
95 void *cbdata;
96
97 gchar **argv;
98 gsize *argv_lens;
99 struct upstream *up;
100 guchar found_digest[rspamd_cryptobox_HASHBYTES];
101 };
102
103 static inline struct upstream_list *
rspamd_redis_get_servers(struct rspamd_fuzzy_backend_redis * ctx,const gchar * what)104 rspamd_redis_get_servers (struct rspamd_fuzzy_backend_redis *ctx,
105 const gchar *what)
106 {
107 lua_State *L = ctx->L;
108 struct upstream_list *res = NULL;
109
110 lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref);
111 lua_pushstring (L, what);
112 lua_gettable (L, -2);
113
114 if (lua_type (L, -1) == LUA_TUSERDATA) {
115 res = *((struct upstream_list **) lua_touserdata (L, -1));
116 }
117 else {
118 struct lua_logger_trace tr;
119 gchar outbuf[8192];
120
121 memset (&tr, 0, sizeof (tr));
122 lua_logger_out_type (L, -2, outbuf, sizeof (outbuf) - 1, &tr,
123 LUA_ESCAPE_UNPRINTABLE);
124
125 msg_err ("cannot get %s upstreams for Redis fuzzy storage %s; table content: %s",
126 what, ctx->id, outbuf);
127 }
128
129 lua_settop (L, 0);
130
131 return res;
132 }
133
134 static inline void
rspamd_fuzzy_redis_session_free_args(struct rspamd_fuzzy_redis_session * session)135 rspamd_fuzzy_redis_session_free_args (struct rspamd_fuzzy_redis_session *session)
136 {
137 guint i;
138
139 if (session->argv) {
140 for (i = 0; i < session->nargs; i ++) {
141 g_free (session->argv[i]);
142 }
143
144 g_free (session->argv);
145 g_free (session->argv_lens);
146 }
147 }
148 static void
rspamd_fuzzy_redis_session_dtor(struct rspamd_fuzzy_redis_session * session,gboolean is_fatal)149 rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session,
150 gboolean is_fatal)
151 {
152 redisAsyncContext *ac;
153
154
155 if (session->ctx) {
156 ac = session->ctx;
157 session->ctx = NULL;
158 rspamd_redis_pool_release_connection (session->backend->pool,
159 ac,
160 is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT);
161 }
162
163 ev_timer_stop (session->event_loop, &session->timeout);
164 rspamd_fuzzy_redis_session_free_args (session);
165
166 REF_RELEASE (session->backend);
167 rspamd_upstream_unref (session->up);
168 g_free (session);
169 }
170
171 static void
rspamd_fuzzy_backend_redis_dtor(struct rspamd_fuzzy_backend_redis * backend)172 rspamd_fuzzy_backend_redis_dtor (struct rspamd_fuzzy_backend_redis *backend)
173 {
174 if (!backend->terminated && backend->conf_ref != -1) {
175 luaL_unref (backend->L, LUA_REGISTRYINDEX, backend->conf_ref);
176 }
177
178 if (backend->id) {
179 g_free (backend->id);
180 }
181
182 g_free (backend);
183 }
184
185 void*
rspamd_fuzzy_backend_init_redis(struct rspamd_fuzzy_backend * bk,const ucl_object_t * obj,struct rspamd_config * cfg,GError ** err)186 rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk,
187 const ucl_object_t *obj, struct rspamd_config *cfg, GError **err)
188 {
189 struct rspamd_fuzzy_backend_redis *backend;
190 const ucl_object_t *elt;
191 gboolean ret = FALSE;
192 guchar id_hash[rspamd_cryptobox_HASHBYTES];
193 rspamd_cryptobox_hash_state_t st;
194 lua_State *L = (lua_State *)cfg->lua_state;
195 gint conf_ref = -1;
196
197 backend = g_malloc0 (sizeof (*backend));
198
199 backend->timeout = REDIS_DEFAULT_TIMEOUT;
200 backend->redis_object = REDIS_DEFAULT_OBJECT;
201 backend->L = L;
202
203 ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref);
204
205 /* Now try global redis settings */
206 if (!ret) {
207 elt = ucl_object_lookup (cfg->rcl_obj, "redis");
208
209 if (elt) {
210 const ucl_object_t *specific_obj;
211
212 specific_obj = ucl_object_lookup_any (elt, "fuzzy", "fuzzy_storage",
213 NULL);
214
215 if (specific_obj) {
216 ret = rspamd_lua_try_load_redis (L, specific_obj, cfg, &conf_ref);
217 }
218 else {
219 ret = rspamd_lua_try_load_redis (L, elt, cfg, &conf_ref);
220 }
221 }
222 }
223
224 if (!ret) {
225 msg_err_config ("cannot init redis backend for fuzzy storage");
226 g_free (backend);
227
228 return NULL;
229 }
230
231 elt = ucl_object_lookup (obj, "prefix");
232 if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
233 backend->redis_object = REDIS_DEFAULT_OBJECT;
234 }
235 else {
236 backend->redis_object = ucl_object_tostring (elt);
237 }
238
239 backend->conf_ref = conf_ref;
240
241 /* Check some common table values */
242 lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref);
243
244 lua_pushstring (L, "timeout");
245 lua_gettable (L, -2);
246 if (lua_type (L, -1) == LUA_TNUMBER) {
247 backend->timeout = lua_tonumber (L, -1);
248 }
249 lua_pop (L, 1);
250
251 lua_pushstring (L, "db");
252 lua_gettable (L, -2);
253 if (lua_type (L, -1) == LUA_TSTRING) {
254 backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool,
255 lua_tostring (L, -1));
256 }
257 lua_pop (L, 1);
258
259 lua_pushstring (L, "password");
260 lua_gettable (L, -2);
261 if (lua_type (L, -1) == LUA_TSTRING) {
262 backend->password = rspamd_mempool_strdup (cfg->cfg_pool,
263 lua_tostring (L, -1));
264 }
265 lua_pop (L, 1);
266
267 lua_settop (L, 0);
268
269 REF_INIT_RETAIN (backend, rspamd_fuzzy_backend_redis_dtor);
270 backend->pool = cfg->redis_pool;
271 rspamd_cryptobox_hash_init (&st, NULL, 0);
272 rspamd_cryptobox_hash_update (&st, backend->redis_object,
273 strlen (backend->redis_object));
274
275 if (backend->dbname) {
276 rspamd_cryptobox_hash_update (&st, backend->dbname,
277 strlen (backend->dbname));
278 }
279
280 if (backend->password) {
281 rspamd_cryptobox_hash_update (&st, backend->password,
282 strlen (backend->password));
283 }
284
285 rspamd_cryptobox_hash_final (&st, id_hash);
286 backend->id = rspamd_encode_base32 (id_hash, sizeof (id_hash), RSPAMD_BASE32_DEFAULT);
287
288 return backend;
289 }
290
291 static void
rspamd_fuzzy_redis_timeout(EV_P_ ev_timer * w,int revents)292 rspamd_fuzzy_redis_timeout (EV_P_ ev_timer *w, int revents)
293 {
294 struct rspamd_fuzzy_redis_session *session =
295 (struct rspamd_fuzzy_redis_session *)w->data;
296 redisAsyncContext *ac;
297 static char errstr[128];
298
299 if (session->ctx) {
300 ac = session->ctx;
301 session->ctx = NULL;
302 ac->err = REDIS_ERR_IO;
303 /* Should be safe as in hiredis it is char[128] */
304 rspamd_snprintf (errstr, sizeof (errstr), "%s", strerror (ETIMEDOUT));
305 ac->errstr = errstr;
306
307 /* This will cause session closing */
308 rspamd_redis_pool_release_connection (session->backend->pool,
309 ac, RSPAMD_REDIS_RELEASE_FATAL);
310 }
311 }
312
313 static void rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r,
314 gpointer priv);
315
316 struct _rspamd_fuzzy_shingles_helper {
317 guchar digest[64];
318 guint found;
319 };
320
321 static gint
rspamd_fuzzy_backend_redis_shingles_cmp(const void * a,const void * b)322 rspamd_fuzzy_backend_redis_shingles_cmp (const void *a, const void *b)
323 {
324 const struct _rspamd_fuzzy_shingles_helper *sha = a,
325 *shb = b;
326
327 return memcmp (sha->digest, shb->digest, sizeof (sha->digest));
328 }
329
330 static void
rspamd_fuzzy_redis_shingles_callback(redisAsyncContext * c,gpointer r,gpointer priv)331 rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r,
332 gpointer priv)
333 {
334 struct rspamd_fuzzy_redis_session *session = priv;
335 redisReply *reply = r, *cur;
336 struct rspamd_fuzzy_reply rep;
337 GString *key;
338 struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL;
339 guint i, found = 0, max_found = 0, cur_found = 0;
340
341 ev_timer_stop (session->event_loop, &session->timeout);
342 memset (&rep, 0, sizeof (rep));
343
344 if (c->err == 0 && reply != NULL) {
345 rspamd_upstream_ok (session->up);
346
347 if (reply->type == REDIS_REPLY_ARRAY &&
348 reply->elements == RSPAMD_SHINGLE_SIZE) {
349 shingles = g_alloca (sizeof (struct _rspamd_fuzzy_shingles_helper) *
350 RSPAMD_SHINGLE_SIZE);
351
352 for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
353 cur = reply->element[i];
354
355 if (cur->type == REDIS_REPLY_STRING) {
356 shingles[i].found = 1;
357 memcpy (shingles[i].digest, cur->str, MIN (64, cur->len));
358 found ++;
359 }
360 else {
361 memset (shingles[i].digest, 0, sizeof (shingles[i].digest));
362 shingles[i].found = 0;
363 }
364 }
365
366 if (found > RSPAMD_SHINGLE_SIZE / 2) {
367 /* Now sort to find the most frequent element */
368 qsort (shingles, RSPAMD_SHINGLE_SIZE,
369 sizeof (struct _rspamd_fuzzy_shingles_helper),
370 rspamd_fuzzy_backend_redis_shingles_cmp);
371
372 prev = &shingles[0];
373
374 for (i = 1; i < RSPAMD_SHINGLE_SIZE; i ++) {
375 if (!shingles[i].found) {
376 continue;
377 }
378
379 if (memcmp (shingles[i].digest, prev->digest, 64) == 0) {
380 cur_found ++;
381
382 if (cur_found > max_found) {
383 max_found = cur_found;
384 sel = &shingles[i];
385 }
386 }
387 else {
388 cur_found = 1;
389 prev = &shingles[i];
390 }
391 }
392
393 if (max_found > RSPAMD_SHINGLE_SIZE / 2) {
394 session->prob = ((float)max_found) / RSPAMD_SHINGLE_SIZE;
395 rep.v1.prob = session->prob;
396
397 g_assert (sel != NULL);
398
399 /* Prepare new check command */
400 rspamd_fuzzy_redis_session_free_args (session);
401 session->nargs = 5;
402 session->argv = g_malloc (sizeof (gchar *) * session->nargs);
403 session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
404
405 key = g_string_new (session->backend->redis_object);
406 g_string_append_len (key, sel->digest, sizeof (sel->digest));
407 session->argv[0] = g_strdup ("HMGET");
408 session->argv_lens[0] = 5;
409 session->argv[1] = key->str;
410 session->argv_lens[1] = key->len;
411 session->argv[2] = g_strdup ("V");
412 session->argv_lens[2] = 1;
413 session->argv[3] = g_strdup ("F");
414 session->argv_lens[3] = 1;
415 session->argv[4] = g_strdup ("C");
416 session->argv_lens[4] = 1;
417 g_string_free (key, FALSE); /* Do not free underlying array */
418 memcpy (session->found_digest, sel->digest,
419 sizeof (session->cmd->digest));
420
421 g_assert (session->ctx != NULL);
422 if (redisAsyncCommandArgv (session->ctx,
423 rspamd_fuzzy_redis_check_callback,
424 session, session->nargs,
425 (const gchar **)session->argv,
426 session->argv_lens) != REDIS_OK) {
427
428 if (session->callback.cb_check) {
429 memset (&rep, 0, sizeof (rep));
430 session->callback.cb_check (&rep, session->cbdata);
431 }
432
433 rspamd_fuzzy_redis_session_dtor (session, TRUE);
434 }
435 else {
436 /* Add timeout */
437 session->timeout.data = session;
438 ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
439 ev_timer_init (&session->timeout,
440 rspamd_fuzzy_redis_timeout,
441 session->backend->timeout, 0.0);
442 ev_timer_start (session->event_loop, &session->timeout);
443 }
444
445 return;
446 }
447 }
448 }
449 else if (reply->type == REDIS_REPLY_ERROR) {
450 msg_err_redis_session ("fuzzy backend redis error: \"%s\"",
451 reply->str);
452 }
453
454 if (session->callback.cb_check) {
455 session->callback.cb_check (&rep, session->cbdata);
456 }
457 }
458 else {
459 if (session->callback.cb_check) {
460 session->callback.cb_check (&rep, session->cbdata);
461 }
462
463 if (c->errstr) {
464 msg_err_redis_session ("error getting shingles: %s", c->errstr);
465 rspamd_upstream_fail (session->up, FALSE, c->errstr);
466 }
467 }
468
469 rspamd_fuzzy_redis_session_dtor (session, FALSE);
470 }
471
472 static void
rspamd_fuzzy_backend_check_shingles(struct rspamd_fuzzy_redis_session * session)473 rspamd_fuzzy_backend_check_shingles (struct rspamd_fuzzy_redis_session *session)
474 {
475 struct rspamd_fuzzy_reply rep;
476 const struct rspamd_fuzzy_shingle_cmd *shcmd;
477 GString *key;
478 guint i, init_len;
479
480 rspamd_fuzzy_redis_session_free_args (session);
481 /* First of all check digest */
482 session->nargs = RSPAMD_SHINGLE_SIZE + 1;
483 session->argv = g_malloc (sizeof (gchar *) * session->nargs);
484 session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
485 shcmd = (const struct rspamd_fuzzy_shingle_cmd *)session->cmd;
486
487 session->argv[0] = g_strdup ("MGET");
488 session->argv_lens[0] = 4;
489 init_len = strlen (session->backend->redis_object);
490
491 for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
492
493 key = g_string_sized_new (init_len + 2 + 2 + sizeof ("18446744073709551616"));
494 rspamd_printf_gstring (key, "%s_%d_%uL", session->backend->redis_object,
495 i, shcmd->sgl.hashes[i]);
496 session->argv[i + 1] = key->str;
497 session->argv_lens[i + 1] = key->len;
498 g_string_free (key, FALSE); /* Do not free underlying array */
499 }
500
501 session->shingles_checked = TRUE;
502
503 g_assert (session->ctx != NULL);
504
505 if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_shingles_callback,
506 session, session->nargs,
507 (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
508 msg_err ("cannot execute redis command on %s: %s",
509 rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)),
510 session->ctx->errstr);
511
512 if (session->callback.cb_check) {
513 memset (&rep, 0, sizeof (rep));
514 session->callback.cb_check (&rep, session->cbdata);
515 }
516
517 rspamd_fuzzy_redis_session_dtor (session, TRUE);
518 }
519 else {
520 /* Add timeout */
521 session->timeout.data = session;
522 ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
523 ev_timer_init (&session->timeout,
524 rspamd_fuzzy_redis_timeout,
525 session->backend->timeout, 0.0);
526 ev_timer_start (session->event_loop, &session->timeout);
527 }
528 }
529
530 static void
rspamd_fuzzy_redis_check_callback(redisAsyncContext * c,gpointer r,gpointer priv)531 rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r,
532 gpointer priv)
533 {
534 struct rspamd_fuzzy_redis_session *session = priv;
535 redisReply *reply = r, *cur;
536 struct rspamd_fuzzy_reply rep;
537 gulong value;
538 guint found_elts = 0;
539
540 ev_timer_stop (session->event_loop, &session->timeout);
541 memset (&rep, 0, sizeof (rep));
542
543 if (c->err == 0 && reply != NULL) {
544 rspamd_upstream_ok (session->up);
545
546 if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) {
547 cur = reply->element[0];
548
549 if (cur->type == REDIS_REPLY_STRING) {
550 value = strtoul (cur->str, NULL, 10);
551 rep.v1.value = value;
552 found_elts ++;
553 }
554
555 cur = reply->element[1];
556
557 if (cur->type == REDIS_REPLY_STRING) {
558 value = strtoul (cur->str, NULL, 10);
559 rep.v1.flag = value;
560 found_elts ++;
561 }
562
563 if (found_elts >= 2) {
564 rep.v1.prob = session->prob;
565 memcpy (rep.digest, session->found_digest, sizeof (rep.digest));
566 }
567
568 rep.ts = 0;
569
570 if (reply->elements > 2) {
571 cur = reply->element[2];
572
573 if (cur->type == REDIS_REPLY_STRING) {
574 rep.ts = strtoul (cur->str, NULL, 10);
575 }
576 }
577 }
578 else if (reply->type == REDIS_REPLY_ERROR) {
579 msg_err_redis_session ("fuzzy backend redis error: \"%s\"",
580 reply->str);
581 }
582
583 if (found_elts < 2) {
584 if (session->cmd->shingles_count > 0 && !session->shingles_checked) {
585 /* We also need to check all shingles here */
586 rspamd_fuzzy_backend_check_shingles (session);
587 /* Do not free session */
588 return;
589 }
590 else {
591 if (session->callback.cb_check) {
592 session->callback.cb_check (&rep, session->cbdata);
593 }
594 }
595 }
596 else {
597 if (session->callback.cb_check) {
598 session->callback.cb_check (&rep, session->cbdata);
599 }
600 }
601 }
602 else {
603 if (session->callback.cb_check) {
604 session->callback.cb_check (&rep, session->cbdata);
605 }
606
607 if (c->errstr) {
608 msg_err_redis_session ("error getting hashes on %s: %s",
609 rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)),
610 c->errstr);
611 rspamd_upstream_fail (session->up, FALSE, c->errstr);
612 }
613 }
614
615 rspamd_fuzzy_redis_session_dtor (session, FALSE);
616 }
617
618 void
rspamd_fuzzy_backend_check_redis(struct rspamd_fuzzy_backend * bk,const struct rspamd_fuzzy_cmd * cmd,rspamd_fuzzy_check_cb cb,void * ud,void * subr_ud)619 rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
620 const struct rspamd_fuzzy_cmd *cmd,
621 rspamd_fuzzy_check_cb cb, void *ud,
622 void *subr_ud)
623 {
624 struct rspamd_fuzzy_backend_redis *backend = subr_ud;
625 struct rspamd_fuzzy_redis_session *session;
626 struct upstream *up;
627 struct upstream_list *ups;
628 rspamd_inet_addr_t *addr;
629 struct rspamd_fuzzy_reply rep;
630 GString *key;
631
632 g_assert (backend != NULL);
633
634 ups = rspamd_redis_get_servers (backend, "read_servers");
635 if (!ups) {
636 if (cb) {
637 memset (&rep, 0, sizeof (rep));
638 cb (&rep, ud);
639 }
640
641 return;
642 }
643
644 session = g_malloc0 (sizeof (*session));
645 session->backend = backend;
646 REF_RETAIN (session->backend);
647
648 session->callback.cb_check = cb;
649 session->cbdata = ud;
650 session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK;
651 session->cmd = cmd;
652 session->prob = 1.0;
653 memcpy (rep.digest, session->cmd->digest, sizeof (rep.digest));
654 memcpy (session->found_digest, session->cmd->digest, sizeof (rep.digest));
655 session->event_loop = rspamd_fuzzy_backend_event_base (bk);
656
657 /* First of all check digest */
658 session->nargs = 5;
659 session->argv = g_malloc (sizeof (gchar *) * session->nargs);
660 session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
661
662 key = g_string_new (backend->redis_object);
663 g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
664 session->argv[0] = g_strdup ("HMGET");
665 session->argv_lens[0] = 5;
666 session->argv[1] = key->str;
667 session->argv_lens[1] = key->len;
668 session->argv[2] = g_strdup ("V");
669 session->argv_lens[2] = 1;
670 session->argv[3] = g_strdup ("F");
671 session->argv_lens[3] = 1;
672 session->argv[4] = g_strdup ("C");
673 session->argv_lens[4] = 1;
674 g_string_free (key, FALSE); /* Do not free underlying array */
675
676 up = rspamd_upstream_get (ups,
677 RSPAMD_UPSTREAM_ROUND_ROBIN,
678 NULL,
679 0);
680
681 session->up = rspamd_upstream_ref (up);
682 addr = rspamd_upstream_addr_next (up);
683 g_assert (addr != NULL);
684 session->ctx = rspamd_redis_pool_connect (backend->pool,
685 backend->dbname, backend->password,
686 rspamd_inet_address_to_string (addr),
687 rspamd_inet_address_get_port (addr));
688
689 if (session->ctx == NULL) {
690 rspamd_upstream_fail (up, TRUE, strerror (errno));
691 rspamd_fuzzy_redis_session_dtor (session, TRUE);
692
693 if (cb) {
694 memset (&rep, 0, sizeof (rep));
695 cb (&rep, ud);
696 }
697 }
698 else {
699 if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_check_callback,
700 session, session->nargs,
701 (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
702 rspamd_fuzzy_redis_session_dtor (session, TRUE);
703
704 if (cb) {
705 memset (&rep, 0, sizeof (rep));
706 cb (&rep, ud);
707 }
708 }
709 else {
710 /* Add timeout */
711 session->timeout.data = session;
712 ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
713 ev_timer_init (&session->timeout,
714 rspamd_fuzzy_redis_timeout,
715 session->backend->timeout, 0.0);
716 ev_timer_start (session->event_loop, &session->timeout);
717 }
718 }
719 }
720
721 static void
rspamd_fuzzy_redis_count_callback(redisAsyncContext * c,gpointer r,gpointer priv)722 rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r,
723 gpointer priv)
724 {
725 struct rspamd_fuzzy_redis_session *session = priv;
726 redisReply *reply = r;
727 gulong nelts;
728
729 ev_timer_stop (session->event_loop, &session->timeout);
730
731 if (c->err == 0 && reply != NULL) {
732 rspamd_upstream_ok (session->up);
733
734 if (reply->type == REDIS_REPLY_INTEGER) {
735 if (session->callback.cb_count) {
736 session->callback.cb_count (reply->integer, session->cbdata);
737 }
738 }
739 else if (reply->type == REDIS_REPLY_STRING) {
740 nelts = strtoul (reply->str, NULL, 10);
741
742 if (session->callback.cb_count) {
743 session->callback.cb_count (nelts, session->cbdata);
744 }
745 }
746 else {
747 if (reply->type == REDIS_REPLY_ERROR) {
748 msg_err_redis_session ("fuzzy backend redis error: \"%s\"",
749 reply->str);
750 }
751 if (session->callback.cb_count) {
752 session->callback.cb_count (0, session->cbdata);
753 }
754 }
755 }
756 else {
757 if (session->callback.cb_count) {
758 session->callback.cb_count (0, session->cbdata);
759 }
760
761 if (c->errstr) {
762 msg_err_redis_session ("error getting count on %s: %s",
763 rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)),
764 c->errstr);
765 rspamd_upstream_fail (session->up, FALSE, c->errstr);
766 }
767
768 }
769
770 rspamd_fuzzy_redis_session_dtor (session, FALSE);
771 }
772
773 void
rspamd_fuzzy_backend_count_redis(struct rspamd_fuzzy_backend * bk,rspamd_fuzzy_count_cb cb,void * ud,void * subr_ud)774 rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk,
775 rspamd_fuzzy_count_cb cb, void *ud,
776 void *subr_ud)
777 {
778 struct rspamd_fuzzy_backend_redis *backend = subr_ud;
779 struct rspamd_fuzzy_redis_session *session;
780 struct upstream *up;
781 struct upstream_list *ups;
782 rspamd_inet_addr_t *addr;
783 GString *key;
784
785 g_assert (backend != NULL);
786
787 ups = rspamd_redis_get_servers (backend, "read_servers");
788 if (!ups) {
789 if (cb) {
790 cb (0, ud);
791 }
792
793 return;
794 }
795
796 session = g_malloc0 (sizeof (*session));
797 session->backend = backend;
798 REF_RETAIN (session->backend);
799
800 session->callback.cb_count = cb;
801 session->cbdata = ud;
802 session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT;
803 session->event_loop = rspamd_fuzzy_backend_event_base (bk);
804
805 session->nargs = 2;
806 session->argv = g_malloc (sizeof (gchar *) * 2);
807 session->argv_lens = g_malloc (sizeof (gsize) * 2);
808 key = g_string_new (backend->redis_object);
809 g_string_append (key, "_count");
810 session->argv[0] = g_strdup ("GET");
811 session->argv_lens[0] = 3;
812 session->argv[1] = key->str;
813 session->argv_lens[1] = key->len;
814 g_string_free (key, FALSE); /* Do not free underlying array */
815
816 up = rspamd_upstream_get (ups,
817 RSPAMD_UPSTREAM_ROUND_ROBIN,
818 NULL,
819 0);
820
821 session->up = rspamd_upstream_ref (up);
822 addr = rspamd_upstream_addr_next (up);
823 g_assert (addr != NULL);
824 session->ctx = rspamd_redis_pool_connect (backend->pool,
825 backend->dbname, backend->password,
826 rspamd_inet_address_to_string (addr),
827 rspamd_inet_address_get_port (addr));
828
829 if (session->ctx == NULL) {
830 rspamd_upstream_fail (up, TRUE, strerror (errno));
831 rspamd_fuzzy_redis_session_dtor (session, TRUE);
832
833 if (cb) {
834 cb (0, ud);
835 }
836 }
837 else {
838 if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_count_callback,
839 session, session->nargs,
840 (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
841 rspamd_fuzzy_redis_session_dtor (session, TRUE);
842
843 if (cb) {
844 cb (0, ud);
845 }
846 }
847 else {
848 /* Add timeout */
849 session->timeout.data = session;
850 ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
851 ev_timer_init (&session->timeout,
852 rspamd_fuzzy_redis_timeout,
853 session->backend->timeout, 0.0);
854 ev_timer_start (session->event_loop, &session->timeout);
855 }
856 }
857 }
858
859 static void
rspamd_fuzzy_redis_version_callback(redisAsyncContext * c,gpointer r,gpointer priv)860 rspamd_fuzzy_redis_version_callback (redisAsyncContext *c, gpointer r,
861 gpointer priv)
862 {
863 struct rspamd_fuzzy_redis_session *session = priv;
864 redisReply *reply = r;
865 gulong nelts;
866
867 ev_timer_stop (session->event_loop, &session->timeout);
868
869 if (c->err == 0 && reply != NULL) {
870 rspamd_upstream_ok (session->up);
871
872 if (reply->type == REDIS_REPLY_INTEGER) {
873 if (session->callback.cb_version) {
874 session->callback.cb_version (reply->integer, session->cbdata);
875 }
876 }
877 else if (reply->type == REDIS_REPLY_STRING) {
878 nelts = strtoul (reply->str, NULL, 10);
879
880 if (session->callback.cb_version) {
881 session->callback.cb_version (nelts, session->cbdata);
882 }
883 }
884 else {
885 if (reply->type == REDIS_REPLY_ERROR) {
886 msg_err_redis_session ("fuzzy backend redis error: \"%s\"",
887 reply->str);
888 }
889 if (session->callback.cb_version) {
890 session->callback.cb_version (0, session->cbdata);
891 }
892 }
893 }
894 else {
895 if (session->callback.cb_version) {
896 session->callback.cb_version (0, session->cbdata);
897 }
898
899 if (c->errstr) {
900 msg_err_redis_session ("error getting version on %s: %s",
901 rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)),
902 c->errstr);
903 rspamd_upstream_fail (session->up, FALSE, c->errstr);
904 }
905 }
906
907 rspamd_fuzzy_redis_session_dtor (session, FALSE);
908 }
909
910 void
rspamd_fuzzy_backend_version_redis(struct rspamd_fuzzy_backend * bk,const gchar * src,rspamd_fuzzy_version_cb cb,void * ud,void * subr_ud)911 rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
912 const gchar *src,
913 rspamd_fuzzy_version_cb cb, void *ud,
914 void *subr_ud)
915 {
916 struct rspamd_fuzzy_backend_redis *backend = subr_ud;
917 struct rspamd_fuzzy_redis_session *session;
918 struct upstream *up;
919 struct upstream_list *ups;
920 rspamd_inet_addr_t *addr;
921 GString *key;
922
923 g_assert (backend != NULL);
924
925 ups = rspamd_redis_get_servers (backend, "read_servers");
926 if (!ups) {
927 if (cb) {
928 cb (0, ud);
929 }
930
931 return;
932 }
933
934 session = g_malloc0 (sizeof (*session));
935 session->backend = backend;
936 REF_RETAIN (session->backend);
937
938 session->callback.cb_version = cb;
939 session->cbdata = ud;
940 session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION;
941 session->event_loop = rspamd_fuzzy_backend_event_base (bk);
942
943 session->nargs = 2;
944 session->argv = g_malloc (sizeof (gchar *) * 2);
945 session->argv_lens = g_malloc (sizeof (gsize) * 2);
946 key = g_string_new (backend->redis_object);
947 g_string_append (key, src);
948 session->argv[0] = g_strdup ("GET");
949 session->argv_lens[0] = 3;
950 session->argv[1] = key->str;
951 session->argv_lens[1] = key->len;
952 g_string_free (key, FALSE); /* Do not free underlying array */
953
954 up = rspamd_upstream_get (ups,
955 RSPAMD_UPSTREAM_ROUND_ROBIN,
956 NULL,
957 0);
958
959 session->up = rspamd_upstream_ref (up);
960 addr = rspamd_upstream_addr_next (up);
961 g_assert (addr != NULL);
962 session->ctx = rspamd_redis_pool_connect (backend->pool,
963 backend->dbname, backend->password,
964 rspamd_inet_address_to_string (addr),
965 rspamd_inet_address_get_port (addr));
966
967 if (session->ctx == NULL) {
968 rspamd_upstream_fail (up, FALSE, strerror (errno));
969 rspamd_fuzzy_redis_session_dtor (session, TRUE);
970
971 if (cb) {
972 cb (0, ud);
973 }
974 }
975 else {
976 if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_version_callback,
977 session, session->nargs,
978 (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
979 rspamd_fuzzy_redis_session_dtor (session, TRUE);
980
981 if (cb) {
982 cb (0, ud);
983 }
984 }
985 else {
986 /* Add timeout */
987 session->timeout.data = session;
988 ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
989 ev_timer_init (&session->timeout,
990 rspamd_fuzzy_redis_timeout,
991 session->backend->timeout, 0.0);
992 ev_timer_start (session->event_loop, &session->timeout);
993 }
994 }
995 }
996
997 const gchar*
rspamd_fuzzy_backend_id_redis(struct rspamd_fuzzy_backend * bk,void * subr_ud)998 rspamd_fuzzy_backend_id_redis (struct rspamd_fuzzy_backend *bk,
999 void *subr_ud)
1000 {
1001 struct rspamd_fuzzy_backend_redis *backend = subr_ud;
1002 g_assert (backend != NULL);
1003
1004 return backend->id;
1005 }
1006
1007 void
rspamd_fuzzy_backend_expire_redis(struct rspamd_fuzzy_backend * bk,void * subr_ud)1008 rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk,
1009 void *subr_ud)
1010 {
1011 struct rspamd_fuzzy_backend_redis *backend = subr_ud;
1012
1013 g_assert (backend != NULL);
1014 }
1015
1016 static gboolean
rspamd_fuzzy_update_append_command(struct rspamd_fuzzy_backend * bk,struct rspamd_fuzzy_redis_session * session,struct fuzzy_peer_cmd * io_cmd,guint * shift)1017 rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk,
1018 struct rspamd_fuzzy_redis_session *session,
1019 struct fuzzy_peer_cmd *io_cmd, guint *shift)
1020 {
1021 GString *key, *value;
1022 guint cur_shift = *shift;
1023 guint i, klen;
1024 struct rspamd_fuzzy_cmd *cmd;
1025
1026 if (io_cmd->is_shingle) {
1027 cmd = &io_cmd->cmd.shingle.basic;
1028 }
1029 else {
1030 cmd = &io_cmd->cmd.normal;
1031
1032 }
1033
1034 if (cmd->cmd == FUZZY_WRITE) {
1035 /*
1036 * For each normal hash addition we do 5 redis commands:
1037 * HSET <key> F <flag>
1038 * HSETNX <key> C <time>
1039 * HINCRBY <key> V <weight>
1040 * EXPIRE <key> <expire>
1041 * Where <key> is <prefix> || <digest>
1042 */
1043
1044 /* HSET */
1045 klen = strlen (session->backend->redis_object) +
1046 sizeof (cmd->digest) + 1;
1047 key = g_string_sized_new (klen);
1048 g_string_append (key, session->backend->redis_object);
1049 g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1050 value = g_string_sized_new (sizeof ("4294967296"));
1051 rspamd_printf_gstring (value, "%d", cmd->flag);
1052 session->argv[cur_shift] = g_strdup ("HSET");
1053 session->argv_lens[cur_shift++] = sizeof ("HSET") - 1;
1054 session->argv[cur_shift] = key->str;
1055 session->argv_lens[cur_shift++] = key->len;
1056 session->argv[cur_shift] = g_strdup ("F");
1057 session->argv_lens[cur_shift++] = sizeof ("F") - 1;
1058 session->argv[cur_shift] = value->str;
1059 session->argv_lens[cur_shift++] = value->len;
1060 g_string_free (key, FALSE);
1061 g_string_free (value, FALSE);
1062
1063 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1064 4,
1065 (const gchar **)&session->argv[cur_shift - 4],
1066 &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
1067
1068 return FALSE;
1069 }
1070
1071 /* HSETNX */
1072 klen = strlen (session->backend->redis_object) +
1073 sizeof (cmd->digest) + 1;
1074 key = g_string_sized_new (klen);
1075 g_string_append (key, session->backend->redis_object);
1076 g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1077 value = g_string_sized_new (sizeof ("18446744073709551616"));
1078 rspamd_printf_gstring (value, "%L", (gint64)rspamd_get_calendar_ticks ());
1079 session->argv[cur_shift] = g_strdup ("HSETNX");
1080 session->argv_lens[cur_shift++] = sizeof ("HSETNX") - 1;
1081 session->argv[cur_shift] = key->str;
1082 session->argv_lens[cur_shift++] = key->len;
1083 session->argv[cur_shift] = g_strdup ("C");
1084 session->argv_lens[cur_shift++] = sizeof ("C") - 1;
1085 session->argv[cur_shift] = value->str;
1086 session->argv_lens[cur_shift++] = value->len;
1087 g_string_free (key, FALSE);
1088 g_string_free (value, FALSE);
1089
1090 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1091 4,
1092 (const gchar **)&session->argv[cur_shift - 4],
1093 &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
1094
1095 return FALSE;
1096 }
1097
1098 /* HINCRBY */
1099 key = g_string_sized_new (klen);
1100 g_string_append (key, session->backend->redis_object);
1101 g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1102 value = g_string_sized_new (sizeof ("4294967296"));
1103 rspamd_printf_gstring (value, "%d", cmd->value);
1104 session->argv[cur_shift] = g_strdup ("HINCRBY");
1105 session->argv_lens[cur_shift++] = sizeof ("HINCRBY") - 1;
1106 session->argv[cur_shift] = key->str;
1107 session->argv_lens[cur_shift++] = key->len;
1108 session->argv[cur_shift] = g_strdup ("V");
1109 session->argv_lens[cur_shift++] = sizeof ("V") - 1;
1110 session->argv[cur_shift] = value->str;
1111 session->argv_lens[cur_shift++] = value->len;
1112 g_string_free (key, FALSE);
1113 g_string_free (value, FALSE);
1114
1115 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1116 4,
1117 (const gchar **)&session->argv[cur_shift - 4],
1118 &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
1119
1120 return FALSE;
1121 }
1122
1123 /* EXPIRE */
1124 key = g_string_sized_new (klen);
1125 g_string_append (key, session->backend->redis_object);
1126 g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1127 value = g_string_sized_new (sizeof ("4294967296"));
1128 rspamd_printf_gstring (value, "%d",
1129 (gint)rspamd_fuzzy_backend_get_expire (bk));
1130 session->argv[cur_shift] = g_strdup ("EXPIRE");
1131 session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
1132 session->argv[cur_shift] = key->str;
1133 session->argv_lens[cur_shift++] = key->len;
1134 session->argv[cur_shift] = value->str;
1135 session->argv_lens[cur_shift++] = value->len;
1136 g_string_free (key, FALSE);
1137 g_string_free (value, FALSE);
1138
1139 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1140 3,
1141 (const gchar **)&session->argv[cur_shift - 3],
1142 &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
1143
1144 return FALSE;
1145 }
1146
1147 /* INCR */
1148 key = g_string_sized_new (klen);
1149 g_string_append (key, session->backend->redis_object);
1150 g_string_append (key, "_count");
1151 session->argv[cur_shift] = g_strdup ("INCR");
1152 session->argv_lens[cur_shift++] = sizeof ("INCR") - 1;
1153 session->argv[cur_shift] = key->str;
1154 session->argv_lens[cur_shift++] = key->len;
1155 g_string_free (key, FALSE);
1156
1157 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1158 2,
1159 (const gchar **)&session->argv[cur_shift - 2],
1160 &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
1161
1162 return FALSE;
1163 }
1164 }
1165 else if (cmd->cmd == FUZZY_DEL) {
1166 /* DEL */
1167 klen = strlen (session->backend->redis_object) +
1168 sizeof (cmd->digest) + 1;
1169
1170 key = g_string_sized_new (klen);
1171 g_string_append (key, session->backend->redis_object);
1172 g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1173 session->argv[cur_shift] = g_strdup ("DEL");
1174 session->argv_lens[cur_shift++] = sizeof ("DEL") - 1;
1175 session->argv[cur_shift] = key->str;
1176 session->argv_lens[cur_shift++] = key->len;
1177 g_string_free (key, FALSE);
1178
1179 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1180 2,
1181 (const gchar **)&session->argv[cur_shift - 2],
1182 &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
1183
1184 return FALSE;
1185 }
1186
1187 /* DECR */
1188 key = g_string_sized_new (klen);
1189 g_string_append (key, session->backend->redis_object);
1190 g_string_append (key, "_count");
1191 session->argv[cur_shift] = g_strdup ("DECR");
1192 session->argv_lens[cur_shift++] = sizeof ("DECR") - 1;
1193 session->argv[cur_shift] = key->str;
1194 session->argv_lens[cur_shift++] = key->len;
1195 g_string_free (key, FALSE);
1196
1197 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1198 2,
1199 (const gchar **)&session->argv[cur_shift - 2],
1200 &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
1201
1202 return FALSE;
1203 }
1204 }
1205 else if (cmd->cmd == FUZZY_REFRESH) {
1206 /*
1207 * Issue refresh command by just EXPIRE command
1208 * EXPIRE <key> <expire>
1209 * Where <key> is <prefix> || <digest>
1210 */
1211
1212 klen = strlen (session->backend->redis_object) +
1213 sizeof (cmd->digest) + 1;
1214
1215 /* EXPIRE */
1216 key = g_string_sized_new (klen);
1217 g_string_append (key, session->backend->redis_object);
1218 g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1219 value = g_string_sized_new (sizeof ("4294967296"));
1220 rspamd_printf_gstring (value, "%d",
1221 (gint)rspamd_fuzzy_backend_get_expire (bk));
1222 session->argv[cur_shift] = g_strdup ("EXPIRE");
1223 session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
1224 session->argv[cur_shift] = key->str;
1225 session->argv_lens[cur_shift++] = key->len;
1226 session->argv[cur_shift] = value->str;
1227 session->argv_lens[cur_shift++] = value->len;
1228 g_string_free (key, FALSE);
1229 g_string_free (value, FALSE);
1230
1231 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1232 3,
1233 (const gchar **)&session->argv[cur_shift - 3],
1234 &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
1235
1236 return FALSE;
1237 }
1238 }
1239 else if (cmd->cmd == FUZZY_DUP) {
1240 /* Ignore */
1241 }
1242 else {
1243 g_assert_not_reached ();
1244 }
1245
1246 if (io_cmd->is_shingle) {
1247 if (cmd->cmd == FUZZY_WRITE) {
1248 klen = strlen (session->backend->redis_object) +
1249 64 + 1;
1250
1251 for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
1252 guchar *hval;
1253 /*
1254 * For each command with shingles we additionally emit 32 commands:
1255 * SETEX <prefix>_<number>_<value> <expire> <digest>
1256 */
1257
1258 /* SETEX */
1259 key = g_string_sized_new (klen);
1260 rspamd_printf_gstring (key, "%s_%d_%uL",
1261 session->backend->redis_object,
1262 i,
1263 io_cmd->cmd.shingle.sgl.hashes[i]);
1264 value = g_string_sized_new (sizeof ("4294967296"));
1265 rspamd_printf_gstring (value, "%d",
1266 (gint)rspamd_fuzzy_backend_get_expire (bk));
1267 hval = g_malloc (sizeof (io_cmd->cmd.shingle.basic.digest));
1268 memcpy (hval, io_cmd->cmd.shingle.basic.digest,
1269 sizeof (io_cmd->cmd.shingle.basic.digest));
1270 session->argv[cur_shift] = g_strdup ("SETEX");
1271 session->argv_lens[cur_shift++] = sizeof ("SETEX") - 1;
1272 session->argv[cur_shift] = key->str;
1273 session->argv_lens[cur_shift++] = key->len;
1274 session->argv[cur_shift] = value->str;
1275 session->argv_lens[cur_shift++] = value->len;
1276 session->argv[cur_shift] = hval;
1277 session->argv_lens[cur_shift++] = sizeof (io_cmd->cmd.shingle.basic.digest);
1278 g_string_free (key, FALSE);
1279 g_string_free (value, FALSE);
1280
1281 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1282 4,
1283 (const gchar **)&session->argv[cur_shift - 4],
1284 &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
1285
1286 return FALSE;
1287 }
1288 }
1289 }
1290 else if (cmd->cmd == FUZZY_DEL) {
1291 klen = strlen (session->backend->redis_object) +
1292 64 + 1;
1293
1294 for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
1295 key = g_string_sized_new (klen);
1296 rspamd_printf_gstring (key, "%s_%d_%uL",
1297 session->backend->redis_object,
1298 i,
1299 io_cmd->cmd.shingle.sgl.hashes[i]);
1300 session->argv[cur_shift] = g_strdup ("DEL");
1301 session->argv_lens[cur_shift++] = sizeof ("DEL") - 1;
1302 session->argv[cur_shift] = key->str;
1303 session->argv_lens[cur_shift++] = key->len;
1304 g_string_free (key, FALSE);
1305
1306 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1307 2,
1308 (const gchar **)&session->argv[cur_shift - 2],
1309 &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
1310
1311 return FALSE;
1312 }
1313 }
1314 }
1315 else if (cmd->cmd == FUZZY_REFRESH) {
1316 klen = strlen (session->backend->redis_object) +
1317 64 + 1;
1318
1319 for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
1320 /*
1321 * For each command with shingles we additionally emit 32 commands:
1322 * EXPIRE <prefix>_<number>_<value> <expire>
1323 */
1324
1325 /* Expire */
1326 key = g_string_sized_new (klen);
1327 rspamd_printf_gstring (key, "%s_%d_%uL",
1328 session->backend->redis_object,
1329 i,
1330 io_cmd->cmd.shingle.sgl.hashes[i]);
1331 value = g_string_sized_new (sizeof ("18446744073709551616"));
1332 rspamd_printf_gstring (value, "%d",
1333 (gint)rspamd_fuzzy_backend_get_expire (bk));
1334 session->argv[cur_shift] = g_strdup ("EXPIRE");
1335 session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
1336 session->argv[cur_shift] = key->str;
1337 session->argv_lens[cur_shift++] = key->len;
1338 session->argv[cur_shift] = value->str;
1339 session->argv_lens[cur_shift++] = value->len;
1340 g_string_free (key, FALSE);
1341 g_string_free (value, FALSE);
1342
1343 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1344 3,
1345 (const gchar **)&session->argv[cur_shift - 3],
1346 &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
1347
1348 return FALSE;
1349 }
1350 }
1351 }
1352 else if (cmd->cmd == FUZZY_DUP) {
1353 /* Ignore */
1354 }
1355 else {
1356 g_assert_not_reached ();
1357 }
1358 }
1359
1360 *shift = cur_shift;
1361
1362 return TRUE;
1363 }
1364
1365 static void
rspamd_fuzzy_redis_update_callback(redisAsyncContext * c,gpointer r,gpointer priv)1366 rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r,
1367 gpointer priv)
1368 {
1369 struct rspamd_fuzzy_redis_session *session = priv;
1370 redisReply *reply = r;
1371
1372 ev_timer_stop (session->event_loop, &session->timeout);
1373
1374 if (c->err == 0 && reply != NULL) {
1375 rspamd_upstream_ok (session->up);
1376
1377 if (reply->type == REDIS_REPLY_ARRAY) {
1378 /* TODO: check all replies somehow */
1379 if (session->callback.cb_update) {
1380 session->callback.cb_update (TRUE,
1381 session->nadded,
1382 session->ndeleted,
1383 session->nextended,
1384 session->nignored,
1385 session->cbdata);
1386 }
1387 }
1388 else {
1389 if (reply->type == REDIS_REPLY_ERROR) {
1390 msg_err_redis_session ("fuzzy backend redis error: \"%s\"",
1391 reply->str);
1392 }
1393 if (session->callback.cb_update) {
1394 session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata);
1395 }
1396 }
1397 }
1398 else {
1399 if (session->callback.cb_update) {
1400 session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata);
1401 }
1402
1403 if (c->errstr) {
1404 msg_err_redis_session ("error sending update to redis %s: %s",
1405 rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)),
1406 c->errstr);
1407 rspamd_upstream_fail (session->up, FALSE, c->errstr);
1408 }
1409 }
1410
1411 rspamd_fuzzy_redis_session_dtor (session, FALSE);
1412 }
1413
1414 void
rspamd_fuzzy_backend_update_redis(struct rspamd_fuzzy_backend * bk,GArray * updates,const gchar * src,rspamd_fuzzy_update_cb cb,void * ud,void * subr_ud)1415 rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
1416 GArray *updates, const gchar *src,
1417 rspamd_fuzzy_update_cb cb, void *ud,
1418 void *subr_ud)
1419 {
1420 struct rspamd_fuzzy_backend_redis *backend = subr_ud;
1421 struct rspamd_fuzzy_redis_session *session;
1422 struct upstream *up;
1423 struct upstream_list *ups;
1424 rspamd_inet_addr_t *addr;
1425 guint i;
1426 GString *key;
1427 struct fuzzy_peer_cmd *io_cmd;
1428 struct rspamd_fuzzy_cmd *cmd = NULL;
1429 guint nargs, ncommands, cur_shift;
1430
1431 g_assert (backend != NULL);
1432
1433 ups = rspamd_redis_get_servers (backend, "write_servers");
1434 if (!ups) {
1435 if (cb) {
1436 cb (FALSE, 0, 0, 0, 0, ud);
1437 }
1438
1439 return;
1440 }
1441
1442 session = g_malloc0 (sizeof (*session));
1443 session->backend = backend;
1444 REF_RETAIN (session->backend);
1445
1446 /*
1447 * For each normal hash addition we do 3 redis commands:
1448 * HSET <key> F <flag>
1449 * HINCRBY <key> V <weight>
1450 * EXPIRE <key> <expire>
1451 * INCR <prefix||fuzzy_count>
1452 *
1453 * Where <key> is <prefix> || <digest>
1454 *
1455 * For each command with shingles we additionally emit 32 commands:
1456 * SETEX <prefix>_<number>_<value> <expire> <digest>
1457 *
1458 * For each delete command we emit:
1459 * DEL <key>
1460 *
1461 * For each delete command with shingles we emit also 32 commands:
1462 * DEL <prefix>_<number>_<value>
1463 * DECR <prefix||fuzzy_count>
1464 */
1465
1466 ncommands = 3; /* For MULTI + EXEC + INCR <src> */
1467 nargs = 4;
1468
1469 for (i = 0; i < updates->len; i ++) {
1470 io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
1471
1472 if (io_cmd->is_shingle) {
1473 cmd = &io_cmd->cmd.shingle.basic;
1474 }
1475 else {
1476 cmd = &io_cmd->cmd.normal;
1477 }
1478
1479 if (cmd->cmd == FUZZY_WRITE) {
1480 ncommands += 5;
1481 nargs += 17;
1482 session->nadded ++;
1483
1484 if (io_cmd->is_shingle) {
1485 ncommands += RSPAMD_SHINGLE_SIZE;
1486 nargs += RSPAMD_SHINGLE_SIZE * 4;
1487 }
1488
1489 }
1490 else if (cmd->cmd == FUZZY_DEL) {
1491 ncommands += 2;
1492 nargs += 4;
1493 session->ndeleted ++;
1494
1495 if (io_cmd->is_shingle) {
1496 ncommands += RSPAMD_SHINGLE_SIZE;
1497 nargs += RSPAMD_SHINGLE_SIZE * 2;
1498 }
1499 }
1500 else if (cmd->cmd == FUZZY_REFRESH) {
1501 ncommands += 1;
1502 nargs += 3;
1503 session->nextended ++;
1504
1505 if (io_cmd->is_shingle) {
1506 ncommands += RSPAMD_SHINGLE_SIZE;
1507 nargs += RSPAMD_SHINGLE_SIZE * 3;
1508 }
1509 }
1510 else {
1511 session->nignored ++;
1512 }
1513 }
1514
1515 /* Now we need to create a new request */
1516 session->callback.cb_update = cb;
1517 session->cbdata = ud;
1518 session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES;
1519 session->cmd = cmd;
1520 session->prob = 1.0f;
1521 session->event_loop = rspamd_fuzzy_backend_event_base (bk);
1522
1523 /* First of all check digest */
1524 session->nargs = nargs;
1525 session->argv = g_malloc0 (sizeof (gchar *) * session->nargs);
1526 session->argv_lens = g_malloc0 (sizeof (gsize) * session->nargs);
1527
1528 up = rspamd_upstream_get (ups,
1529 RSPAMD_UPSTREAM_MASTER_SLAVE,
1530 NULL,
1531 0);
1532
1533 session->up = rspamd_upstream_ref (up);
1534 addr = rspamd_upstream_addr_next (up);
1535 g_assert (addr != NULL);
1536 session->ctx = rspamd_redis_pool_connect (backend->pool,
1537 backend->dbname, backend->password,
1538 rspamd_inet_address_to_string (addr),
1539 rspamd_inet_address_get_port (addr));
1540
1541 if (session->ctx == NULL) {
1542 rspamd_upstream_fail (up, TRUE, strerror (errno));
1543 rspamd_fuzzy_redis_session_dtor (session, TRUE);
1544
1545 if (cb) {
1546 cb (FALSE, 0, 0, 0, 0, ud);
1547 }
1548 }
1549 else {
1550 /* Start with MULTI command */
1551 session->argv[0] = g_strdup ("MULTI");
1552 session->argv_lens[0] = 5;
1553
1554 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1555 1,
1556 (const gchar **)session->argv,
1557 session->argv_lens) != REDIS_OK) {
1558
1559 if (cb) {
1560 cb (FALSE, 0, 0, 0, 0, ud);
1561 }
1562 rspamd_fuzzy_redis_session_dtor (session, TRUE);
1563
1564 return;
1565 }
1566
1567 /* Now split the rest of commands in packs and emit them command by command */
1568 cur_shift = 1;
1569
1570 for (i = 0; i < updates->len; i ++) {
1571 io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
1572
1573 if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd,
1574 &cur_shift)) {
1575 if (cb) {
1576 cb (FALSE, 0, 0, 0, 0, ud);
1577 }
1578 rspamd_fuzzy_redis_session_dtor (session, TRUE);
1579
1580 return;
1581 }
1582 }
1583
1584 /* Now INCR command for the source */
1585 key = g_string_new (backend->redis_object);
1586 g_string_append (key, src);
1587 session->argv[cur_shift] = g_strdup ("INCR");
1588 session->argv_lens[cur_shift ++] = 4;
1589 session->argv[cur_shift] = key->str;
1590 session->argv_lens[cur_shift ++] = key->len;
1591 g_string_free (key, FALSE);
1592
1593 if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1594 2,
1595 (const gchar **)&session->argv[cur_shift - 2],
1596 &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
1597
1598 if (cb) {
1599 cb (FALSE, 0, 0, 0, 0, ud);
1600 }
1601 rspamd_fuzzy_redis_session_dtor (session, TRUE);
1602
1603 return;
1604 }
1605
1606 /* Finally we call EXEC with a specific callback */
1607 session->argv[cur_shift] = g_strdup ("EXEC");
1608 session->argv_lens[cur_shift] = 4;
1609
1610 if (redisAsyncCommandArgv (session->ctx,
1611 rspamd_fuzzy_redis_update_callback, session,
1612 1,
1613 (const gchar **)&session->argv[cur_shift],
1614 &session->argv_lens[cur_shift]) != REDIS_OK) {
1615
1616 if (cb) {
1617 cb (FALSE, 0, 0, 0, 0, ud);
1618 }
1619 rspamd_fuzzy_redis_session_dtor (session, TRUE);
1620
1621 return;
1622 }
1623 else {
1624 /* Add timeout */
1625 session->timeout.data = session;
1626 ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
1627 ev_timer_init (&session->timeout,
1628 rspamd_fuzzy_redis_timeout,
1629 session->backend->timeout, 0.0);
1630 ev_timer_start (session->event_loop, &session->timeout);
1631 }
1632 }
1633 }
1634
1635 void
rspamd_fuzzy_backend_close_redis(struct rspamd_fuzzy_backend * bk,void * subr_ud)1636 rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk,
1637 void *subr_ud)
1638 {
1639 struct rspamd_fuzzy_backend_redis *backend = subr_ud;
1640
1641 g_assert (backend != NULL);
1642
1643 /*
1644 * XXX: we leak lua registry element there to avoid crashing
1645 * due to chicken-egg problem between lua state termination and
1646 * redis pool termination.
1647 * Here, we assume that redis pool is destroyed AFTER lua_state,
1648 * so all connections pending will release references but due to
1649 * `terminated` hack they will not try to access Lua stuff
1650 * This is enabled merely if we have connections pending (e.g. refcount > 1)
1651 */
1652 if (backend->ref.refcount > 1) {
1653 backend->terminated = true;
1654 }
1655 REF_RELEASE (backend);
1656 }
1657