1 /*-
2  * Copyright 2016 Vsevolod Stakhov
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "config.h"
18 #include "ref.h"
19 #include "fuzzy_backend.h"
20 #include "fuzzy_backend_redis.h"
21 #include "redis_pool.h"
22 #include "cryptobox.h"
23 #include "str_util.h"
24 #include "upstream.h"
25 #include "contrib/hiredis/hiredis.h"
26 #include "contrib/hiredis/async.h"
27 #include "lua/lua_common.h"
28 
29 #define REDIS_DEFAULT_PORT 6379
30 #define REDIS_DEFAULT_OBJECT "fuzzy"
31 #define REDIS_DEFAULT_TIMEOUT 2.0
32 
33 #define msg_err_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
34         "fuzzy_redis", session->backend->id, \
35         G_STRFUNC, \
36         __VA_ARGS__)
37 #define msg_warn_redis_session(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
38         "fuzzy_redis", session->backend->id, \
39         G_STRFUNC, \
40         __VA_ARGS__)
41 #define msg_info_redis_session(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
42         "fuzzy_redis", session->backend->id, \
43         G_STRFUNC, \
44         __VA_ARGS__)
45 #define msg_debug_redis_session(...)  rspamd_conditional_debug_fast (NULL, NULL, \
46         rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \
47         G_STRFUNC, \
48         __VA_ARGS__)
49 
50 INIT_LOG_MODULE(fuzzy_redis)
51 
52 struct rspamd_fuzzy_backend_redis {
53 	lua_State *L;
54 	const gchar *redis_object;
55 	const gchar *password;
56 	const gchar *dbname;
57 	gchar *id;
58 	struct rspamd_redis_pool *pool;
59 	gdouble timeout;
60 	gint conf_ref;
61 	bool terminated;
62 	ref_entry_t ref;
63 };
64 
65 enum rspamd_fuzzy_redis_command {
66 	RSPAMD_FUZZY_REDIS_COMMAND_COUNT,
67 	RSPAMD_FUZZY_REDIS_COMMAND_VERSION,
68 	RSPAMD_FUZZY_REDIS_COMMAND_UPDATES,
69 	RSPAMD_FUZZY_REDIS_COMMAND_CHECK
70 };
71 
72 struct rspamd_fuzzy_redis_session {
73 	struct rspamd_fuzzy_backend_redis *backend;
74 	redisAsyncContext *ctx;
75 	ev_timer timeout;
76 	const struct rspamd_fuzzy_cmd *cmd;
77 	struct ev_loop *event_loop;
78 	float prob;
79 	gboolean shingles_checked;
80 
81 	enum rspamd_fuzzy_redis_command command;
82 	guint nargs;
83 
84 	guint nadded;
85 	guint ndeleted;
86 	guint nextended;
87 	guint nignored;
88 
89 	union {
90 		rspamd_fuzzy_check_cb cb_check;
91 		rspamd_fuzzy_update_cb cb_update;
92 		rspamd_fuzzy_version_cb cb_version;
93 		rspamd_fuzzy_count_cb cb_count;
94 	} callback;
95 	void *cbdata;
96 
97 	gchar **argv;
98 	gsize *argv_lens;
99 	struct upstream *up;
100 	guchar found_digest[rspamd_cryptobox_HASHBYTES];
101 };
102 
103 static inline struct upstream_list *
rspamd_redis_get_servers(struct rspamd_fuzzy_backend_redis * ctx,const gchar * what)104 rspamd_redis_get_servers (struct rspamd_fuzzy_backend_redis *ctx,
105 						  const gchar *what)
106 {
107 	lua_State *L = ctx->L;
108 	struct upstream_list *res = NULL;
109 
110 	lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref);
111 	lua_pushstring (L, what);
112 	lua_gettable (L, -2);
113 
114 	if (lua_type (L, -1) == LUA_TUSERDATA) {
115 		res = *((struct upstream_list **) lua_touserdata (L, -1));
116 	}
117 	else {
118 		struct lua_logger_trace tr;
119 		gchar outbuf[8192];
120 
121 		memset (&tr, 0, sizeof (tr));
122 		lua_logger_out_type (L, -2, outbuf, sizeof (outbuf) - 1, &tr,
123 				LUA_ESCAPE_UNPRINTABLE);
124 
125 		msg_err ("cannot get %s upstreams for Redis fuzzy storage %s; table content: %s",
126 				what, ctx->id, outbuf);
127 	}
128 
129 	lua_settop (L, 0);
130 
131 	return res;
132 }
133 
134 static inline void
rspamd_fuzzy_redis_session_free_args(struct rspamd_fuzzy_redis_session * session)135 rspamd_fuzzy_redis_session_free_args (struct rspamd_fuzzy_redis_session *session)
136 {
137 	guint i;
138 
139 	if (session->argv) {
140 		for (i = 0; i < session->nargs; i ++) {
141 			g_free (session->argv[i]);
142 		}
143 
144 		g_free (session->argv);
145 		g_free (session->argv_lens);
146 	}
147 }
148 static void
rspamd_fuzzy_redis_session_dtor(struct rspamd_fuzzy_redis_session * session,gboolean is_fatal)149 rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session,
150 		gboolean is_fatal)
151 {
152 	redisAsyncContext *ac;
153 
154 
155 	if (session->ctx) {
156 		ac = session->ctx;
157 		session->ctx = NULL;
158 		rspamd_redis_pool_release_connection (session->backend->pool,
159 				ac,
160 				is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT);
161 	}
162 
163 	ev_timer_stop (session->event_loop, &session->timeout);
164 	rspamd_fuzzy_redis_session_free_args (session);
165 
166 	REF_RELEASE (session->backend);
167 	rspamd_upstream_unref (session->up);
168 	g_free (session);
169 }
170 
171 static void
rspamd_fuzzy_backend_redis_dtor(struct rspamd_fuzzy_backend_redis * backend)172 rspamd_fuzzy_backend_redis_dtor (struct rspamd_fuzzy_backend_redis *backend)
173 {
174 	if (!backend->terminated && backend->conf_ref != -1) {
175 		luaL_unref (backend->L, LUA_REGISTRYINDEX, backend->conf_ref);
176 	}
177 
178 	if (backend->id) {
179 		g_free (backend->id);
180 	}
181 
182 	g_free (backend);
183 }
184 
185 void*
rspamd_fuzzy_backend_init_redis(struct rspamd_fuzzy_backend * bk,const ucl_object_t * obj,struct rspamd_config * cfg,GError ** err)186 rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk,
187 		const ucl_object_t *obj, struct rspamd_config *cfg, GError **err)
188 {
189 	struct rspamd_fuzzy_backend_redis *backend;
190 	const ucl_object_t *elt;
191 	gboolean ret = FALSE;
192 	guchar id_hash[rspamd_cryptobox_HASHBYTES];
193 	rspamd_cryptobox_hash_state_t st;
194 	lua_State *L = (lua_State *)cfg->lua_state;
195 	gint conf_ref = -1;
196 
197 	backend = g_malloc0 (sizeof (*backend));
198 
199 	backend->timeout = REDIS_DEFAULT_TIMEOUT;
200 	backend->redis_object = REDIS_DEFAULT_OBJECT;
201 	backend->L = L;
202 
203 	ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref);
204 
205 	/* Now try global redis settings */
206 	if (!ret) {
207 		elt = ucl_object_lookup (cfg->rcl_obj, "redis");
208 
209 		if (elt) {
210 			const ucl_object_t *specific_obj;
211 
212 			specific_obj = ucl_object_lookup_any (elt, "fuzzy", "fuzzy_storage",
213 					NULL);
214 
215 			if (specific_obj) {
216 				ret = rspamd_lua_try_load_redis (L, specific_obj, cfg, &conf_ref);
217 			}
218 			else {
219 				ret = rspamd_lua_try_load_redis (L, elt, cfg, &conf_ref);
220 			}
221 		}
222 	}
223 
224 	if (!ret) {
225 		msg_err_config ("cannot init redis backend for fuzzy storage");
226 		g_free (backend);
227 
228 		return NULL;
229 	}
230 
231 	elt = ucl_object_lookup (obj, "prefix");
232 	if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
233 		backend->redis_object = REDIS_DEFAULT_OBJECT;
234 	}
235 	else {
236 		backend->redis_object = ucl_object_tostring (elt);
237 	}
238 
239 	backend->conf_ref = conf_ref;
240 
241 	/* Check some common table values */
242 	lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref);
243 
244 	lua_pushstring (L, "timeout");
245 	lua_gettable (L, -2);
246 	if (lua_type (L, -1) == LUA_TNUMBER) {
247 		backend->timeout = lua_tonumber (L, -1);
248 	}
249 	lua_pop (L, 1);
250 
251 	lua_pushstring (L, "db");
252 	lua_gettable (L, -2);
253 	if (lua_type (L, -1) == LUA_TSTRING) {
254 		backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool,
255 				lua_tostring (L, -1));
256 	}
257 	lua_pop (L, 1);
258 
259 	lua_pushstring (L, "password");
260 	lua_gettable (L, -2);
261 	if (lua_type (L, -1) == LUA_TSTRING) {
262 		backend->password = rspamd_mempool_strdup (cfg->cfg_pool,
263 				lua_tostring (L, -1));
264 	}
265 	lua_pop (L, 1);
266 
267 	lua_settop (L, 0);
268 
269 	REF_INIT_RETAIN (backend, rspamd_fuzzy_backend_redis_dtor);
270 	backend->pool = cfg->redis_pool;
271 	rspamd_cryptobox_hash_init (&st, NULL, 0);
272 	rspamd_cryptobox_hash_update (&st, backend->redis_object,
273 			strlen (backend->redis_object));
274 
275 	if (backend->dbname) {
276 		rspamd_cryptobox_hash_update (&st, backend->dbname,
277 				strlen (backend->dbname));
278 	}
279 
280 	if (backend->password) {
281 		rspamd_cryptobox_hash_update (&st, backend->password,
282 				strlen (backend->password));
283 	}
284 
285 	rspamd_cryptobox_hash_final (&st, id_hash);
286 	backend->id = rspamd_encode_base32 (id_hash, sizeof (id_hash), RSPAMD_BASE32_DEFAULT);
287 
288 	return backend;
289 }
290 
291 static void
rspamd_fuzzy_redis_timeout(EV_P_ ev_timer * w,int revents)292 rspamd_fuzzy_redis_timeout (EV_P_ ev_timer *w, int revents)
293 {
294 	struct rspamd_fuzzy_redis_session *session =
295 			(struct rspamd_fuzzy_redis_session *)w->data;
296 	redisAsyncContext *ac;
297 	static char errstr[128];
298 
299 	if (session->ctx) {
300 		ac = session->ctx;
301 		session->ctx = NULL;
302 		ac->err = REDIS_ERR_IO;
303 		/* Should be safe as in hiredis it is char[128] */
304 		rspamd_snprintf (errstr, sizeof (errstr), "%s", strerror (ETIMEDOUT));
305 		ac->errstr = errstr;
306 
307 		/* This will cause session closing */
308 		rspamd_redis_pool_release_connection (session->backend->pool,
309 				ac, RSPAMD_REDIS_RELEASE_FATAL);
310 	}
311 }
312 
313 static void rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r,
314 		gpointer priv);
315 
316 struct _rspamd_fuzzy_shingles_helper {
317 	guchar digest[64];
318 	guint found;
319 };
320 
321 static gint
rspamd_fuzzy_backend_redis_shingles_cmp(const void * a,const void * b)322 rspamd_fuzzy_backend_redis_shingles_cmp (const void *a, const void *b)
323 {
324 	const struct _rspamd_fuzzy_shingles_helper *sha = a,
325 			*shb = b;
326 
327 	return memcmp (sha->digest, shb->digest, sizeof (sha->digest));
328 }
329 
330 static void
rspamd_fuzzy_redis_shingles_callback(redisAsyncContext * c,gpointer r,gpointer priv)331 rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r,
332 		gpointer priv)
333 {
334 	struct rspamd_fuzzy_redis_session *session = priv;
335 	redisReply *reply = r, *cur;
336 	struct rspamd_fuzzy_reply rep;
337 	GString *key;
338 	struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL;
339 	guint i, found = 0, max_found = 0, cur_found = 0;
340 
341 	ev_timer_stop (session->event_loop, &session->timeout);
342 	memset (&rep, 0, sizeof (rep));
343 
344 	if (c->err == 0 && reply != NULL) {
345 		rspamd_upstream_ok (session->up);
346 
347 		if (reply->type == REDIS_REPLY_ARRAY &&
348 				reply->elements == RSPAMD_SHINGLE_SIZE) {
349 			shingles = g_alloca (sizeof (struct _rspamd_fuzzy_shingles_helper) *
350 					RSPAMD_SHINGLE_SIZE);
351 
352 			for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
353 				cur = reply->element[i];
354 
355 				if (cur->type == REDIS_REPLY_STRING) {
356 					shingles[i].found = 1;
357 					memcpy (shingles[i].digest, cur->str, MIN (64, cur->len));
358 					found ++;
359 				}
360 				else {
361 					memset (shingles[i].digest, 0, sizeof (shingles[i].digest));
362 					shingles[i].found = 0;
363 				}
364 			}
365 
366 			if (found > RSPAMD_SHINGLE_SIZE / 2) {
367 				/* Now sort to find the most frequent element */
368 				qsort (shingles, RSPAMD_SHINGLE_SIZE,
369 						sizeof (struct _rspamd_fuzzy_shingles_helper),
370 						rspamd_fuzzy_backend_redis_shingles_cmp);
371 
372 				prev = &shingles[0];
373 
374 				for (i = 1; i < RSPAMD_SHINGLE_SIZE; i ++) {
375 					if (!shingles[i].found) {
376 						continue;
377 					}
378 
379 					if (memcmp (shingles[i].digest, prev->digest, 64) == 0) {
380 						cur_found ++;
381 
382 						if (cur_found > max_found) {
383 							max_found = cur_found;
384 							sel = &shingles[i];
385 						}
386 					}
387 					else {
388 						cur_found = 1;
389 						prev = &shingles[i];
390 					}
391 				}
392 
393 				if (max_found > RSPAMD_SHINGLE_SIZE / 2) {
394 					session->prob = ((float)max_found) / RSPAMD_SHINGLE_SIZE;
395 					rep.v1.prob = session->prob;
396 
397 					g_assert (sel != NULL);
398 
399 					/* Prepare new check command */
400 					rspamd_fuzzy_redis_session_free_args (session);
401 					session->nargs = 5;
402 					session->argv = g_malloc (sizeof (gchar *) * session->nargs);
403 					session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
404 
405 					key = g_string_new (session->backend->redis_object);
406 					g_string_append_len (key, sel->digest, sizeof (sel->digest));
407 					session->argv[0] = g_strdup ("HMGET");
408 					session->argv_lens[0] = 5;
409 					session->argv[1] = key->str;
410 					session->argv_lens[1] = key->len;
411 					session->argv[2] = g_strdup ("V");
412 					session->argv_lens[2] = 1;
413 					session->argv[3] = g_strdup ("F");
414 					session->argv_lens[3] = 1;
415 					session->argv[4] = g_strdup ("C");
416 					session->argv_lens[4] = 1;
417 					g_string_free (key, FALSE); /* Do not free underlying array */
418 					memcpy (session->found_digest, sel->digest,
419 							sizeof (session->cmd->digest));
420 
421 					g_assert (session->ctx != NULL);
422 					if (redisAsyncCommandArgv (session->ctx,
423 							rspamd_fuzzy_redis_check_callback,
424 							session, session->nargs,
425 							(const gchar **)session->argv,
426 							session->argv_lens) != REDIS_OK) {
427 
428 						if (session->callback.cb_check) {
429 							memset (&rep, 0, sizeof (rep));
430 							session->callback.cb_check (&rep, session->cbdata);
431 						}
432 
433 						rspamd_fuzzy_redis_session_dtor (session, TRUE);
434 					}
435 					else {
436 						/* Add timeout */
437 						session->timeout.data = session;
438 						ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
439 						ev_timer_init (&session->timeout,
440 								rspamd_fuzzy_redis_timeout,
441 								session->backend->timeout, 0.0);
442 						ev_timer_start (session->event_loop, &session->timeout);
443 					}
444 
445 					return;
446 				}
447 			}
448 		}
449 		else if (reply->type == REDIS_REPLY_ERROR) {
450 			msg_err_redis_session ("fuzzy backend redis error: \"%s\"",
451 					reply->str);
452 		}
453 
454 		if (session->callback.cb_check) {
455 			session->callback.cb_check (&rep, session->cbdata);
456 		}
457 	}
458 	else {
459 		if (session->callback.cb_check) {
460 			session->callback.cb_check (&rep, session->cbdata);
461 		}
462 
463 		if (c->errstr) {
464 			msg_err_redis_session ("error getting shingles: %s", c->errstr);
465 			rspamd_upstream_fail (session->up, FALSE,  c->errstr);
466 		}
467 	}
468 
469 	rspamd_fuzzy_redis_session_dtor (session, FALSE);
470 }
471 
472 static void
rspamd_fuzzy_backend_check_shingles(struct rspamd_fuzzy_redis_session * session)473 rspamd_fuzzy_backend_check_shingles (struct rspamd_fuzzy_redis_session *session)
474 {
475 	struct rspamd_fuzzy_reply rep;
476 	const struct rspamd_fuzzy_shingle_cmd *shcmd;
477 	GString *key;
478 	guint i, init_len;
479 
480 	rspamd_fuzzy_redis_session_free_args (session);
481 	/* First of all check digest */
482 	session->nargs = RSPAMD_SHINGLE_SIZE + 1;
483 	session->argv = g_malloc (sizeof (gchar *) * session->nargs);
484 	session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
485 	shcmd = (const struct rspamd_fuzzy_shingle_cmd *)session->cmd;
486 
487 	session->argv[0] = g_strdup ("MGET");
488 	session->argv_lens[0] = 4;
489 	init_len = strlen (session->backend->redis_object);
490 
491 	for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
492 
493 		key = g_string_sized_new (init_len + 2 + 2 + sizeof ("18446744073709551616"));
494 		rspamd_printf_gstring (key, "%s_%d_%uL", session->backend->redis_object,
495 				i, shcmd->sgl.hashes[i]);
496 		session->argv[i + 1] = key->str;
497 		session->argv_lens[i + 1] = key->len;
498 		g_string_free (key, FALSE); /* Do not free underlying array */
499 	}
500 
501 	session->shingles_checked = TRUE;
502 
503 	g_assert (session->ctx != NULL);
504 
505 	if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_shingles_callback,
506 			session, session->nargs,
507 			(const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
508 		msg_err ("cannot execute redis command on %s: %s",
509 				rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)),
510 				session->ctx->errstr);
511 
512 		if (session->callback.cb_check) {
513 			memset (&rep, 0, sizeof (rep));
514 			session->callback.cb_check (&rep, session->cbdata);
515 		}
516 
517 		rspamd_fuzzy_redis_session_dtor (session, TRUE);
518 	}
519 	else {
520 		/* Add timeout */
521 		session->timeout.data = session;
522 		ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
523 		ev_timer_init (&session->timeout,
524 				rspamd_fuzzy_redis_timeout,
525 				session->backend->timeout, 0.0);
526 		ev_timer_start (session->event_loop, &session->timeout);
527 	}
528 }
529 
530 static void
rspamd_fuzzy_redis_check_callback(redisAsyncContext * c,gpointer r,gpointer priv)531 rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r,
532 		gpointer priv)
533 {
534 	struct rspamd_fuzzy_redis_session *session = priv;
535 	redisReply *reply = r, *cur;
536 	struct rspamd_fuzzy_reply rep;
537 	gulong value;
538 	guint found_elts = 0;
539 
540 	ev_timer_stop (session->event_loop, &session->timeout);
541 	memset (&rep, 0, sizeof (rep));
542 
543 	if (c->err == 0 && reply != NULL) {
544 		rspamd_upstream_ok (session->up);
545 
546 		if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) {
547 			cur = reply->element[0];
548 
549 			if (cur->type == REDIS_REPLY_STRING) {
550 				value = strtoul (cur->str, NULL, 10);
551 				rep.v1.value = value;
552 				found_elts ++;
553 			}
554 
555 			cur = reply->element[1];
556 
557 			if (cur->type == REDIS_REPLY_STRING) {
558 				value = strtoul (cur->str, NULL, 10);
559 				rep.v1.flag = value;
560 				found_elts ++;
561 			}
562 
563 			if (found_elts >= 2) {
564 				rep.v1.prob = session->prob;
565 				memcpy (rep.digest, session->found_digest, sizeof (rep.digest));
566 			}
567 
568 			rep.ts = 0;
569 
570 			if (reply->elements > 2) {
571 				cur = reply->element[2];
572 
573 				if (cur->type == REDIS_REPLY_STRING) {
574 					rep.ts = strtoul (cur->str, NULL, 10);
575 				}
576 			}
577 		}
578 		else if (reply->type == REDIS_REPLY_ERROR) {
579 			msg_err_redis_session ("fuzzy backend redis error: \"%s\"",
580 					reply->str);
581 		}
582 
583 		if (found_elts < 2) {
584 			if (session->cmd->shingles_count > 0 && !session->shingles_checked) {
585 				/* We also need to check all shingles here */
586 				rspamd_fuzzy_backend_check_shingles (session);
587 				/* Do not free session */
588 				return;
589 			}
590 			else {
591 				if (session->callback.cb_check) {
592 					session->callback.cb_check (&rep, session->cbdata);
593 				}
594 			}
595 		}
596 		else {
597 			if (session->callback.cb_check) {
598 				session->callback.cb_check (&rep, session->cbdata);
599 			}
600 		}
601 	}
602 	else {
603 		if (session->callback.cb_check) {
604 			session->callback.cb_check (&rep, session->cbdata);
605 		}
606 
607 		if (c->errstr) {
608 			msg_err_redis_session ("error getting hashes on %s: %s",
609 					rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)),
610 					c->errstr);
611 			rspamd_upstream_fail (session->up, FALSE, c->errstr);
612 		}
613 	}
614 
615 	rspamd_fuzzy_redis_session_dtor (session, FALSE);
616 }
617 
618 void
rspamd_fuzzy_backend_check_redis(struct rspamd_fuzzy_backend * bk,const struct rspamd_fuzzy_cmd * cmd,rspamd_fuzzy_check_cb cb,void * ud,void * subr_ud)619 rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
620 		const struct rspamd_fuzzy_cmd *cmd,
621 		rspamd_fuzzy_check_cb cb, void *ud,
622 		void *subr_ud)
623 {
624 	struct rspamd_fuzzy_backend_redis *backend = subr_ud;
625 	struct rspamd_fuzzy_redis_session *session;
626 	struct upstream *up;
627 	struct upstream_list *ups;
628 	rspamd_inet_addr_t *addr;
629 	struct rspamd_fuzzy_reply rep;
630 	GString *key;
631 
632 	g_assert (backend != NULL);
633 
634 	ups = rspamd_redis_get_servers (backend, "read_servers");
635 	if (!ups) {
636 		if (cb) {
637 			memset (&rep, 0, sizeof (rep));
638 			cb (&rep, ud);
639 		}
640 
641 		return;
642 	}
643 
644 	session = g_malloc0 (sizeof (*session));
645 	session->backend = backend;
646 	REF_RETAIN (session->backend);
647 
648 	session->callback.cb_check = cb;
649 	session->cbdata = ud;
650 	session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK;
651 	session->cmd = cmd;
652 	session->prob = 1.0;
653 	memcpy (rep.digest, session->cmd->digest, sizeof (rep.digest));
654 	memcpy (session->found_digest, session->cmd->digest, sizeof (rep.digest));
655 	session->event_loop = rspamd_fuzzy_backend_event_base (bk);
656 
657 	/* First of all check digest */
658 	session->nargs = 5;
659 	session->argv = g_malloc (sizeof (gchar *) * session->nargs);
660 	session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
661 
662 	key = g_string_new (backend->redis_object);
663 	g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
664 	session->argv[0] = g_strdup ("HMGET");
665 	session->argv_lens[0] = 5;
666 	session->argv[1] = key->str;
667 	session->argv_lens[1] = key->len;
668 	session->argv[2] = g_strdup ("V");
669 	session->argv_lens[2] = 1;
670 	session->argv[3] = g_strdup ("F");
671 	session->argv_lens[3] = 1;
672 	session->argv[4] = g_strdup ("C");
673 	session->argv_lens[4] = 1;
674 	g_string_free (key, FALSE); /* Do not free underlying array */
675 
676 	up = rspamd_upstream_get (ups,
677 			RSPAMD_UPSTREAM_ROUND_ROBIN,
678 			NULL,
679 			0);
680 
681 	session->up = rspamd_upstream_ref (up);
682 	addr = rspamd_upstream_addr_next (up);
683 	g_assert (addr != NULL);
684 	session->ctx = rspamd_redis_pool_connect (backend->pool,
685 			backend->dbname, backend->password,
686 			rspamd_inet_address_to_string (addr),
687 			rspamd_inet_address_get_port (addr));
688 
689 	if (session->ctx == NULL) {
690 		rspamd_upstream_fail (up, TRUE, strerror (errno));
691 		rspamd_fuzzy_redis_session_dtor (session, TRUE);
692 
693 		if (cb) {
694 			memset (&rep, 0, sizeof (rep));
695 			cb (&rep, ud);
696 		}
697 	}
698 	else {
699 		if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_check_callback,
700 				session, session->nargs,
701 				(const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
702 			rspamd_fuzzy_redis_session_dtor (session, TRUE);
703 
704 			if (cb) {
705 				memset (&rep, 0, sizeof (rep));
706 				cb (&rep, ud);
707 			}
708 		}
709 		else {
710 			/* Add timeout */
711 			session->timeout.data = session;
712 			ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
713 			ev_timer_init (&session->timeout,
714 					rspamd_fuzzy_redis_timeout,
715 					session->backend->timeout, 0.0);
716 			ev_timer_start (session->event_loop, &session->timeout);
717 		}
718 	}
719 }
720 
721 static void
rspamd_fuzzy_redis_count_callback(redisAsyncContext * c,gpointer r,gpointer priv)722 rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r,
723 		gpointer priv)
724 {
725 	struct rspamd_fuzzy_redis_session *session = priv;
726 	redisReply *reply = r;
727 	gulong nelts;
728 
729 	ev_timer_stop (session->event_loop, &session->timeout);
730 
731 	if (c->err == 0 && reply != NULL) {
732 		rspamd_upstream_ok (session->up);
733 
734 		if (reply->type == REDIS_REPLY_INTEGER) {
735 			if (session->callback.cb_count) {
736 				session->callback.cb_count (reply->integer, session->cbdata);
737 			}
738 		}
739 		else if (reply->type == REDIS_REPLY_STRING) {
740 			nelts = strtoul (reply->str, NULL, 10);
741 
742 			if (session->callback.cb_count) {
743 				session->callback.cb_count (nelts, session->cbdata);
744 			}
745 		}
746 		else {
747 			if (reply->type == REDIS_REPLY_ERROR) {
748 				msg_err_redis_session ("fuzzy backend redis error: \"%s\"",
749 						reply->str);
750 			}
751 			if (session->callback.cb_count) {
752 				session->callback.cb_count (0, session->cbdata);
753 			}
754 		}
755 	}
756 	else {
757 		if (session->callback.cb_count) {
758 			session->callback.cb_count (0, session->cbdata);
759 		}
760 
761 		if (c->errstr) {
762 			msg_err_redis_session ("error getting count on %s: %s",
763 					rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)),
764 					c->errstr);
765 			rspamd_upstream_fail (session->up, FALSE, c->errstr);
766 		}
767 
768 	}
769 
770 	rspamd_fuzzy_redis_session_dtor (session, FALSE);
771 }
772 
773 void
rspamd_fuzzy_backend_count_redis(struct rspamd_fuzzy_backend * bk,rspamd_fuzzy_count_cb cb,void * ud,void * subr_ud)774 rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk,
775 		rspamd_fuzzy_count_cb cb, void *ud,
776 		void *subr_ud)
777 {
778 	struct rspamd_fuzzy_backend_redis *backend = subr_ud;
779 	struct rspamd_fuzzy_redis_session *session;
780 	struct upstream *up;
781 	struct upstream_list *ups;
782 	rspamd_inet_addr_t *addr;
783 	GString *key;
784 
785 	g_assert (backend != NULL);
786 
787 	ups = rspamd_redis_get_servers (backend, "read_servers");
788 	if (!ups) {
789 		if (cb) {
790 			cb (0, ud);
791 		}
792 
793 		return;
794 	}
795 
796 	session = g_malloc0 (sizeof (*session));
797 	session->backend = backend;
798 	REF_RETAIN (session->backend);
799 
800 	session->callback.cb_count = cb;
801 	session->cbdata = ud;
802 	session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT;
803 	session->event_loop = rspamd_fuzzy_backend_event_base (bk);
804 
805 	session->nargs = 2;
806 	session->argv = g_malloc (sizeof (gchar *) * 2);
807 	session->argv_lens = g_malloc (sizeof (gsize) * 2);
808 	key = g_string_new (backend->redis_object);
809 	g_string_append (key, "_count");
810 	session->argv[0] = g_strdup ("GET");
811 	session->argv_lens[0] = 3;
812 	session->argv[1] = key->str;
813 	session->argv_lens[1] = key->len;
814 	g_string_free (key, FALSE); /* Do not free underlying array */
815 
816 	up = rspamd_upstream_get (ups,
817 			RSPAMD_UPSTREAM_ROUND_ROBIN,
818 			NULL,
819 			0);
820 
821 	session->up = rspamd_upstream_ref (up);
822 	addr = rspamd_upstream_addr_next (up);
823 	g_assert (addr != NULL);
824 	session->ctx = rspamd_redis_pool_connect (backend->pool,
825 			backend->dbname, backend->password,
826 			rspamd_inet_address_to_string (addr),
827 			rspamd_inet_address_get_port (addr));
828 
829 	if (session->ctx == NULL) {
830 		rspamd_upstream_fail (up, TRUE,  strerror (errno));
831 		rspamd_fuzzy_redis_session_dtor (session, TRUE);
832 
833 		if (cb) {
834 			cb (0, ud);
835 		}
836 	}
837 	else {
838 		if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_count_callback,
839 				session, session->nargs,
840 				(const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
841 			rspamd_fuzzy_redis_session_dtor (session, TRUE);
842 
843 			if (cb) {
844 				cb (0, ud);
845 			}
846 		}
847 		else {
848 			/* Add timeout */
849 			session->timeout.data = session;
850 			ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
851 			ev_timer_init (&session->timeout,
852 					rspamd_fuzzy_redis_timeout,
853 					session->backend->timeout, 0.0);
854 			ev_timer_start (session->event_loop, &session->timeout);
855 		}
856 	}
857 }
858 
859 static void
rspamd_fuzzy_redis_version_callback(redisAsyncContext * c,gpointer r,gpointer priv)860 rspamd_fuzzy_redis_version_callback (redisAsyncContext *c, gpointer r,
861 		gpointer priv)
862 {
863 	struct rspamd_fuzzy_redis_session *session = priv;
864 	redisReply *reply = r;
865 	gulong nelts;
866 
867 	ev_timer_stop (session->event_loop, &session->timeout);
868 
869 	if (c->err == 0 && reply != NULL) {
870 		rspamd_upstream_ok (session->up);
871 
872 		if (reply->type == REDIS_REPLY_INTEGER) {
873 			if (session->callback.cb_version) {
874 				session->callback.cb_version (reply->integer, session->cbdata);
875 			}
876 		}
877 		else if (reply->type == REDIS_REPLY_STRING) {
878 			nelts = strtoul (reply->str, NULL, 10);
879 
880 			if (session->callback.cb_version) {
881 				session->callback.cb_version (nelts, session->cbdata);
882 			}
883 		}
884 		else {
885 			if (reply->type == REDIS_REPLY_ERROR) {
886 				msg_err_redis_session ("fuzzy backend redis error: \"%s\"",
887 						reply->str);
888 			}
889 			if (session->callback.cb_version) {
890 				session->callback.cb_version (0, session->cbdata);
891 			}
892 		}
893 	}
894 	else {
895 		if (session->callback.cb_version) {
896 			session->callback.cb_version (0, session->cbdata);
897 		}
898 
899 		if (c->errstr) {
900 			msg_err_redis_session ("error getting version on %s: %s",
901 					rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)),
902 					c->errstr);
903 			rspamd_upstream_fail (session->up, FALSE,  c->errstr);
904 		}
905 	}
906 
907 	rspamd_fuzzy_redis_session_dtor (session, FALSE);
908 }
909 
910 void
rspamd_fuzzy_backend_version_redis(struct rspamd_fuzzy_backend * bk,const gchar * src,rspamd_fuzzy_version_cb cb,void * ud,void * subr_ud)911 rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
912 		const gchar *src,
913 		rspamd_fuzzy_version_cb cb, void *ud,
914 		void *subr_ud)
915 {
916 	struct rspamd_fuzzy_backend_redis *backend = subr_ud;
917 	struct rspamd_fuzzy_redis_session *session;
918 	struct upstream *up;
919 	struct upstream_list *ups;
920 	rspamd_inet_addr_t *addr;
921 	GString *key;
922 
923 	g_assert (backend != NULL);
924 
925 	ups = rspamd_redis_get_servers (backend, "read_servers");
926 	if (!ups) {
927 		if (cb) {
928 			cb (0, ud);
929 		}
930 
931 		return;
932 	}
933 
934 	session = g_malloc0 (sizeof (*session));
935 	session->backend = backend;
936 	REF_RETAIN (session->backend);
937 
938 	session->callback.cb_version = cb;
939 	session->cbdata = ud;
940 	session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION;
941 	session->event_loop = rspamd_fuzzy_backend_event_base (bk);
942 
943 	session->nargs = 2;
944 	session->argv = g_malloc (sizeof (gchar *) * 2);
945 	session->argv_lens = g_malloc (sizeof (gsize) * 2);
946 	key = g_string_new (backend->redis_object);
947 	g_string_append (key, src);
948 	session->argv[0] = g_strdup ("GET");
949 	session->argv_lens[0] = 3;
950 	session->argv[1] = key->str;
951 	session->argv_lens[1] = key->len;
952 	g_string_free (key, FALSE); /* Do not free underlying array */
953 
954 	up = rspamd_upstream_get (ups,
955 			RSPAMD_UPSTREAM_ROUND_ROBIN,
956 			NULL,
957 			0);
958 
959 	session->up = rspamd_upstream_ref (up);
960 	addr = rspamd_upstream_addr_next (up);
961 	g_assert (addr != NULL);
962 	session->ctx = rspamd_redis_pool_connect (backend->pool,
963 			backend->dbname, backend->password,
964 			rspamd_inet_address_to_string (addr),
965 			rspamd_inet_address_get_port (addr));
966 
967 	if (session->ctx == NULL) {
968 		rspamd_upstream_fail (up, FALSE,  strerror (errno));
969 		rspamd_fuzzy_redis_session_dtor (session, TRUE);
970 
971 		if (cb) {
972 			cb (0, ud);
973 		}
974 	}
975 	else {
976 		if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_version_callback,
977 				session, session->nargs,
978 				(const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
979 			rspamd_fuzzy_redis_session_dtor (session, TRUE);
980 
981 			if (cb) {
982 				cb (0, ud);
983 			}
984 		}
985 		else {
986 			/* Add timeout */
987 			session->timeout.data = session;
988 			ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
989 			ev_timer_init (&session->timeout,
990 					rspamd_fuzzy_redis_timeout,
991 					session->backend->timeout, 0.0);
992 			ev_timer_start (session->event_loop, &session->timeout);
993 		}
994 	}
995 }
996 
997 const gchar*
rspamd_fuzzy_backend_id_redis(struct rspamd_fuzzy_backend * bk,void * subr_ud)998 rspamd_fuzzy_backend_id_redis (struct rspamd_fuzzy_backend *bk,
999 		void *subr_ud)
1000 {
1001 	struct rspamd_fuzzy_backend_redis *backend = subr_ud;
1002 	g_assert (backend != NULL);
1003 
1004 	return backend->id;
1005 }
1006 
1007 void
rspamd_fuzzy_backend_expire_redis(struct rspamd_fuzzy_backend * bk,void * subr_ud)1008 rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk,
1009 		void *subr_ud)
1010 {
1011 	struct rspamd_fuzzy_backend_redis *backend = subr_ud;
1012 
1013 	g_assert (backend != NULL);
1014 }
1015 
1016 static gboolean
rspamd_fuzzy_update_append_command(struct rspamd_fuzzy_backend * bk,struct rspamd_fuzzy_redis_session * session,struct fuzzy_peer_cmd * io_cmd,guint * shift)1017 rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk,
1018 		struct rspamd_fuzzy_redis_session *session,
1019 		struct fuzzy_peer_cmd *io_cmd, guint *shift)
1020 {
1021 	GString *key, *value;
1022 	guint cur_shift = *shift;
1023 	guint i, klen;
1024 	struct rspamd_fuzzy_cmd *cmd;
1025 
1026 	if (io_cmd->is_shingle) {
1027 		cmd = &io_cmd->cmd.shingle.basic;
1028 	}
1029 	else {
1030 		cmd = &io_cmd->cmd.normal;
1031 
1032 	}
1033 
1034 	if (cmd->cmd == FUZZY_WRITE) {
1035 		/*
1036 		 * For each normal hash addition we do 5 redis commands:
1037 		 * HSET <key> F <flag>
1038 		 * HSETNX <key> C <time>
1039 		 * HINCRBY <key> V <weight>
1040 		 * EXPIRE <key> <expire>
1041 		 * Where <key> is <prefix> || <digest>
1042 		 */
1043 
1044 		/* HSET */
1045 		klen = strlen (session->backend->redis_object) +
1046 				sizeof (cmd->digest) + 1;
1047 		key = g_string_sized_new (klen);
1048 		g_string_append (key, session->backend->redis_object);
1049 		g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1050 		value = g_string_sized_new (sizeof ("4294967296"));
1051 		rspamd_printf_gstring (value, "%d", cmd->flag);
1052 		session->argv[cur_shift] = g_strdup ("HSET");
1053 		session->argv_lens[cur_shift++] = sizeof ("HSET") - 1;
1054 		session->argv[cur_shift] = key->str;
1055 		session->argv_lens[cur_shift++] = key->len;
1056 		session->argv[cur_shift] = g_strdup ("F");
1057 		session->argv_lens[cur_shift++] = sizeof ("F") - 1;
1058 		session->argv[cur_shift] = value->str;
1059 		session->argv_lens[cur_shift++] = value->len;
1060 		g_string_free (key, FALSE);
1061 		g_string_free (value, FALSE);
1062 
1063 		if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1064 				4,
1065 				(const gchar **)&session->argv[cur_shift - 4],
1066 				&session->argv_lens[cur_shift - 4]) != REDIS_OK) {
1067 
1068 			return FALSE;
1069 		}
1070 
1071 		/* HSETNX */
1072 		klen = strlen (session->backend->redis_object) +
1073 				sizeof (cmd->digest) + 1;
1074 		key = g_string_sized_new (klen);
1075 		g_string_append (key, session->backend->redis_object);
1076 		g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1077 		value = g_string_sized_new (sizeof ("18446744073709551616"));
1078 		rspamd_printf_gstring (value, "%L", (gint64)rspamd_get_calendar_ticks ());
1079 		session->argv[cur_shift] = g_strdup ("HSETNX");
1080 		session->argv_lens[cur_shift++] = sizeof ("HSETNX") - 1;
1081 		session->argv[cur_shift] = key->str;
1082 		session->argv_lens[cur_shift++] = key->len;
1083 		session->argv[cur_shift] = g_strdup ("C");
1084 		session->argv_lens[cur_shift++] = sizeof ("C") - 1;
1085 		session->argv[cur_shift] = value->str;
1086 		session->argv_lens[cur_shift++] = value->len;
1087 		g_string_free (key, FALSE);
1088 		g_string_free (value, FALSE);
1089 
1090 		if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1091 				4,
1092 				(const gchar **)&session->argv[cur_shift - 4],
1093 				&session->argv_lens[cur_shift - 4]) != REDIS_OK) {
1094 
1095 			return FALSE;
1096 		}
1097 
1098 		/* HINCRBY */
1099 		key = g_string_sized_new (klen);
1100 		g_string_append (key, session->backend->redis_object);
1101 		g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1102 		value = g_string_sized_new (sizeof ("4294967296"));
1103 		rspamd_printf_gstring (value, "%d", cmd->value);
1104 		session->argv[cur_shift] = g_strdup ("HINCRBY");
1105 		session->argv_lens[cur_shift++] = sizeof ("HINCRBY") - 1;
1106 		session->argv[cur_shift] = key->str;
1107 		session->argv_lens[cur_shift++] = key->len;
1108 		session->argv[cur_shift] = g_strdup ("V");
1109 		session->argv_lens[cur_shift++] = sizeof ("V") - 1;
1110 		session->argv[cur_shift] = value->str;
1111 		session->argv_lens[cur_shift++] = value->len;
1112 		g_string_free (key, FALSE);
1113 		g_string_free (value, FALSE);
1114 
1115 		if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1116 				4,
1117 				(const gchar **)&session->argv[cur_shift - 4],
1118 				&session->argv_lens[cur_shift - 4]) != REDIS_OK) {
1119 
1120 			return FALSE;
1121 		}
1122 
1123 		/* EXPIRE */
1124 		key = g_string_sized_new (klen);
1125 		g_string_append (key, session->backend->redis_object);
1126 		g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1127 		value = g_string_sized_new (sizeof ("4294967296"));
1128 		rspamd_printf_gstring (value, "%d",
1129 				(gint)rspamd_fuzzy_backend_get_expire (bk));
1130 		session->argv[cur_shift] = g_strdup ("EXPIRE");
1131 		session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
1132 		session->argv[cur_shift] = key->str;
1133 		session->argv_lens[cur_shift++] = key->len;
1134 		session->argv[cur_shift] = value->str;
1135 		session->argv_lens[cur_shift++] = value->len;
1136 		g_string_free (key, FALSE);
1137 		g_string_free (value, FALSE);
1138 
1139 		if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1140 				3,
1141 				(const gchar **)&session->argv[cur_shift - 3],
1142 				&session->argv_lens[cur_shift - 3]) != REDIS_OK) {
1143 
1144 			return FALSE;
1145 		}
1146 
1147 		/* INCR */
1148 		key = g_string_sized_new (klen);
1149 		g_string_append (key, session->backend->redis_object);
1150 		g_string_append (key, "_count");
1151 		session->argv[cur_shift] = g_strdup ("INCR");
1152 		session->argv_lens[cur_shift++] = sizeof ("INCR") - 1;
1153 		session->argv[cur_shift] = key->str;
1154 		session->argv_lens[cur_shift++] = key->len;
1155 		g_string_free (key, FALSE);
1156 
1157 		if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1158 				2,
1159 				(const gchar **)&session->argv[cur_shift - 2],
1160 				&session->argv_lens[cur_shift - 2]) != REDIS_OK) {
1161 
1162 			return FALSE;
1163 		}
1164 	}
1165 	else if (cmd->cmd == FUZZY_DEL) {
1166 		/* DEL */
1167 		klen = strlen (session->backend->redis_object) +
1168 				sizeof (cmd->digest) + 1;
1169 
1170 		key = g_string_sized_new (klen);
1171 		g_string_append (key, session->backend->redis_object);
1172 		g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1173 		session->argv[cur_shift] = g_strdup ("DEL");
1174 		session->argv_lens[cur_shift++] = sizeof ("DEL") - 1;
1175 		session->argv[cur_shift] = key->str;
1176 		session->argv_lens[cur_shift++] = key->len;
1177 		g_string_free (key, FALSE);
1178 
1179 		if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1180 				2,
1181 				(const gchar **)&session->argv[cur_shift - 2],
1182 				&session->argv_lens[cur_shift - 2]) != REDIS_OK) {
1183 
1184 			return FALSE;
1185 		}
1186 
1187 		/* DECR */
1188 		key = g_string_sized_new (klen);
1189 		g_string_append (key, session->backend->redis_object);
1190 		g_string_append (key, "_count");
1191 		session->argv[cur_shift] = g_strdup ("DECR");
1192 		session->argv_lens[cur_shift++] = sizeof ("DECR") - 1;
1193 		session->argv[cur_shift] = key->str;
1194 		session->argv_lens[cur_shift++] = key->len;
1195 		g_string_free (key, FALSE);
1196 
1197 		if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1198 				2,
1199 				(const gchar **)&session->argv[cur_shift - 2],
1200 				&session->argv_lens[cur_shift - 2]) != REDIS_OK) {
1201 
1202 			return FALSE;
1203 		}
1204 	}
1205 	else if (cmd->cmd == FUZZY_REFRESH) {
1206 		/*
1207 		 * Issue refresh command by just EXPIRE command
1208 		 * EXPIRE <key> <expire>
1209 		 * Where <key> is <prefix> || <digest>
1210 		 */
1211 
1212 		klen = strlen (session->backend->redis_object) +
1213 			   sizeof (cmd->digest) + 1;
1214 
1215 		/* EXPIRE */
1216 		key = g_string_sized_new (klen);
1217 		g_string_append (key, session->backend->redis_object);
1218 		g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
1219 		value = g_string_sized_new (sizeof ("4294967296"));
1220 		rspamd_printf_gstring (value, "%d",
1221 				(gint)rspamd_fuzzy_backend_get_expire (bk));
1222 		session->argv[cur_shift] = g_strdup ("EXPIRE");
1223 		session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
1224 		session->argv[cur_shift] = key->str;
1225 		session->argv_lens[cur_shift++] = key->len;
1226 		session->argv[cur_shift] = value->str;
1227 		session->argv_lens[cur_shift++] = value->len;
1228 		g_string_free (key, FALSE);
1229 		g_string_free (value, FALSE);
1230 
1231 		if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1232 				3,
1233 				(const gchar **)&session->argv[cur_shift - 3],
1234 				&session->argv_lens[cur_shift - 3]) != REDIS_OK) {
1235 
1236 			return FALSE;
1237 		}
1238 	}
1239 	else if (cmd->cmd == FUZZY_DUP) {
1240 		/* Ignore */
1241 	}
1242 	else {
1243 		g_assert_not_reached ();
1244 	}
1245 
1246 	if (io_cmd->is_shingle) {
1247 		if (cmd->cmd == FUZZY_WRITE) {
1248 			klen = strlen (session->backend->redis_object) +
1249 							64 + 1;
1250 
1251 			for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
1252 				guchar *hval;
1253 				/*
1254 				 * For each command with shingles we additionally emit 32 commands:
1255 				 * SETEX <prefix>_<number>_<value> <expire> <digest>
1256 				 */
1257 
1258 				/* SETEX */
1259 				key = g_string_sized_new (klen);
1260 				rspamd_printf_gstring (key, "%s_%d_%uL",
1261 						session->backend->redis_object,
1262 						i,
1263 						io_cmd->cmd.shingle.sgl.hashes[i]);
1264 				value = g_string_sized_new (sizeof ("4294967296"));
1265 				rspamd_printf_gstring (value, "%d",
1266 						(gint)rspamd_fuzzy_backend_get_expire (bk));
1267 				hval = g_malloc (sizeof (io_cmd->cmd.shingle.basic.digest));
1268 				memcpy (hval, io_cmd->cmd.shingle.basic.digest,
1269 						sizeof (io_cmd->cmd.shingle.basic.digest));
1270 				session->argv[cur_shift] = g_strdup ("SETEX");
1271 				session->argv_lens[cur_shift++] = sizeof ("SETEX") - 1;
1272 				session->argv[cur_shift] = key->str;
1273 				session->argv_lens[cur_shift++] = key->len;
1274 				session->argv[cur_shift] = value->str;
1275 				session->argv_lens[cur_shift++] = value->len;
1276 				session->argv[cur_shift] = hval;
1277 				session->argv_lens[cur_shift++] = sizeof (io_cmd->cmd.shingle.basic.digest);
1278 				g_string_free (key, FALSE);
1279 				g_string_free (value, FALSE);
1280 
1281 				if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1282 						4,
1283 						(const gchar **)&session->argv[cur_shift - 4],
1284 						&session->argv_lens[cur_shift - 4]) != REDIS_OK) {
1285 
1286 					return FALSE;
1287 				}
1288 			}
1289 		}
1290 		else if (cmd->cmd == FUZZY_DEL) {
1291 			klen = strlen (session->backend->redis_object) +
1292 					64 + 1;
1293 
1294 			for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
1295 				key = g_string_sized_new (klen);
1296 				rspamd_printf_gstring (key, "%s_%d_%uL",
1297 						session->backend->redis_object,
1298 						i,
1299 						io_cmd->cmd.shingle.sgl.hashes[i]);
1300 				session->argv[cur_shift] = g_strdup ("DEL");
1301 				session->argv_lens[cur_shift++] = sizeof ("DEL") - 1;
1302 				session->argv[cur_shift] = key->str;
1303 				session->argv_lens[cur_shift++] = key->len;
1304 				g_string_free (key, FALSE);
1305 
1306 				if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1307 						2,
1308 						(const gchar **)&session->argv[cur_shift - 2],
1309 						&session->argv_lens[cur_shift - 2]) != REDIS_OK) {
1310 
1311 					return FALSE;
1312 				}
1313 			}
1314 		}
1315 		else if (cmd->cmd == FUZZY_REFRESH) {
1316 			klen = strlen (session->backend->redis_object) +
1317 				   64 + 1;
1318 
1319 			for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
1320 				/*
1321 				 * For each command with shingles we additionally emit 32 commands:
1322 				 * EXPIRE <prefix>_<number>_<value> <expire>
1323 				 */
1324 
1325 				/* Expire */
1326 				key = g_string_sized_new (klen);
1327 				rspamd_printf_gstring (key, "%s_%d_%uL",
1328 						session->backend->redis_object,
1329 						i,
1330 						io_cmd->cmd.shingle.sgl.hashes[i]);
1331 				value = g_string_sized_new (sizeof ("18446744073709551616"));
1332 				rspamd_printf_gstring (value, "%d",
1333 						(gint)rspamd_fuzzy_backend_get_expire (bk));
1334 				session->argv[cur_shift] = g_strdup ("EXPIRE");
1335 				session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
1336 				session->argv[cur_shift] = key->str;
1337 				session->argv_lens[cur_shift++] = key->len;
1338 				session->argv[cur_shift] = value->str;
1339 				session->argv_lens[cur_shift++] = value->len;
1340 				g_string_free (key, FALSE);
1341 				g_string_free (value, FALSE);
1342 
1343 				if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1344 						3,
1345 						(const gchar **)&session->argv[cur_shift - 3],
1346 						&session->argv_lens[cur_shift - 3]) != REDIS_OK) {
1347 
1348 					return FALSE;
1349 				}
1350 			}
1351 		}
1352 		else if (cmd->cmd == FUZZY_DUP) {
1353 			/* Ignore */
1354 		}
1355 		else {
1356 			g_assert_not_reached ();
1357 		}
1358 	}
1359 
1360 	*shift = cur_shift;
1361 
1362 	return TRUE;
1363 }
1364 
1365 static void
rspamd_fuzzy_redis_update_callback(redisAsyncContext * c,gpointer r,gpointer priv)1366 rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r,
1367 		gpointer priv)
1368 {
1369 	struct rspamd_fuzzy_redis_session *session = priv;
1370 	redisReply *reply = r;
1371 
1372 	ev_timer_stop (session->event_loop, &session->timeout);
1373 
1374 	if (c->err == 0 && reply != NULL) {
1375 		rspamd_upstream_ok (session->up);
1376 
1377 		if (reply->type == REDIS_REPLY_ARRAY) {
1378 			/* TODO: check all replies somehow */
1379 			if (session->callback.cb_update) {
1380 				session->callback.cb_update (TRUE,
1381 						session->nadded,
1382 						session->ndeleted,
1383 						session->nextended,
1384 						session->nignored,
1385 						session->cbdata);
1386 			}
1387 		}
1388 		else {
1389 			if (reply->type == REDIS_REPLY_ERROR) {
1390 				msg_err_redis_session ("fuzzy backend redis error: \"%s\"",
1391 						reply->str);
1392 			}
1393 			if (session->callback.cb_update) {
1394 				session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata);
1395 			}
1396 		}
1397 	}
1398 	else {
1399 		if (session->callback.cb_update) {
1400 			session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata);
1401 		}
1402 
1403 		if (c->errstr) {
1404 			msg_err_redis_session ("error sending update to redis %s: %s",
1405 					rspamd_inet_address_to_string_pretty (rspamd_upstream_addr_cur (session->up)),
1406 					c->errstr);
1407 			rspamd_upstream_fail (session->up, FALSE,  c->errstr);
1408 		}
1409 	}
1410 
1411 	rspamd_fuzzy_redis_session_dtor (session, FALSE);
1412 }
1413 
1414 void
rspamd_fuzzy_backend_update_redis(struct rspamd_fuzzy_backend * bk,GArray * updates,const gchar * src,rspamd_fuzzy_update_cb cb,void * ud,void * subr_ud)1415 rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
1416 		GArray *updates, const gchar *src,
1417 		rspamd_fuzzy_update_cb cb, void *ud,
1418 		void *subr_ud)
1419 {
1420 	struct rspamd_fuzzy_backend_redis *backend = subr_ud;
1421 	struct rspamd_fuzzy_redis_session *session;
1422 	struct upstream *up;
1423 	struct upstream_list *ups;
1424 	rspamd_inet_addr_t *addr;
1425 	guint i;
1426 	GString *key;
1427 	struct fuzzy_peer_cmd *io_cmd;
1428 	struct rspamd_fuzzy_cmd *cmd = NULL;
1429 	guint nargs, ncommands, cur_shift;
1430 
1431 	g_assert (backend != NULL);
1432 
1433 	ups = rspamd_redis_get_servers (backend, "write_servers");
1434 	if (!ups) {
1435 		if (cb) {
1436 			cb (FALSE, 0, 0, 0, 0, ud);
1437 		}
1438 
1439 		return;
1440 	}
1441 
1442 	session = g_malloc0 (sizeof (*session));
1443 	session->backend = backend;
1444 	REF_RETAIN (session->backend);
1445 
1446 	/*
1447 	 * For each normal hash addition we do 3 redis commands:
1448 	 * HSET <key> F <flag>
1449 	 * HINCRBY <key> V <weight>
1450 	 * EXPIRE <key> <expire>
1451 	 * INCR <prefix||fuzzy_count>
1452 	 *
1453 	 * Where <key> is <prefix> || <digest>
1454 	 *
1455 	 * For each command with shingles we additionally emit 32 commands:
1456 	 * SETEX <prefix>_<number>_<value> <expire> <digest>
1457 	 *
1458 	 * For each delete command we emit:
1459 	 * DEL <key>
1460 	 *
1461 	 * For each delete command with shingles we emit also 32 commands:
1462 	 * DEL <prefix>_<number>_<value>
1463 	 * DECR <prefix||fuzzy_count>
1464 	 */
1465 
1466 	ncommands = 3; /* For MULTI + EXEC + INCR <src> */
1467 	nargs = 4;
1468 
1469 	for (i = 0; i < updates->len; i ++) {
1470 		io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
1471 
1472 		if (io_cmd->is_shingle) {
1473 			cmd = &io_cmd->cmd.shingle.basic;
1474 		}
1475 		else {
1476 			cmd = &io_cmd->cmd.normal;
1477 		}
1478 
1479 		if (cmd->cmd == FUZZY_WRITE) {
1480 			ncommands += 5;
1481 			nargs += 17;
1482 			session->nadded ++;
1483 
1484 			if (io_cmd->is_shingle) {
1485 				ncommands += RSPAMD_SHINGLE_SIZE;
1486 				nargs += RSPAMD_SHINGLE_SIZE * 4;
1487 			}
1488 
1489 		}
1490 		else if (cmd->cmd == FUZZY_DEL) {
1491 			ncommands += 2;
1492 			nargs += 4;
1493 			session->ndeleted ++;
1494 
1495 			if (io_cmd->is_shingle) {
1496 				ncommands += RSPAMD_SHINGLE_SIZE;
1497 				nargs += RSPAMD_SHINGLE_SIZE * 2;
1498 			}
1499 		}
1500 		else if (cmd->cmd == FUZZY_REFRESH) {
1501 			ncommands += 1;
1502 			nargs += 3;
1503 			session->nextended ++;
1504 
1505 			if (io_cmd->is_shingle) {
1506 				ncommands += RSPAMD_SHINGLE_SIZE;
1507 				nargs += RSPAMD_SHINGLE_SIZE * 3;
1508 			}
1509 		}
1510 		else {
1511 			session->nignored ++;
1512 		}
1513 	}
1514 
1515 	/* Now we need to create a new request */
1516 	session->callback.cb_update = cb;
1517 	session->cbdata = ud;
1518 	session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES;
1519 	session->cmd = cmd;
1520 	session->prob = 1.0f;
1521 	session->event_loop = rspamd_fuzzy_backend_event_base (bk);
1522 
1523 	/* First of all check digest */
1524 	session->nargs = nargs;
1525 	session->argv = g_malloc0 (sizeof (gchar *) * session->nargs);
1526 	session->argv_lens = g_malloc0 (sizeof (gsize) * session->nargs);
1527 
1528 	up = rspamd_upstream_get (ups,
1529 			RSPAMD_UPSTREAM_MASTER_SLAVE,
1530 			NULL,
1531 			0);
1532 
1533 	session->up = rspamd_upstream_ref (up);
1534 	addr = rspamd_upstream_addr_next (up);
1535 	g_assert (addr != NULL);
1536 	session->ctx = rspamd_redis_pool_connect (backend->pool,
1537 			backend->dbname, backend->password,
1538 			rspamd_inet_address_to_string (addr),
1539 			rspamd_inet_address_get_port (addr));
1540 
1541 	if (session->ctx == NULL) {
1542 		rspamd_upstream_fail (up, TRUE,  strerror (errno));
1543 		rspamd_fuzzy_redis_session_dtor (session, TRUE);
1544 
1545 		if (cb) {
1546 			cb (FALSE, 0, 0, 0, 0, ud);
1547 		}
1548 	}
1549 	else {
1550 		/* Start with MULTI command */
1551 		session->argv[0] = g_strdup ("MULTI");
1552 		session->argv_lens[0] = 5;
1553 
1554 		if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1555 				1,
1556 				(const gchar **)session->argv,
1557 				session->argv_lens) != REDIS_OK) {
1558 
1559 			if (cb) {
1560 				cb (FALSE, 0, 0, 0, 0, ud);
1561 			}
1562 			rspamd_fuzzy_redis_session_dtor (session, TRUE);
1563 
1564 			return;
1565 		}
1566 
1567 		/* Now split the rest of commands in packs and emit them command by command */
1568 		cur_shift = 1;
1569 
1570 		for (i = 0; i < updates->len; i ++) {
1571 			io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
1572 
1573 			if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd,
1574 					&cur_shift)) {
1575 				if (cb) {
1576 					cb (FALSE, 0, 0, 0, 0, ud);
1577 				}
1578 				rspamd_fuzzy_redis_session_dtor (session, TRUE);
1579 
1580 				return;
1581 			}
1582 		}
1583 
1584 		/* Now INCR command for the source */
1585 		key = g_string_new (backend->redis_object);
1586 		g_string_append (key, src);
1587 		session->argv[cur_shift] = g_strdup ("INCR");
1588 		session->argv_lens[cur_shift ++] = 4;
1589 		session->argv[cur_shift] = key->str;
1590 		session->argv_lens[cur_shift ++] = key->len;
1591 		g_string_free (key, FALSE);
1592 
1593 		if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
1594 				2,
1595 				(const gchar **)&session->argv[cur_shift - 2],
1596 				&session->argv_lens[cur_shift - 2]) != REDIS_OK) {
1597 
1598 			if (cb) {
1599 				cb (FALSE, 0, 0, 0, 0, ud);
1600 			}
1601 			rspamd_fuzzy_redis_session_dtor (session, TRUE);
1602 
1603 			return;
1604 		}
1605 
1606 		/* Finally we call EXEC with a specific callback */
1607 		session->argv[cur_shift] = g_strdup ("EXEC");
1608 		session->argv_lens[cur_shift] = 4;
1609 
1610 		if (redisAsyncCommandArgv (session->ctx,
1611 				rspamd_fuzzy_redis_update_callback, session,
1612 				1,
1613 				(const gchar **)&session->argv[cur_shift],
1614 				&session->argv_lens[cur_shift]) != REDIS_OK) {
1615 
1616 			if (cb) {
1617 				cb (FALSE, 0, 0, 0, 0, ud);
1618 			}
1619 			rspamd_fuzzy_redis_session_dtor (session, TRUE);
1620 
1621 			return;
1622 		}
1623 		else {
1624 			/* Add timeout */
1625 			session->timeout.data = session;
1626 			ev_now_update_if_cheap ((struct ev_loop *)session->event_loop);
1627 			ev_timer_init (&session->timeout,
1628 					rspamd_fuzzy_redis_timeout,
1629 					session->backend->timeout, 0.0);
1630 			ev_timer_start (session->event_loop, &session->timeout);
1631 		}
1632 	}
1633 }
1634 
1635 void
rspamd_fuzzy_backend_close_redis(struct rspamd_fuzzy_backend * bk,void * subr_ud)1636 rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk,
1637 		void *subr_ud)
1638 {
1639 	struct rspamd_fuzzy_backend_redis *backend = subr_ud;
1640 
1641 	g_assert (backend != NULL);
1642 
1643 	/*
1644 	 * XXX: we leak lua registry element there to avoid crashing
1645 	 * due to chicken-egg problem between lua state termination and
1646 	 * redis pool termination.
1647 	 * Here, we assume that redis pool is destroyed AFTER lua_state,
1648 	 * so all connections pending will release references but due to
1649 	 * `terminated` hack they will not try to access Lua stuff
1650 	 * This is enabled merely if we have connections pending (e.g. refcount > 1)
1651 	 */
1652 	if (backend->ref.refcount > 1) {
1653 		backend->terminated = true;
1654 	}
1655 	REF_RELEASE (backend);
1656 }
1657