1 /*-
2  * Copyright 2016 Vsevolod Stakhov
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "config.h"
17 #include "rspamd.h"
18 #include "stat_internal.h"
19 #include "upstream.h"
20 #include "lua/lua_common.h"
21 #include "libserver/mempool_vars_internal.h"
22 #include "hiredis.h"
23 #include "adapters/libev.h"
24 #include "ref.h"
25 
26 #define msg_debug_stat_redis(...)  rspamd_conditional_debug_fast (NULL, NULL, \
27         rspamd_stat_redis_log_id, "stat_redis", task->task_pool->tag.uid, \
28         G_STRFUNC, \
29         __VA_ARGS__)
30 
31 INIT_LOG_MODULE(stat_redis)
32 
33 #define REDIS_CTX(p) (struct redis_stat_ctx *)(p)
34 #define REDIS_RUNTIME(p) (struct redis_stat_runtime *)(p)
35 #define REDIS_BACKEND_TYPE "redis"
36 #define REDIS_DEFAULT_PORT 6379
37 #define REDIS_DEFAULT_OBJECT "%s%l"
38 #define REDIS_DEFAULT_USERS_OBJECT "%s%l%r"
39 #define REDIS_DEFAULT_TIMEOUT 0.5
40 #define REDIS_STAT_TIMEOUT 30
41 
42 struct redis_stat_ctx {
43 	lua_State *L;
44 	struct rspamd_statfile_config *stcf;
45 	gint conf_ref;
46 	struct rspamd_stat_async_elt *stat_elt;
47 	const gchar *redis_object;
48 	const gchar *password;
49 	const gchar *dbname;
50 	gdouble timeout;
51 	gboolean enable_users;
52 	gboolean store_tokens;
53 	gboolean new_schema;
54 	gboolean enable_signatures;
55 	guint expiry;
56 	gint cbref_user;
57 };
58 
59 enum rspamd_redis_connection_state {
60 	RSPAMD_REDIS_DISCONNECTED = 0,
61 	RSPAMD_REDIS_CONNECTED,
62 	RSPAMD_REDIS_REQUEST_SENT,
63 	RSPAMD_REDIS_TIMEDOUT,
64 	RSPAMD_REDIS_TERMINATED
65 };
66 
67 struct redis_stat_runtime {
68 	struct redis_stat_ctx *ctx;
69 	struct rspamd_task *task;
70 	struct upstream *selected;
71 	ev_timer timeout_event;
72 	GArray *results;
73 	GPtrArray *tokens;
74 	struct rspamd_statfile_config *stcf;
75 	gchar *redis_object_expanded;
76 	redisAsyncContext *redis;
77 	guint64 learned;
78 	gint id;
79 	gboolean has_event;
80 	GError *err;
81 };
82 
83 /* Used to get statistics from redis */
84 struct rspamd_redis_stat_cbdata;
85 
86 struct rspamd_redis_stat_elt {
87 	struct redis_stat_ctx *ctx;
88 	struct rspamd_stat_async_elt *async;
89 	struct ev_loop *event_loop;
90 	ucl_object_t *stat;
91 	struct rspamd_redis_stat_cbdata *cbdata;
92 };
93 
94 struct rspamd_redis_stat_cbdata {
95 	struct rspamd_redis_stat_elt *elt;
96 	redisAsyncContext *redis;
97 	ucl_object_t *cur;
98 	GPtrArray *cur_keys;
99 	struct upstream *selected;
100 	guint inflight;
101 	gboolean wanna_die;
102 };
103 
104 #define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
105 
106 static const gchar *M = "redis statistics";
107 
108 static GQuark
rspamd_redis_stat_quark(void)109 rspamd_redis_stat_quark (void)
110 {
111 	return g_quark_from_static_string (M);
112 }
113 
114 static inline struct upstream_list *
rspamd_redis_get_servers(struct redis_stat_ctx * ctx,const gchar * what)115 rspamd_redis_get_servers (struct redis_stat_ctx *ctx,
116 						  const gchar *what)
117 {
118 	lua_State *L = ctx->L;
119 	struct upstream_list *res;
120 
121 	lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref);
122 	lua_pushstring (L, what);
123 	lua_gettable (L, -2);
124 	res = *((struct upstream_list**)lua_touserdata (L, -1));
125 	lua_settop (L, 0);
126 
127 	return res;
128 }
129 
130 /*
131  * Non-static for lua unit testing
132  */
133 gsize
rspamd_redis_expand_object(const gchar * pattern,struct redis_stat_ctx * ctx,struct rspamd_task * task,gchar ** target)134 rspamd_redis_expand_object (const gchar *pattern,
135 		struct redis_stat_ctx *ctx,
136 		struct rspamd_task *task,
137 		gchar **target)
138 {
139 	gsize tlen = 0;
140 	const gchar *p = pattern, *elt;
141 	gchar *d, *end;
142 	enum  {
143 		just_char,
144 		percent_char,
145 		mod_char
146 	} state = just_char;
147 	struct rspamd_statfile_config *stcf;
148 	lua_State *L = NULL;
149 	struct rspamd_task **ptask;
150 	const gchar *rcpt = NULL;
151 	gint err_idx;
152 
153 	g_assert (ctx != NULL);
154 	g_assert (task != NULL);
155 	stcf = ctx->stcf;
156 
157 	L = task->cfg->lua_state;
158 	g_assert (L != NULL);
159 
160 	if (ctx->enable_users) {
161 		if (ctx->cbref_user == -1) {
162 			rcpt = rspamd_task_get_principal_recipient (task);
163 		}
164 		else {
165 			/* Execute lua function to get userdata */
166 			lua_pushcfunction (L, &rspamd_lua_traceback);
167 			err_idx = lua_gettop (L);
168 
169 			lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_user);
170 			ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
171 			*ptask = task;
172 			rspamd_lua_setclass (L, "rspamd{task}", -1);
173 
174 			if (lua_pcall (L, 1, 1, err_idx) != 0) {
175 				msg_err_task ("call to user extraction script failed: %s",
176 						lua_tostring (L, -1));
177 			}
178 			else {
179 				rcpt = rspamd_mempool_strdup (task->task_pool, lua_tostring (L, -1));
180 			}
181 
182 			/* Result + error function */
183 			lua_settop (L, err_idx - 1);
184 		}
185 
186 		if (rcpt) {
187 			rspamd_mempool_set_variable (task->task_pool, "stat_user",
188 					(gpointer)rcpt, NULL);
189 		}
190 	}
191 
192 	/* Length calculation */
193 	while (*p) {
194 		switch (state) {
195 		case just_char:
196 			if (*p == '%') {
197 				state = percent_char;
198 			}
199 			else {
200 				tlen ++;
201 			}
202 			p ++;
203 			break;
204 		case percent_char:
205 			switch (*p) {
206 			case '%':
207 				tlen ++;
208 				state = just_char;
209 				break;
210 			case 'u':
211 				elt = GET_TASK_ELT (task, user);
212 				if (elt) {
213 					tlen += strlen (elt);
214 				}
215 				break;
216 			case 'r':
217 
218 				if (rcpt == NULL) {
219 					elt = rspamd_task_get_principal_recipient (task);
220 				}
221 				else {
222 					elt = rcpt;
223 				}
224 
225 				if (elt) {
226 					tlen += strlen (elt);
227 				}
228 				break;
229 			case 'l':
230 				if (stcf->label) {
231 					tlen += strlen (stcf->label);
232 				}
233 				/* Label miss is OK */
234 				break;
235 			case 's':
236 				if (ctx->new_schema) {
237 					tlen += sizeof ("RS") - 1;
238 				}
239 				else {
240 					if (stcf->symbol) {
241 						tlen += strlen (stcf->symbol);
242 					}
243 				}
244 				break;
245 			default:
246 				state = just_char;
247 				tlen ++;
248 				break;
249 			}
250 
251 			if (state == percent_char) {
252 				state = mod_char;
253 			}
254 			p ++;
255 			break;
256 
257 		case mod_char:
258 			switch (*p) {
259 			case 'd':
260 				p ++;
261 				state = just_char;
262 				break;
263 			default:
264 				state = just_char;
265 				break;
266 			}
267 			break;
268 		}
269 	}
270 
271 
272 	if (target == NULL) {
273 		return -1;
274 	}
275 
276 	*target = rspamd_mempool_alloc (task->task_pool, tlen + 1);
277 	d = *target;
278 	end = d + tlen + 1;
279 	d[tlen] = '\0';
280 	p = pattern;
281 	state = just_char;
282 
283 	/* Expand string */
284 	while (*p && d < end) {
285 		switch (state) {
286 		case just_char:
287 			if (*p == '%') {
288 				state = percent_char;
289 			}
290 			else {
291 				*d++ = *p;
292 			}
293 			p ++;
294 			break;
295 		case percent_char:
296 			switch (*p) {
297 			case '%':
298 				*d++ = *p;
299 				state = just_char;
300 				break;
301 			case 'u':
302 				elt = GET_TASK_ELT (task, user);
303 				if (elt) {
304 					d += rspamd_strlcpy (d, elt, end - d);
305 				}
306 				break;
307 			case 'r':
308 				if (rcpt == NULL) {
309 					elt = rspamd_task_get_principal_recipient (task);
310 				}
311 				else {
312 					elt = rcpt;
313 				}
314 
315 				if (elt) {
316 					d += rspamd_strlcpy (d, elt, end - d);
317 				}
318 				break;
319 			case 'l':
320 				if (stcf->label) {
321 					d += rspamd_strlcpy (d, stcf->label, end - d);
322 				}
323 				break;
324 			case 's':
325 				if (ctx->new_schema) {
326 					d += rspamd_strlcpy (d, "RS", end - d);
327 				}
328 				else {
329 					if (stcf->symbol) {
330 						d += rspamd_strlcpy (d, stcf->symbol, end - d);
331 					}
332 				}
333 				break;
334 			default:
335 				state = just_char;
336 				*d++ = *p;
337 				break;
338 			}
339 
340 			if (state == percent_char) {
341 				state = mod_char;
342 			}
343 			p ++;
344 			break;
345 
346 		case mod_char:
347 			switch (*p) {
348 			case 'd':
349 				/* TODO: not supported yet */
350 				p ++;
351 				state = just_char;
352 				break;
353 			default:
354 				state = just_char;
355 				break;
356 			}
357 			break;
358 		}
359 	}
360 
361 	return tlen;
362 }
363 
364 static void
rspamd_redis_maybe_auth(struct redis_stat_ctx * ctx,redisAsyncContext * redis)365 rspamd_redis_maybe_auth (struct redis_stat_ctx *ctx, redisAsyncContext *redis)
366 {
367 	if (ctx->password) {
368 		redisAsyncCommand (redis, NULL, NULL, "AUTH %s", ctx->password);
369 	}
370 	if (ctx->dbname) {
371 		redisAsyncCommand (redis, NULL, NULL, "SELECT %s", ctx->dbname);
372 	}
373 }
374 
375 // the `b` conversion type character is unknown to gcc
376 #ifdef __GNUC__
377 #pragma GCC diagnostic push
378 #pragma GCC diagnostic ignored "-Wformat"
379 #pragma GCC diagnostic ignored "-Wformat-extra-args"
380 #endif
381 static rspamd_fstring_t *
rspamd_redis_tokens_to_query(struct rspamd_task * task,struct redis_stat_runtime * rt,GPtrArray * tokens,const gchar * command,const gchar * prefix,gboolean learn,gint idx,gboolean intvals)382 rspamd_redis_tokens_to_query (struct rspamd_task *task,
383 		struct redis_stat_runtime *rt,
384 		GPtrArray *tokens,
385 		const gchar *command,
386 		const gchar *prefix,
387 		gboolean learn,
388 		gint idx,
389 		gboolean intvals)
390 {
391 	rspamd_fstring_t *out;
392 	rspamd_token_t *tok;
393 	gchar n0[512], n1[64];
394 	guint i, l0, l1, cmd_len, prefix_len;
395 	gint ret;
396 
397 	g_assert (tokens != NULL);
398 
399 	cmd_len = strlen (command);
400 	prefix_len = strlen (prefix);
401 	out = rspamd_fstring_sized_new (1024);
402 
403 	if (learn) {
404 		rspamd_printf_fstring (&out, "*1\r\n$5\r\nMULTI\r\n");
405 
406 		ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
407 				out->str, out->len);
408 
409 		if (ret != REDIS_OK) {
410 			msg_err_task ("call to redis failed: %s", rt->redis->errstr);
411 			rspamd_fstring_free (out);
412 
413 			return NULL;
414 		}
415 
416 		out->len = 0;
417 	}
418 	else {
419 		if (rt->ctx->new_schema) {
420 			/* Multi + HGET */
421 			rspamd_printf_fstring (&out, "*1\r\n$5\r\nMULTI\r\n");
422 
423 			ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
424 					out->str, out->len);
425 
426 			if (ret != REDIS_OK) {
427 				msg_err_task ("call to redis failed: %s", rt->redis->errstr);
428 				rspamd_fstring_free (out);
429 
430 				return NULL;
431 			}
432 
433 			out->len = 0;
434 		}
435 		else {
436 			rspamd_printf_fstring (&out, ""
437 							"*%d\r\n"
438 							"$%d\r\n"
439 							"%s\r\n"
440 							"$%d\r\n"
441 							"%s\r\n",
442 					(tokens->len + 2),
443 					cmd_len, command,
444 					prefix_len, prefix);
445 		}
446 	}
447 
448 	for (i = 0; i < tokens->len; i ++) {
449 		tok = g_ptr_array_index (tokens, i);
450 
451 		if (learn) {
452 			if (intvals) {
453 				l1 = rspamd_snprintf (n1, sizeof (n1), "%L",
454 						(gint64) tok->values[idx]);
455 			} else {
456 				l1 = rspamd_snprintf (n1, sizeof (n1), "%f",
457 						tok->values[idx]);
458 			}
459 
460 			if (rt->ctx->new_schema) {
461 				/*
462 				 * HINCRBY <prefix_token> <0|1> <value>
463 				 */
464 				l0 = rspamd_snprintf (n0, sizeof (n0), "%*s_%uL",
465 						prefix_len, prefix,
466 						tok->data);
467 
468 				rspamd_printf_fstring (&out, ""
469 								"*4\r\n"
470 								"$%d\r\n"
471 								"%s\r\n"
472 								"$%d\r\n"
473 								"%s\r\n"
474 								"$%d\r\n"
475 								"%s\r\n"
476 								"$%d\r\n"
477 								"%s\r\n",
478 						cmd_len, command,
479 						l0, n0,
480 						1, rt->stcf->is_spam ? "S" : "H",
481 						l1, n1);
482 			}
483 			else {
484 				l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data);
485 
486 				/*
487 				 * HINCRBY <prefix> <token> <value>
488 				 */
489 				rspamd_printf_fstring (&out, ""
490 								"*4\r\n"
491 								"$%d\r\n"
492 								"%s\r\n"
493 								"$%d\r\n"
494 								"%s\r\n"
495 								"$%d\r\n"
496 								"%s\r\n"
497 								"$%d\r\n"
498 								"%s\r\n",
499 						cmd_len, command,
500 						prefix_len, prefix,
501 						l0, n0,
502 						l1, n1);
503 			}
504 
505 			ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
506 					out->str, out->len);
507 
508 			if (ret != REDIS_OK) {
509 				msg_err_task ("call to redis failed: %s", rt->redis->errstr);
510 				rspamd_fstring_free (out);
511 
512 				return NULL;
513 			}
514 
515 			if (rt->ctx->store_tokens) {
516 
517 				if (!rt->ctx->new_schema) {
518 					/*
519 					 * We store tokens in form
520 					 * HSET prefix_tokens <token_id> "token_string"
521 					 * ZINCRBY prefix_z 1.0 <token_id>
522 					 */
523 					if (tok->t1 && tok->t2) {
524 						redisAsyncCommand (rt->redis, NULL, NULL,
525 								"HSET %b_tokens %b %b:%b",
526 								prefix, (size_t) prefix_len,
527 								n0, (size_t) l0,
528 								tok->t1->stemmed.begin, tok->t1->stemmed.len,
529 								tok->t2->stemmed.begin, tok->t2->stemmed.len);
530 					} else if (tok->t1) {
531 						redisAsyncCommand (rt->redis, NULL, NULL,
532 								"HSET %b_tokens %b %b",
533 								prefix, (size_t) prefix_len,
534 								n0, (size_t) l0,
535 								tok->t1->stemmed.begin,
536 								tok->t1->stemmed.len);
537 					}
538 				}
539 				else {
540 					/*
541 					 * We store tokens in form
542 					 * HSET <token_id> "tokens" "token_string"
543 					 * ZINCRBY prefix_z 1.0 <token_id>
544 					 */
545 					if (tok->t1 && tok->t2) {
546 						redisAsyncCommand (rt->redis, NULL, NULL,
547 								"HSET %b %s %b:%b",
548 								n0, (size_t) l0,
549 								"tokens",
550 								tok->t1->stemmed.begin, tok->t1->stemmed.len,
551 								tok->t2->stemmed.begin, tok->t2->stemmed.len);
552 					} else if (tok->t1) {
553 						redisAsyncCommand (rt->redis, NULL, NULL,
554 								"HSET %b %s %b",
555 								n0, (size_t) l0,
556 								"tokens",
557 								tok->t1->stemmed.begin, tok->t1->stemmed.len);
558 					}
559 				}
560 
561 				redisAsyncCommand (rt->redis, NULL, NULL,
562 						"ZINCRBY %b_z %b %b",
563 						prefix, (size_t)prefix_len,
564 						n1, (size_t)l1,
565 						n0, (size_t)l0);
566 			}
567 
568 			if (rt->ctx->new_schema && rt->ctx->expiry > 0) {
569 				out->len = 0;
570 				l1 = rspamd_snprintf (n1, sizeof (n1), "%d",
571 						rt->ctx->expiry);
572 
573 				rspamd_printf_fstring (&out, ""
574 								"*3\r\n"
575 								"$6\r\n"
576 								"EXPIRE\r\n"
577 								"$%d\r\n"
578 								"%s\r\n"
579 								"$%d\r\n"
580 								"%s\r\n",
581 						l0, n0,
582 						l1, n1);
583 				redisAsyncFormattedCommand (rt->redis, NULL, NULL,
584 						out->str, out->len);
585 			}
586 
587 			out->len = 0;
588 		}
589 		else {
590 			if (rt->ctx->new_schema) {
591 				l0 = rspamd_snprintf (n0, sizeof (n0), "%*s_%uL",
592 						prefix_len, prefix,
593 						tok->data);
594 
595 				rspamd_printf_fstring (&out, ""
596 								"*3\r\n"
597 								"$%d\r\n"
598 								"%s\r\n"
599 								"$%d\r\n"
600 								"%s\r\n"
601 								"$%d\r\n"
602 								"%s\r\n",
603 						cmd_len, command,
604 						l0, n0,
605 						1, rt->stcf->is_spam ? "S" : "H");
606 
607 				ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
608 						out->str, out->len);
609 
610 				if (ret != REDIS_OK) {
611 					msg_err_task ("call to redis failed: %s", rt->redis->errstr);
612 					rspamd_fstring_free (out);
613 
614 					return NULL;
615 				}
616 
617 				out->len = 0;
618 			}
619 			else {
620 				l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data);
621 				rspamd_printf_fstring (&out, ""
622 						"$%d\r\n"
623 						"%s\r\n", l0, n0);
624 			}
625 		}
626 	}
627 
628 	if (!learn && rt->ctx->new_schema) {
629 		rspamd_printf_fstring (&out, "*1\r\n$4\r\nEXEC\r\n");
630 	}
631 
632 	return out;
633 }
634 #ifdef __GNUC__
635 #pragma GCC diagnostic pop
636 #endif
637 
638 static void
rspamd_redis_store_stat_signature(struct rspamd_task * task,struct redis_stat_runtime * rt,GPtrArray * tokens,const gchar * prefix)639 rspamd_redis_store_stat_signature (struct rspamd_task *task,
640 		struct redis_stat_runtime *rt,
641 		GPtrArray *tokens,
642 		const gchar *prefix)
643 {
644 	gchar *sig, keybuf[512], nbuf[64];
645 	rspamd_token_t *tok;
646 	guint i, blen, klen;
647 	rspamd_fstring_t *out;
648 
649 	sig = rspamd_mempool_get_variable (task->task_pool,
650 			RSPAMD_MEMPOOL_STAT_SIGNATURE);
651 
652 	if (sig == NULL) {
653 		msg_err_task ("cannot get bayes signature");
654 		return;
655 	}
656 
657 	out = rspamd_fstring_sized_new (1024);
658 	klen = rspamd_snprintf (keybuf, sizeof (keybuf), "%s_%s_%s",
659 			prefix, sig, rt->stcf->is_spam ? "S" : "H");
660 
661 	/* Cleanup key */
662 	rspamd_printf_fstring (&out, ""
663 					"*2\r\n"
664 					"$3\r\n"
665 					"DEL\r\n"
666 					"$%d\r\n"
667 					"%s\r\n",
668 			klen, keybuf);
669 	redisAsyncFormattedCommand (rt->redis, NULL, NULL,
670 			out->str, out->len);
671 	out->len = 0;
672 
673 	rspamd_printf_fstring (&out, ""
674 					"*%d\r\n"
675 					"$5\r\n"
676 					"LPUSH\r\n"
677 					"$%d\r\n"
678 					"%s\r\n",
679 			tokens->len + 2,
680 			klen, keybuf);
681 
682 	PTR_ARRAY_FOREACH (tokens, i, tok) {
683 		blen = rspamd_snprintf (nbuf, sizeof (nbuf), "%uL", tok->data);
684 		rspamd_printf_fstring (&out, ""
685 				"$%d\r\n"
686 				"%s\r\n", blen, nbuf);
687 	}
688 
689 	redisAsyncFormattedCommand (rt->redis, NULL, NULL,
690 			out->str, out->len);
691 	out->len = 0;
692 
693 	if (rt->ctx->expiry > 0) {
694 		out->len = 0;
695 		blen = rspamd_snprintf (nbuf, sizeof (nbuf), "%d",
696 				rt->ctx->expiry);
697 
698 		rspamd_printf_fstring (&out, ""
699 						"*3\r\n"
700 						"$6\r\n"
701 						"EXPIRE\r\n"
702 						"$%d\r\n"
703 						"%s\r\n"
704 						"$%d\r\n"
705 						"%s\r\n",
706 				klen, keybuf,
707 				blen, nbuf);
708 		redisAsyncFormattedCommand (rt->redis, NULL, NULL,
709 				out->str, out->len);
710 	}
711 
712 	rspamd_fstring_free (out);
713 }
714 
715 static void
rspamd_redis_async_cbdata_cleanup(struct rspamd_redis_stat_cbdata * cbdata)716 rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
717 {
718 	guint i;
719 	gchar *k;
720 
721 	if (cbdata && !cbdata->wanna_die) {
722 		/* Avoid double frees */
723 		cbdata->wanna_die = TRUE;
724 		redisAsyncFree (cbdata->redis);
725 
726 		for (i = 0; i < cbdata->cur_keys->len; i ++) {
727 			k = g_ptr_array_index (cbdata->cur_keys, i);
728 			g_free (k);
729 		}
730 
731 		g_ptr_array_free (cbdata->cur_keys, TRUE);
732 
733 		if (cbdata->elt) {
734 			cbdata->elt->cbdata = NULL;
735 			/* Re-enable parent event */
736 			cbdata->elt->async->enabled = TRUE;
737 
738 			/* Replace ucl object */
739 			if (cbdata->cur) {
740 				if (cbdata->elt->stat) {
741 					ucl_object_unref (cbdata->elt->stat);
742 				}
743 
744 				cbdata->elt->stat = cbdata->cur;
745 				cbdata->cur = NULL;
746 			}
747 		}
748 
749 		if (cbdata->cur) {
750 			ucl_object_unref (cbdata->cur);
751 		}
752 
753 		g_free (cbdata);
754 	}
755 }
756 
757 /* Called when we get number of learns for a specific key */
758 static void
rspamd_redis_stat_learns(redisAsyncContext * c,gpointer r,gpointer priv)759 rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
760 {
761 	struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
762 	struct rspamd_redis_stat_cbdata *cbdata;
763 	redisReply *reply = r;
764 	ucl_object_t *obj;
765 	gulong num = 0;
766 
767 	cbdata = redis_elt->cbdata;
768 
769 	if (cbdata == NULL || cbdata->wanna_die) {
770 		return;
771 	}
772 
773 	cbdata->inflight --;
774 
775 	if (c->err == 0 && r != NULL) {
776 		if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
777 			num = reply->integer;
778 		}
779 		else if (reply->type == REDIS_REPLY_STRING) {
780 			rspamd_strtoul (reply->str, reply->len, &num);
781 		}
782 
783 		obj = (ucl_object_t *) ucl_object_lookup (cbdata->cur, "revision");
784 		if (obj) {
785 			obj->value.iv += num;
786 		}
787 	}
788 
789 	if (cbdata->inflight == 0) {
790 		rspamd_redis_async_cbdata_cleanup (cbdata);
791 		redis_elt->cbdata = NULL;
792 	}
793 }
794 
795 /* Called when we get number of elements for a specific key */
796 static void
rspamd_redis_stat_key(redisAsyncContext * c,gpointer r,gpointer priv)797 rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
798 {
799 	struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
800 	struct rspamd_redis_stat_cbdata *cbdata;
801 	redisReply *reply = r;
802 	ucl_object_t *obj;
803 	glong num = 0;
804 
805 	cbdata = redis_elt->cbdata;
806 
807 	if (cbdata == NULL || cbdata->wanna_die) {
808 		return;
809 	}
810 
811 	cbdata->inflight --;
812 
813 	if (c->err == 0 && r != NULL) {
814 		if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
815 			num = reply->integer;
816 		}
817 		else if (reply->type == REDIS_REPLY_STRING) {
818 			rspamd_strtol (reply->str, reply->len, &num);
819 		}
820 
821 		if (num < 0) {
822 			msg_err ("bad learns count: %L", (gint64)num);
823 			num = 0;
824 		}
825 
826 		obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "used");
827 		if (obj) {
828 			obj->value.iv += num;
829 		}
830 
831 		obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "total");
832 		if (obj) {
833 			obj->value.iv += num;
834 		}
835 
836 		obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "size");
837 		if (obj) {
838 			/* Size of key + size of int64_t */
839 			obj->value.iv += num * (sizeof (G_STRINGIFY (G_MAXINT64)) +
840 					sizeof (guint64) + sizeof (gpointer));
841 		}
842 	}
843 
844 	if (cbdata->inflight == 0) {
845 		rspamd_redis_async_cbdata_cleanup (cbdata);
846 		redis_elt->cbdata = NULL;
847 	}
848 }
849 
850 /* Called when we have connected to the redis server and got keys to check */
851 static void
rspamd_redis_stat_keys(redisAsyncContext * c,gpointer r,gpointer priv)852 rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
853 {
854 	struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
855 	struct rspamd_redis_stat_cbdata *cbdata;
856 	redisReply *reply = r, *more_elt, *elts, *elt;
857 	gchar **pk, *k;
858 	guint i, processed = 0;
859 	gboolean more = false;
860 
861 	cbdata = redis_elt->cbdata;
862 
863 	if (cbdata == NULL || cbdata->wanna_die) {
864 		return;
865 	}
866 
867 	cbdata->inflight --;
868 
869 	if (c->err == 0 && r != NULL) {
870 		if (reply->type == REDIS_REPLY_ARRAY) {
871 			more_elt = reply->element[0];
872 			elts = reply->element[1];
873 
874 			if (more_elt != NULL && more_elt->str != NULL && strcmp (more_elt->str, "0") != 0) {
875 				more = true;
876 			}
877 
878 			/* Clear the existing stuff */
879 			PTR_ARRAY_FOREACH (cbdata->cur_keys, i, k) {
880 				if (k) {
881 					g_free (k);
882 				}
883 			}
884 
885 			g_ptr_array_set_size (cbdata->cur_keys, elts->elements);
886 
887 			for (i = 0; i < elts->elements; i ++) {
888 				elt = elts->element[i];
889 
890 				if (elt->type == REDIS_REPLY_STRING) {
891 					pk = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
892 					*pk = g_malloc (elt->len + 1);
893 					rspamd_strlcpy (*pk, elt->str, elt->len + 1);
894 					processed ++;
895 				}
896 				else {
897 					pk = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
898 					*pk = NULL;
899 				}
900 			}
901 
902 			if (processed) {
903 				PTR_ARRAY_FOREACH (cbdata->cur_keys, i, k) {
904 					if (k) {
905 						const gchar *learned_key = "learns";
906 
907 						if (cbdata->elt->ctx->new_schema) {
908 							if (cbdata->elt->ctx->stcf->is_spam) {
909 								learned_key = "learns_spam";
910 							}
911 							else {
912 								learned_key = "learns_ham";
913 							}
914 							redisAsyncCommand (cbdata->redis,
915 									rspamd_redis_stat_learns,
916 									redis_elt,
917 									"HGET %s %s",
918 									k, learned_key);
919 							cbdata->inflight += 1;
920 						}
921 						else {
922 							redisAsyncCommand (cbdata->redis,
923 									rspamd_redis_stat_key,
924 									redis_elt,
925 									"HLEN %s",
926 									k);
927 							redisAsyncCommand (cbdata->redis,
928 									rspamd_redis_stat_learns,
929 									redis_elt,
930 									"HGET %s %s",
931 									k, learned_key);
932 							cbdata->inflight += 2;
933 						}
934 					}
935 				}
936 			}
937 		}
938 
939 		if (more) {
940 			/* Get more stat keys */
941 			redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, redis_elt,
942 					"SSCAN %s_keys %s COUNT 1000",
943 					cbdata->elt->ctx->stcf->symbol, more_elt->str);
944 
945 			cbdata->inflight += 1;
946 		}
947 		else {
948 			/* Set up the required keys */
949 			ucl_object_insert_key (cbdata->cur,
950 					ucl_object_typed_new (UCL_INT), "revision", 0, false);
951 			ucl_object_insert_key (cbdata->cur,
952 					ucl_object_typed_new (UCL_INT), "used", 0, false);
953 			ucl_object_insert_key (cbdata->cur,
954 					ucl_object_typed_new (UCL_INT), "total", 0, false);
955 			ucl_object_insert_key (cbdata->cur,
956 					ucl_object_typed_new (UCL_INT), "size", 0, false);
957 			ucl_object_insert_key (cbdata->cur,
958 					ucl_object_fromstring (cbdata->elt->ctx->stcf->symbol),
959 					"symbol", 0, false);
960 			ucl_object_insert_key (cbdata->cur, ucl_object_fromstring ("redis"),
961 					"type", 0, false);
962 			ucl_object_insert_key (cbdata->cur, ucl_object_fromint (0),
963 					"languages", 0, false);
964 			ucl_object_insert_key (cbdata->cur, ucl_object_fromint (processed),
965 					"users", 0, false);
966 
967 			rspamd_upstream_ok (cbdata->selected);
968 
969 			if (cbdata->inflight == 0) {
970 				rspamd_redis_async_cbdata_cleanup (cbdata);
971 				redis_elt->cbdata = NULL;
972 			}
973 		}
974 	}
975 	else {
976 		if (c->errstr) {
977 			msg_err ("cannot get keys to gather stat: %s", c->errstr);
978 		}
979 		else {
980 			msg_err ("cannot get keys to gather stat: unknown error");
981 		}
982 
983 		rspamd_upstream_fail (cbdata->selected, FALSE, c->errstr);
984 		rspamd_redis_async_cbdata_cleanup (cbdata);
985 		redis_elt->cbdata = NULL;
986 	}
987 }
988 
989 static void
rspamd_redis_async_stat_cb(struct rspamd_stat_async_elt * elt,gpointer d)990 rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
991 {
992 	struct redis_stat_ctx *ctx;
993 	struct rspamd_redis_stat_elt *redis_elt = elt->ud;
994 	struct rspamd_redis_stat_cbdata *cbdata;
995 	rspamd_inet_addr_t *addr;
996 	struct upstream_list *ups;
997 	redisAsyncContext *redis_ctx;
998 	struct upstream *selected;
999 
1000 	g_assert (redis_elt != NULL);
1001 
1002 	ctx = redis_elt->ctx;
1003 
1004 	if (redis_elt->cbdata) {
1005 		/* We have some other process pending */
1006 		rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
1007 		redis_elt->cbdata = NULL;
1008 	}
1009 
1010 	/* Disable further events unless needed */
1011 	elt->enabled = FALSE;
1012 
1013 	ups = rspamd_redis_get_servers (ctx, "read_servers");
1014 
1015 	if (!ups) {
1016 		return;
1017 	}
1018 
1019 	selected = rspamd_upstream_get (ups,
1020 					RSPAMD_UPSTREAM_ROUND_ROBIN,
1021 					NULL,
1022 					0);
1023 
1024 	g_assert (selected != NULL);
1025 	addr = rspamd_upstream_addr_next (selected);
1026 	g_assert (addr != NULL);
1027 
1028 	if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
1029 		redis_ctx = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
1030 	}
1031 	else {
1032 		redis_ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr),
1033 				rspamd_inet_address_get_port (addr));
1034 	}
1035 
1036 	if (redis_ctx == NULL) {
1037 		msg_warn ("cannot connect to redis server %s: %s",
1038 				rspamd_inet_address_to_string_pretty (addr),
1039 				strerror (errno));
1040 
1041 		return;
1042 	}
1043 	else if (redis_ctx->err != REDIS_OK) {
1044 		msg_warn ("cannot connect to redis server %s: %s",
1045 				rspamd_inet_address_to_string_pretty (addr),
1046 				redis_ctx->errstr);
1047 		redisAsyncFree (redis_ctx);
1048 
1049 		return;
1050 	}
1051 
1052 	redisLibevAttach (redis_elt->event_loop, redis_ctx);
1053 	cbdata = g_malloc0 (sizeof (*cbdata));
1054 	cbdata->redis = redis_ctx;
1055 	cbdata->selected = selected;
1056 	cbdata->inflight = 1;
1057 	cbdata->cur = ucl_object_typed_new (UCL_OBJECT);
1058 	cbdata->elt = redis_elt;
1059 	cbdata->cur_keys = g_ptr_array_sized_new (1000);
1060 	redis_elt->cbdata = cbdata;
1061 
1062 	/* XXX: deal with timeouts maybe */
1063 	/* Get keys in redis that match our symbol */
1064 	rspamd_redis_maybe_auth (ctx, cbdata->redis);
1065 	redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, redis_elt,
1066 			"SSCAN %s_keys 0 COUNT 1000",
1067 			ctx->stcf->symbol);
1068 }
1069 
1070 static void
rspamd_redis_async_stat_fin(struct rspamd_stat_async_elt * elt,gpointer d)1071 rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d)
1072 {
1073 	struct rspamd_redis_stat_elt *redis_elt = elt->ud;
1074 
1075 	if (redis_elt->cbdata != NULL) {
1076 		rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
1077 		redis_elt->cbdata = NULL;
1078 	}
1079 }
1080 
1081 /* Called on connection termination */
1082 static void
rspamd_redis_fin(gpointer data)1083 rspamd_redis_fin (gpointer data)
1084 {
1085 	struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
1086 	redisAsyncContext *redis;
1087 
1088 	if (rt->has_event) {
1089 		/* Should not happen ! */
1090 		msg_err ("FIXME: this code path should not be reached!");
1091 		rspamd_session_remove_event (rt->task->s, NULL, rt);
1092 		rt->has_event = FALSE;
1093 	}
1094 	/* Stop timeout */
1095 	if (ev_can_stop (&rt->timeout_event)) {
1096 		ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
1097 	}
1098 
1099 	if (rt->tokens) {
1100 		g_ptr_array_unref (rt->tokens);
1101 		rt->tokens = NULL;
1102 	}
1103 
1104 	if (rt->redis) {
1105 		redis = rt->redis;
1106 		rt->redis = NULL;
1107 		/* This calls for all callbacks pending */
1108 		redisAsyncFree (redis);
1109 	}
1110 
1111 	if (rt->err) {
1112 		g_error_free (rt->err);
1113 	}
1114 }
1115 
1116 static void
rspamd_redis_timeout(EV_P_ ev_timer * w,int revents)1117 rspamd_redis_timeout (EV_P_ ev_timer *w, int revents)
1118 {
1119 	struct redis_stat_runtime *rt = REDIS_RUNTIME (w->data);
1120 	struct rspamd_task *task;
1121 	redisAsyncContext *redis;
1122 
1123 	task = rt->task;
1124 
1125 	msg_err_task_check ("connection to redis server %s timed out",
1126 			rspamd_upstream_name (rt->selected));
1127 
1128 	rspamd_upstream_fail (rt->selected, FALSE, "timeout");
1129 
1130 	if (rt->redis) {
1131 		redis = rt->redis;
1132 		rt->redis = NULL;
1133 		/* This calls for all callbacks pending */
1134 		redisAsyncFree (redis);
1135 	}
1136 
1137 	if (rt->tokens) {
1138 		g_ptr_array_unref (rt->tokens);
1139 		rt->tokens = NULL;
1140 	}
1141 
1142 	if (!rt->err) {
1143 		g_set_error (&rt->err, rspamd_redis_stat_quark (), ETIMEDOUT,
1144 				"error getting reply from redis server %s: timeout",
1145 				rspamd_upstream_name (rt->selected));
1146 	}
1147 	if (rt->has_event) {
1148 		rt->has_event = FALSE;
1149 		rspamd_session_remove_event (task->s, NULL, rt);
1150 	}
1151 }
1152 
1153 /* Called when we have received tokens values from redis */
1154 static void
rspamd_redis_processed(redisAsyncContext * c,gpointer r,gpointer priv)1155 rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
1156 {
1157 	struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
1158 	redisReply *reply = r, *elt;
1159 	struct rspamd_task *task;
1160 	rspamd_token_t *tok;
1161 	guint i, processed = 0, found = 0;
1162 	gulong val;
1163 	gdouble float_val;
1164 
1165 	task = rt->task;
1166 
1167 	if (c->err == 0 && rt->has_event) {
1168 		if (r != NULL) {
1169 			if (reply->type == REDIS_REPLY_ARRAY) {
1170 
1171 				if (reply->elements == task->tokens->len) {
1172 					for (i = 0; i < reply->elements; i ++) {
1173 						tok = g_ptr_array_index (task->tokens, i);
1174 						elt = reply->element[i];
1175 
1176 						if (G_UNLIKELY (elt->type == REDIS_REPLY_INTEGER)) {
1177 							tok->values[rt->id] = elt->integer;
1178 							found ++;
1179 						}
1180 						else if (elt->type == REDIS_REPLY_STRING) {
1181 							if (rt->stcf->clcf->flags &
1182 								RSPAMD_FLAG_CLASSIFIER_INTEGER) {
1183 								rspamd_strtoul (elt->str, elt->len, &val);
1184 								tok->values[rt->id] = val;
1185 							}
1186 							else {
1187 								float_val = strtof (elt->str, NULL);
1188 								tok->values[rt->id] = float_val;
1189 							}
1190 
1191 							found ++;
1192 						}
1193 						else {
1194 							tok->values[rt->id] = 0;
1195 						}
1196 
1197 						processed ++;
1198 					}
1199 
1200 					if (rt->stcf->is_spam) {
1201 						task->flags |= RSPAMD_TASK_FLAG_HAS_SPAM_TOKENS;
1202 					}
1203 					else {
1204 						task->flags |= RSPAMD_TASK_FLAG_HAS_HAM_TOKENS;
1205 					}
1206 				}
1207 				else {
1208 					msg_err_task_check ("got invalid length of reply vector from redis: "
1209 										"%d, expected: %d",
1210 							(gint)reply->elements,
1211 							(gint)task->tokens->len);
1212 				}
1213 			}
1214 			else {
1215 				if (reply->type == REDIS_REPLY_ERROR) {
1216 					msg_err_task_check ("cannot learn %s: redis error: \"%s\"",
1217 							rt->stcf->symbol, reply->str);
1218 				}
1219 				else {
1220 					msg_err_task_check ("got invalid reply from redis: %s, array expected",
1221 							rspamd_redis_type_to_string(reply->type));
1222 				}
1223 			}
1224 
1225 			msg_debug_stat_redis ("received tokens for %s: %d processed, %d found",
1226 					rt->redis_object_expanded, processed, found);
1227 			rspamd_upstream_ok (rt->selected);
1228 		}
1229 	}
1230 	else {
1231 		msg_err_task ("error getting reply from redis server %s: %s",
1232 				rspamd_upstream_name (rt->selected), c->errstr);
1233 
1234 		if (rt->redis) {
1235 			rspamd_upstream_fail (rt->selected, FALSE, c->errstr);
1236 		}
1237 
1238 		if (!rt->err) {
1239 			g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
1240 					"cannot get values: error getting reply from redis server %s: %s",
1241 					rspamd_upstream_name (rt->selected), c->errstr);
1242 		}
1243 	}
1244 
1245 	if (rt->has_event) {
1246 		rt->has_event = FALSE;
1247 		rspamd_session_remove_event (task->s, NULL, rt);
1248 	}
1249 }
1250 
1251 /* Called when we have connected to the redis server and got stats */
1252 static void
rspamd_redis_connected(redisAsyncContext * c,gpointer r,gpointer priv)1253 rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
1254 {
1255 	struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
1256 	redisReply *reply = r;
1257 	struct rspamd_task *task;
1258 	glong val = 0;
1259 	gboolean final = TRUE;
1260 
1261 	task = rt->task;
1262 
1263 	if (c->err == 0 && rt->has_event) {
1264 		if (r != NULL) {
1265 			if (G_UNLIKELY (reply->type == REDIS_REPLY_INTEGER)) {
1266 				val = reply->integer;
1267 			}
1268 			else if (reply->type == REDIS_REPLY_STRING) {
1269 				rspamd_strtol (reply->str, reply->len, &val);
1270 			}
1271 			else {
1272 				if (reply->type != REDIS_REPLY_NIL) {
1273 					if (reply->type == REDIS_REPLY_ERROR) {
1274 						msg_err_task ("cannot learn %s: redis error: \"%s\"",
1275 								rt->stcf->symbol, reply->str);
1276 					}
1277 					else {
1278 						msg_err_task ("bad learned type for %s: %s, nil expected",
1279 								rt->stcf->symbol,
1280 								rspamd_redis_type_to_string(reply->type));
1281 					}
1282 				}
1283 
1284 				val = 0;
1285 			}
1286 
1287 			if (val < 0) {
1288 				msg_warn_task ("invalid number of learns for %s: %L",
1289 						rt->stcf->symbol, val);
1290 				val = 0;
1291 			}
1292 
1293 			rt->learned = val;
1294 			msg_debug_stat_redis ("connected to redis server, tokens learned for %s: %uL",
1295 					rt->redis_object_expanded, rt->learned);
1296 			rspamd_upstream_ok (rt->selected);
1297 
1298 			/* Save learn count in mempool variable */
1299 			gint64 *learns_cnt;
1300 			const gchar *var_name;
1301 
1302 			if (rt->stcf->is_spam) {
1303 				var_name = RSPAMD_MEMPOOL_SPAM_LEARNS;
1304 			}
1305 			else {
1306 				var_name = RSPAMD_MEMPOOL_HAM_LEARNS;
1307 			}
1308 
1309 			learns_cnt = rspamd_mempool_get_variable (task->task_pool,
1310 					var_name);
1311 
1312 			if (learns_cnt) {
1313 				(*learns_cnt) += rt->learned;
1314 			}
1315 			else {
1316 				learns_cnt = rspamd_mempool_alloc (task->task_pool,
1317 						sizeof (*learns_cnt));
1318 				*learns_cnt = rt->learned;
1319 				rspamd_mempool_set_variable (task->task_pool,
1320 						var_name,
1321 						learns_cnt, NULL);
1322 			}
1323 
1324 			if (rt->learned >= rt->stcf->clcf->min_learns && rt->learned > 0) {
1325 				rspamd_fstring_t *query = rspamd_redis_tokens_to_query (
1326 						task,
1327 						rt,
1328 						rt->tokens,
1329 						rt->ctx->new_schema ? "HGET" : "HMGET",
1330 						rt->redis_object_expanded, FALSE, -1,
1331 						rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
1332 				g_assert (query != NULL);
1333 				rspamd_mempool_add_destructor (task->task_pool,
1334 						(rspamd_mempool_destruct_t)rspamd_fstring_free, query);
1335 
1336 				int ret = redisAsyncFormattedCommand (rt->redis,
1337 						rspamd_redis_processed, rt,
1338 						query->str, query->len);
1339 
1340 				if (ret != REDIS_OK) {
1341 					msg_err_task ("call to redis failed: %s", rt->redis->errstr);
1342 				}
1343 				else {
1344 					/* Further is handled by rspamd_redis_processed */
1345 					final = FALSE;
1346 					/* Restart timeout */
1347 					if (ev_can_stop (&rt->timeout_event)) {
1348 						rt->timeout_event.repeat = rt->ctx->timeout;
1349 						ev_timer_again (task->event_loop, &rt->timeout_event);
1350 					}
1351 					else {
1352 						rt->timeout_event.data = rt;
1353 						ev_timer_init (&rt->timeout_event, rspamd_redis_timeout,
1354 								rt->ctx->timeout, 0.);
1355 						ev_timer_start (task->event_loop, &rt->timeout_event);
1356 					}
1357 				}
1358 			}
1359 			else {
1360 				msg_warn_task ("skip obtaining bayes tokens for %s of classifier "
1361 							   "%s: not enough learns %d; %d required",
1362 						rt->stcf->symbol, rt->stcf->clcf->name,
1363 						(int)rt->learned, rt->stcf->clcf->min_learns);
1364 			}
1365 		}
1366 	}
1367 	else if (rt->has_event) {
1368 		msg_err_task ("error getting reply from redis server %s: %s",
1369 				rspamd_upstream_name (rt->selected), c->errstr);
1370 		rspamd_upstream_fail (rt->selected, FALSE,  c->errstr);
1371 
1372 		if (!rt->err) {
1373 			g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
1374 					"error getting reply from redis server %s: %s",
1375 					rspamd_upstream_name (rt->selected), c->errstr);
1376 		}
1377 	}
1378 
1379 	if (final && rt->has_event) {
1380 		rt->has_event = FALSE;
1381 		rspamd_session_remove_event (task->s, NULL, rt);
1382 	}
1383 }
1384 
1385 /* Called when we have set tokens during learning */
1386 static void
rspamd_redis_learned(redisAsyncContext * c,gpointer r,gpointer priv)1387 rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
1388 {
1389 	struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
1390 	struct rspamd_task *task;
1391 
1392 	task = rt->task;
1393 
1394 	if (c->err == 0) {
1395 		rspamd_upstream_ok (rt->selected);
1396 	}
1397 	else {
1398 		msg_err_task_check ("error getting reply from redis server %s: %s",
1399 				rspamd_upstream_name (rt->selected), c->errstr);
1400 
1401 		if (rt->redis) {
1402 			rspamd_upstream_fail (rt->selected, FALSE, c->errstr);
1403 		}
1404 
1405 		if (!rt->err) {
1406 			g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
1407 					"cannot get learned: error getting reply from redis server %s: %s",
1408 					rspamd_upstream_name (rt->selected), c->errstr);
1409 		}
1410 	}
1411 
1412 	if (rt->has_event) {
1413 		rt->has_event = FALSE;
1414 		rspamd_session_remove_event (task->s, NULL, rt);
1415 	}
1416 }
1417 static void
rspamd_redis_parse_classifier_opts(struct redis_stat_ctx * backend,const ucl_object_t * obj,struct rspamd_config * cfg)1418 rspamd_redis_parse_classifier_opts (struct redis_stat_ctx *backend,
1419 		const ucl_object_t *obj,
1420 		struct rspamd_config *cfg)
1421 {
1422 	const gchar *lua_script;
1423 	const ucl_object_t *elt, *users_enabled;
1424 
1425 	users_enabled = ucl_object_lookup_any (obj, "per_user",
1426 			"users_enabled", NULL);
1427 
1428 	if (users_enabled != NULL) {
1429 		if (ucl_object_type (users_enabled) == UCL_BOOLEAN) {
1430 			backend->enable_users = ucl_object_toboolean (users_enabled);
1431 			backend->cbref_user = -1;
1432 		}
1433 		else if (ucl_object_type (users_enabled) == UCL_STRING) {
1434 			lua_script = ucl_object_tostring (users_enabled);
1435 
1436 			if (luaL_dostring (cfg->lua_state, lua_script) != 0) {
1437 				msg_err_config ("cannot execute lua script for users "
1438 						"extraction: %s", lua_tostring (cfg->lua_state, -1));
1439 			}
1440 			else {
1441 				if (lua_type (cfg->lua_state, -1) == LUA_TFUNCTION) {
1442 					backend->enable_users = TRUE;
1443 					backend->cbref_user = luaL_ref (cfg->lua_state,
1444 							LUA_REGISTRYINDEX);
1445 				}
1446 				else {
1447 					msg_err_config ("lua script must return "
1448 							"function(task) and not %s",
1449 							lua_typename (cfg->lua_state, lua_type (
1450 									cfg->lua_state, -1)));
1451 				}
1452 			}
1453 		}
1454 	}
1455 	else {
1456 		backend->enable_users = FALSE;
1457 		backend->cbref_user = -1;
1458 	}
1459 
1460 	elt = ucl_object_lookup (obj, "prefix");
1461 	if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
1462 		/* Default non-users statistics */
1463 		if (backend->enable_users || backend->cbref_user != -1) {
1464 			backend->redis_object = REDIS_DEFAULT_USERS_OBJECT;
1465 		}
1466 		else {
1467 			backend->redis_object = REDIS_DEFAULT_OBJECT;
1468 		}
1469 	}
1470 	else {
1471 		/* XXX: sanity check */
1472 		backend->redis_object = ucl_object_tostring (elt);
1473 	}
1474 
1475 	elt = ucl_object_lookup (obj, "store_tokens");
1476 	if (elt) {
1477 		backend->store_tokens = ucl_object_toboolean (elt);
1478 	}
1479 	else {
1480 		backend->store_tokens = FALSE;
1481 	}
1482 
1483 	elt = ucl_object_lookup (obj, "new_schema");
1484 	if (elt) {
1485 		backend->new_schema = ucl_object_toboolean (elt);
1486 	}
1487 	else {
1488 		backend->new_schema = FALSE;
1489 
1490 		msg_warn_config ("you are using old bayes schema for redis statistics, "
1491 				"please consider converting it to a new one "
1492 				"by using 'rspamadm configwizard statistics'");
1493 	}
1494 
1495 	elt = ucl_object_lookup (obj, "signatures");
1496 	if (elt) {
1497 		backend->enable_signatures = ucl_object_toboolean (elt);
1498 	}
1499 	else {
1500 		backend->enable_signatures = FALSE;
1501 	}
1502 
1503 	elt = ucl_object_lookup_any (obj, "expiry", "expire", NULL);
1504 	if (elt) {
1505 		backend->expiry = ucl_object_toint (elt);
1506 	}
1507 	else {
1508 		backend->expiry = 0;
1509 	}
1510 }
1511 
1512 gpointer
rspamd_redis_init(struct rspamd_stat_ctx * ctx,struct rspamd_config * cfg,struct rspamd_statfile * st)1513 rspamd_redis_init (struct rspamd_stat_ctx *ctx,
1514 		struct rspamd_config *cfg, struct rspamd_statfile *st)
1515 {
1516 	struct redis_stat_ctx *backend;
1517 	struct rspamd_statfile_config *stf = st->stcf;
1518 	struct rspamd_redis_stat_elt *st_elt;
1519 	const ucl_object_t *obj;
1520 	gboolean ret = FALSE;
1521 	gint conf_ref = -1;
1522 	lua_State *L = (lua_State *)cfg->lua_state;
1523 
1524 	backend = g_malloc0 (sizeof (*backend));
1525 	backend->L = L;
1526 	backend->timeout = REDIS_DEFAULT_TIMEOUT;
1527 
1528 	/* First search in backend configuration */
1529 	obj = ucl_object_lookup (st->classifier->cfg->opts, "backend");
1530 	if (obj != NULL && ucl_object_type (obj) == UCL_OBJECT) {
1531 		ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref);
1532 	}
1533 
1534 	/* Now try statfiles config */
1535 	if (!ret && stf->opts) {
1536 		ret = rspamd_lua_try_load_redis (L, stf->opts, cfg, &conf_ref);
1537 	}
1538 
1539 	/* Now try classifier config */
1540 	if (!ret && st->classifier->cfg->opts) {
1541 		ret = rspamd_lua_try_load_redis (L, st->classifier->cfg->opts, cfg, &conf_ref);
1542 	}
1543 
1544 	/* Now try global redis settings */
1545 	if (!ret) {
1546 		obj = ucl_object_lookup (cfg->rcl_obj, "redis");
1547 
1548 		if (obj) {
1549 			const ucl_object_t *specific_obj;
1550 
1551 			specific_obj = ucl_object_lookup (obj, "statistics");
1552 
1553 			if (specific_obj) {
1554 				ret = rspamd_lua_try_load_redis (L,
1555 						specific_obj, cfg, &conf_ref);
1556 			}
1557 			else {
1558 				ret = rspamd_lua_try_load_redis (L,
1559 						obj, cfg, &conf_ref);
1560 			}
1561 		}
1562 	}
1563 
1564 	if (!ret) {
1565 		msg_err_config ("cannot init redis backend for %s", stf->symbol);
1566 		g_free (backend);
1567 		return NULL;
1568 	}
1569 
1570 	backend->conf_ref = conf_ref;
1571 
1572 	/* Check some common table values */
1573 	lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref);
1574 
1575 	lua_pushstring (L, "timeout");
1576 	lua_gettable (L, -2);
1577 	if (lua_type (L, -1) == LUA_TNUMBER) {
1578 		backend->timeout = lua_tonumber (L, -1);
1579 	}
1580 	lua_pop (L, 1);
1581 
1582 	lua_pushstring (L, "db");
1583 	lua_gettable (L, -2);
1584 	if (lua_type (L, -1) == LUA_TSTRING) {
1585 		backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool,
1586 				lua_tostring (L, -1));
1587 	}
1588 	lua_pop (L, 1);
1589 
1590 	lua_pushstring (L, "password");
1591 	lua_gettable (L, -2);
1592 	if (lua_type (L, -1) == LUA_TSTRING) {
1593 		backend->password = rspamd_mempool_strdup (cfg->cfg_pool,
1594 				lua_tostring (L, -1));
1595 	}
1596 	lua_pop (L, 1);
1597 
1598 	lua_settop (L, 0);
1599 
1600 	rspamd_redis_parse_classifier_opts (backend, st->classifier->cfg->opts, cfg);
1601 	stf->clcf->flags |= RSPAMD_FLAG_CLASSIFIER_INCREMENTING_BACKEND;
1602 	backend->stcf = stf;
1603 
1604 	st_elt = g_malloc0 (sizeof (*st_elt));
1605 	st_elt->event_loop = ctx->event_loop;
1606 	st_elt->ctx = backend;
1607 	backend->stat_elt = rspamd_stat_ctx_register_async (
1608 			rspamd_redis_async_stat_cb,
1609 			rspamd_redis_async_stat_fin,
1610 			st_elt,
1611 			REDIS_STAT_TIMEOUT);
1612 	st_elt->async = backend->stat_elt;
1613 
1614 	return (gpointer)backend;
1615 }
1616 
1617 /*
1618  * This callback is called when Redis is disconnected somehow, and the structure
1619  * itself is usually freed by hiredis itself
1620  */
1621 static void
rspamd_stat_redis_on_disconnect(const struct redisAsyncContext * ac,int status)1622 rspamd_stat_redis_on_disconnect(const struct redisAsyncContext *ac, int status)
1623 {
1624 	struct redis_stat_runtime *rt = (struct redis_stat_runtime *)ac->data;
1625 
1626 	if (ev_can_stop (&rt->timeout_event)) {
1627 		ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
1628 	}
1629 	rt->redis = NULL;
1630 }
1631 
1632 static void
rspamd_stat_redis_on_connect(const struct redisAsyncContext * ac,int status)1633 rspamd_stat_redis_on_connect(const struct redisAsyncContext *ac, int status)
1634 {
1635 	struct redis_stat_runtime *rt = (struct redis_stat_runtime *)ac->data;
1636 
1637 
1638 	if (status == REDIS_ERR) {
1639 		/*
1640 		 * We also need to reset rt->redis as it will be subsequently freed without
1641 		 * calling for redis_on_disconnect callback...
1642 		 */
1643 		if (ev_can_stop (&rt->timeout_event)) {
1644 			ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
1645 		}
1646 		rt->redis = NULL;
1647 	}
1648 }
1649 
1650 gpointer
rspamd_redis_runtime(struct rspamd_task * task,struct rspamd_statfile_config * stcf,gboolean learn,gpointer c)1651 rspamd_redis_runtime (struct rspamd_task *task,
1652 		struct rspamd_statfile_config *stcf,
1653 		gboolean learn, gpointer c)
1654 {
1655 	struct redis_stat_ctx *ctx = REDIS_CTX (c);
1656 	struct redis_stat_runtime *rt;
1657 	struct upstream *up;
1658 	struct upstream_list *ups;
1659 	char *object_expanded = NULL;
1660 	rspamd_inet_addr_t *addr;
1661 
1662 	g_assert (ctx != NULL);
1663 	g_assert (stcf != NULL);
1664 
1665 	if (learn) {
1666 		ups = rspamd_redis_get_servers (ctx, "write_servers");
1667 
1668 		if (!ups) {
1669 			msg_err_task ("no write servers defined for %s, cannot learn",
1670 					stcf->symbol);
1671 			return NULL;
1672 		}
1673 		up = rspamd_upstream_get (ups,
1674 				RSPAMD_UPSTREAM_MASTER_SLAVE,
1675 				NULL,
1676 				0);
1677 	}
1678 	else {
1679 		ups = rspamd_redis_get_servers (ctx, "read_servers");
1680 
1681 		if (!ups) {
1682 			msg_err_task ("no read servers defined for %s, cannot stat",
1683 					stcf->symbol);
1684 			return NULL;
1685 		}
1686 		up = rspamd_upstream_get (ups,
1687 				RSPAMD_UPSTREAM_ROUND_ROBIN,
1688 				NULL,
1689 				0);
1690 	}
1691 
1692 	if (up == NULL) {
1693 		msg_err_task ("no upstreams reachable");
1694 		return NULL;
1695 	}
1696 
1697 	if (rspamd_redis_expand_object (ctx->redis_object, ctx, task,
1698 			&object_expanded) == 0) {
1699 		msg_err_task ("expansion for %s failed for symbol %s "
1700 				 "(maybe learning per user classifier with no user or recipient)",
1701 				 learn ? "learning" : "classifying",
1702 				 stcf->symbol);
1703 		return NULL;
1704 	}
1705 
1706 	rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
1707 	rt->selected = up;
1708 	rt->task = task;
1709 	rt->ctx = ctx;
1710 	rt->stcf = stcf;
1711 	rt->redis_object_expanded = object_expanded;
1712 
1713 	addr = rspamd_upstream_addr_next (up);
1714 	g_assert (addr != NULL);
1715 
1716 	if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
1717 		rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
1718 	}
1719 	else {
1720 		rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
1721 				rspamd_inet_address_get_port (addr));
1722 	}
1723 
1724 	if (rt->redis == NULL) {
1725 		msg_warn_task ("cannot connect to redis server %s: %s",
1726 				rspamd_inet_address_to_string_pretty (addr),
1727 				strerror (errno));
1728 		return NULL;
1729 	}
1730 	else if (rt->redis->err != REDIS_OK) {
1731 		msg_warn_task ("cannot connect to redis server %s: %s",
1732 				rspamd_inet_address_to_string_pretty (addr),
1733 				rt->redis->errstr);
1734 		redisAsyncFree (rt->redis);
1735 		rt->redis = NULL;
1736 
1737 		return NULL;
1738 	}
1739 
1740 	redisLibevAttach (task->event_loop, rt->redis);
1741 	rspamd_redis_maybe_auth (ctx, rt->redis);
1742 	rt->redis->data = rt;
1743 	redisAsyncSetDisconnectCallback (rt->redis, rspamd_stat_redis_on_disconnect);
1744 	redisAsyncSetConnectCallback (rt->redis, rspamd_stat_redis_on_connect);
1745 
1746 	rspamd_mempool_add_destructor (task->task_pool, rspamd_redis_fin, rt);
1747 
1748 	return rt;
1749 }
1750 
1751 void
rspamd_redis_close(gpointer p)1752 rspamd_redis_close (gpointer p)
1753 {
1754 	struct redis_stat_ctx *ctx = REDIS_CTX (p);
1755 	lua_State *L = ctx->L;
1756 
1757 	if (ctx->conf_ref) {
1758 		luaL_unref (L, LUA_REGISTRYINDEX, ctx->conf_ref);
1759 	}
1760 
1761 	g_free (ctx);
1762 }
1763 
1764 gboolean
rspamd_redis_process_tokens(struct rspamd_task * task,GPtrArray * tokens,gint id,gpointer p)1765 rspamd_redis_process_tokens (struct rspamd_task *task,
1766 		GPtrArray *tokens,
1767 		gint id, gpointer p)
1768 {
1769 	struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
1770 	const gchar *learned_key = "learns";
1771 
1772 	if (rspamd_session_blocked (task->s)) {
1773 		return FALSE;
1774 	}
1775 
1776 	if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) {
1777 		return FALSE;
1778 	}
1779 
1780 	rt->id = id;
1781 
1782 	if (rt->ctx->new_schema) {
1783 		if (rt->ctx->stcf->is_spam) {
1784 			learned_key = "learns_spam";
1785 		}
1786 		else {
1787 			learned_key = "learns_ham";
1788 		}
1789 	}
1790 
1791 	if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
1792 			rt->redis_object_expanded, learned_key) == REDIS_OK) {
1793 
1794 		rspamd_session_add_event (task->s, NULL, rt, M);
1795 		rt->has_event = TRUE;
1796 		rt->tokens = g_ptr_array_ref (tokens);
1797 
1798 		if (ev_can_stop (&rt->timeout_event)) {
1799 			rt->timeout_event.repeat = rt->ctx->timeout;
1800 			ev_timer_again (task->event_loop, &rt->timeout_event);
1801 		}
1802 		else {
1803 			rt->timeout_event.data = rt;
1804 			ev_timer_init (&rt->timeout_event, rspamd_redis_timeout,
1805 					rt->ctx->timeout, 0.);
1806 			ev_timer_start (task->event_loop, &rt->timeout_event);
1807 		}
1808 	}
1809 
1810 	return FALSE;
1811 }
1812 
1813 gboolean
rspamd_redis_finalize_process(struct rspamd_task * task,gpointer runtime,gpointer ctx)1814 rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime,
1815 		gpointer ctx)
1816 {
1817 	struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
1818 
1819 	if (rt->err) {
1820 		msg_info_task ("cannot retrieve stat tokens from Redis: %e", rt->err);
1821 		g_error_free (rt->err);
1822 		rt->err = NULL;
1823 		rspamd_redis_fin (rt);
1824 
1825 		return FALSE;
1826 	}
1827 
1828 	rspamd_redis_fin (rt);
1829 
1830 	return TRUE;
1831 }
1832 
1833 gboolean
rspamd_redis_learn_tokens(struct rspamd_task * task,GPtrArray * tokens,gint id,gpointer p)1834 rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
1835 		gint id, gpointer p)
1836 {
1837 	struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
1838 	rspamd_fstring_t *query;
1839 	const gchar *redis_cmd;
1840 	rspamd_token_t *tok;
1841 	gint ret;
1842 	goffset off;
1843 	const gchar *learned_key = "learns";
1844 
1845 	if (rspamd_session_blocked (task->s)) {
1846 		return FALSE;
1847 	}
1848 
1849 	if (rt->ctx->new_schema) {
1850 		if (rt->ctx->stcf->is_spam) {
1851 			learned_key = "learns_spam";
1852 		}
1853 		else {
1854 			learned_key = "learns_ham";
1855 		}
1856 	}
1857 
1858 	/*
1859 	 * Add the current key to the set of learned keys
1860 	 */
1861 	redisAsyncCommand (rt->redis, NULL, NULL, "SADD %s_keys %s",
1862 			rt->stcf->symbol, rt->redis_object_expanded);
1863 
1864 	if (rt->ctx->new_schema) {
1865 		redisAsyncCommand (rt->redis, NULL, NULL, "HSET %s version 2",
1866 				rt->redis_object_expanded);
1867 	}
1868 
1869 	if (rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER) {
1870 		redis_cmd = "HINCRBY";
1871 	}
1872 	else {
1873 		redis_cmd = "HINCRBYFLOAT";
1874 	}
1875 
1876 	rt->id = id;
1877 	query = rspamd_redis_tokens_to_query (task, rt, tokens,
1878 			redis_cmd, rt->redis_object_expanded, TRUE, id,
1879 			rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
1880 	g_assert (query != NULL);
1881 	query->len = 0;
1882 
1883 	/*
1884 	 * XXX:
1885 	 * Dirty hack: we get a token and check if it's value is -1 or 1, so
1886 	 * we could understand that we are learning or unlearning
1887 	 */
1888 
1889 	tok = g_ptr_array_index (task->tokens, 0);
1890 
1891 	if (tok->values[id] > 0) {
1892 		rspamd_printf_fstring (&query, ""
1893 				"*4\r\n"
1894 				"$7\r\n"
1895 				"HINCRBY\r\n"
1896 				"$%d\r\n"
1897 				"%s\r\n"
1898 				"$%d\r\n"
1899 				"%s\r\n" /* Learned key */
1900 				"$1\r\n"
1901 				"1\r\n",
1902 				(gint)strlen (rt->redis_object_expanded),
1903 				rt->redis_object_expanded,
1904 				(gint)strlen (learned_key),
1905 				learned_key);
1906 	}
1907 	else {
1908 		rspamd_printf_fstring (&query, ""
1909 				"*4\r\n"
1910 				"$7\r\n"
1911 				"HINCRBY\r\n"
1912 				"$%d\r\n"
1913 				"%s\r\n"
1914 				"$%d\r\n"
1915 				"%s\r\n" /* Learned key */
1916 				"$2\r\n"
1917 				"-1\r\n",
1918 				(gint)strlen (rt->redis_object_expanded),
1919 				rt->redis_object_expanded,
1920 				(gint)strlen (learned_key),
1921 				learned_key);
1922 	}
1923 
1924 	ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
1925 			query->str, query->len);
1926 
1927 	if (ret != REDIS_OK) {
1928 		msg_err_task ("call to redis failed: %s", rt->redis->errstr);
1929 		rspamd_fstring_free (query);
1930 
1931 		return FALSE;
1932 	}
1933 
1934 	off = query->len;
1935 	ret = rspamd_printf_fstring (&query, "*1\r\n$4\r\nEXEC\r\n");
1936 	ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_learned, rt,
1937 			query->str + off, ret);
1938 	rspamd_mempool_add_destructor (task->task_pool,
1939 			(rspamd_mempool_destruct_t)rspamd_fstring_free, query);
1940 
1941 	if (ret == REDIS_OK) {
1942 
1943 		/* Add signature if needed */
1944 		if (rt->ctx->enable_signatures) {
1945 			rspamd_redis_store_stat_signature (task, rt, tokens,
1946 					"RSIG");
1947 		}
1948 
1949 		rspamd_session_add_event (task->s, NULL, rt, M);
1950 		rt->has_event = TRUE;
1951 
1952 		/* Set timeout */
1953 		if (ev_can_stop (&rt->timeout_event)) {
1954 			rt->timeout_event.repeat = rt->ctx->timeout;
1955 			ev_timer_again (task->event_loop, &rt->timeout_event);
1956 		}
1957 		else {
1958 			rt->timeout_event.data = rt;
1959 			ev_timer_init (&rt->timeout_event, rspamd_redis_timeout,
1960 					rt->ctx->timeout, 0.);
1961 			ev_timer_start (task->event_loop, &rt->timeout_event);
1962 		}
1963 
1964 		return TRUE;
1965 	}
1966 	else {
1967 		msg_err_task ("call to redis failed: %s", rt->redis->errstr);
1968 	}
1969 
1970 	return FALSE;
1971 }
1972 
1973 
1974 gboolean
rspamd_redis_finalize_learn(struct rspamd_task * task,gpointer runtime,gpointer ctx,GError ** err)1975 rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime,
1976 		gpointer ctx, GError **err)
1977 {
1978 	struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
1979 
1980 	if (rt->err) {
1981 		g_propagate_error (err, rt->err);
1982 		rt->err = NULL;
1983 		rspamd_redis_fin (rt);
1984 
1985 		return FALSE;
1986 	}
1987 
1988 	rspamd_redis_fin (rt);
1989 
1990 	return TRUE;
1991 }
1992 
1993 gulong
rspamd_redis_total_learns(struct rspamd_task * task,gpointer runtime,gpointer ctx)1994 rspamd_redis_total_learns (struct rspamd_task *task, gpointer runtime,
1995 		gpointer ctx)
1996 {
1997 	struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
1998 
1999 	return rt->learned;
2000 }
2001 
2002 gulong
rspamd_redis_inc_learns(struct rspamd_task * task,gpointer runtime,gpointer ctx)2003 rspamd_redis_inc_learns (struct rspamd_task *task, gpointer runtime,
2004 		gpointer ctx)
2005 {
2006 	struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
2007 
2008 	/* XXX: may cause races */
2009 	return rt->learned + 1;
2010 }
2011 
2012 gulong
rspamd_redis_dec_learns(struct rspamd_task * task,gpointer runtime,gpointer ctx)2013 rspamd_redis_dec_learns (struct rspamd_task *task, gpointer runtime,
2014 		gpointer ctx)
2015 {
2016 	struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
2017 
2018 	/* XXX: may cause races */
2019 	return rt->learned + 1;
2020 }
2021 
2022 gulong
rspamd_redis_learns(struct rspamd_task * task,gpointer runtime,gpointer ctx)2023 rspamd_redis_learns (struct rspamd_task *task, gpointer runtime,
2024 		gpointer ctx)
2025 {
2026 	struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
2027 
2028 	return rt->learned;
2029 }
2030 
2031 ucl_object_t *
rspamd_redis_get_stat(gpointer runtime,gpointer ctx)2032 rspamd_redis_get_stat (gpointer runtime,
2033 		gpointer ctx)
2034 {
2035 	struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
2036 	struct rspamd_redis_stat_elt *st;
2037 	redisAsyncContext *redis;
2038 
2039 	if (rt->ctx->stat_elt) {
2040 		st = rt->ctx->stat_elt->ud;
2041 
2042 		if (rt->redis) {
2043 			redis = rt->redis;
2044 			rt->redis = NULL;
2045 			redisAsyncFree (redis);
2046 		}
2047 
2048 		if (st->stat) {
2049 			return ucl_object_ref (st->stat);
2050 		}
2051 	}
2052 
2053 	return NULL;
2054 }
2055 
2056 gpointer
rspamd_redis_load_tokenizer_config(gpointer runtime,gsize * len)2057 rspamd_redis_load_tokenizer_config (gpointer runtime,
2058 		gsize *len)
2059 {
2060 	return NULL;
2061 }
2062