1 /*-
2 * Copyright 2016 Vsevolod Stakhov
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "lua_common.h"
17 #include "lua_thread_pool.h"
18 #include "utlist.h"
19
20 #include "contrib/hiredis/hiredis.h"
21 #include "contrib/hiredis/async.h"
22
23 #define REDIS_DEFAULT_TIMEOUT 1.0
24
25 static const gchar *M = "rspamd lua redis";
26 static void *redis_null;
27
28 /***
29 * @module rspamd_redis
30 * This module implements redis asynchronous client for rspamd LUA API.
31 * Here is an example of using of this module:
32 * @example
33 local rspamd_redis = require "rspamd_redis"
34 local rspamd_logger = require "rspamd_logger"
35
36 local function symbol_callback(task)
37 local redis_key = 'some_key'
38 local function redis_cb(err, data)
39 if not err then
40 rspamd_logger.infox('redis returned %1=%2', redis_key, data)
41 end
42 end
43
44 rspamd_redis.make_request(task, "127.0.0.1:6379", redis_cb,
45 'GET', {redis_key})
46 -- or in table form:
47 -- rspamd_redis.make_request({task=task, host="127.0.0.1:6379,
48 -- callback=redis_cb, timeout=2.0, cmd='GET', args={redis_key}})
49 end
50 */
51
52 LUA_FUNCTION_DEF (redis, make_request);
53 LUA_FUNCTION_DEF (redis, make_request_sync);
54 LUA_FUNCTION_DEF (redis, connect);
55 LUA_FUNCTION_DEF (redis, connect_sync);
56 LUA_FUNCTION_DEF (redis, add_cmd);
57 LUA_FUNCTION_DEF (redis, exec);
58 LUA_FUNCTION_DEF (redis, gc);
59
60 static const struct luaL_reg redislib_f[] = {
61 LUA_INTERFACE_DEF (redis, make_request),
62 LUA_INTERFACE_DEF (redis, make_request_sync),
63 LUA_INTERFACE_DEF (redis, connect),
64 LUA_INTERFACE_DEF (redis, connect_sync),
65 {NULL, NULL}
66 };
67
68 static const struct luaL_reg redislib_m[] = {
69 LUA_INTERFACE_DEF (redis, add_cmd),
70 LUA_INTERFACE_DEF (redis, exec),
71 {"__gc", lua_redis_gc},
72 {"__tostring", rspamd_lua_class_tostring},
73 {NULL, NULL}
74 };
75
76 #undef REDIS_DEBUG_REFS
77 #ifdef REDIS_DEBUG_REFS
78 #define REDIS_RETAIN(x) do { \
79 msg_err ("retain ref %p, refcount: %d", (x), (x)->ref.refcount); \
80 REF_RETAIN(x); \
81 } while (0)
82
83 #define REDIS_RELEASE(x) do { \
84 msg_err ("release ref %p, refcount: %d", (x), (x)->ref.refcount); \
85 REF_RELEASE(x); \
86 } while (0)
87 #else
88 #define REDIS_RETAIN REF_RETAIN
89 #define REDIS_RELEASE REF_RELEASE
90 #endif
91
92 #ifdef WITH_HIREDIS
93 struct lua_redis_request_specific_userdata;
94 /**
95 * Struct for userdata representation
96 */
97 struct lua_redis_userdata {
98 redisAsyncContext *ctx;
99 struct rspamd_task *task;
100 struct rspamd_symcache_item *item;
101 struct rspamd_async_session *s;
102 struct ev_loop *event_loop;
103 struct rspamd_config *cfg;
104 struct rspamd_redis_pool *pool;
105 gchar *server;
106 gchar log_tag[RSPAMD_LOG_ID_LEN + 1];
107 struct lua_redis_request_specific_userdata *specific;
108 gdouble timeout;
109 guint16 port;
110 guint16 terminated;
111 };
112
113 #define msg_debug_lua_redis(...) rspamd_conditional_debug_fast (NULL, NULL, \
114 rspamd_lua_redis_log_id, "lua_redis", ud->log_tag, \
115 G_STRFUNC, \
116 __VA_ARGS__)
117 INIT_LOG_MODULE(lua_redis)
118
119 #define LUA_REDIS_SPECIFIC_REPLIED (1 << 0)
120 /* session was finished */
121 #define LUA_REDIS_SPECIFIC_FINISHED (1 << 1)
122 #define LUA_REDIS_ASYNC (1 << 0)
123 #define LUA_REDIS_TEXTDATA (1 << 1)
124 #define LUA_REDIS_TERMINATED (1 << 2)
125 #define LUA_REDIS_NO_POOL (1 << 3)
126 #define LUA_REDIS_SUBSCRIBED (1 << 4)
127 #define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)
128
129 struct lua_redis_request_specific_userdata {
130 gint cbref;
131 guint nargs;
132 gchar **args;
133 gsize *arglens;
134 struct lua_redis_userdata *c;
135 struct lua_redis_ctx *ctx;
136 struct lua_redis_request_specific_userdata *next;
137 ev_timer timeout_ev;
138 guint flags;
139 };
140
141 struct lua_redis_ctx {
142 guint flags;
143 struct lua_redis_userdata async;
144 guint cmds_pending;
145 ref_entry_t ref;
146 GQueue *replies; /* for sync connection only */
147 GQueue *events_cleanup; /* for sync connection only */
148 struct thread_entry *thread; /* for sync mode, set only if there was yield */
149 };
150
151 struct lua_redis_result {
152 gboolean is_error;
153 gint result_ref;
154 struct rspamd_symcache_item *item;
155 struct rspamd_async_session *s;
156 struct rspamd_task *task;
157 struct lua_redis_request_specific_userdata *sp_ud;
158 };
159
160 static struct lua_redis_ctx *
lua_check_redis(lua_State * L,gint pos)161 lua_check_redis (lua_State * L, gint pos)
162 {
163 void *ud = rspamd_lua_check_udata (L, pos, "rspamd{redis}");
164 luaL_argcheck (L, ud != NULL, pos, "'redis' expected");
165 return ud ? *((struct lua_redis_ctx **)ud) : NULL;
166 }
167
168 static void
lua_redis_free_args(char ** args,gsize * arglens,guint nargs)169 lua_redis_free_args (char **args, gsize *arglens, guint nargs)
170 {
171 guint i;
172
173 if (args) {
174 for (i = 0; i < nargs; i ++) {
175 g_free (args[i]);
176 }
177
178 g_free (args);
179 g_free (arglens);
180 }
181 }
182
183 static void
lua_redis_dtor(struct lua_redis_ctx * ctx)184 lua_redis_dtor (struct lua_redis_ctx *ctx)
185 {
186 struct lua_redis_userdata *ud;
187 struct lua_redis_request_specific_userdata *cur, *tmp;
188 gboolean is_successful = TRUE;
189 struct redisAsyncContext *ac;
190
191 ud = &ctx->async;
192 msg_debug_lua_redis ("desctructing %p", ctx);
193
194 if (ud->ctx) {
195
196 LL_FOREACH_SAFE (ud->specific, cur, tmp) {
197 ev_timer_stop (ud->event_loop, &cur->timeout_ev);
198
199 if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
200 is_successful = FALSE;
201 }
202
203 cur->flags |= LUA_REDIS_SPECIFIC_FINISHED;
204 }
205
206 ctx->flags |= LUA_REDIS_TERMINATED;
207
208 ud->terminated = 1;
209 ac = ud->ctx;
210 ud->ctx = NULL;
211
212 if (!is_successful) {
213 rspamd_redis_pool_release_connection (ud->pool, ac,
214 RSPAMD_REDIS_RELEASE_FATAL);
215 }
216 else {
217 rspamd_redis_pool_release_connection (ud->pool, ac,
218 (ctx->flags & LUA_REDIS_NO_POOL) ?
219 RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
220 }
221
222 }
223
224 LL_FOREACH_SAFE (ud->specific, cur, tmp) {
225 lua_redis_free_args (cur->args, cur->arglens, cur->nargs);
226
227 if (cur->cbref != -1) {
228 luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
229 }
230
231 g_free (cur);
232 }
233
234 if (ctx->events_cleanup) {
235 g_queue_free (ctx->events_cleanup);
236 ctx->events_cleanup = NULL;
237 }
238 if (ctx->replies) {
239 g_queue_free (ctx->replies);
240 ctx->replies = NULL;
241 }
242
243 g_free (ctx);
244 }
245
246 static gint
lua_redis_gc(lua_State * L)247 lua_redis_gc (lua_State *L)
248 {
249 struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
250
251 if (ctx) {
252 REDIS_RELEASE (ctx);
253 }
254
255 return 0;
256 }
257
258 static void
lua_redis_fin(void * arg)259 lua_redis_fin (void *arg)
260 {
261 struct lua_redis_request_specific_userdata *sp_ud = arg;
262 struct lua_redis_userdata *ud;
263 struct lua_redis_ctx *ctx;
264
265 ctx = sp_ud->ctx;
266 ud = sp_ud->c;
267
268 if (ev_can_stop (&sp_ud->timeout_ev)) {
269 ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
270 }
271
272 msg_debug_lua_redis ("finished redis query %p from session %p; refcount=%d",
273 sp_ud, ctx, ctx->ref.refcount);
274 sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED;
275
276 REDIS_RELEASE (ctx);
277 }
278
279 /**
280 * Push error of redis request to lua callback
281 * @param code
282 * @param ud
283 */
284 static void
lua_redis_push_error(const gchar * err,struct lua_redis_ctx * ctx,struct lua_redis_request_specific_userdata * sp_ud,gboolean connected)285 lua_redis_push_error (const gchar *err,
286 struct lua_redis_ctx *ctx,
287 struct lua_redis_request_specific_userdata *sp_ud,
288 gboolean connected)
289 {
290 struct lua_redis_userdata *ud = sp_ud->c;
291 struct lua_callback_state cbs;
292 lua_State *L;
293
294 if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
295 if (sp_ud->cbref != -1) {
296
297 lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
298 L = cbs.L;
299
300 lua_pushcfunction (L, &rspamd_lua_traceback);
301 int err_idx = lua_gettop (L);
302 /* Push error */
303 lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
304
305 /* String of error */
306 lua_pushstring (cbs.L, err);
307 /* Data is nil */
308 lua_pushnil (cbs.L);
309
310 if (ud->item) {
311 rspamd_symcache_set_cur_item (ud->task, ud->item);
312 }
313
314 if (lua_pcall (cbs.L, 2, 0, err_idx) != 0) {
315 msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1));
316 }
317
318 lua_settop (L, err_idx - 1);
319 lua_thread_pool_restore_callback (&cbs);
320 }
321
322 sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
323
324 if (connected && ud->s) {
325 if (ud->item) {
326 rspamd_symcache_item_async_dec_check (ud->task, ud->item, M);
327 }
328
329 rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
330 }
331 else {
332 lua_redis_fin (sp_ud);
333 }
334 }
335 }
336
337 static void
lua_redis_push_reply(lua_State * L,const redisReply * r,gboolean text_data)338 lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data)
339 {
340 guint i;
341 struct rspamd_lua_text *t;
342
343 switch (r->type) {
344 case REDIS_REPLY_INTEGER:
345 lua_pushinteger (L, r->integer);
346 break;
347 case REDIS_REPLY_NIL:
348 lua_getfield (L, LUA_REGISTRYINDEX, "redis.null");
349 break;
350 case REDIS_REPLY_STRING:
351 case REDIS_REPLY_STATUS:
352 if (text_data) {
353 t = lua_newuserdata (L, sizeof (*t));
354 rspamd_lua_setclass (L, "rspamd{text}", -1);
355 t->flags = 0;
356 t->start = r->str;
357 t->len = r->len;
358 }
359 else {
360 lua_pushlstring (L, r->str, r->len);
361 }
362 break;
363 case REDIS_REPLY_ARRAY:
364 lua_createtable (L, r->elements, 0);
365 for (i = 0; i < r->elements; ++i) {
366 lua_redis_push_reply (L, r->element[i], text_data);
367 lua_rawseti (L, -2, i + 1); /* Store sub-reply */
368 }
369 break;
370 default: /* should not happen */
371 msg_info ("unknown reply type: %d", r->type);
372 break;
373 }
374 }
375
376 /**
377 * Push data of redis request to lua callback
378 * @param r redis reply data
379 * @param ud
380 */
381 static void
lua_redis_push_data(const redisReply * r,struct lua_redis_ctx * ctx,struct lua_redis_request_specific_userdata * sp_ud)382 lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
383 struct lua_redis_request_specific_userdata *sp_ud)
384 {
385 struct lua_redis_userdata *ud = sp_ud->c;
386 struct lua_callback_state cbs;
387 lua_State *L;
388
389 if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED)) ||
390 (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
391 if (sp_ud->cbref != -1) {
392 lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
393 L = cbs.L;
394
395 lua_pushcfunction (L, &rspamd_lua_traceback);
396 int err_idx = lua_gettop (L);
397 /* Push error */
398 lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
399 /* Error is nil */
400 lua_pushnil (cbs.L);
401 /* Data */
402 lua_redis_push_reply (cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA);
403
404 if (ud->item) {
405 rspamd_symcache_set_cur_item (ud->task, ud->item);
406 }
407
408 gint ret = lua_pcall (cbs.L, 2, 0, err_idx);
409
410 if (ret != 0) {
411 msg_info ("call to lua_redis callback failed (%d): %s",
412 ret, lua_tostring (cbs.L, -1));
413 }
414
415 lua_settop (L, err_idx - 1);
416 lua_thread_pool_restore_callback (&cbs);
417 }
418
419 if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) {
420 if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
421 if (ev_can_stop (&sp_ud->timeout_ev)) {
422 ev_timer_stop (sp_ud->ctx->async.event_loop,
423 &sp_ud->timeout_ev);
424 }
425 }
426 }
427
428 sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
429
430 if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
431 if (ud->s) {
432 if (ud->item) {
433 rspamd_symcache_item_async_dec_check (ud->task,
434 ud->item, M);
435 }
436
437 rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
438 }
439 else {
440 lua_redis_fin (sp_ud);
441 }
442 }
443 }
444 }
445
446 /**
447 * Callback for redis replies
448 * @param c context of redis connection
449 * @param r redis reply
450 * @param priv userdata
451 */
452 static void
lua_redis_callback(redisAsyncContext * c,gpointer r,gpointer priv)453 lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
454 {
455 redisReply *reply = r;
456 struct lua_redis_request_specific_userdata *sp_ud = priv;
457 struct lua_redis_ctx *ctx;
458 struct lua_redis_userdata *ud;
459 redisAsyncContext *ac;
460
461 ctx = sp_ud->ctx;
462 ud = sp_ud->c;
463
464 if (ud->terminated) {
465 /* We are already at the termination stage, just go out */
466 return;
467 }
468
469 msg_debug_lua_redis ("got reply from redis %p for query %p", sp_ud->c->ctx,
470 sp_ud);
471
472 REDIS_RETAIN (ctx);
473
474 /* If session is finished, we cannot call lua callbacks */
475 if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) ||
476 (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
477 if (c->err == 0) {
478 if (r != NULL) {
479 if (reply->type != REDIS_REPLY_ERROR) {
480 lua_redis_push_data (reply, ctx, sp_ud);
481 }
482 else {
483 lua_redis_push_error (reply->str, ctx, sp_ud, TRUE);
484 }
485 }
486 else {
487 lua_redis_push_error ("received no data from server", ctx, sp_ud, TRUE);
488 }
489 }
490 else {
491 if (c->err == REDIS_ERR_IO) {
492 lua_redis_push_error (strerror (errno), ctx, sp_ud, TRUE);
493 }
494 else {
495 lua_redis_push_error (c->errstr, ctx, sp_ud, TRUE);
496 }
497 }
498 }
499
500 if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
501 ctx->cmds_pending--;
502
503 if (ctx->cmds_pending == 0 && !ud->terminated) {
504 /* Disconnect redis early as we don't need it anymore */
505 ud->terminated = 1;
506 ac = ud->ctx;
507 ud->ctx = NULL;
508
509 if (ac) {
510 msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d",
511 ud, ctx, ctx->ref.refcount);
512 rspamd_redis_pool_release_connection (ud->pool, ac,
513 (ctx->flags & LUA_REDIS_NO_POOL) ?
514 RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
515 }
516 }
517 }
518
519 REDIS_RELEASE (ctx);
520 }
521
522 static gint
lua_redis_push_results(struct lua_redis_ctx * ctx,lua_State * L)523 lua_redis_push_results (struct lua_redis_ctx *ctx, lua_State *L)
524 {
525 gint results = g_queue_get_length (ctx->replies);
526 gint i;
527 gboolean can_use_lua = TRUE;
528
529 results = g_queue_get_length (ctx->replies);
530
531 if (!lua_checkstack (L, (results * 2) + 1)) {
532 luaL_error (L, "cannot resize stack to fit %d commands",
533 ctx->cmds_pending);
534
535 can_use_lua = FALSE;
536 }
537
538 for (i = 0; i < results; i ++) {
539 struct lua_redis_result *result = g_queue_pop_head (ctx->replies);
540
541 if (can_use_lua) {
542 lua_pushboolean (L, !result->is_error);
543 lua_rawgeti (L, LUA_REGISTRYINDEX, result->result_ref);
544 }
545
546 luaL_unref (L, LUA_REGISTRYINDEX, result->result_ref);
547
548 g_queue_push_tail (ctx->events_cleanup, result);
549 }
550
551 return can_use_lua ? results * 2 : 0;
552 }
553
554 static void
lua_redis_cleanup_events(struct lua_redis_ctx * ctx)555 lua_redis_cleanup_events (struct lua_redis_ctx *ctx)
556 {
557 REDIS_RETAIN (ctx); /* To avoid preliminary destruction */
558
559 while (!g_queue_is_empty (ctx->events_cleanup)) {
560 struct lua_redis_result *result = g_queue_pop_head (ctx->events_cleanup);
561
562 if (result->item) {
563 rspamd_symcache_item_async_dec_check (result->task, result->item, M);
564 }
565
566 if (result->s) {
567 rspamd_session_remove_event (result->s, lua_redis_fin, result->sp_ud);
568 }
569 else {
570 lua_redis_fin (result->sp_ud);
571 }
572
573 g_free (result);
574 }
575
576 REDIS_RELEASE (ctx);
577 }
578
579 /**
580 * Callback for redis replies
581 * @param c context of redis connection
582 * @param r redis reply
583 * @param priv userdata
584 */
585 static void
lua_redis_callback_sync(redisAsyncContext * ac,gpointer r,gpointer priv)586 lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
587 {
588 redisReply *reply = r;
589
590 struct lua_redis_request_specific_userdata *sp_ud = priv;
591 struct lua_redis_ctx *ctx;
592 struct lua_redis_userdata *ud;
593 struct thread_entry* thread;
594 gint results;
595
596 ctx = sp_ud->ctx;
597 ud = sp_ud->c;
598 lua_State *L = ctx->async.cfg->lua_state;
599
600 sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
601
602 if (ud->terminated) {
603 /* We are already at the termination stage, just go out */
604 /* TODO:
605 if somebody is waiting for us (ctx->thread), return result,
606 otherwise, indeed, ignore
607 */
608 return;
609 }
610
611 if (ev_can_stop ( &sp_ud->timeout_ev)) {
612 ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
613 }
614
615 if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
616 msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
617
618 struct lua_redis_result *result = g_malloc0 (sizeof *result);
619
620 if (ac->err == 0) {
621 if (r != NULL) {
622 if (reply->type != REDIS_REPLY_ERROR) {
623 result->is_error = FALSE;
624 lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
625 }
626 else {
627 result->is_error = TRUE;
628 lua_pushstring (L, reply->str);
629 }
630 }
631 else {
632 result->is_error = TRUE;
633 lua_pushliteral (L, "received no data from server");
634 }
635 }
636 else {
637 result->is_error = TRUE;
638 if (ac->err == REDIS_ERR_IO) {
639 lua_pushstring (L, strerror (errno));
640 }
641 else {
642 lua_pushstring (L, ac->errstr);
643 }
644 }
645
646 /* if error happened, we should terminate the connection,
647 and release it */
648
649 if (result->is_error && sp_ud->c->ctx) {
650 ac = sp_ud->c->ctx;
651 /* Set to NULL to avoid double free in dtor */
652 sp_ud->c->ctx = NULL;
653 ctx->flags |= LUA_REDIS_TERMINATED;
654
655 /*
656 * This will call all callbacks pending so the entire context
657 * will be destructed
658 */
659 rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
660 RSPAMD_REDIS_RELEASE_FATAL);
661 }
662
663 result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
664 result->s = ud->s;
665 result->item = ud->item;
666 result->task = ud->task;
667 result->sp_ud = sp_ud;
668
669 g_queue_push_tail (ctx->replies, result);
670
671 }
672
673 ctx->cmds_pending --;
674
675 if (ctx->cmds_pending == 0) {
676 if (ctx->thread) {
677 if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
678 /* somebody yielded and waits for results */
679 thread = ctx->thread;
680 ctx->thread = NULL;
681
682 results = lua_redis_push_results(ctx, thread->lua_state);
683 lua_thread_resume (thread, results);
684 lua_redis_cleanup_events(ctx);
685 }
686 else {
687 /* We cannot resume the thread as the associated task has gone */
688 lua_thread_pool_terminate_entry_full (ud->cfg->lua_thread_pool,
689 ctx->thread, G_STRLOC, true);
690 ctx->thread = NULL;
691 }
692 }
693 }
694
695 }
696
697 static void
lua_redis_timeout_sync(EV_P_ ev_timer * w,int revents)698 lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents)
699 {
700 struct lua_redis_request_specific_userdata *sp_ud =
701 (struct lua_redis_request_specific_userdata *)w->data;
702 struct lua_redis_ctx *ctx;
703 struct lua_redis_userdata *ud;
704 redisAsyncContext *ac;
705
706 if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
707 return;
708 }
709
710 ud = sp_ud->c;
711 ctx = sp_ud->ctx;
712 msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud,
713 sp_ud->c->ctx);
714
715 if (sp_ud->c->ctx) {
716 ac = sp_ud->c->ctx;
717
718 /* Set to NULL to avoid double free in dtor */
719 sp_ud->c->ctx = NULL;
720 ac->err = REDIS_ERR_IO;
721 errno = ETIMEDOUT;
722 ctx->flags |= LUA_REDIS_TERMINATED;
723
724 /*
725 * This will call all callbacks pending so the entire context
726 * will be destructed
727 */
728 rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
729 RSPAMD_REDIS_RELEASE_FATAL);
730 }
731 }
732
733 static void
lua_redis_timeout(EV_P_ ev_timer * w,int revents)734 lua_redis_timeout (EV_P_ ev_timer *w, int revents)
735 {
736 struct lua_redis_request_specific_userdata *sp_ud =
737 (struct lua_redis_request_specific_userdata *)w->data;
738 struct lua_redis_userdata *ud;
739 struct lua_redis_ctx *ctx;
740 redisAsyncContext *ac;
741
742 if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
743 return;
744 }
745
746 ctx = sp_ud->ctx;
747 ud = sp_ud->c;
748
749 REDIS_RETAIN (ctx);
750 msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud,
751 sp_ud->c->ctx);
752 lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, TRUE);
753
754 if (sp_ud->c->ctx) {
755 ac = sp_ud->c->ctx;
756 /* Set to NULL to avoid double free in dtor */
757 sp_ud->c->ctx = NULL;
758 ac->err = REDIS_ERR_IO;
759 errno = ETIMEDOUT;
760 /*
761 * This will call all callbacks pending so the entire context
762 * will be destructed
763 */
764 rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
765 RSPAMD_REDIS_RELEASE_FATAL);
766 }
767
768 REDIS_RELEASE (ctx);
769 }
770
771
772 static void
lua_redis_parse_args(lua_State * L,gint idx,const gchar * cmd,gchar *** pargs,gsize ** parglens,guint * nargs)773 lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd,
774 gchar ***pargs, gsize **parglens, guint *nargs)
775 {
776 gchar **args = NULL;
777 gsize *arglens;
778 gint top;
779
780 if (idx != 0 && lua_type (L, idx) == LUA_TTABLE) {
781 /* Get all arguments */
782 lua_pushvalue (L, idx);
783 lua_pushnil (L);
784 top = 0;
785
786 while (lua_next (L, -2) != 0) {
787 gint type = lua_type (L, -1);
788
789 if (type == LUA_TNUMBER || type == LUA_TSTRING ||
790 type == LUA_TUSERDATA) {
791 top ++;
792 }
793 lua_pop (L, 1);
794 }
795
796 args = g_malloc ((top + 1) * sizeof (gchar *));
797 arglens = g_malloc ((top + 1) * sizeof (gsize));
798 arglens[0] = strlen (cmd);
799 args[0] = g_malloc (arglens[0]);
800 memcpy (args[0], cmd, arglens[0]);
801 top = 1;
802 lua_pushnil (L);
803
804 while (lua_next (L, -2) != 0) {
805 gint type = lua_type (L, -1);
806
807 if (type == LUA_TSTRING) {
808 const gchar *s;
809
810 s = lua_tolstring (L, -1, &arglens[top]);
811 args[top] = g_malloc (arglens[top]);
812 memcpy (args[top], s, arglens[top]);
813 top ++;
814 }
815 else if (type == LUA_TUSERDATA) {
816 struct rspamd_lua_text *t;
817
818 t = lua_check_text (L, -1);
819
820 if (t && t->start) {
821 arglens[top] = t->len;
822 args[top] = g_malloc (arglens[top]);
823 memcpy (args[top], t->start, arglens[top]);
824 top ++;
825 }
826 }
827 else if (type == LUA_TNUMBER) {
828 gdouble val = lua_tonumber (L, -1);
829 gint r;
830 gchar numbuf[64];
831
832 if (val == (gdouble)((gint64)val)) {
833 r = rspamd_snprintf (numbuf, sizeof (numbuf), "%L",
834 (gint64)val);
835 }
836 else {
837 r = rspamd_snprintf (numbuf, sizeof (numbuf), "%f",
838 val);
839 }
840
841 arglens[top] = r;
842 args[top] = g_malloc (arglens[top]);
843 memcpy (args[top], numbuf, arglens[top]);
844 top ++;
845 }
846
847 lua_pop (L, 1);
848 }
849
850 lua_pop (L, 1);
851 }
852 else {
853 /* Use merely cmd */
854
855 args = g_malloc (sizeof (gchar *));
856 arglens = g_malloc (sizeof (gsize));
857 arglens[0] = strlen (cmd);
858 args[0] = g_malloc (arglens[0]);
859 memcpy (args[0], cmd, arglens[0]);
860 top = 1;
861 }
862
863 *pargs = args;
864 *parglens = arglens;
865 *nargs = top;
866 }
867
868 static struct lua_redis_ctx *
rspamd_lua_redis_prepare_connection(lua_State * L,gint * pcbref,gboolean is_async)869 rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_async)
870 {
871 struct lua_redis_ctx *ctx = NULL;
872 rspamd_inet_addr_t *ip = NULL;
873 struct lua_redis_userdata *ud = NULL;
874 struct rspamd_lua_ip *addr = NULL;
875 struct rspamd_task *task = NULL;
876 const gchar *host = NULL;
877 const gchar *password = NULL, *dbname = NULL, *log_tag = NULL;
878 gint cbref = -1;
879 struct rspamd_config *cfg = NULL;
880 struct rspamd_async_session *session = NULL;
881 struct ev_loop *ev_base = NULL;
882 gboolean ret = FALSE;
883 guint flags = 0;
884
885 if (lua_istable (L, 1)) {
886 /* Table version */
887 lua_pushvalue (L, 1);
888 lua_pushstring (L, "task");
889 lua_gettable (L, -2);
890 if (lua_type (L, -1) == LUA_TUSERDATA) {
891 task = lua_check_task_maybe (L, -1);
892 }
893 lua_pop (L, 1);
894
895 if (!task) {
896 /* We need to get ev_base, config and session separately */
897 lua_pushstring (L, "config");
898 lua_gettable (L, -2);
899 if (lua_type (L, -1) == LUA_TUSERDATA) {
900 cfg = lua_check_config (L, -1);
901 }
902 lua_pop (L, 1);
903
904 lua_pushstring (L, "session");
905 lua_gettable (L, -2);
906 if (lua_type (L, -1) == LUA_TUSERDATA) {
907 session = lua_check_session (L, -1);
908 }
909 lua_pop (L, 1);
910
911 lua_pushstring (L, "ev_base");
912 lua_gettable (L, -2);
913 if (lua_type (L, -1) == LUA_TUSERDATA) {
914 ev_base = lua_check_ev_base (L, -1);
915 }
916 lua_pop (L, 1);
917
918 if (cfg && ev_base) {
919 ret = TRUE;
920 }
921 else if (!cfg) {
922 msg_err_task_check ("config is not passed");
923 }
924 else {
925 msg_err_task_check ("ev_base is not set");
926 }
927 }
928 else {
929 cfg = task->cfg;
930 session = task->s;
931 ev_base = task->event_loop;
932 log_tag = task->task_pool->tag.uid;
933 ret = TRUE;
934
935 }
936
937 if (pcbref) {
938 lua_pushstring (L, "callback");
939 lua_gettable (L, -2);
940 if (lua_type (L, -1) == LUA_TFUNCTION) {
941 /* This also pops function from the stack */
942 cbref = luaL_ref (L, LUA_REGISTRYINDEX);
943 *pcbref = cbref;
944 }
945 else {
946 *pcbref = -1;
947 lua_pop (L, 1);
948 }
949 }
950
951 lua_pushstring (L, "host");
952 lua_gettable (L, -2);
953
954 if (lua_type (L, -1) == LUA_TUSERDATA) {
955 addr = lua_check_ip (L, -1);
956 host = rspamd_inet_address_to_string_pretty (addr->addr);
957 }
958 else if (lua_type (L, -1) == LUA_TSTRING) {
959 host = lua_tostring (L, -1);
960
961 if (rspamd_parse_inet_address (&ip,
962 host, strlen (host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
963 addr = g_alloca (sizeof (*addr));
964 addr->addr = ip;
965
966 if (rspamd_inet_address_get_port (ip) == 0) {
967 rspamd_inet_address_set_port (ip, 6379);
968 }
969 }
970 }
971 lua_pop (L, 1);
972
973 lua_pushstring (L, "password");
974 lua_gettable (L, -2);
975 if (lua_type (L, -1) == LUA_TSTRING) {
976 password = lua_tostring (L, -1);
977 }
978 lua_pop (L, 1);
979
980 lua_pushstring (L, "dbname");
981 lua_gettable (L, -2);
982 if (lua_type (L, -1) == LUA_TSTRING) {
983 dbname = lua_tostring (L, -1);
984 }
985 lua_pop (L, 1);
986
987 lua_pushstring (L, "opaque_data");
988 lua_gettable (L, -2);
989 if (!!lua_toboolean (L, -1)) {
990 flags |= LUA_REDIS_TEXTDATA;
991 }
992 lua_pop (L, 1);
993
994 lua_pushstring (L, "no_pool");
995 lua_gettable (L, -2);
996 if (!!lua_toboolean (L, -1)) {
997 flags |= LUA_REDIS_NO_POOL;
998 }
999 lua_pop (L, 1);
1000
1001 lua_pop (L, 1); /* table */
1002
1003 if (session && rspamd_session_blocked (session)) {
1004 msg_err_task_check ("Session is being destroying");
1005 ret = FALSE;
1006 }
1007
1008 if (ret && addr != NULL) {
1009 ctx = g_malloc0 (sizeof (struct lua_redis_ctx));
1010 REF_INIT_RETAIN (ctx, lua_redis_dtor);
1011 if (is_async) {
1012 ctx->flags |= flags | LUA_REDIS_ASYNC;
1013 ud = &ctx->async;
1014 }
1015 else {
1016 ud = &ctx->async;
1017 ctx->replies = g_queue_new ();
1018 ctx->events_cleanup = g_queue_new ();
1019
1020 }
1021
1022 ud->s = session;
1023 ud->cfg = cfg;
1024 ud->pool = cfg->redis_pool;
1025 ud->event_loop = ev_base;
1026 ud->task = task;
1027
1028 if (log_tag) {
1029 rspamd_strlcpy (ud->log_tag, log_tag, sizeof (ud->log_tag));
1030 }
1031 else {
1032 /* Use pointer itself as a tag */
1033 rspamd_snprintf (ud->log_tag, sizeof (ud->log_tag),
1034 "%ud",
1035 (int)rspamd_cryptobox_fast_hash (&ud, sizeof (ud), 0));
1036 }
1037
1038 if (task) {
1039 ud->item = rspamd_symcache_get_cur_item (task);
1040 }
1041
1042 ret = TRUE;
1043 }
1044 else {
1045 if (cbref != -1) {
1046 luaL_unref (L, LUA_REGISTRYINDEX, cbref);
1047 }
1048
1049 msg_err_task_check ("incorrect function invocation");
1050 ret = FALSE;
1051 }
1052 }
1053
1054 if (ret) {
1055 ud->terminated = 0;
1056 ud->ctx = rspamd_redis_pool_connect (ud->pool,
1057 dbname, password,
1058 rspamd_inet_address_to_string (addr->addr),
1059 rspamd_inet_address_get_port (addr->addr));
1060
1061 if (ip) {
1062 rspamd_inet_address_free (ip);
1063 }
1064
1065 if (ud->ctx == NULL || ud->ctx->err) {
1066 if (ud->ctx) {
1067 msg_err_task_check ("cannot connect to redis: %s",
1068 ud->ctx->errstr);
1069 rspamd_redis_pool_release_connection (ud->pool, ud->ctx,
1070 RSPAMD_REDIS_RELEASE_FATAL);
1071 ud->ctx = NULL;
1072 }
1073 else {
1074 msg_err_task_check ("cannot connect to redis (OS error): %s",
1075 strerror (errno));
1076 }
1077
1078 REDIS_RELEASE (ctx);
1079
1080 return NULL;
1081 }
1082
1083 msg_debug_lua_redis ("opened redis connection host=%s; ctx=%p; ud=%p",
1084 host, ctx, ud);
1085
1086 return ctx;
1087 }
1088
1089 if (ip) {
1090 rspamd_inet_address_free (ip);
1091 }
1092
1093 return NULL;
1094 }
1095
1096 /***
1097 * @function rspamd_redis.make_request({params})
1098 * Make request to redis server, params is a table of key=value arguments in any order
1099 * @param {task} task worker task object
1100 * @param {ip|string} host server address
1101 * @param {function} callback callback to be called in form `function (task, err, data)`
1102 * @param {string} cmd command to be sent to redis
1103 * @param {table} args numeric array of strings used as redis arguments
1104 * @param {number} timeout timeout in seconds for request (1.0 by default)
1105 * @return {boolean} `true` if a request has been scheduled
1106 */
1107 static int
lua_redis_make_request(lua_State * L)1108 lua_redis_make_request (lua_State *L)
1109 {
1110 LUA_TRACE_POINT;
1111 struct lua_redis_request_specific_userdata *sp_ud;
1112 struct lua_redis_userdata *ud;
1113 struct lua_redis_ctx *ctx, **pctx;
1114 const gchar *cmd = NULL;
1115 gdouble timeout = REDIS_DEFAULT_TIMEOUT;
1116 gint cbref = -1;
1117 gboolean ret = FALSE;
1118
1119 ctx = rspamd_lua_redis_prepare_connection (L, &cbref, TRUE);
1120
1121 if (ctx) {
1122 ud = &ctx->async;
1123 sp_ud = g_malloc0 (sizeof (*sp_ud));
1124 sp_ud->cbref = cbref;
1125 sp_ud->c = ud;
1126 sp_ud->ctx = ctx;
1127
1128 lua_pushstring (L, "cmd");
1129 lua_gettable (L, -2);
1130 cmd = lua_tostring (L, -1);
1131 lua_pop (L, 1);
1132
1133 lua_pushstring (L, "timeout");
1134 lua_gettable (L, 1);
1135 if (lua_type (L, -1) == LUA_TNUMBER) {
1136 timeout = lua_tonumber (L, -1);
1137 }
1138 lua_pop (L, 1);
1139 ud->timeout = timeout;
1140
1141
1142 lua_pushstring (L, "args");
1143 lua_gettable (L, 1);
1144 lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens,
1145 &sp_ud->nargs);
1146 lua_pop (L, 1);
1147 LL_PREPEND (ud->specific, sp_ud);
1148
1149 ret = redisAsyncCommandArgv (ud->ctx,
1150 lua_redis_callback,
1151 sp_ud,
1152 sp_ud->nargs,
1153 (const gchar **)sp_ud->args,
1154 sp_ud->arglens);
1155
1156 if (ret == REDIS_OK) {
1157 if (ud->s) {
1158 rspamd_session_add_event (ud->s,
1159 lua_redis_fin, sp_ud,
1160 M);
1161
1162 if (ud->item) {
1163 rspamd_symcache_item_async_inc (ud->task, ud->item, M);
1164 }
1165 }
1166
1167 REDIS_RETAIN (ctx); /* Cleared by fin event */
1168 ctx->cmds_pending ++;
1169
1170 if (ud->ctx->c.flags & REDIS_SUBSCRIBED) {
1171 msg_debug_lua_redis ("subscribe command, never unref/timeout");
1172 sp_ud->flags |= LUA_REDIS_SUBSCRIBED;
1173 }
1174
1175 sp_ud->timeout_ev.data = sp_ud;
1176 ev_now_update_if_cheap ((struct ev_loop *)ud->event_loop);
1177 ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0);
1178 ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
1179
1180 ret = TRUE;
1181 }
1182 else {
1183 msg_info ("call to redis failed: %s", ud->ctx->errstr);
1184 rspamd_redis_pool_release_connection (ud->pool, ud->ctx,
1185 RSPAMD_REDIS_RELEASE_FATAL);
1186 ud->ctx = NULL;
1187 REDIS_RELEASE (ctx);
1188 ret = FALSE;
1189 }
1190 }
1191 else {
1192 lua_pushboolean (L, FALSE);
1193 lua_pushnil (L);
1194
1195 return 2;
1196 }
1197
1198 lua_pushboolean (L, ret);
1199
1200 if (ret) {
1201 pctx = lua_newuserdata (L, sizeof (ctx));
1202 *pctx = ctx;
1203 rspamd_lua_setclass (L, "rspamd{redis}", -1);
1204 }
1205 else {
1206 lua_pushnil (L);
1207 }
1208
1209 return 2;
1210 }
1211
1212 /***
1213 * @function rspamd_redis.make_request_sync({params})
1214 * Make blocking request to redis server, params is a table of key=value arguments in any order
1215 * @param {ip|string} host server address
1216 * @param {string} cmd command to be sent to redis
1217 * @param {table} args numeric array of strings used as redis arguments
1218 * @param {number} timeout timeout in seconds for request (1.0 by default)
1219 * @return {boolean + result} `true` and a result if a request has been successful
1220 */
1221 static int
lua_redis_make_request_sync(lua_State * L)1222 lua_redis_make_request_sync (lua_State *L)
1223 {
1224 LUA_TRACE_POINT;
1225 struct rspamd_lua_ip *addr = NULL;
1226 rspamd_inet_addr_t *ip = NULL;
1227 const gchar *cmd = NULL, *host;
1228 struct timeval tv;
1229 gboolean ret = FALSE;
1230 gdouble timeout = REDIS_DEFAULT_TIMEOUT;
1231 gchar **args = NULL;
1232 gsize *arglens = NULL;
1233 guint nargs = 0, flags = 0;
1234 redisContext *ctx;
1235 redisReply *r;
1236
1237 if (lua_istable (L, 1)) {
1238 lua_pushvalue (L, 1);
1239
1240 lua_pushstring (L, "cmd");
1241 lua_gettable (L, -2);
1242 cmd = lua_tostring (L, -1);
1243 lua_pop (L, 1);
1244
1245 lua_pushstring (L, "host");
1246 lua_gettable (L, -2);
1247 if (lua_type (L, -1) == LUA_TUSERDATA) {
1248 addr = lua_check_ip (L, -1);
1249 }
1250 else if (lua_type (L, -1) == LUA_TSTRING) {
1251 host = lua_tostring (L, -1);
1252 if (rspamd_parse_inet_address (&ip,
1253 host, strlen (host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
1254 addr = g_alloca (sizeof (*addr));
1255 addr->addr = ip;
1256
1257 if (rspamd_inet_address_get_port (ip) == 0) {
1258 rspamd_inet_address_set_port (ip, 6379);
1259 }
1260 }
1261 }
1262 lua_pop (L, 1);
1263
1264 lua_pushstring (L, "timeout");
1265 lua_gettable (L, -2);
1266 if (lua_type (L, -1) == LUA_TNUMBER) {
1267 timeout = lua_tonumber (L, -1);
1268 }
1269 lua_pop (L, 1);
1270
1271 lua_pushstring (L, "opaque_data");
1272 lua_gettable (L, -2);
1273 if (!!lua_toboolean (L, -1)) {
1274 flags |= LUA_REDIS_TEXTDATA;
1275 }
1276 lua_pop (L, 1);
1277
1278
1279 if (cmd) {
1280 lua_pushstring (L, "args");
1281 lua_gettable (L, -2);
1282 lua_redis_parse_args (L, -1, cmd, &args, &arglens, &nargs);
1283 lua_pop (L, 1);
1284 }
1285
1286 lua_pop (L, 1);
1287
1288 if (addr && cmd) {
1289 ret = TRUE;
1290 }
1291 }
1292
1293 if (ret) {
1294 double_to_tv (timeout, &tv);
1295
1296 if (rspamd_inet_address_get_af (addr->addr) == AF_UNIX) {
1297 ctx = redisConnectUnixWithTimeout (
1298 rspamd_inet_address_to_string (addr->addr), tv);
1299 }
1300 else {
1301 ctx = redisConnectWithTimeout (
1302 rspamd_inet_address_to_string (addr->addr),
1303 rspamd_inet_address_get_port (addr->addr), tv);
1304 }
1305
1306 if (ip) {
1307 rspamd_inet_address_free (ip);
1308 }
1309
1310 if (ctx == NULL || ctx->err) {
1311 redisFree (ctx);
1312 lua_redis_free_args (args, arglens, nargs);
1313 lua_pushboolean (L, FALSE);
1314
1315 return 1;
1316 }
1317
1318 r = redisCommandArgv (ctx,
1319 nargs,
1320 (const gchar **)args,
1321 arglens);
1322
1323 if (r != NULL) {
1324 if (r->type != REDIS_REPLY_ERROR) {
1325 lua_pushboolean (L, TRUE);
1326 lua_redis_push_reply (L, r, flags & LUA_REDIS_TEXTDATA);
1327 }
1328 else {
1329 lua_pushboolean (L, FALSE);
1330 lua_pushstring (L, r->str);
1331 }
1332
1333 freeReplyObject (r);
1334 redisFree (ctx);
1335 lua_redis_free_args (args, arglens, nargs);
1336
1337 return 2;
1338 }
1339 else {
1340 msg_info ("call to redis failed: %s", ctx->errstr);
1341 redisFree (ctx);
1342 lua_redis_free_args (args, arglens, nargs);
1343 lua_pushboolean (L, FALSE);
1344 }
1345 }
1346 else {
1347 if (ip) {
1348 rspamd_inet_address_free (ip);
1349 }
1350 msg_err ("bad arguments for redis request");
1351 lua_redis_free_args (args, arglens, nargs);
1352
1353 lua_pushboolean (L, FALSE);
1354 }
1355
1356 return 1;
1357 }
1358
1359 /***
1360 * @function rspamd_redis.connect({params})
1361 * Make request to redis server, params is a table of key=value arguments in any order
1362 * @param {task} task worker task object
1363 * @param {ip|string} host server address
1364 * @param {number} timeout timeout in seconds for request (1.0 by default)
1365 * @return {boolean,redis} new connection object or nil if connection failed
1366 */
1367 static int
lua_redis_connect(lua_State * L)1368 lua_redis_connect (lua_State *L)
1369 {
1370 LUA_TRACE_POINT;
1371 struct lua_redis_userdata *ud;
1372 struct lua_redis_ctx *ctx, **pctx;
1373 gdouble timeout = REDIS_DEFAULT_TIMEOUT;
1374
1375 ctx = rspamd_lua_redis_prepare_connection (L, NULL, TRUE);
1376
1377 if (ctx) {
1378 ud = &ctx->async;
1379
1380 lua_pushstring (L, "timeout");
1381 lua_gettable (L, 1);
1382 if (lua_type (L, -1) == LUA_TNUMBER) {
1383 timeout = lua_tonumber (L, -1);
1384 }
1385
1386 lua_pop (L, 1);
1387 ud->timeout = timeout;
1388 }
1389 else {
1390 lua_pushboolean (L, FALSE);
1391 lua_pushnil (L);
1392
1393 return 2;
1394 }
1395
1396 lua_pushboolean (L, TRUE);
1397 pctx = lua_newuserdata (L, sizeof (ctx));
1398 *pctx = ctx;
1399 rspamd_lua_setclass (L, "rspamd{redis}", -1);
1400
1401 return 2;
1402 }
1403
1404 /***
1405 * @function rspamd_redis.connect_sync({params})
1406 * Make blocking request to redis server, params is a table of key=value arguments in any order
1407 * @param {ip|string} host server address
1408 * @param {number} timeout timeout in seconds for request (1.0 by default)
1409 * @return {redis} redis object if a request has been successful
1410 */
1411 static int
lua_redis_connect_sync(lua_State * L)1412 lua_redis_connect_sync (lua_State *L)
1413 {
1414 LUA_TRACE_POINT;
1415 gdouble timeout = REDIS_DEFAULT_TIMEOUT;
1416 struct lua_redis_ctx *ctx, **pctx;
1417
1418 ctx = rspamd_lua_redis_prepare_connection (L, NULL, FALSE);
1419
1420 if (ctx) {
1421 if (lua_istable (L, 1)) {
1422 lua_pushstring (L, "timeout");
1423 lua_gettable (L, 1);
1424 if (lua_type (L, -1) == LUA_TNUMBER) {
1425 timeout = lua_tonumber (L, -1);
1426 }
1427 lua_pop (L, 1);
1428 }
1429
1430 ctx->async.timeout = timeout;
1431
1432 lua_pushboolean (L, TRUE);
1433 pctx = lua_newuserdata (L, sizeof (ctx));
1434 *pctx = ctx;
1435 rspamd_lua_setclass (L, "rspamd{redis}", -1);
1436 }
1437 else {
1438 lua_pushboolean (L, FALSE);
1439 lua_pushstring (L, "bad arguments for redis request");
1440 return 2;
1441 }
1442
1443 return 2;
1444 }
1445
1446 /***
1447 * @method rspamd_redis:add_cmd(cmd, {args})
1448 * Append new cmd to redis pipeline
1449 * @param {string} cmd command to be sent to redis
1450 * @param {table} args array of strings used as redis arguments
1451 * @return {boolean} `true` if a request has been successful
1452 */
1453 static int
lua_redis_add_cmd(lua_State * L)1454 lua_redis_add_cmd (lua_State *L)
1455 {
1456 LUA_TRACE_POINT;
1457 struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
1458 struct lua_redis_request_specific_userdata *sp_ud;
1459 struct lua_redis_userdata *ud;
1460 const gchar *cmd = NULL;
1461 gint args_pos = 2;
1462 gint cbref = -1, ret;
1463
1464 if (ctx) {
1465 if (ctx->flags & LUA_REDIS_TERMINATED) {
1466 lua_pushboolean (L, FALSE);
1467 lua_pushstring (L, "Connection is terminated");
1468
1469 return 2;
1470 }
1471
1472 /* Async version */
1473 if (lua_type (L, 2) == LUA_TSTRING) {
1474 /* No callback version */
1475 cmd = lua_tostring (L, 2);
1476 args_pos = 3;
1477 }
1478 else if (lua_type (L, 2) == LUA_TFUNCTION) {
1479 lua_pushvalue (L, 2);
1480 cbref = luaL_ref (L, LUA_REGISTRYINDEX);
1481 cmd = lua_tostring (L, 3);
1482 args_pos = 4;
1483 }
1484 else {
1485 return luaL_error (L, "invalid arguments");
1486 }
1487
1488 sp_ud = g_malloc0 (sizeof (*sp_ud));
1489 if (IS_ASYNC (ctx)) {
1490 sp_ud->c = &ctx->async;
1491 ud = &ctx->async;
1492 sp_ud->cbref = cbref;
1493 }
1494 else {
1495 sp_ud->c = &ctx->async;
1496 ud = &ctx->async;
1497 }
1498 sp_ud->ctx = ctx;
1499
1500 lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args,
1501 &sp_ud->arglens, &sp_ud->nargs);
1502
1503 LL_PREPEND (sp_ud->c->specific, sp_ud);
1504
1505 if (ud->s && rspamd_session_blocked (ud->s)) {
1506 lua_pushboolean (L, 0);
1507 lua_pushstring (L, "session is terminating");
1508
1509 return 2;
1510 }
1511
1512 if (IS_ASYNC (ctx)) {
1513 ret = redisAsyncCommandArgv (sp_ud->c->ctx,
1514 lua_redis_callback,
1515 sp_ud,
1516 sp_ud->nargs,
1517 (const gchar **)sp_ud->args,
1518 sp_ud->arglens);
1519 }
1520 else {
1521 ret = redisAsyncCommandArgv (sp_ud->c->ctx,
1522 lua_redis_callback_sync,
1523 sp_ud,
1524 sp_ud->nargs,
1525 (const gchar **)sp_ud->args,
1526 sp_ud->arglens);
1527 }
1528
1529 if (ret == REDIS_OK) {
1530 if (ud->s) {
1531 rspamd_session_add_event (ud->s,
1532 lua_redis_fin,
1533 sp_ud,
1534 M);
1535
1536 if (ud->item) {
1537 rspamd_symcache_item_async_inc (ud->task, ud->item, M);
1538 }
1539 }
1540
1541 sp_ud->timeout_ev.data = sp_ud;
1542
1543 if (IS_ASYNC (ctx)) {
1544 ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout,
1545 sp_ud->c->timeout, 0.0);
1546 }
1547 else {
1548 ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout_sync,
1549 sp_ud->c->timeout, 0.0);
1550 }
1551
1552 ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
1553 REDIS_RETAIN (ctx);
1554 ctx->cmds_pending ++;
1555 }
1556 else {
1557 msg_info ("call to redis failed: %s",
1558 sp_ud->c->ctx->errstr);
1559 lua_pushboolean (L, 0);
1560 lua_pushstring (L, sp_ud->c->ctx->errstr);
1561
1562 return 2;
1563 }
1564 }
1565
1566 lua_pushboolean (L, true);
1567
1568 return 1;
1569 }
1570
1571 /***
1572 * @method rspamd_redis:exec()
1573 * Executes pending commands (suitable for blocking IO only for now)
1574 * @return {boolean}, {table}, ...: pairs in format [bool, result] for each request pending
1575 */
1576 static int
lua_redis_exec(lua_State * L)1577 lua_redis_exec (lua_State *L)
1578 {
1579 LUA_TRACE_POINT;
1580 struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
1581
1582 if (ctx == NULL) {
1583 lua_error (L);
1584
1585 return 1;
1586 }
1587
1588 if (IS_ASYNC (ctx)) {
1589 lua_pushstring (L, "Async redis pipelining is not implemented");
1590 lua_error (L);
1591 return 0;
1592 }
1593 else {
1594 if (false /* !ctx->d.sync */) {
1595 lua_pushstring (L, "cannot exec commands when not connected");
1596 lua_error (L);
1597 return 0;
1598 }
1599 else {
1600 if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) == 0) {
1601 lua_pushstring (L, "No pending commands to execute");
1602 lua_error (L);
1603 }
1604 if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) > 0) {
1605 gint results = lua_redis_push_results (ctx, L);
1606 return results;
1607 }
1608 else {
1609 ctx->thread = lua_thread_pool_get_running_entry (ctx->async.cfg->lua_thread_pool);
1610 return lua_thread_yield (ctx->thread, 0);
1611 }
1612 }
1613 }
1614 }
1615 #else
1616 static int
lua_redis_make_request(lua_State * L)1617 lua_redis_make_request (lua_State *L)
1618 {
1619 msg_warn ("rspamd is compiled with no redis support");
1620
1621 lua_pushboolean (L, FALSE);
1622
1623 return 1;
1624 }
1625 static int
lua_redis_make_request_sync(lua_State * L)1626 lua_redis_make_request_sync (lua_State *L)
1627 {
1628 msg_warn ("rspamd is compiled with no redis support");
1629
1630 lua_pushboolean (L, FALSE);
1631
1632 return 1;
1633 }
1634 static int
lua_redis_connect(lua_State * L)1635 lua_redis_connect (lua_State *L)
1636 {
1637 msg_warn ("rspamd is compiled with no redis support");
1638
1639 lua_pushboolean (L, FALSE);
1640
1641 return 1;
1642 }
1643 static int
lua_redis_connect_sync(lua_State * L)1644 lua_redis_connect_sync (lua_State *L)
1645 {
1646 msg_warn ("rspamd is compiled with no redis support");
1647
1648 lua_pushboolean (L, FALSE);
1649
1650 return 1;
1651 }
1652 static int
lua_redis_add_cmd(lua_State * L)1653 lua_redis_add_cmd (lua_State *L)
1654 {
1655 msg_warn ("rspamd is compiled with no redis support");
1656
1657 lua_pushboolean (L, FALSE);
1658
1659 return 1;
1660 }
1661 static int
lua_redis_exec(lua_State * L)1662 lua_redis_exec (lua_State *L)
1663 {
1664 msg_warn ("rspamd is compiled with no redis support");
1665
1666 lua_pushboolean (L, FALSE);
1667
1668 return 1;
1669 }
1670 static int
lua_redis_gc(lua_State * L)1671 lua_redis_gc (lua_State *L)
1672 {
1673 return 0;
1674 }
1675 #endif
1676
1677 static gint
lua_load_redis(lua_State * L)1678 lua_load_redis (lua_State * L)
1679 {
1680 lua_newtable (L);
1681 luaL_register (L, NULL, redislib_f);
1682
1683 return 1;
1684 }
1685
1686 static gint
lua_redis_null_idx(lua_State * L)1687 lua_redis_null_idx (lua_State *L)
1688 {
1689 lua_pushnil (L);
1690
1691 return 1;
1692 }
1693
1694 static void
lua_redis_null_mt(lua_State * L)1695 lua_redis_null_mt (lua_State *L)
1696 {
1697 luaL_newmetatable (L, "redis{null}");
1698
1699 lua_pushcfunction (L, lua_redis_null_idx);
1700 lua_setfield (L, -2, "__index");
1701 lua_pushcfunction (L, lua_redis_null_idx);
1702 lua_setfield (L, -2, "__tostring");
1703
1704 lua_pop (L, 1);
1705 }
1706
1707 /**
1708 * Open redis library
1709 * @param L lua stack
1710 * @return
1711 */
1712 void
luaopen_redis(lua_State * L)1713 luaopen_redis (lua_State * L)
1714 {
1715 rspamd_lua_new_class (L, "rspamd{redis}", redislib_m);
1716 lua_pop (L, 1);
1717 rspamd_lua_add_preload (L, "rspamd_redis", lua_load_redis);
1718
1719 /* Set null element */
1720 lua_redis_null_mt (L);
1721 redis_null = lua_newuserdata (L, 0);
1722 luaL_getmetatable (L, "redis{null}");
1723 lua_setmetatable (L, -2);
1724 lua_setfield (L, LUA_REGISTRYINDEX, "redis.null");
1725 }
1726