1 /*-
2 * Copyright 2016 Vsevolod Stakhov
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "config.h"
17 #include "rspamd.h"
18 #include "stat_internal.h"
19 #include "upstream.h"
20 #include "lua/lua_common.h"
21 #include "libserver/mempool_vars_internal.h"
22 #include "hiredis.h"
23 #include "adapters/libev.h"
24 #include "ref.h"
25
26 #define msg_debug_stat_redis(...) rspamd_conditional_debug_fast (NULL, NULL, \
27 rspamd_stat_redis_log_id, "stat_redis", task->task_pool->tag.uid, \
28 G_STRFUNC, \
29 __VA_ARGS__)
30
31 INIT_LOG_MODULE(stat_redis)
32
33 #define REDIS_CTX(p) (struct redis_stat_ctx *)(p)
34 #define REDIS_RUNTIME(p) (struct redis_stat_runtime *)(p)
35 #define REDIS_BACKEND_TYPE "redis"
36 #define REDIS_DEFAULT_PORT 6379
37 #define REDIS_DEFAULT_OBJECT "%s%l"
38 #define REDIS_DEFAULT_USERS_OBJECT "%s%l%r"
39 #define REDIS_DEFAULT_TIMEOUT 0.5
40 #define REDIS_STAT_TIMEOUT 30
41
42 struct redis_stat_ctx {
43 lua_State *L;
44 struct rspamd_statfile_config *stcf;
45 gint conf_ref;
46 struct rspamd_stat_async_elt *stat_elt;
47 const gchar *redis_object;
48 const gchar *password;
49 const gchar *dbname;
50 gdouble timeout;
51 gboolean enable_users;
52 gboolean store_tokens;
53 gboolean new_schema;
54 gboolean enable_signatures;
55 guint expiry;
56 gint cbref_user;
57 };
58
59 enum rspamd_redis_connection_state {
60 RSPAMD_REDIS_DISCONNECTED = 0,
61 RSPAMD_REDIS_CONNECTED,
62 RSPAMD_REDIS_REQUEST_SENT,
63 RSPAMD_REDIS_TIMEDOUT,
64 RSPAMD_REDIS_TERMINATED
65 };
66
67 struct redis_stat_runtime {
68 struct redis_stat_ctx *ctx;
69 struct rspamd_task *task;
70 struct upstream *selected;
71 ev_timer timeout_event;
72 GArray *results;
73 GPtrArray *tokens;
74 struct rspamd_statfile_config *stcf;
75 gchar *redis_object_expanded;
76 redisAsyncContext *redis;
77 guint64 learned;
78 gint id;
79 gboolean has_event;
80 GError *err;
81 };
82
83 /* Used to get statistics from redis */
84 struct rspamd_redis_stat_cbdata;
85
86 struct rspamd_redis_stat_elt {
87 struct redis_stat_ctx *ctx;
88 struct rspamd_stat_async_elt *async;
89 struct ev_loop *event_loop;
90 ucl_object_t *stat;
91 struct rspamd_redis_stat_cbdata *cbdata;
92 };
93
94 struct rspamd_redis_stat_cbdata {
95 struct rspamd_redis_stat_elt *elt;
96 redisAsyncContext *redis;
97 ucl_object_t *cur;
98 GPtrArray *cur_keys;
99 struct upstream *selected;
100 guint inflight;
101 gboolean wanna_die;
102 };
103
104 #define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
105
106 static const gchar *M = "redis statistics";
107
108 static GQuark
rspamd_redis_stat_quark(void)109 rspamd_redis_stat_quark (void)
110 {
111 return g_quark_from_static_string (M);
112 }
113
114 static inline struct upstream_list *
rspamd_redis_get_servers(struct redis_stat_ctx * ctx,const gchar * what)115 rspamd_redis_get_servers (struct redis_stat_ctx *ctx,
116 const gchar *what)
117 {
118 lua_State *L = ctx->L;
119 struct upstream_list *res;
120
121 lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref);
122 lua_pushstring (L, what);
123 lua_gettable (L, -2);
124 res = *((struct upstream_list**)lua_touserdata (L, -1));
125 lua_settop (L, 0);
126
127 return res;
128 }
129
130 /*
131 * Non-static for lua unit testing
132 */
133 gsize
rspamd_redis_expand_object(const gchar * pattern,struct redis_stat_ctx * ctx,struct rspamd_task * task,gchar ** target)134 rspamd_redis_expand_object (const gchar *pattern,
135 struct redis_stat_ctx *ctx,
136 struct rspamd_task *task,
137 gchar **target)
138 {
139 gsize tlen = 0;
140 const gchar *p = pattern, *elt;
141 gchar *d, *end;
142 enum {
143 just_char,
144 percent_char,
145 mod_char
146 } state = just_char;
147 struct rspamd_statfile_config *stcf;
148 lua_State *L = NULL;
149 struct rspamd_task **ptask;
150 const gchar *rcpt = NULL;
151 gint err_idx;
152
153 g_assert (ctx != NULL);
154 g_assert (task != NULL);
155 stcf = ctx->stcf;
156
157 L = task->cfg->lua_state;
158 g_assert (L != NULL);
159
160 if (ctx->enable_users) {
161 if (ctx->cbref_user == -1) {
162 rcpt = rspamd_task_get_principal_recipient (task);
163 }
164 else {
165 /* Execute lua function to get userdata */
166 lua_pushcfunction (L, &rspamd_lua_traceback);
167 err_idx = lua_gettop (L);
168
169 lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_user);
170 ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
171 *ptask = task;
172 rspamd_lua_setclass (L, "rspamd{task}", -1);
173
174 if (lua_pcall (L, 1, 1, err_idx) != 0) {
175 msg_err_task ("call to user extraction script failed: %s",
176 lua_tostring (L, -1));
177 }
178 else {
179 rcpt = rspamd_mempool_strdup (task->task_pool, lua_tostring (L, -1));
180 }
181
182 /* Result + error function */
183 lua_settop (L, err_idx - 1);
184 }
185
186 if (rcpt) {
187 rspamd_mempool_set_variable (task->task_pool, "stat_user",
188 (gpointer)rcpt, NULL);
189 }
190 }
191
192 /* Length calculation */
193 while (*p) {
194 switch (state) {
195 case just_char:
196 if (*p == '%') {
197 state = percent_char;
198 }
199 else {
200 tlen ++;
201 }
202 p ++;
203 break;
204 case percent_char:
205 switch (*p) {
206 case '%':
207 tlen ++;
208 state = just_char;
209 break;
210 case 'u':
211 elt = GET_TASK_ELT (task, user);
212 if (elt) {
213 tlen += strlen (elt);
214 }
215 break;
216 case 'r':
217
218 if (rcpt == NULL) {
219 elt = rspamd_task_get_principal_recipient (task);
220 }
221 else {
222 elt = rcpt;
223 }
224
225 if (elt) {
226 tlen += strlen (elt);
227 }
228 break;
229 case 'l':
230 if (stcf->label) {
231 tlen += strlen (stcf->label);
232 }
233 /* Label miss is OK */
234 break;
235 case 's':
236 if (ctx->new_schema) {
237 tlen += sizeof ("RS") - 1;
238 }
239 else {
240 if (stcf->symbol) {
241 tlen += strlen (stcf->symbol);
242 }
243 }
244 break;
245 default:
246 state = just_char;
247 tlen ++;
248 break;
249 }
250
251 if (state == percent_char) {
252 state = mod_char;
253 }
254 p ++;
255 break;
256
257 case mod_char:
258 switch (*p) {
259 case 'd':
260 p ++;
261 state = just_char;
262 break;
263 default:
264 state = just_char;
265 break;
266 }
267 break;
268 }
269 }
270
271
272 if (target == NULL) {
273 return -1;
274 }
275
276 *target = rspamd_mempool_alloc (task->task_pool, tlen + 1);
277 d = *target;
278 end = d + tlen + 1;
279 d[tlen] = '\0';
280 p = pattern;
281 state = just_char;
282
283 /* Expand string */
284 while (*p && d < end) {
285 switch (state) {
286 case just_char:
287 if (*p == '%') {
288 state = percent_char;
289 }
290 else {
291 *d++ = *p;
292 }
293 p ++;
294 break;
295 case percent_char:
296 switch (*p) {
297 case '%':
298 *d++ = *p;
299 state = just_char;
300 break;
301 case 'u':
302 elt = GET_TASK_ELT (task, user);
303 if (elt) {
304 d += rspamd_strlcpy (d, elt, end - d);
305 }
306 break;
307 case 'r':
308 if (rcpt == NULL) {
309 elt = rspamd_task_get_principal_recipient (task);
310 }
311 else {
312 elt = rcpt;
313 }
314
315 if (elt) {
316 d += rspamd_strlcpy (d, elt, end - d);
317 }
318 break;
319 case 'l':
320 if (stcf->label) {
321 d += rspamd_strlcpy (d, stcf->label, end - d);
322 }
323 break;
324 case 's':
325 if (ctx->new_schema) {
326 d += rspamd_strlcpy (d, "RS", end - d);
327 }
328 else {
329 if (stcf->symbol) {
330 d += rspamd_strlcpy (d, stcf->symbol, end - d);
331 }
332 }
333 break;
334 default:
335 state = just_char;
336 *d++ = *p;
337 break;
338 }
339
340 if (state == percent_char) {
341 state = mod_char;
342 }
343 p ++;
344 break;
345
346 case mod_char:
347 switch (*p) {
348 case 'd':
349 /* TODO: not supported yet */
350 p ++;
351 state = just_char;
352 break;
353 default:
354 state = just_char;
355 break;
356 }
357 break;
358 }
359 }
360
361 return tlen;
362 }
363
364 static void
rspamd_redis_maybe_auth(struct redis_stat_ctx * ctx,redisAsyncContext * redis)365 rspamd_redis_maybe_auth (struct redis_stat_ctx *ctx, redisAsyncContext *redis)
366 {
367 if (ctx->password) {
368 redisAsyncCommand (redis, NULL, NULL, "AUTH %s", ctx->password);
369 }
370 if (ctx->dbname) {
371 redisAsyncCommand (redis, NULL, NULL, "SELECT %s", ctx->dbname);
372 }
373 }
374
375 // the `b` conversion type character is unknown to gcc
376 #ifdef __GNUC__
377 #pragma GCC diagnostic push
378 #pragma GCC diagnostic ignored "-Wformat"
379 #pragma GCC diagnostic ignored "-Wformat-extra-args"
380 #endif
381 static rspamd_fstring_t *
rspamd_redis_tokens_to_query(struct rspamd_task * task,struct redis_stat_runtime * rt,GPtrArray * tokens,const gchar * command,const gchar * prefix,gboolean learn,gint idx,gboolean intvals)382 rspamd_redis_tokens_to_query (struct rspamd_task *task,
383 struct redis_stat_runtime *rt,
384 GPtrArray *tokens,
385 const gchar *command,
386 const gchar *prefix,
387 gboolean learn,
388 gint idx,
389 gboolean intvals)
390 {
391 rspamd_fstring_t *out;
392 rspamd_token_t *tok;
393 gchar n0[512], n1[64];
394 guint i, l0, l1, cmd_len, prefix_len;
395 gint ret;
396
397 g_assert (tokens != NULL);
398
399 cmd_len = strlen (command);
400 prefix_len = strlen (prefix);
401 out = rspamd_fstring_sized_new (1024);
402
403 if (learn) {
404 rspamd_printf_fstring (&out, "*1\r\n$5\r\nMULTI\r\n");
405
406 ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
407 out->str, out->len);
408
409 if (ret != REDIS_OK) {
410 msg_err_task ("call to redis failed: %s", rt->redis->errstr);
411 rspamd_fstring_free (out);
412
413 return NULL;
414 }
415
416 out->len = 0;
417 }
418 else {
419 if (rt->ctx->new_schema) {
420 /* Multi + HGET */
421 rspamd_printf_fstring (&out, "*1\r\n$5\r\nMULTI\r\n");
422
423 ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
424 out->str, out->len);
425
426 if (ret != REDIS_OK) {
427 msg_err_task ("call to redis failed: %s", rt->redis->errstr);
428 rspamd_fstring_free (out);
429
430 return NULL;
431 }
432
433 out->len = 0;
434 }
435 else {
436 rspamd_printf_fstring (&out, ""
437 "*%d\r\n"
438 "$%d\r\n"
439 "%s\r\n"
440 "$%d\r\n"
441 "%s\r\n",
442 (tokens->len + 2),
443 cmd_len, command,
444 prefix_len, prefix);
445 }
446 }
447
448 for (i = 0; i < tokens->len; i ++) {
449 tok = g_ptr_array_index (tokens, i);
450
451 if (learn) {
452 if (intvals) {
453 l1 = rspamd_snprintf (n1, sizeof (n1), "%L",
454 (gint64) tok->values[idx]);
455 } else {
456 l1 = rspamd_snprintf (n1, sizeof (n1), "%f",
457 tok->values[idx]);
458 }
459
460 if (rt->ctx->new_schema) {
461 /*
462 * HINCRBY <prefix_token> <0|1> <value>
463 */
464 l0 = rspamd_snprintf (n0, sizeof (n0), "%*s_%uL",
465 prefix_len, prefix,
466 tok->data);
467
468 rspamd_printf_fstring (&out, ""
469 "*4\r\n"
470 "$%d\r\n"
471 "%s\r\n"
472 "$%d\r\n"
473 "%s\r\n"
474 "$%d\r\n"
475 "%s\r\n"
476 "$%d\r\n"
477 "%s\r\n",
478 cmd_len, command,
479 l0, n0,
480 1, rt->stcf->is_spam ? "S" : "H",
481 l1, n1);
482 }
483 else {
484 l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data);
485
486 /*
487 * HINCRBY <prefix> <token> <value>
488 */
489 rspamd_printf_fstring (&out, ""
490 "*4\r\n"
491 "$%d\r\n"
492 "%s\r\n"
493 "$%d\r\n"
494 "%s\r\n"
495 "$%d\r\n"
496 "%s\r\n"
497 "$%d\r\n"
498 "%s\r\n",
499 cmd_len, command,
500 prefix_len, prefix,
501 l0, n0,
502 l1, n1);
503 }
504
505 ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
506 out->str, out->len);
507
508 if (ret != REDIS_OK) {
509 msg_err_task ("call to redis failed: %s", rt->redis->errstr);
510 rspamd_fstring_free (out);
511
512 return NULL;
513 }
514
515 if (rt->ctx->store_tokens) {
516
517 if (!rt->ctx->new_schema) {
518 /*
519 * We store tokens in form
520 * HSET prefix_tokens <token_id> "token_string"
521 * ZINCRBY prefix_z 1.0 <token_id>
522 */
523 if (tok->t1 && tok->t2) {
524 redisAsyncCommand (rt->redis, NULL, NULL,
525 "HSET %b_tokens %b %b:%b",
526 prefix, (size_t) prefix_len,
527 n0, (size_t) l0,
528 tok->t1->stemmed.begin, tok->t1->stemmed.len,
529 tok->t2->stemmed.begin, tok->t2->stemmed.len);
530 } else if (tok->t1) {
531 redisAsyncCommand (rt->redis, NULL, NULL,
532 "HSET %b_tokens %b %b",
533 prefix, (size_t) prefix_len,
534 n0, (size_t) l0,
535 tok->t1->stemmed.begin,
536 tok->t1->stemmed.len);
537 }
538 }
539 else {
540 /*
541 * We store tokens in form
542 * HSET <token_id> "tokens" "token_string"
543 * ZINCRBY prefix_z 1.0 <token_id>
544 */
545 if (tok->t1 && tok->t2) {
546 redisAsyncCommand (rt->redis, NULL, NULL,
547 "HSET %b %s %b:%b",
548 n0, (size_t) l0,
549 "tokens",
550 tok->t1->stemmed.begin, tok->t1->stemmed.len,
551 tok->t2->stemmed.begin, tok->t2->stemmed.len);
552 } else if (tok->t1) {
553 redisAsyncCommand (rt->redis, NULL, NULL,
554 "HSET %b %s %b",
555 n0, (size_t) l0,
556 "tokens",
557 tok->t1->stemmed.begin, tok->t1->stemmed.len);
558 }
559 }
560
561 redisAsyncCommand (rt->redis, NULL, NULL,
562 "ZINCRBY %b_z %b %b",
563 prefix, (size_t)prefix_len,
564 n1, (size_t)l1,
565 n0, (size_t)l0);
566 }
567
568 if (rt->ctx->new_schema && rt->ctx->expiry > 0) {
569 out->len = 0;
570 l1 = rspamd_snprintf (n1, sizeof (n1), "%d",
571 rt->ctx->expiry);
572
573 rspamd_printf_fstring (&out, ""
574 "*3\r\n"
575 "$6\r\n"
576 "EXPIRE\r\n"
577 "$%d\r\n"
578 "%s\r\n"
579 "$%d\r\n"
580 "%s\r\n",
581 l0, n0,
582 l1, n1);
583 redisAsyncFormattedCommand (rt->redis, NULL, NULL,
584 out->str, out->len);
585 }
586
587 out->len = 0;
588 }
589 else {
590 if (rt->ctx->new_schema) {
591 l0 = rspamd_snprintf (n0, sizeof (n0), "%*s_%uL",
592 prefix_len, prefix,
593 tok->data);
594
595 rspamd_printf_fstring (&out, ""
596 "*3\r\n"
597 "$%d\r\n"
598 "%s\r\n"
599 "$%d\r\n"
600 "%s\r\n"
601 "$%d\r\n"
602 "%s\r\n",
603 cmd_len, command,
604 l0, n0,
605 1, rt->stcf->is_spam ? "S" : "H");
606
607 ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
608 out->str, out->len);
609
610 if (ret != REDIS_OK) {
611 msg_err_task ("call to redis failed: %s", rt->redis->errstr);
612 rspamd_fstring_free (out);
613
614 return NULL;
615 }
616
617 out->len = 0;
618 }
619 else {
620 l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data);
621 rspamd_printf_fstring (&out, ""
622 "$%d\r\n"
623 "%s\r\n", l0, n0);
624 }
625 }
626 }
627
628 if (!learn && rt->ctx->new_schema) {
629 rspamd_printf_fstring (&out, "*1\r\n$4\r\nEXEC\r\n");
630 }
631
632 return out;
633 }
634 #ifdef __GNUC__
635 #pragma GCC diagnostic pop
636 #endif
637
638 static void
rspamd_redis_store_stat_signature(struct rspamd_task * task,struct redis_stat_runtime * rt,GPtrArray * tokens,const gchar * prefix)639 rspamd_redis_store_stat_signature (struct rspamd_task *task,
640 struct redis_stat_runtime *rt,
641 GPtrArray *tokens,
642 const gchar *prefix)
643 {
644 gchar *sig, keybuf[512], nbuf[64];
645 rspamd_token_t *tok;
646 guint i, blen, klen;
647 rspamd_fstring_t *out;
648
649 sig = rspamd_mempool_get_variable (task->task_pool,
650 RSPAMD_MEMPOOL_STAT_SIGNATURE);
651
652 if (sig == NULL) {
653 msg_err_task ("cannot get bayes signature");
654 return;
655 }
656
657 out = rspamd_fstring_sized_new (1024);
658 klen = rspamd_snprintf (keybuf, sizeof (keybuf), "%s_%s_%s",
659 prefix, sig, rt->stcf->is_spam ? "S" : "H");
660
661 /* Cleanup key */
662 rspamd_printf_fstring (&out, ""
663 "*2\r\n"
664 "$3\r\n"
665 "DEL\r\n"
666 "$%d\r\n"
667 "%s\r\n",
668 klen, keybuf);
669 redisAsyncFormattedCommand (rt->redis, NULL, NULL,
670 out->str, out->len);
671 out->len = 0;
672
673 rspamd_printf_fstring (&out, ""
674 "*%d\r\n"
675 "$5\r\n"
676 "LPUSH\r\n"
677 "$%d\r\n"
678 "%s\r\n",
679 tokens->len + 2,
680 klen, keybuf);
681
682 PTR_ARRAY_FOREACH (tokens, i, tok) {
683 blen = rspamd_snprintf (nbuf, sizeof (nbuf), "%uL", tok->data);
684 rspamd_printf_fstring (&out, ""
685 "$%d\r\n"
686 "%s\r\n", blen, nbuf);
687 }
688
689 redisAsyncFormattedCommand (rt->redis, NULL, NULL,
690 out->str, out->len);
691 out->len = 0;
692
693 if (rt->ctx->expiry > 0) {
694 out->len = 0;
695 blen = rspamd_snprintf (nbuf, sizeof (nbuf), "%d",
696 rt->ctx->expiry);
697
698 rspamd_printf_fstring (&out, ""
699 "*3\r\n"
700 "$6\r\n"
701 "EXPIRE\r\n"
702 "$%d\r\n"
703 "%s\r\n"
704 "$%d\r\n"
705 "%s\r\n",
706 klen, keybuf,
707 blen, nbuf);
708 redisAsyncFormattedCommand (rt->redis, NULL, NULL,
709 out->str, out->len);
710 }
711
712 rspamd_fstring_free (out);
713 }
714
715 static void
rspamd_redis_async_cbdata_cleanup(struct rspamd_redis_stat_cbdata * cbdata)716 rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
717 {
718 guint i;
719 gchar *k;
720
721 if (cbdata && !cbdata->wanna_die) {
722 /* Avoid double frees */
723 cbdata->wanna_die = TRUE;
724 redisAsyncFree (cbdata->redis);
725
726 for (i = 0; i < cbdata->cur_keys->len; i ++) {
727 k = g_ptr_array_index (cbdata->cur_keys, i);
728 g_free (k);
729 }
730
731 g_ptr_array_free (cbdata->cur_keys, TRUE);
732
733 if (cbdata->elt) {
734 cbdata->elt->cbdata = NULL;
735 /* Re-enable parent event */
736 cbdata->elt->async->enabled = TRUE;
737
738 /* Replace ucl object */
739 if (cbdata->cur) {
740 if (cbdata->elt->stat) {
741 ucl_object_unref (cbdata->elt->stat);
742 }
743
744 cbdata->elt->stat = cbdata->cur;
745 cbdata->cur = NULL;
746 }
747 }
748
749 if (cbdata->cur) {
750 ucl_object_unref (cbdata->cur);
751 }
752
753 g_free (cbdata);
754 }
755 }
756
757 /* Called when we get number of learns for a specific key */
758 static void
rspamd_redis_stat_learns(redisAsyncContext * c,gpointer r,gpointer priv)759 rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
760 {
761 struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
762 struct rspamd_redis_stat_cbdata *cbdata;
763 redisReply *reply = r;
764 ucl_object_t *obj;
765 gulong num = 0;
766
767 cbdata = redis_elt->cbdata;
768
769 if (cbdata == NULL || cbdata->wanna_die) {
770 return;
771 }
772
773 cbdata->inflight --;
774
775 if (c->err == 0 && r != NULL) {
776 if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
777 num = reply->integer;
778 }
779 else if (reply->type == REDIS_REPLY_STRING) {
780 rspamd_strtoul (reply->str, reply->len, &num);
781 }
782
783 obj = (ucl_object_t *) ucl_object_lookup (cbdata->cur, "revision");
784 if (obj) {
785 obj->value.iv += num;
786 }
787 }
788
789 if (cbdata->inflight == 0) {
790 rspamd_redis_async_cbdata_cleanup (cbdata);
791 redis_elt->cbdata = NULL;
792 }
793 }
794
795 /* Called when we get number of elements for a specific key */
796 static void
rspamd_redis_stat_key(redisAsyncContext * c,gpointer r,gpointer priv)797 rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
798 {
799 struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
800 struct rspamd_redis_stat_cbdata *cbdata;
801 redisReply *reply = r;
802 ucl_object_t *obj;
803 glong num = 0;
804
805 cbdata = redis_elt->cbdata;
806
807 if (cbdata == NULL || cbdata->wanna_die) {
808 return;
809 }
810
811 cbdata->inflight --;
812
813 if (c->err == 0 && r != NULL) {
814 if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
815 num = reply->integer;
816 }
817 else if (reply->type == REDIS_REPLY_STRING) {
818 rspamd_strtol (reply->str, reply->len, &num);
819 }
820
821 if (num < 0) {
822 msg_err ("bad learns count: %L", (gint64)num);
823 num = 0;
824 }
825
826 obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "used");
827 if (obj) {
828 obj->value.iv += num;
829 }
830
831 obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "total");
832 if (obj) {
833 obj->value.iv += num;
834 }
835
836 obj = (ucl_object_t *)ucl_object_lookup (cbdata->cur, "size");
837 if (obj) {
838 /* Size of key + size of int64_t */
839 obj->value.iv += num * (sizeof (G_STRINGIFY (G_MAXINT64)) +
840 sizeof (guint64) + sizeof (gpointer));
841 }
842 }
843
844 if (cbdata->inflight == 0) {
845 rspamd_redis_async_cbdata_cleanup (cbdata);
846 redis_elt->cbdata = NULL;
847 }
848 }
849
850 /* Called when we have connected to the redis server and got keys to check */
851 static void
rspamd_redis_stat_keys(redisAsyncContext * c,gpointer r,gpointer priv)852 rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
853 {
854 struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
855 struct rspamd_redis_stat_cbdata *cbdata;
856 redisReply *reply = r, *more_elt, *elts, *elt;
857 gchar **pk, *k;
858 guint i, processed = 0;
859 gboolean more = false;
860
861 cbdata = redis_elt->cbdata;
862
863 if (cbdata == NULL || cbdata->wanna_die) {
864 return;
865 }
866
867 cbdata->inflight --;
868
869 if (c->err == 0 && r != NULL) {
870 if (reply->type == REDIS_REPLY_ARRAY) {
871 more_elt = reply->element[0];
872 elts = reply->element[1];
873
874 if (more_elt != NULL && more_elt->str != NULL && strcmp (more_elt->str, "0") != 0) {
875 more = true;
876 }
877
878 /* Clear the existing stuff */
879 PTR_ARRAY_FOREACH (cbdata->cur_keys, i, k) {
880 if (k) {
881 g_free (k);
882 }
883 }
884
885 g_ptr_array_set_size (cbdata->cur_keys, elts->elements);
886
887 for (i = 0; i < elts->elements; i ++) {
888 elt = elts->element[i];
889
890 if (elt->type == REDIS_REPLY_STRING) {
891 pk = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
892 *pk = g_malloc (elt->len + 1);
893 rspamd_strlcpy (*pk, elt->str, elt->len + 1);
894 processed ++;
895 }
896 else {
897 pk = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
898 *pk = NULL;
899 }
900 }
901
902 if (processed) {
903 PTR_ARRAY_FOREACH (cbdata->cur_keys, i, k) {
904 if (k) {
905 const gchar *learned_key = "learns";
906
907 if (cbdata->elt->ctx->new_schema) {
908 if (cbdata->elt->ctx->stcf->is_spam) {
909 learned_key = "learns_spam";
910 }
911 else {
912 learned_key = "learns_ham";
913 }
914 redisAsyncCommand (cbdata->redis,
915 rspamd_redis_stat_learns,
916 redis_elt,
917 "HGET %s %s",
918 k, learned_key);
919 cbdata->inflight += 1;
920 }
921 else {
922 redisAsyncCommand (cbdata->redis,
923 rspamd_redis_stat_key,
924 redis_elt,
925 "HLEN %s",
926 k);
927 redisAsyncCommand (cbdata->redis,
928 rspamd_redis_stat_learns,
929 redis_elt,
930 "HGET %s %s",
931 k, learned_key);
932 cbdata->inflight += 2;
933 }
934 }
935 }
936 }
937 }
938
939 if (more) {
940 /* Get more stat keys */
941 redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, redis_elt,
942 "SSCAN %s_keys %s COUNT 1000",
943 cbdata->elt->ctx->stcf->symbol, more_elt->str);
944
945 cbdata->inflight += 1;
946 }
947 else {
948 /* Set up the required keys */
949 ucl_object_insert_key (cbdata->cur,
950 ucl_object_typed_new (UCL_INT), "revision", 0, false);
951 ucl_object_insert_key (cbdata->cur,
952 ucl_object_typed_new (UCL_INT), "used", 0, false);
953 ucl_object_insert_key (cbdata->cur,
954 ucl_object_typed_new (UCL_INT), "total", 0, false);
955 ucl_object_insert_key (cbdata->cur,
956 ucl_object_typed_new (UCL_INT), "size", 0, false);
957 ucl_object_insert_key (cbdata->cur,
958 ucl_object_fromstring (cbdata->elt->ctx->stcf->symbol),
959 "symbol", 0, false);
960 ucl_object_insert_key (cbdata->cur, ucl_object_fromstring ("redis"),
961 "type", 0, false);
962 ucl_object_insert_key (cbdata->cur, ucl_object_fromint (0),
963 "languages", 0, false);
964 ucl_object_insert_key (cbdata->cur, ucl_object_fromint (processed),
965 "users", 0, false);
966
967 rspamd_upstream_ok (cbdata->selected);
968
969 if (cbdata->inflight == 0) {
970 rspamd_redis_async_cbdata_cleanup (cbdata);
971 redis_elt->cbdata = NULL;
972 }
973 }
974 }
975 else {
976 if (c->errstr) {
977 msg_err ("cannot get keys to gather stat: %s", c->errstr);
978 }
979 else {
980 msg_err ("cannot get keys to gather stat: unknown error");
981 }
982
983 rspamd_upstream_fail (cbdata->selected, FALSE, c->errstr);
984 rspamd_redis_async_cbdata_cleanup (cbdata);
985 redis_elt->cbdata = NULL;
986 }
987 }
988
989 static void
rspamd_redis_async_stat_cb(struct rspamd_stat_async_elt * elt,gpointer d)990 rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
991 {
992 struct redis_stat_ctx *ctx;
993 struct rspamd_redis_stat_elt *redis_elt = elt->ud;
994 struct rspamd_redis_stat_cbdata *cbdata;
995 rspamd_inet_addr_t *addr;
996 struct upstream_list *ups;
997 redisAsyncContext *redis_ctx;
998 struct upstream *selected;
999
1000 g_assert (redis_elt != NULL);
1001
1002 ctx = redis_elt->ctx;
1003
1004 if (redis_elt->cbdata) {
1005 /* We have some other process pending */
1006 rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
1007 redis_elt->cbdata = NULL;
1008 }
1009
1010 /* Disable further events unless needed */
1011 elt->enabled = FALSE;
1012
1013 ups = rspamd_redis_get_servers (ctx, "read_servers");
1014
1015 if (!ups) {
1016 return;
1017 }
1018
1019 selected = rspamd_upstream_get (ups,
1020 RSPAMD_UPSTREAM_ROUND_ROBIN,
1021 NULL,
1022 0);
1023
1024 g_assert (selected != NULL);
1025 addr = rspamd_upstream_addr_next (selected);
1026 g_assert (addr != NULL);
1027
1028 if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
1029 redis_ctx = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
1030 }
1031 else {
1032 redis_ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr),
1033 rspamd_inet_address_get_port (addr));
1034 }
1035
1036 if (redis_ctx == NULL) {
1037 msg_warn ("cannot connect to redis server %s: %s",
1038 rspamd_inet_address_to_string_pretty (addr),
1039 strerror (errno));
1040
1041 return;
1042 }
1043 else if (redis_ctx->err != REDIS_OK) {
1044 msg_warn ("cannot connect to redis server %s: %s",
1045 rspamd_inet_address_to_string_pretty (addr),
1046 redis_ctx->errstr);
1047 redisAsyncFree (redis_ctx);
1048
1049 return;
1050 }
1051
1052 redisLibevAttach (redis_elt->event_loop, redis_ctx);
1053 cbdata = g_malloc0 (sizeof (*cbdata));
1054 cbdata->redis = redis_ctx;
1055 cbdata->selected = selected;
1056 cbdata->inflight = 1;
1057 cbdata->cur = ucl_object_typed_new (UCL_OBJECT);
1058 cbdata->elt = redis_elt;
1059 cbdata->cur_keys = g_ptr_array_sized_new (1000);
1060 redis_elt->cbdata = cbdata;
1061
1062 /* XXX: deal with timeouts maybe */
1063 /* Get keys in redis that match our symbol */
1064 rspamd_redis_maybe_auth (ctx, cbdata->redis);
1065 redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, redis_elt,
1066 "SSCAN %s_keys 0 COUNT 1000",
1067 ctx->stcf->symbol);
1068 }
1069
1070 static void
rspamd_redis_async_stat_fin(struct rspamd_stat_async_elt * elt,gpointer d)1071 rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d)
1072 {
1073 struct rspamd_redis_stat_elt *redis_elt = elt->ud;
1074
1075 if (redis_elt->cbdata != NULL) {
1076 rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
1077 redis_elt->cbdata = NULL;
1078 }
1079 }
1080
1081 /* Called on connection termination */
1082 static void
rspamd_redis_fin(gpointer data)1083 rspamd_redis_fin (gpointer data)
1084 {
1085 struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
1086 redisAsyncContext *redis;
1087
1088 if (rt->has_event) {
1089 /* Should not happen ! */
1090 msg_err ("FIXME: this code path should not be reached!");
1091 rspamd_session_remove_event (rt->task->s, NULL, rt);
1092 rt->has_event = FALSE;
1093 }
1094 /* Stop timeout */
1095 if (ev_can_stop (&rt->timeout_event)) {
1096 ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
1097 }
1098
1099 if (rt->tokens) {
1100 g_ptr_array_unref (rt->tokens);
1101 rt->tokens = NULL;
1102 }
1103
1104 if (rt->redis) {
1105 redis = rt->redis;
1106 rt->redis = NULL;
1107 /* This calls for all callbacks pending */
1108 redisAsyncFree (redis);
1109 }
1110
1111 if (rt->err) {
1112 g_error_free (rt->err);
1113 }
1114 }
1115
1116 static void
rspamd_redis_timeout(EV_P_ ev_timer * w,int revents)1117 rspamd_redis_timeout (EV_P_ ev_timer *w, int revents)
1118 {
1119 struct redis_stat_runtime *rt = REDIS_RUNTIME (w->data);
1120 struct rspamd_task *task;
1121 redisAsyncContext *redis;
1122
1123 task = rt->task;
1124
1125 msg_err_task_check ("connection to redis server %s timed out",
1126 rspamd_upstream_name (rt->selected));
1127
1128 rspamd_upstream_fail (rt->selected, FALSE, "timeout");
1129
1130 if (rt->redis) {
1131 redis = rt->redis;
1132 rt->redis = NULL;
1133 /* This calls for all callbacks pending */
1134 redisAsyncFree (redis);
1135 }
1136
1137 if (rt->tokens) {
1138 g_ptr_array_unref (rt->tokens);
1139 rt->tokens = NULL;
1140 }
1141
1142 if (!rt->err) {
1143 g_set_error (&rt->err, rspamd_redis_stat_quark (), ETIMEDOUT,
1144 "error getting reply from redis server %s: timeout",
1145 rspamd_upstream_name (rt->selected));
1146 }
1147 if (rt->has_event) {
1148 rt->has_event = FALSE;
1149 rspamd_session_remove_event (task->s, NULL, rt);
1150 }
1151 }
1152
1153 /* Called when we have received tokens values from redis */
1154 static void
rspamd_redis_processed(redisAsyncContext * c,gpointer r,gpointer priv)1155 rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
1156 {
1157 struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
1158 redisReply *reply = r, *elt;
1159 struct rspamd_task *task;
1160 rspamd_token_t *tok;
1161 guint i, processed = 0, found = 0;
1162 gulong val;
1163 gdouble float_val;
1164
1165 task = rt->task;
1166
1167 if (c->err == 0 && rt->has_event) {
1168 if (r != NULL) {
1169 if (reply->type == REDIS_REPLY_ARRAY) {
1170
1171 if (reply->elements == task->tokens->len) {
1172 for (i = 0; i < reply->elements; i ++) {
1173 tok = g_ptr_array_index (task->tokens, i);
1174 elt = reply->element[i];
1175
1176 if (G_UNLIKELY (elt->type == REDIS_REPLY_INTEGER)) {
1177 tok->values[rt->id] = elt->integer;
1178 found ++;
1179 }
1180 else if (elt->type == REDIS_REPLY_STRING) {
1181 if (rt->stcf->clcf->flags &
1182 RSPAMD_FLAG_CLASSIFIER_INTEGER) {
1183 rspamd_strtoul (elt->str, elt->len, &val);
1184 tok->values[rt->id] = val;
1185 }
1186 else {
1187 float_val = strtof (elt->str, NULL);
1188 tok->values[rt->id] = float_val;
1189 }
1190
1191 found ++;
1192 }
1193 else {
1194 tok->values[rt->id] = 0;
1195 }
1196
1197 processed ++;
1198 }
1199
1200 if (rt->stcf->is_spam) {
1201 task->flags |= RSPAMD_TASK_FLAG_HAS_SPAM_TOKENS;
1202 }
1203 else {
1204 task->flags |= RSPAMD_TASK_FLAG_HAS_HAM_TOKENS;
1205 }
1206 }
1207 else {
1208 msg_err_task_check ("got invalid length of reply vector from redis: "
1209 "%d, expected: %d",
1210 (gint)reply->elements,
1211 (gint)task->tokens->len);
1212 }
1213 }
1214 else {
1215 if (reply->type == REDIS_REPLY_ERROR) {
1216 msg_err_task_check ("cannot learn %s: redis error: \"%s\"",
1217 rt->stcf->symbol, reply->str);
1218 }
1219 else {
1220 msg_err_task_check ("got invalid reply from redis: %s, array expected",
1221 rspamd_redis_type_to_string(reply->type));
1222 }
1223 }
1224
1225 msg_debug_stat_redis ("received tokens for %s: %d processed, %d found",
1226 rt->redis_object_expanded, processed, found);
1227 rspamd_upstream_ok (rt->selected);
1228 }
1229 }
1230 else {
1231 msg_err_task ("error getting reply from redis server %s: %s",
1232 rspamd_upstream_name (rt->selected), c->errstr);
1233
1234 if (rt->redis) {
1235 rspamd_upstream_fail (rt->selected, FALSE, c->errstr);
1236 }
1237
1238 if (!rt->err) {
1239 g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
1240 "cannot get values: error getting reply from redis server %s: %s",
1241 rspamd_upstream_name (rt->selected), c->errstr);
1242 }
1243 }
1244
1245 if (rt->has_event) {
1246 rt->has_event = FALSE;
1247 rspamd_session_remove_event (task->s, NULL, rt);
1248 }
1249 }
1250
1251 /* Called when we have connected to the redis server and got stats */
1252 static void
rspamd_redis_connected(redisAsyncContext * c,gpointer r,gpointer priv)1253 rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
1254 {
1255 struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
1256 redisReply *reply = r;
1257 struct rspamd_task *task;
1258 glong val = 0;
1259 gboolean final = TRUE;
1260
1261 task = rt->task;
1262
1263 if (c->err == 0 && rt->has_event) {
1264 if (r != NULL) {
1265 if (G_UNLIKELY (reply->type == REDIS_REPLY_INTEGER)) {
1266 val = reply->integer;
1267 }
1268 else if (reply->type == REDIS_REPLY_STRING) {
1269 rspamd_strtol (reply->str, reply->len, &val);
1270 }
1271 else {
1272 if (reply->type != REDIS_REPLY_NIL) {
1273 if (reply->type == REDIS_REPLY_ERROR) {
1274 msg_err_task ("cannot learn %s: redis error: \"%s\"",
1275 rt->stcf->symbol, reply->str);
1276 }
1277 else {
1278 msg_err_task ("bad learned type for %s: %s, nil expected",
1279 rt->stcf->symbol,
1280 rspamd_redis_type_to_string(reply->type));
1281 }
1282 }
1283
1284 val = 0;
1285 }
1286
1287 if (val < 0) {
1288 msg_warn_task ("invalid number of learns for %s: %L",
1289 rt->stcf->symbol, val);
1290 val = 0;
1291 }
1292
1293 rt->learned = val;
1294 msg_debug_stat_redis ("connected to redis server, tokens learned for %s: %uL",
1295 rt->redis_object_expanded, rt->learned);
1296 rspamd_upstream_ok (rt->selected);
1297
1298 /* Save learn count in mempool variable */
1299 gint64 *learns_cnt;
1300 const gchar *var_name;
1301
1302 if (rt->stcf->is_spam) {
1303 var_name = RSPAMD_MEMPOOL_SPAM_LEARNS;
1304 }
1305 else {
1306 var_name = RSPAMD_MEMPOOL_HAM_LEARNS;
1307 }
1308
1309 learns_cnt = rspamd_mempool_get_variable (task->task_pool,
1310 var_name);
1311
1312 if (learns_cnt) {
1313 (*learns_cnt) += rt->learned;
1314 }
1315 else {
1316 learns_cnt = rspamd_mempool_alloc (task->task_pool,
1317 sizeof (*learns_cnt));
1318 *learns_cnt = rt->learned;
1319 rspamd_mempool_set_variable (task->task_pool,
1320 var_name,
1321 learns_cnt, NULL);
1322 }
1323
1324 if (rt->learned >= rt->stcf->clcf->min_learns && rt->learned > 0) {
1325 rspamd_fstring_t *query = rspamd_redis_tokens_to_query (
1326 task,
1327 rt,
1328 rt->tokens,
1329 rt->ctx->new_schema ? "HGET" : "HMGET",
1330 rt->redis_object_expanded, FALSE, -1,
1331 rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
1332 g_assert (query != NULL);
1333 rspamd_mempool_add_destructor (task->task_pool,
1334 (rspamd_mempool_destruct_t)rspamd_fstring_free, query);
1335
1336 int ret = redisAsyncFormattedCommand (rt->redis,
1337 rspamd_redis_processed, rt,
1338 query->str, query->len);
1339
1340 if (ret != REDIS_OK) {
1341 msg_err_task ("call to redis failed: %s", rt->redis->errstr);
1342 }
1343 else {
1344 /* Further is handled by rspamd_redis_processed */
1345 final = FALSE;
1346 /* Restart timeout */
1347 if (ev_can_stop (&rt->timeout_event)) {
1348 rt->timeout_event.repeat = rt->ctx->timeout;
1349 ev_timer_again (task->event_loop, &rt->timeout_event);
1350 }
1351 else {
1352 rt->timeout_event.data = rt;
1353 ev_timer_init (&rt->timeout_event, rspamd_redis_timeout,
1354 rt->ctx->timeout, 0.);
1355 ev_timer_start (task->event_loop, &rt->timeout_event);
1356 }
1357 }
1358 }
1359 else {
1360 msg_warn_task ("skip obtaining bayes tokens for %s of classifier "
1361 "%s: not enough learns %d; %d required",
1362 rt->stcf->symbol, rt->stcf->clcf->name,
1363 (int)rt->learned, rt->stcf->clcf->min_learns);
1364 }
1365 }
1366 }
1367 else if (rt->has_event) {
1368 msg_err_task ("error getting reply from redis server %s: %s",
1369 rspamd_upstream_name (rt->selected), c->errstr);
1370 rspamd_upstream_fail (rt->selected, FALSE, c->errstr);
1371
1372 if (!rt->err) {
1373 g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
1374 "error getting reply from redis server %s: %s",
1375 rspamd_upstream_name (rt->selected), c->errstr);
1376 }
1377 }
1378
1379 if (final && rt->has_event) {
1380 rt->has_event = FALSE;
1381 rspamd_session_remove_event (task->s, NULL, rt);
1382 }
1383 }
1384
1385 /* Called when we have set tokens during learning */
1386 static void
rspamd_redis_learned(redisAsyncContext * c,gpointer r,gpointer priv)1387 rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
1388 {
1389 struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
1390 struct rspamd_task *task;
1391
1392 task = rt->task;
1393
1394 if (c->err == 0) {
1395 rspamd_upstream_ok (rt->selected);
1396 }
1397 else {
1398 msg_err_task_check ("error getting reply from redis server %s: %s",
1399 rspamd_upstream_name (rt->selected), c->errstr);
1400
1401 if (rt->redis) {
1402 rspamd_upstream_fail (rt->selected, FALSE, c->errstr);
1403 }
1404
1405 if (!rt->err) {
1406 g_set_error (&rt->err, rspamd_redis_stat_quark (), c->err,
1407 "cannot get learned: error getting reply from redis server %s: %s",
1408 rspamd_upstream_name (rt->selected), c->errstr);
1409 }
1410 }
1411
1412 if (rt->has_event) {
1413 rt->has_event = FALSE;
1414 rspamd_session_remove_event (task->s, NULL, rt);
1415 }
1416 }
1417 static void
rspamd_redis_parse_classifier_opts(struct redis_stat_ctx * backend,const ucl_object_t * obj,struct rspamd_config * cfg)1418 rspamd_redis_parse_classifier_opts (struct redis_stat_ctx *backend,
1419 const ucl_object_t *obj,
1420 struct rspamd_config *cfg)
1421 {
1422 const gchar *lua_script;
1423 const ucl_object_t *elt, *users_enabled;
1424
1425 users_enabled = ucl_object_lookup_any (obj, "per_user",
1426 "users_enabled", NULL);
1427
1428 if (users_enabled != NULL) {
1429 if (ucl_object_type (users_enabled) == UCL_BOOLEAN) {
1430 backend->enable_users = ucl_object_toboolean (users_enabled);
1431 backend->cbref_user = -1;
1432 }
1433 else if (ucl_object_type (users_enabled) == UCL_STRING) {
1434 lua_script = ucl_object_tostring (users_enabled);
1435
1436 if (luaL_dostring (cfg->lua_state, lua_script) != 0) {
1437 msg_err_config ("cannot execute lua script for users "
1438 "extraction: %s", lua_tostring (cfg->lua_state, -1));
1439 }
1440 else {
1441 if (lua_type (cfg->lua_state, -1) == LUA_TFUNCTION) {
1442 backend->enable_users = TRUE;
1443 backend->cbref_user = luaL_ref (cfg->lua_state,
1444 LUA_REGISTRYINDEX);
1445 }
1446 else {
1447 msg_err_config ("lua script must return "
1448 "function(task) and not %s",
1449 lua_typename (cfg->lua_state, lua_type (
1450 cfg->lua_state, -1)));
1451 }
1452 }
1453 }
1454 }
1455 else {
1456 backend->enable_users = FALSE;
1457 backend->cbref_user = -1;
1458 }
1459
1460 elt = ucl_object_lookup (obj, "prefix");
1461 if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
1462 /* Default non-users statistics */
1463 if (backend->enable_users || backend->cbref_user != -1) {
1464 backend->redis_object = REDIS_DEFAULT_USERS_OBJECT;
1465 }
1466 else {
1467 backend->redis_object = REDIS_DEFAULT_OBJECT;
1468 }
1469 }
1470 else {
1471 /* XXX: sanity check */
1472 backend->redis_object = ucl_object_tostring (elt);
1473 }
1474
1475 elt = ucl_object_lookup (obj, "store_tokens");
1476 if (elt) {
1477 backend->store_tokens = ucl_object_toboolean (elt);
1478 }
1479 else {
1480 backend->store_tokens = FALSE;
1481 }
1482
1483 elt = ucl_object_lookup (obj, "new_schema");
1484 if (elt) {
1485 backend->new_schema = ucl_object_toboolean (elt);
1486 }
1487 else {
1488 backend->new_schema = FALSE;
1489
1490 msg_warn_config ("you are using old bayes schema for redis statistics, "
1491 "please consider converting it to a new one "
1492 "by using 'rspamadm configwizard statistics'");
1493 }
1494
1495 elt = ucl_object_lookup (obj, "signatures");
1496 if (elt) {
1497 backend->enable_signatures = ucl_object_toboolean (elt);
1498 }
1499 else {
1500 backend->enable_signatures = FALSE;
1501 }
1502
1503 elt = ucl_object_lookup_any (obj, "expiry", "expire", NULL);
1504 if (elt) {
1505 backend->expiry = ucl_object_toint (elt);
1506 }
1507 else {
1508 backend->expiry = 0;
1509 }
1510 }
1511
1512 gpointer
rspamd_redis_init(struct rspamd_stat_ctx * ctx,struct rspamd_config * cfg,struct rspamd_statfile * st)1513 rspamd_redis_init (struct rspamd_stat_ctx *ctx,
1514 struct rspamd_config *cfg, struct rspamd_statfile *st)
1515 {
1516 struct redis_stat_ctx *backend;
1517 struct rspamd_statfile_config *stf = st->stcf;
1518 struct rspamd_redis_stat_elt *st_elt;
1519 const ucl_object_t *obj;
1520 gboolean ret = FALSE;
1521 gint conf_ref = -1;
1522 lua_State *L = (lua_State *)cfg->lua_state;
1523
1524 backend = g_malloc0 (sizeof (*backend));
1525 backend->L = L;
1526 backend->timeout = REDIS_DEFAULT_TIMEOUT;
1527
1528 /* First search in backend configuration */
1529 obj = ucl_object_lookup (st->classifier->cfg->opts, "backend");
1530 if (obj != NULL && ucl_object_type (obj) == UCL_OBJECT) {
1531 ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref);
1532 }
1533
1534 /* Now try statfiles config */
1535 if (!ret && stf->opts) {
1536 ret = rspamd_lua_try_load_redis (L, stf->opts, cfg, &conf_ref);
1537 }
1538
1539 /* Now try classifier config */
1540 if (!ret && st->classifier->cfg->opts) {
1541 ret = rspamd_lua_try_load_redis (L, st->classifier->cfg->opts, cfg, &conf_ref);
1542 }
1543
1544 /* Now try global redis settings */
1545 if (!ret) {
1546 obj = ucl_object_lookup (cfg->rcl_obj, "redis");
1547
1548 if (obj) {
1549 const ucl_object_t *specific_obj;
1550
1551 specific_obj = ucl_object_lookup (obj, "statistics");
1552
1553 if (specific_obj) {
1554 ret = rspamd_lua_try_load_redis (L,
1555 specific_obj, cfg, &conf_ref);
1556 }
1557 else {
1558 ret = rspamd_lua_try_load_redis (L,
1559 obj, cfg, &conf_ref);
1560 }
1561 }
1562 }
1563
1564 if (!ret) {
1565 msg_err_config ("cannot init redis backend for %s", stf->symbol);
1566 g_free (backend);
1567 return NULL;
1568 }
1569
1570 backend->conf_ref = conf_ref;
1571
1572 /* Check some common table values */
1573 lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref);
1574
1575 lua_pushstring (L, "timeout");
1576 lua_gettable (L, -2);
1577 if (lua_type (L, -1) == LUA_TNUMBER) {
1578 backend->timeout = lua_tonumber (L, -1);
1579 }
1580 lua_pop (L, 1);
1581
1582 lua_pushstring (L, "db");
1583 lua_gettable (L, -2);
1584 if (lua_type (L, -1) == LUA_TSTRING) {
1585 backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool,
1586 lua_tostring (L, -1));
1587 }
1588 lua_pop (L, 1);
1589
1590 lua_pushstring (L, "password");
1591 lua_gettable (L, -2);
1592 if (lua_type (L, -1) == LUA_TSTRING) {
1593 backend->password = rspamd_mempool_strdup (cfg->cfg_pool,
1594 lua_tostring (L, -1));
1595 }
1596 lua_pop (L, 1);
1597
1598 lua_settop (L, 0);
1599
1600 rspamd_redis_parse_classifier_opts (backend, st->classifier->cfg->opts, cfg);
1601 stf->clcf->flags |= RSPAMD_FLAG_CLASSIFIER_INCREMENTING_BACKEND;
1602 backend->stcf = stf;
1603
1604 st_elt = g_malloc0 (sizeof (*st_elt));
1605 st_elt->event_loop = ctx->event_loop;
1606 st_elt->ctx = backend;
1607 backend->stat_elt = rspamd_stat_ctx_register_async (
1608 rspamd_redis_async_stat_cb,
1609 rspamd_redis_async_stat_fin,
1610 st_elt,
1611 REDIS_STAT_TIMEOUT);
1612 st_elt->async = backend->stat_elt;
1613
1614 return (gpointer)backend;
1615 }
1616
1617 /*
1618 * This callback is called when Redis is disconnected somehow, and the structure
1619 * itself is usually freed by hiredis itself
1620 */
1621 static void
rspamd_stat_redis_on_disconnect(const struct redisAsyncContext * ac,int status)1622 rspamd_stat_redis_on_disconnect(const struct redisAsyncContext *ac, int status)
1623 {
1624 struct redis_stat_runtime *rt = (struct redis_stat_runtime *)ac->data;
1625
1626 if (ev_can_stop (&rt->timeout_event)) {
1627 ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
1628 }
1629 rt->redis = NULL;
1630 }
1631
1632 static void
rspamd_stat_redis_on_connect(const struct redisAsyncContext * ac,int status)1633 rspamd_stat_redis_on_connect(const struct redisAsyncContext *ac, int status)
1634 {
1635 struct redis_stat_runtime *rt = (struct redis_stat_runtime *)ac->data;
1636
1637
1638 if (status == REDIS_ERR) {
1639 /*
1640 * We also need to reset rt->redis as it will be subsequently freed without
1641 * calling for redis_on_disconnect callback...
1642 */
1643 if (ev_can_stop (&rt->timeout_event)) {
1644 ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
1645 }
1646 rt->redis = NULL;
1647 }
1648 }
1649
1650 gpointer
rspamd_redis_runtime(struct rspamd_task * task,struct rspamd_statfile_config * stcf,gboolean learn,gpointer c)1651 rspamd_redis_runtime (struct rspamd_task *task,
1652 struct rspamd_statfile_config *stcf,
1653 gboolean learn, gpointer c)
1654 {
1655 struct redis_stat_ctx *ctx = REDIS_CTX (c);
1656 struct redis_stat_runtime *rt;
1657 struct upstream *up;
1658 struct upstream_list *ups;
1659 char *object_expanded = NULL;
1660 rspamd_inet_addr_t *addr;
1661
1662 g_assert (ctx != NULL);
1663 g_assert (stcf != NULL);
1664
1665 if (learn) {
1666 ups = rspamd_redis_get_servers (ctx, "write_servers");
1667
1668 if (!ups) {
1669 msg_err_task ("no write servers defined for %s, cannot learn",
1670 stcf->symbol);
1671 return NULL;
1672 }
1673 up = rspamd_upstream_get (ups,
1674 RSPAMD_UPSTREAM_MASTER_SLAVE,
1675 NULL,
1676 0);
1677 }
1678 else {
1679 ups = rspamd_redis_get_servers (ctx, "read_servers");
1680
1681 if (!ups) {
1682 msg_err_task ("no read servers defined for %s, cannot stat",
1683 stcf->symbol);
1684 return NULL;
1685 }
1686 up = rspamd_upstream_get (ups,
1687 RSPAMD_UPSTREAM_ROUND_ROBIN,
1688 NULL,
1689 0);
1690 }
1691
1692 if (up == NULL) {
1693 msg_err_task ("no upstreams reachable");
1694 return NULL;
1695 }
1696
1697 if (rspamd_redis_expand_object (ctx->redis_object, ctx, task,
1698 &object_expanded) == 0) {
1699 msg_err_task ("expansion for %s failed for symbol %s "
1700 "(maybe learning per user classifier with no user or recipient)",
1701 learn ? "learning" : "classifying",
1702 stcf->symbol);
1703 return NULL;
1704 }
1705
1706 rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
1707 rt->selected = up;
1708 rt->task = task;
1709 rt->ctx = ctx;
1710 rt->stcf = stcf;
1711 rt->redis_object_expanded = object_expanded;
1712
1713 addr = rspamd_upstream_addr_next (up);
1714 g_assert (addr != NULL);
1715
1716 if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
1717 rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
1718 }
1719 else {
1720 rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
1721 rspamd_inet_address_get_port (addr));
1722 }
1723
1724 if (rt->redis == NULL) {
1725 msg_warn_task ("cannot connect to redis server %s: %s",
1726 rspamd_inet_address_to_string_pretty (addr),
1727 strerror (errno));
1728 return NULL;
1729 }
1730 else if (rt->redis->err != REDIS_OK) {
1731 msg_warn_task ("cannot connect to redis server %s: %s",
1732 rspamd_inet_address_to_string_pretty (addr),
1733 rt->redis->errstr);
1734 redisAsyncFree (rt->redis);
1735 rt->redis = NULL;
1736
1737 return NULL;
1738 }
1739
1740 redisLibevAttach (task->event_loop, rt->redis);
1741 rspamd_redis_maybe_auth (ctx, rt->redis);
1742 rt->redis->data = rt;
1743 redisAsyncSetDisconnectCallback (rt->redis, rspamd_stat_redis_on_disconnect);
1744 redisAsyncSetConnectCallback (rt->redis, rspamd_stat_redis_on_connect);
1745
1746 rspamd_mempool_add_destructor (task->task_pool, rspamd_redis_fin, rt);
1747
1748 return rt;
1749 }
1750
1751 void
rspamd_redis_close(gpointer p)1752 rspamd_redis_close (gpointer p)
1753 {
1754 struct redis_stat_ctx *ctx = REDIS_CTX (p);
1755 lua_State *L = ctx->L;
1756
1757 if (ctx->conf_ref) {
1758 luaL_unref (L, LUA_REGISTRYINDEX, ctx->conf_ref);
1759 }
1760
1761 g_free (ctx);
1762 }
1763
1764 gboolean
rspamd_redis_process_tokens(struct rspamd_task * task,GPtrArray * tokens,gint id,gpointer p)1765 rspamd_redis_process_tokens (struct rspamd_task *task,
1766 GPtrArray *tokens,
1767 gint id, gpointer p)
1768 {
1769 struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
1770 const gchar *learned_key = "learns";
1771
1772 if (rspamd_session_blocked (task->s)) {
1773 return FALSE;
1774 }
1775
1776 if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) {
1777 return FALSE;
1778 }
1779
1780 rt->id = id;
1781
1782 if (rt->ctx->new_schema) {
1783 if (rt->ctx->stcf->is_spam) {
1784 learned_key = "learns_spam";
1785 }
1786 else {
1787 learned_key = "learns_ham";
1788 }
1789 }
1790
1791 if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
1792 rt->redis_object_expanded, learned_key) == REDIS_OK) {
1793
1794 rspamd_session_add_event (task->s, NULL, rt, M);
1795 rt->has_event = TRUE;
1796 rt->tokens = g_ptr_array_ref (tokens);
1797
1798 if (ev_can_stop (&rt->timeout_event)) {
1799 rt->timeout_event.repeat = rt->ctx->timeout;
1800 ev_timer_again (task->event_loop, &rt->timeout_event);
1801 }
1802 else {
1803 rt->timeout_event.data = rt;
1804 ev_timer_init (&rt->timeout_event, rspamd_redis_timeout,
1805 rt->ctx->timeout, 0.);
1806 ev_timer_start (task->event_loop, &rt->timeout_event);
1807 }
1808 }
1809
1810 return FALSE;
1811 }
1812
1813 gboolean
rspamd_redis_finalize_process(struct rspamd_task * task,gpointer runtime,gpointer ctx)1814 rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime,
1815 gpointer ctx)
1816 {
1817 struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
1818
1819 if (rt->err) {
1820 msg_info_task ("cannot retrieve stat tokens from Redis: %e", rt->err);
1821 g_error_free (rt->err);
1822 rt->err = NULL;
1823 rspamd_redis_fin (rt);
1824
1825 return FALSE;
1826 }
1827
1828 rspamd_redis_fin (rt);
1829
1830 return TRUE;
1831 }
1832
1833 gboolean
rspamd_redis_learn_tokens(struct rspamd_task * task,GPtrArray * tokens,gint id,gpointer p)1834 rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
1835 gint id, gpointer p)
1836 {
1837 struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
1838 rspamd_fstring_t *query;
1839 const gchar *redis_cmd;
1840 rspamd_token_t *tok;
1841 gint ret;
1842 goffset off;
1843 const gchar *learned_key = "learns";
1844
1845 if (rspamd_session_blocked (task->s)) {
1846 return FALSE;
1847 }
1848
1849 if (rt->ctx->new_schema) {
1850 if (rt->ctx->stcf->is_spam) {
1851 learned_key = "learns_spam";
1852 }
1853 else {
1854 learned_key = "learns_ham";
1855 }
1856 }
1857
1858 /*
1859 * Add the current key to the set of learned keys
1860 */
1861 redisAsyncCommand (rt->redis, NULL, NULL, "SADD %s_keys %s",
1862 rt->stcf->symbol, rt->redis_object_expanded);
1863
1864 if (rt->ctx->new_schema) {
1865 redisAsyncCommand (rt->redis, NULL, NULL, "HSET %s version 2",
1866 rt->redis_object_expanded);
1867 }
1868
1869 if (rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER) {
1870 redis_cmd = "HINCRBY";
1871 }
1872 else {
1873 redis_cmd = "HINCRBYFLOAT";
1874 }
1875
1876 rt->id = id;
1877 query = rspamd_redis_tokens_to_query (task, rt, tokens,
1878 redis_cmd, rt->redis_object_expanded, TRUE, id,
1879 rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER);
1880 g_assert (query != NULL);
1881 query->len = 0;
1882
1883 /*
1884 * XXX:
1885 * Dirty hack: we get a token and check if it's value is -1 or 1, so
1886 * we could understand that we are learning or unlearning
1887 */
1888
1889 tok = g_ptr_array_index (task->tokens, 0);
1890
1891 if (tok->values[id] > 0) {
1892 rspamd_printf_fstring (&query, ""
1893 "*4\r\n"
1894 "$7\r\n"
1895 "HINCRBY\r\n"
1896 "$%d\r\n"
1897 "%s\r\n"
1898 "$%d\r\n"
1899 "%s\r\n" /* Learned key */
1900 "$1\r\n"
1901 "1\r\n",
1902 (gint)strlen (rt->redis_object_expanded),
1903 rt->redis_object_expanded,
1904 (gint)strlen (learned_key),
1905 learned_key);
1906 }
1907 else {
1908 rspamd_printf_fstring (&query, ""
1909 "*4\r\n"
1910 "$7\r\n"
1911 "HINCRBY\r\n"
1912 "$%d\r\n"
1913 "%s\r\n"
1914 "$%d\r\n"
1915 "%s\r\n" /* Learned key */
1916 "$2\r\n"
1917 "-1\r\n",
1918 (gint)strlen (rt->redis_object_expanded),
1919 rt->redis_object_expanded,
1920 (gint)strlen (learned_key),
1921 learned_key);
1922 }
1923
1924 ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL,
1925 query->str, query->len);
1926
1927 if (ret != REDIS_OK) {
1928 msg_err_task ("call to redis failed: %s", rt->redis->errstr);
1929 rspamd_fstring_free (query);
1930
1931 return FALSE;
1932 }
1933
1934 off = query->len;
1935 ret = rspamd_printf_fstring (&query, "*1\r\n$4\r\nEXEC\r\n");
1936 ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_learned, rt,
1937 query->str + off, ret);
1938 rspamd_mempool_add_destructor (task->task_pool,
1939 (rspamd_mempool_destruct_t)rspamd_fstring_free, query);
1940
1941 if (ret == REDIS_OK) {
1942
1943 /* Add signature if needed */
1944 if (rt->ctx->enable_signatures) {
1945 rspamd_redis_store_stat_signature (task, rt, tokens,
1946 "RSIG");
1947 }
1948
1949 rspamd_session_add_event (task->s, NULL, rt, M);
1950 rt->has_event = TRUE;
1951
1952 /* Set timeout */
1953 if (ev_can_stop (&rt->timeout_event)) {
1954 rt->timeout_event.repeat = rt->ctx->timeout;
1955 ev_timer_again (task->event_loop, &rt->timeout_event);
1956 }
1957 else {
1958 rt->timeout_event.data = rt;
1959 ev_timer_init (&rt->timeout_event, rspamd_redis_timeout,
1960 rt->ctx->timeout, 0.);
1961 ev_timer_start (task->event_loop, &rt->timeout_event);
1962 }
1963
1964 return TRUE;
1965 }
1966 else {
1967 msg_err_task ("call to redis failed: %s", rt->redis->errstr);
1968 }
1969
1970 return FALSE;
1971 }
1972
1973
1974 gboolean
rspamd_redis_finalize_learn(struct rspamd_task * task,gpointer runtime,gpointer ctx,GError ** err)1975 rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime,
1976 gpointer ctx, GError **err)
1977 {
1978 struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
1979
1980 if (rt->err) {
1981 g_propagate_error (err, rt->err);
1982 rt->err = NULL;
1983 rspamd_redis_fin (rt);
1984
1985 return FALSE;
1986 }
1987
1988 rspamd_redis_fin (rt);
1989
1990 return TRUE;
1991 }
1992
1993 gulong
rspamd_redis_total_learns(struct rspamd_task * task,gpointer runtime,gpointer ctx)1994 rspamd_redis_total_learns (struct rspamd_task *task, gpointer runtime,
1995 gpointer ctx)
1996 {
1997 struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
1998
1999 return rt->learned;
2000 }
2001
2002 gulong
rspamd_redis_inc_learns(struct rspamd_task * task,gpointer runtime,gpointer ctx)2003 rspamd_redis_inc_learns (struct rspamd_task *task, gpointer runtime,
2004 gpointer ctx)
2005 {
2006 struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
2007
2008 /* XXX: may cause races */
2009 return rt->learned + 1;
2010 }
2011
2012 gulong
rspamd_redis_dec_learns(struct rspamd_task * task,gpointer runtime,gpointer ctx)2013 rspamd_redis_dec_learns (struct rspamd_task *task, gpointer runtime,
2014 gpointer ctx)
2015 {
2016 struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
2017
2018 /* XXX: may cause races */
2019 return rt->learned + 1;
2020 }
2021
2022 gulong
rspamd_redis_learns(struct rspamd_task * task,gpointer runtime,gpointer ctx)2023 rspamd_redis_learns (struct rspamd_task *task, gpointer runtime,
2024 gpointer ctx)
2025 {
2026 struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
2027
2028 return rt->learned;
2029 }
2030
2031 ucl_object_t *
rspamd_redis_get_stat(gpointer runtime,gpointer ctx)2032 rspamd_redis_get_stat (gpointer runtime,
2033 gpointer ctx)
2034 {
2035 struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
2036 struct rspamd_redis_stat_elt *st;
2037 redisAsyncContext *redis;
2038
2039 if (rt->ctx->stat_elt) {
2040 st = rt->ctx->stat_elt->ud;
2041
2042 if (rt->redis) {
2043 redis = rt->redis;
2044 rt->redis = NULL;
2045 redisAsyncFree (redis);
2046 }
2047
2048 if (st->stat) {
2049 return ucl_object_ref (st->stat);
2050 }
2051 }
2052
2053 return NULL;
2054 }
2055
2056 gpointer
rspamd_redis_load_tokenizer_config(gpointer runtime,gsize * len)2057 rspamd_redis_load_tokenizer_config (gpointer runtime,
2058 gsize *len)
2059 {
2060 return NULL;
2061 }
2062