1 /*-
2  * Copyright 2016 Vsevolod Stakhov
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "lua_common.h"
17 #include "lua_thread_pool.h"
18 #include "utlist.h"
19 
20 #include "contrib/hiredis/hiredis.h"
21 #include "contrib/hiredis/async.h"
22 
23 #define REDIS_DEFAULT_TIMEOUT 1.0
24 
25 static const gchar *M = "rspamd lua redis";
26 static void *redis_null;
27 
28 /***
29  * @module rspamd_redis
30  * This module implements redis asynchronous client for rspamd LUA API.
31  * Here is an example of using of this module:
32  * @example
33 local rspamd_redis = require "rspamd_redis"
34 local rspamd_logger = require "rspamd_logger"
35 
36 local function symbol_callback(task)
37 	local redis_key = 'some_key'
38 	local function redis_cb(err, data)
39 		if not err then
40 			rspamd_logger.infox('redis returned %1=%2', redis_key, data)
41 		end
42 	end
43 
44 	rspamd_redis.make_request(task, "127.0.0.1:6379", redis_cb,
45 		'GET', {redis_key})
46 	-- or in table form:
47 	-- rspamd_redis.make_request({task=task, host="127.0.0.1:6379,
48 	--	callback=redis_cb, timeout=2.0, cmd='GET', args={redis_key}})
49 end
50  */
51 
52 LUA_FUNCTION_DEF (redis, make_request);
53 LUA_FUNCTION_DEF (redis, make_request_sync);
54 LUA_FUNCTION_DEF (redis, connect);
55 LUA_FUNCTION_DEF (redis, connect_sync);
56 LUA_FUNCTION_DEF (redis, add_cmd);
57 LUA_FUNCTION_DEF (redis, exec);
58 LUA_FUNCTION_DEF (redis, gc);
59 
60 static const struct luaL_reg redislib_f[] = {
61 	LUA_INTERFACE_DEF (redis, make_request),
62 	LUA_INTERFACE_DEF (redis, make_request_sync),
63 	LUA_INTERFACE_DEF (redis, connect),
64 	LUA_INTERFACE_DEF (redis, connect_sync),
65 	{NULL, NULL}
66 };
67 
68 static const struct luaL_reg redislib_m[] = {
69 	LUA_INTERFACE_DEF (redis, add_cmd),
70 	LUA_INTERFACE_DEF (redis, exec),
71 	{"__gc", lua_redis_gc},
72 	{"__tostring", rspamd_lua_class_tostring},
73 	{NULL, NULL}
74 };
75 
76 #undef REDIS_DEBUG_REFS
77 #ifdef REDIS_DEBUG_REFS
78 #define REDIS_RETAIN(x) do { \
79 	msg_err ("retain ref %p, refcount: %d", (x), (x)->ref.refcount); \
80 	REF_RETAIN(x);	\
81 } while (0)
82 
83 #define REDIS_RELEASE(x) do { \
84 	msg_err ("release ref %p, refcount: %d", (x), (x)->ref.refcount); \
85 	REF_RELEASE(x);	\
86 } while (0)
87 #else
88 #define REDIS_RETAIN REF_RETAIN
89 #define REDIS_RELEASE REF_RELEASE
90 #endif
91 
92 #ifdef WITH_HIREDIS
93 struct lua_redis_request_specific_userdata;
94 /**
95  * Struct for userdata representation
96  */
97 struct lua_redis_userdata {
98 	redisAsyncContext *ctx;
99 	struct rspamd_task *task;
100 	struct rspamd_symcache_item *item;
101 	struct rspamd_async_session *s;
102 	struct ev_loop *event_loop;
103 	struct rspamd_config *cfg;
104 	struct rspamd_redis_pool *pool;
105 	gchar *server;
106 	gchar log_tag[RSPAMD_LOG_ID_LEN + 1];
107 	struct lua_redis_request_specific_userdata *specific;
108 	gdouble timeout;
109 	guint16 port;
110 	guint16 terminated;
111 };
112 
113 #define msg_debug_lua_redis(...)  rspamd_conditional_debug_fast (NULL, NULL, \
114         rspamd_lua_redis_log_id, "lua_redis", ud->log_tag, \
115         G_STRFUNC, \
116         __VA_ARGS__)
117 INIT_LOG_MODULE(lua_redis)
118 
119 #define LUA_REDIS_SPECIFIC_REPLIED (1 << 0)
120 /* session was finished */
121 #define LUA_REDIS_SPECIFIC_FINISHED (1 << 1)
122 #define LUA_REDIS_ASYNC (1 << 0)
123 #define LUA_REDIS_TEXTDATA (1 << 1)
124 #define LUA_REDIS_TERMINATED (1 << 2)
125 #define LUA_REDIS_NO_POOL (1 << 3)
126 #define LUA_REDIS_SUBSCRIBED (1 << 4)
127 #define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)
128 
129 struct lua_redis_request_specific_userdata {
130 	gint cbref;
131 	guint nargs;
132 	gchar **args;
133 	gsize *arglens;
134 	struct lua_redis_userdata *c;
135 	struct lua_redis_ctx *ctx;
136 	struct lua_redis_request_specific_userdata *next;
137 	ev_timer timeout_ev;
138 	guint flags;
139 };
140 
141 struct lua_redis_ctx {
142 	guint flags;
143 	struct lua_redis_userdata async;
144 	guint cmds_pending;
145 	ref_entry_t ref;
146 	GQueue *replies; /* for sync connection only */
147 	GQueue *events_cleanup; /* for sync connection only */
148 	struct thread_entry *thread; /* for sync mode, set only if there was yield */
149 };
150 
151 struct lua_redis_result {
152 	gboolean is_error;
153 	gint result_ref;
154 	struct rspamd_symcache_item *item;
155 	struct rspamd_async_session *s;
156 	struct rspamd_task *task;
157 	struct lua_redis_request_specific_userdata *sp_ud;
158 };
159 
160 static struct lua_redis_ctx *
lua_check_redis(lua_State * L,gint pos)161 lua_check_redis (lua_State * L, gint pos)
162 {
163 	void *ud = rspamd_lua_check_udata (L, pos, "rspamd{redis}");
164 	luaL_argcheck (L, ud != NULL, pos, "'redis' expected");
165 	return ud ? *((struct lua_redis_ctx **)ud) : NULL;
166 }
167 
168 static void
lua_redis_free_args(char ** args,gsize * arglens,guint nargs)169 lua_redis_free_args (char **args, gsize *arglens, guint nargs)
170 {
171 	guint i;
172 
173 	if (args) {
174 		for (i = 0; i < nargs; i ++) {
175 			g_free (args[i]);
176 		}
177 
178 		g_free (args);
179 		g_free (arglens);
180 	}
181 }
182 
183 static void
lua_redis_dtor(struct lua_redis_ctx * ctx)184 lua_redis_dtor (struct lua_redis_ctx *ctx)
185 {
186 	struct lua_redis_userdata *ud;
187 	struct lua_redis_request_specific_userdata *cur, *tmp;
188 	gboolean is_successful = TRUE;
189 	struct redisAsyncContext *ac;
190 
191 	ud = &ctx->async;
192 	msg_debug_lua_redis ("desctructing %p", ctx);
193 
194 	if (ud->ctx) {
195 
196 		LL_FOREACH_SAFE (ud->specific, cur, tmp) {
197 			ev_timer_stop (ud->event_loop, &cur->timeout_ev);
198 
199 			if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
200 				is_successful = FALSE;
201 			}
202 
203 			cur->flags |= LUA_REDIS_SPECIFIC_FINISHED;
204 		}
205 
206 		ctx->flags |= LUA_REDIS_TERMINATED;
207 
208 		ud->terminated = 1;
209 		ac = ud->ctx;
210 		ud->ctx = NULL;
211 
212 		if (!is_successful) {
213 			rspamd_redis_pool_release_connection (ud->pool, ac,
214 					RSPAMD_REDIS_RELEASE_FATAL);
215 		}
216 		else {
217 			rspamd_redis_pool_release_connection (ud->pool, ac,
218 					(ctx->flags & LUA_REDIS_NO_POOL) ?
219 					RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
220 		}
221 
222 	}
223 
224 	LL_FOREACH_SAFE (ud->specific, cur, tmp) {
225 		lua_redis_free_args (cur->args, cur->arglens, cur->nargs);
226 
227 		if (cur->cbref != -1) {
228 			luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
229 		}
230 
231 		g_free (cur);
232 	}
233 
234 	if (ctx->events_cleanup) {
235 		g_queue_free (ctx->events_cleanup);
236 		ctx->events_cleanup = NULL;
237 	}
238 	if (ctx->replies) {
239 		g_queue_free (ctx->replies);
240 		ctx->replies = NULL;
241 	}
242 
243 	g_free (ctx);
244 }
245 
246 static gint
lua_redis_gc(lua_State * L)247 lua_redis_gc (lua_State *L)
248 {
249 	struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
250 
251 	if (ctx) {
252 		REDIS_RELEASE (ctx);
253 	}
254 
255 	return 0;
256 }
257 
258 static void
lua_redis_fin(void * arg)259 lua_redis_fin (void *arg)
260 {
261 	struct lua_redis_request_specific_userdata *sp_ud = arg;
262 	struct lua_redis_userdata *ud;
263 	struct lua_redis_ctx *ctx;
264 
265 	ctx = sp_ud->ctx;
266 	ud = sp_ud->c;
267 
268 	if (ev_can_stop (&sp_ud->timeout_ev)) {
269 		ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
270 	}
271 
272 	msg_debug_lua_redis ("finished redis query %p from session %p; refcount=%d",
273 			sp_ud, ctx, ctx->ref.refcount);
274 	sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED;
275 
276 	REDIS_RELEASE (ctx);
277 }
278 
279 /**
280  * Push error of redis request to lua callback
281  * @param code
282  * @param ud
283  */
284 static void
lua_redis_push_error(const gchar * err,struct lua_redis_ctx * ctx,struct lua_redis_request_specific_userdata * sp_ud,gboolean connected)285 lua_redis_push_error (const gchar *err,
286 	struct lua_redis_ctx *ctx,
287 	struct lua_redis_request_specific_userdata *sp_ud,
288 	gboolean connected)
289 {
290 	struct lua_redis_userdata *ud = sp_ud->c;
291 	struct lua_callback_state cbs;
292 	lua_State *L;
293 
294 	if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
295 		if (sp_ud->cbref != -1) {
296 
297 			lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
298 			L = cbs.L;
299 
300 			lua_pushcfunction (L, &rspamd_lua_traceback);
301 			int err_idx = lua_gettop (L);
302 			/* Push error */
303 			lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
304 
305 			/* String of error */
306 			lua_pushstring (cbs.L, err);
307 			/* Data is nil */
308 			lua_pushnil (cbs.L);
309 
310 			if (ud->item) {
311 				rspamd_symcache_set_cur_item (ud->task, ud->item);
312 			}
313 
314 			if (lua_pcall (cbs.L, 2, 0, err_idx) != 0) {
315 				msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1));
316 			}
317 
318 			lua_settop (L, err_idx - 1);
319 			lua_thread_pool_restore_callback (&cbs);
320 		}
321 
322 		sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
323 
324 		if (connected && ud->s) {
325 			if (ud->item) {
326 				rspamd_symcache_item_async_dec_check (ud->task, ud->item, M);
327 			}
328 
329 			rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
330 		}
331 		else {
332 			lua_redis_fin (sp_ud);
333 		}
334 	}
335 }
336 
337 static void
lua_redis_push_reply(lua_State * L,const redisReply * r,gboolean text_data)338 lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data)
339 {
340 	guint i;
341 	struct rspamd_lua_text *t;
342 
343 	switch (r->type) {
344 	case REDIS_REPLY_INTEGER:
345 		lua_pushinteger (L, r->integer);
346 		break;
347 	case REDIS_REPLY_NIL:
348 		lua_getfield (L, LUA_REGISTRYINDEX, "redis.null");
349 		break;
350 	case REDIS_REPLY_STRING:
351 	case REDIS_REPLY_STATUS:
352 		if (text_data) {
353 			t = lua_newuserdata (L, sizeof (*t));
354 			rspamd_lua_setclass (L, "rspamd{text}", -1);
355 			t->flags = 0;
356 			t->start = r->str;
357 			t->len = r->len;
358 		}
359 		else {
360 			lua_pushlstring (L, r->str, r->len);
361 		}
362 		break;
363 	case REDIS_REPLY_ARRAY:
364 		lua_createtable (L, r->elements, 0);
365 		for (i = 0; i < r->elements; ++i) {
366 			lua_redis_push_reply (L, r->element[i], text_data);
367 			lua_rawseti (L, -2, i + 1); /* Store sub-reply */
368 		}
369 		break;
370 	default: /* should not happen */
371 		msg_info ("unknown reply type: %d", r->type);
372 		break;
373 	}
374 }
375 
376 /**
377  * Push data of redis request to lua callback
378  * @param r redis reply data
379  * @param ud
380  */
381 static void
lua_redis_push_data(const redisReply * r,struct lua_redis_ctx * ctx,struct lua_redis_request_specific_userdata * sp_ud)382 lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
383 		struct lua_redis_request_specific_userdata *sp_ud)
384 {
385 	struct lua_redis_userdata *ud = sp_ud->c;
386 	struct lua_callback_state cbs;
387 	lua_State *L;
388 
389 	if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED)) ||
390 			(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
391 		if (sp_ud->cbref != -1) {
392 			lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
393 			L = cbs.L;
394 
395 			lua_pushcfunction (L, &rspamd_lua_traceback);
396 			int err_idx = lua_gettop (L);
397 			/* Push error */
398 			lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
399 			/* Error is nil */
400 			lua_pushnil (cbs.L);
401 			/* Data */
402 			lua_redis_push_reply (cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA);
403 
404 			if (ud->item) {
405 				rspamd_symcache_set_cur_item (ud->task, ud->item);
406 			}
407 
408 			gint ret = lua_pcall (cbs.L, 2, 0, err_idx);
409 
410 			if (ret != 0) {
411 				msg_info ("call to lua_redis callback failed (%d): %s",
412 						ret, lua_tostring (cbs.L, -1));
413 			}
414 
415 			lua_settop (L, err_idx - 1);
416 			lua_thread_pool_restore_callback (&cbs);
417 		}
418 
419 		if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) {
420 			if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
421 				if (ev_can_stop (&sp_ud->timeout_ev)) {
422 					ev_timer_stop (sp_ud->ctx->async.event_loop,
423 							&sp_ud->timeout_ev);
424 				}
425 			}
426 		}
427 
428 		sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
429 
430 		if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
431 			if (ud->s) {
432 				if (ud->item) {
433 					rspamd_symcache_item_async_dec_check (ud->task,
434 							ud->item, M);
435 				}
436 
437 				rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
438 			}
439 			else {
440 				lua_redis_fin (sp_ud);
441 			}
442 		}
443 	}
444 }
445 
446 /**
447  * Callback for redis replies
448  * @param c context of redis connection
449  * @param r redis reply
450  * @param priv userdata
451  */
452 static void
lua_redis_callback(redisAsyncContext * c,gpointer r,gpointer priv)453 lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
454 {
455 	redisReply *reply = r;
456 	struct lua_redis_request_specific_userdata *sp_ud = priv;
457 	struct lua_redis_ctx *ctx;
458 	struct lua_redis_userdata *ud;
459 	redisAsyncContext *ac;
460 
461 	ctx = sp_ud->ctx;
462 	ud = sp_ud->c;
463 
464 	if (ud->terminated) {
465 		/* We are already at the termination stage, just go out */
466 		return;
467 	}
468 
469 	msg_debug_lua_redis ("got reply from redis %p for query %p", sp_ud->c->ctx,
470 			sp_ud);
471 
472 	REDIS_RETAIN (ctx);
473 
474 	/* If session is finished, we cannot call lua callbacks */
475 	if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) ||
476 			(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
477 		if (c->err == 0) {
478 			if (r != NULL) {
479 				if (reply->type != REDIS_REPLY_ERROR) {
480 					lua_redis_push_data (reply, ctx, sp_ud);
481 				}
482 				else {
483 					lua_redis_push_error (reply->str, ctx, sp_ud, TRUE);
484 				}
485 			}
486 			else {
487 				lua_redis_push_error ("received no data from server", ctx, sp_ud, TRUE);
488 			}
489 		}
490 		else {
491 			if (c->err == REDIS_ERR_IO) {
492 				lua_redis_push_error (strerror (errno), ctx, sp_ud, TRUE);
493 			}
494 			else {
495 				lua_redis_push_error (c->errstr, ctx, sp_ud, TRUE);
496 			}
497 		}
498 	}
499 
500 	if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
501 		ctx->cmds_pending--;
502 
503 		if (ctx->cmds_pending == 0 && !ud->terminated) {
504 			/* Disconnect redis early as we don't need it anymore */
505 			ud->terminated = 1;
506 			ac = ud->ctx;
507 			ud->ctx = NULL;
508 
509 			if (ac) {
510 				msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d",
511 						ud, ctx, ctx->ref.refcount);
512 				rspamd_redis_pool_release_connection (ud->pool, ac,
513 						(ctx->flags & LUA_REDIS_NO_POOL) ?
514 						RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
515 			}
516 		}
517 	}
518 
519 	REDIS_RELEASE (ctx);
520 }
521 
522 static gint
lua_redis_push_results(struct lua_redis_ctx * ctx,lua_State * L)523 lua_redis_push_results (struct lua_redis_ctx *ctx, lua_State *L)
524 {
525 	gint results = g_queue_get_length (ctx->replies);
526 	gint i;
527 	gboolean can_use_lua = TRUE;
528 
529 	results = g_queue_get_length (ctx->replies);
530 
531 	if (!lua_checkstack (L, (results * 2) + 1)) {
532 		luaL_error (L, "cannot resize stack to fit %d commands",
533 				ctx->cmds_pending);
534 
535 		can_use_lua = FALSE;
536 	}
537 
538 	for (i = 0; i < results; i ++) {
539 		struct lua_redis_result *result = g_queue_pop_head (ctx->replies);
540 
541 		if (can_use_lua) {
542 			lua_pushboolean (L, !result->is_error);
543 			lua_rawgeti (L, LUA_REGISTRYINDEX, result->result_ref);
544 		}
545 
546 		luaL_unref (L, LUA_REGISTRYINDEX, result->result_ref);
547 
548 		g_queue_push_tail (ctx->events_cleanup, result);
549 	}
550 
551 	return can_use_lua ? results * 2 : 0;
552 }
553 
554 static void
lua_redis_cleanup_events(struct lua_redis_ctx * ctx)555 lua_redis_cleanup_events (struct lua_redis_ctx *ctx)
556 {
557 	REDIS_RETAIN (ctx); /* To avoid preliminary destruction */
558 
559 	while (!g_queue_is_empty (ctx->events_cleanup)) {
560 		struct lua_redis_result *result = g_queue_pop_head (ctx->events_cleanup);
561 
562 		if (result->item) {
563 			rspamd_symcache_item_async_dec_check (result->task, result->item, M);
564 		}
565 
566 		if (result->s) {
567 			rspamd_session_remove_event (result->s, lua_redis_fin, result->sp_ud);
568 		}
569 		else {
570 			lua_redis_fin (result->sp_ud);
571 		}
572 
573 		g_free (result);
574 	}
575 
576 	REDIS_RELEASE (ctx);
577 }
578 
579 /**
580  * Callback for redis replies
581  * @param c context of redis connection
582  * @param r redis reply
583  * @param priv userdata
584  */
585 static void
lua_redis_callback_sync(redisAsyncContext * ac,gpointer r,gpointer priv)586 lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
587 {
588 	redisReply *reply = r;
589 
590 	struct lua_redis_request_specific_userdata *sp_ud = priv;
591 	struct lua_redis_ctx *ctx;
592 	struct lua_redis_userdata *ud;
593 	struct thread_entry* thread;
594 	gint results;
595 
596 	ctx = sp_ud->ctx;
597 	ud = sp_ud->c;
598 	lua_State *L = ctx->async.cfg->lua_state;
599 
600 	sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
601 
602 	if (ud->terminated) {
603 		/* We are already at the termination stage, just go out */
604 		/* TODO:
605 		   if somebody is waiting for us (ctx->thread), return result,
606 		   otherwise, indeed, ignore
607 		 */
608 		return;
609 	}
610 
611 	if (ev_can_stop ( &sp_ud->timeout_ev)) {
612 		ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
613 	}
614 
615 	if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
616 		msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
617 
618 		struct lua_redis_result *result = g_malloc0 (sizeof *result);
619 
620 		if (ac->err == 0) {
621 			if (r != NULL) {
622 				if (reply->type != REDIS_REPLY_ERROR) {
623 					result->is_error = FALSE;
624 					lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
625 				}
626 				else {
627 					result->is_error = TRUE;
628 					lua_pushstring (L, reply->str);
629 				}
630 			}
631 			else {
632 				result->is_error = TRUE;
633 				lua_pushliteral (L, "received no data from server");
634 			}
635 		}
636 		else {
637 			result->is_error = TRUE;
638 			if (ac->err == REDIS_ERR_IO) {
639 				lua_pushstring (L, strerror (errno));
640 			}
641 			else {
642 				lua_pushstring (L, ac->errstr);
643 			}
644 		}
645 
646 		/* if error happened, we should terminate the connection,
647 		   and release it */
648 
649 		if (result->is_error && sp_ud->c->ctx) {
650 			ac = sp_ud->c->ctx;
651 			/* Set to NULL to avoid double free in dtor */
652 			sp_ud->c->ctx = NULL;
653 			ctx->flags |= LUA_REDIS_TERMINATED;
654 
655 			/*
656 			 * This will call all callbacks pending so the entire context
657 			 * will be destructed
658 			 */
659 			rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
660 					RSPAMD_REDIS_RELEASE_FATAL);
661 		}
662 
663 		result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
664 		result->s = ud->s;
665 		result->item = ud->item;
666 		result->task = ud->task;
667 		result->sp_ud = sp_ud;
668 
669 		g_queue_push_tail (ctx->replies, result);
670 
671 	}
672 
673 	ctx->cmds_pending --;
674 
675 	if (ctx->cmds_pending == 0) {
676 		if (ctx->thread) {
677 			if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
678 				/* somebody yielded and waits for results */
679 				thread = ctx->thread;
680 				ctx->thread = NULL;
681 
682 				results = lua_redis_push_results(ctx, thread->lua_state);
683 				lua_thread_resume (thread, results);
684 				lua_redis_cleanup_events(ctx);
685 			}
686 			else {
687 				/* We cannot resume the thread as the associated task has gone */
688 				lua_thread_pool_terminate_entry_full (ud->cfg->lua_thread_pool,
689 						ctx->thread, G_STRLOC, true);
690 				ctx->thread = NULL;
691 			}
692 		}
693 	}
694 
695 }
696 
697 static void
lua_redis_timeout_sync(EV_P_ ev_timer * w,int revents)698 lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents)
699 {
700 	struct lua_redis_request_specific_userdata *sp_ud =
701 			(struct lua_redis_request_specific_userdata *)w->data;
702 	struct lua_redis_ctx *ctx;
703 	struct lua_redis_userdata *ud;
704 	redisAsyncContext *ac;
705 
706 	if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
707 		return;
708 	}
709 
710 	ud = sp_ud->c;
711 	ctx = sp_ud->ctx;
712 	msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud,
713 			sp_ud->c->ctx);
714 
715 	if (sp_ud->c->ctx) {
716 		ac = sp_ud->c->ctx;
717 
718 		/* Set to NULL to avoid double free in dtor */
719 		sp_ud->c->ctx = NULL;
720 		ac->err = REDIS_ERR_IO;
721 		errno = ETIMEDOUT;
722 		ctx->flags |= LUA_REDIS_TERMINATED;
723 
724 		/*
725 		 * This will call all callbacks pending so the entire context
726 		 * will be destructed
727 		 */
728 		rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
729 				RSPAMD_REDIS_RELEASE_FATAL);
730 	}
731 }
732 
733 static void
lua_redis_timeout(EV_P_ ev_timer * w,int revents)734 lua_redis_timeout (EV_P_ ev_timer *w, int revents)
735 {
736 	struct lua_redis_request_specific_userdata *sp_ud =
737 			(struct lua_redis_request_specific_userdata *)w->data;
738 	struct lua_redis_userdata *ud;
739 	struct lua_redis_ctx *ctx;
740 	redisAsyncContext *ac;
741 
742 	if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
743 		return;
744 	}
745 
746 	ctx = sp_ud->ctx;
747 	ud = sp_ud->c;
748 
749 	REDIS_RETAIN (ctx);
750 	msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud,
751 			sp_ud->c->ctx);
752 	lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, TRUE);
753 
754 	if (sp_ud->c->ctx) {
755 		ac = sp_ud->c->ctx;
756 		/* Set to NULL to avoid double free in dtor */
757 		sp_ud->c->ctx = NULL;
758 		ac->err = REDIS_ERR_IO;
759 		errno = ETIMEDOUT;
760 		/*
761 		 * This will call all callbacks pending so the entire context
762 		 * will be destructed
763 		 */
764 		rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
765 				RSPAMD_REDIS_RELEASE_FATAL);
766 	}
767 
768 	REDIS_RELEASE (ctx);
769 }
770 
771 
772 static void
lua_redis_parse_args(lua_State * L,gint idx,const gchar * cmd,gchar *** pargs,gsize ** parglens,guint * nargs)773 lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd,
774 		gchar ***pargs, gsize **parglens, guint *nargs)
775 {
776 	gchar **args = NULL;
777 	gsize *arglens;
778 	gint top;
779 
780 	if (idx != 0 && lua_type (L, idx) == LUA_TTABLE) {
781 		/* Get all arguments */
782 		lua_pushvalue (L, idx);
783 		lua_pushnil (L);
784 		top = 0;
785 
786 		while (lua_next (L, -2) != 0) {
787 			gint type = lua_type (L, -1);
788 
789 			if (type == LUA_TNUMBER || type == LUA_TSTRING ||
790 					type == LUA_TUSERDATA) {
791 				top ++;
792 			}
793 			lua_pop (L, 1);
794 		}
795 
796 		args = g_malloc ((top + 1) * sizeof (gchar *));
797 		arglens = g_malloc ((top + 1) * sizeof (gsize));
798 		arglens[0] = strlen (cmd);
799 		args[0] = g_malloc (arglens[0]);
800 		memcpy (args[0], cmd, arglens[0]);
801 		top = 1;
802 		lua_pushnil (L);
803 
804 		while (lua_next (L, -2) != 0) {
805 			gint type = lua_type (L, -1);
806 
807 			if (type == LUA_TSTRING) {
808 				const gchar *s;
809 
810 				s = lua_tolstring (L, -1, &arglens[top]);
811 				args[top] = g_malloc (arglens[top]);
812 				memcpy (args[top], s, arglens[top]);
813 				top ++;
814 			}
815 			else if (type == LUA_TUSERDATA) {
816 				struct rspamd_lua_text *t;
817 
818 				t = lua_check_text (L, -1);
819 
820 				if (t && t->start) {
821 					arglens[top] = t->len;
822 					args[top] = g_malloc (arglens[top]);
823 					memcpy (args[top], t->start, arglens[top]);
824 					top ++;
825 				}
826 			}
827 			else if (type == LUA_TNUMBER) {
828 				gdouble val = lua_tonumber (L, -1);
829 				gint r;
830 				gchar numbuf[64];
831 
832 				if (val == (gdouble)((gint64)val)) {
833 					r = rspamd_snprintf (numbuf, sizeof (numbuf), "%L",
834 							(gint64)val);
835 				}
836 				else {
837 					r = rspamd_snprintf (numbuf, sizeof (numbuf), "%f",
838 							val);
839 				}
840 
841 				arglens[top] = r;
842 				args[top] = g_malloc (arglens[top]);
843 				memcpy (args[top], numbuf, arglens[top]);
844 				top ++;
845 			}
846 
847 			lua_pop (L, 1);
848 		}
849 
850 		lua_pop (L, 1);
851 	}
852 	else {
853 		/* Use merely cmd */
854 
855 		args = g_malloc (sizeof (gchar *));
856 		arglens = g_malloc (sizeof (gsize));
857 		arglens[0] = strlen (cmd);
858 		args[0] = g_malloc (arglens[0]);
859 		memcpy (args[0], cmd, arglens[0]);
860 		top = 1;
861 	}
862 
863 	*pargs = args;
864 	*parglens = arglens;
865 	*nargs = top;
866 }
867 
868 static struct lua_redis_ctx *
rspamd_lua_redis_prepare_connection(lua_State * L,gint * pcbref,gboolean is_async)869 rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_async)
870 {
871 	struct lua_redis_ctx *ctx = NULL;
872 	rspamd_inet_addr_t *ip = NULL;
873 	struct lua_redis_userdata *ud = NULL;
874 	struct rspamd_lua_ip *addr = NULL;
875 	struct rspamd_task *task = NULL;
876 	const gchar *host = NULL;
877 	const gchar *password = NULL, *dbname = NULL, *log_tag = NULL;
878 	gint cbref = -1;
879 	struct rspamd_config *cfg = NULL;
880 	struct rspamd_async_session *session = NULL;
881 	struct ev_loop *ev_base = NULL;
882 	gboolean ret = FALSE;
883 	guint flags = 0;
884 
885 	if (lua_istable (L, 1)) {
886 		/* Table version */
887 		lua_pushvalue (L, 1);
888 		lua_pushstring (L, "task");
889 		lua_gettable (L, -2);
890 		if (lua_type (L, -1) == LUA_TUSERDATA) {
891 			task = lua_check_task_maybe (L, -1);
892 		}
893 		lua_pop (L, 1);
894 
895 		if (!task) {
896 			/* We need to get ev_base, config and session separately */
897 			lua_pushstring (L, "config");
898 			lua_gettable (L, -2);
899 			if (lua_type (L, -1) == LUA_TUSERDATA) {
900 				cfg = lua_check_config (L, -1);
901 			}
902 			lua_pop (L, 1);
903 
904 			lua_pushstring (L, "session");
905 			lua_gettable (L, -2);
906 			if (lua_type (L, -1) == LUA_TUSERDATA) {
907 				session = lua_check_session (L, -1);
908 			}
909 			lua_pop (L, 1);
910 
911 			lua_pushstring (L, "ev_base");
912 			lua_gettable (L, -2);
913 			if (lua_type (L, -1) == LUA_TUSERDATA) {
914 				ev_base = lua_check_ev_base (L, -1);
915 			}
916 			lua_pop (L, 1);
917 
918 			if (cfg && ev_base) {
919 				ret = TRUE;
920 			}
921 			else if (!cfg) {
922 				msg_err_task_check ("config is not passed");
923 			}
924 			else {
925 				msg_err_task_check ("ev_base is not set");
926 			}
927 		}
928 		else {
929 			cfg = task->cfg;
930 			session = task->s;
931 			ev_base = task->event_loop;
932 			log_tag = task->task_pool->tag.uid;
933 			ret = TRUE;
934 
935 		}
936 
937 		if (pcbref) {
938 			lua_pushstring (L, "callback");
939 			lua_gettable (L, -2);
940 			if (lua_type (L, -1) == LUA_TFUNCTION) {
941 				/* This also pops function from the stack */
942 				cbref = luaL_ref (L, LUA_REGISTRYINDEX);
943 				*pcbref = cbref;
944 			}
945 			else {
946 				*pcbref = -1;
947 				lua_pop (L, 1);
948 			}
949 		}
950 
951 		lua_pushstring (L, "host");
952 		lua_gettable (L, -2);
953 
954 		if (lua_type (L, -1) == LUA_TUSERDATA) {
955 			addr = lua_check_ip (L, -1);
956 			host = rspamd_inet_address_to_string_pretty (addr->addr);
957 		}
958 		else if (lua_type (L, -1) == LUA_TSTRING) {
959 			host = lua_tostring (L, -1);
960 
961 			if (rspamd_parse_inet_address (&ip,
962 					host, strlen (host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
963 				addr = g_alloca (sizeof (*addr));
964 				addr->addr = ip;
965 
966 				if (rspamd_inet_address_get_port (ip) == 0) {
967 					rspamd_inet_address_set_port (ip, 6379);
968 				}
969 			}
970 		}
971 		lua_pop (L, 1);
972 
973 		lua_pushstring (L, "password");
974 		lua_gettable (L, -2);
975 		if (lua_type (L, -1) == LUA_TSTRING) {
976 			password = lua_tostring (L, -1);
977 		}
978 		lua_pop (L, 1);
979 
980 		lua_pushstring (L, "dbname");
981 		lua_gettable (L, -2);
982 		if (lua_type (L, -1) == LUA_TSTRING) {
983 			dbname = lua_tostring (L, -1);
984 		}
985 		lua_pop (L, 1);
986 
987 		lua_pushstring (L, "opaque_data");
988 		lua_gettable (L, -2);
989 		if (!!lua_toboolean (L, -1)) {
990 			flags |= LUA_REDIS_TEXTDATA;
991 		}
992 		lua_pop (L, 1);
993 
994 		lua_pushstring (L, "no_pool");
995 		lua_gettable (L, -2);
996 		if (!!lua_toboolean (L, -1)) {
997 			flags |= LUA_REDIS_NO_POOL;
998 		}
999 		lua_pop (L, 1);
1000 
1001 		lua_pop (L, 1); /* table */
1002 
1003 		if (session && rspamd_session_blocked (session)) {
1004 			msg_err_task_check ("Session is being destroying");
1005 			ret = FALSE;
1006 		}
1007 
1008 		if (ret && addr != NULL) {
1009 			ctx = g_malloc0 (sizeof (struct lua_redis_ctx));
1010 			REF_INIT_RETAIN (ctx, lua_redis_dtor);
1011 			if (is_async) {
1012 				ctx->flags |= flags | LUA_REDIS_ASYNC;
1013 				ud = &ctx->async;
1014 			}
1015 			else {
1016 				ud = &ctx->async;
1017 				ctx->replies = g_queue_new ();
1018 				ctx->events_cleanup = g_queue_new ();
1019 
1020 			}
1021 
1022 			ud->s = session;
1023 			ud->cfg = cfg;
1024 			ud->pool = cfg->redis_pool;
1025 			ud->event_loop = ev_base;
1026 			ud->task = task;
1027 
1028 			if (log_tag) {
1029 				rspamd_strlcpy (ud->log_tag, log_tag, sizeof (ud->log_tag));
1030 			}
1031 			else {
1032 				/* Use pointer itself as a tag */
1033 				rspamd_snprintf (ud->log_tag, sizeof (ud->log_tag),
1034 						"%ud",
1035 						(int)rspamd_cryptobox_fast_hash (&ud, sizeof (ud), 0));
1036 			}
1037 
1038 			if (task) {
1039 				ud->item = rspamd_symcache_get_cur_item (task);
1040 			}
1041 
1042 			ret = TRUE;
1043 		}
1044 		else {
1045 			if (cbref != -1) {
1046 				luaL_unref (L, LUA_REGISTRYINDEX, cbref);
1047 			}
1048 
1049 			msg_err_task_check ("incorrect function invocation");
1050 			ret = FALSE;
1051 		}
1052 	}
1053 
1054 	if (ret) {
1055 		ud->terminated = 0;
1056 		ud->ctx = rspamd_redis_pool_connect (ud->pool,
1057 				dbname, password,
1058 				rspamd_inet_address_to_string (addr->addr),
1059 				rspamd_inet_address_get_port (addr->addr));
1060 
1061 		if (ip) {
1062 			rspamd_inet_address_free (ip);
1063 		}
1064 
1065 		if (ud->ctx == NULL || ud->ctx->err) {
1066 			if (ud->ctx) {
1067 				msg_err_task_check ("cannot connect to redis: %s",
1068 						ud->ctx->errstr);
1069 				rspamd_redis_pool_release_connection (ud->pool, ud->ctx,
1070 						RSPAMD_REDIS_RELEASE_FATAL);
1071 				ud->ctx = NULL;
1072 			}
1073 			else {
1074 				msg_err_task_check ("cannot connect to redis (OS error): %s",
1075 						strerror (errno));
1076 			}
1077 
1078 			REDIS_RELEASE (ctx);
1079 
1080 			return NULL;
1081 		}
1082 
1083 		msg_debug_lua_redis ("opened redis connection host=%s; ctx=%p; ud=%p",
1084 				host, ctx, ud);
1085 
1086 		return ctx;
1087 	}
1088 
1089 	if (ip) {
1090 		rspamd_inet_address_free (ip);
1091 	}
1092 
1093 	return NULL;
1094 }
1095 
1096 /***
1097  * @function rspamd_redis.make_request({params})
1098  * Make request to redis server, params is a table of key=value arguments in any order
1099  * @param {task} task worker task object
1100  * @param {ip|string} host server address
1101  * @param {function} callback callback to be called in form `function (task, err, data)`
1102  * @param {string} cmd command to be sent to redis
1103  * @param {table} args numeric array of strings used as redis arguments
1104  * @param {number} timeout timeout in seconds for request (1.0 by default)
1105  * @return {boolean} `true` if a request has been scheduled
1106  */
1107 static int
lua_redis_make_request(lua_State * L)1108 lua_redis_make_request (lua_State *L)
1109 {
1110 	LUA_TRACE_POINT;
1111 	struct lua_redis_request_specific_userdata *sp_ud;
1112 	struct lua_redis_userdata *ud;
1113 	struct lua_redis_ctx *ctx, **pctx;
1114 	const gchar *cmd = NULL;
1115 	gdouble timeout = REDIS_DEFAULT_TIMEOUT;
1116 	gint cbref = -1;
1117 	gboolean ret = FALSE;
1118 
1119 	ctx = rspamd_lua_redis_prepare_connection (L, &cbref, TRUE);
1120 
1121 	if (ctx) {
1122 		ud = &ctx->async;
1123 		sp_ud = g_malloc0 (sizeof (*sp_ud));
1124 		sp_ud->cbref = cbref;
1125 		sp_ud->c = ud;
1126 		sp_ud->ctx = ctx;
1127 
1128 		lua_pushstring (L, "cmd");
1129 		lua_gettable (L, -2);
1130 		cmd = lua_tostring (L, -1);
1131 		lua_pop (L, 1);
1132 
1133 		lua_pushstring (L, "timeout");
1134 		lua_gettable (L, 1);
1135 		if (lua_type (L, -1) == LUA_TNUMBER) {
1136 			timeout = lua_tonumber (L, -1);
1137 		}
1138 		lua_pop (L, 1);
1139 		ud->timeout = timeout;
1140 
1141 
1142 		lua_pushstring (L, "args");
1143 		lua_gettable (L, 1);
1144 		lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens,
1145 				&sp_ud->nargs);
1146 		lua_pop (L, 1);
1147 		LL_PREPEND (ud->specific, sp_ud);
1148 
1149 		ret = redisAsyncCommandArgv (ud->ctx,
1150 				lua_redis_callback,
1151 				sp_ud,
1152 				sp_ud->nargs,
1153 				(const gchar **)sp_ud->args,
1154 				sp_ud->arglens);
1155 
1156 		if (ret == REDIS_OK) {
1157 			if (ud->s) {
1158 				rspamd_session_add_event (ud->s,
1159 						lua_redis_fin, sp_ud,
1160 						M);
1161 
1162 				if (ud->item) {
1163 					rspamd_symcache_item_async_inc (ud->task, ud->item, M);
1164 				}
1165 			}
1166 
1167 			REDIS_RETAIN (ctx); /* Cleared by fin event */
1168 			ctx->cmds_pending ++;
1169 
1170 			if (ud->ctx->c.flags & REDIS_SUBSCRIBED) {
1171 				msg_debug_lua_redis ("subscribe command, never unref/timeout");
1172 				sp_ud->flags |= LUA_REDIS_SUBSCRIBED;
1173 			}
1174 
1175 			sp_ud->timeout_ev.data = sp_ud;
1176 			ev_now_update_if_cheap ((struct ev_loop *)ud->event_loop);
1177 			ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0);
1178 			ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
1179 
1180 			ret = TRUE;
1181 		}
1182 		else {
1183 			msg_info ("call to redis failed: %s", ud->ctx->errstr);
1184 			rspamd_redis_pool_release_connection (ud->pool, ud->ctx,
1185 					RSPAMD_REDIS_RELEASE_FATAL);
1186 			ud->ctx = NULL;
1187 			REDIS_RELEASE (ctx);
1188 			ret = FALSE;
1189 		}
1190 	}
1191 	else {
1192 		lua_pushboolean (L, FALSE);
1193 		lua_pushnil (L);
1194 
1195 		return 2;
1196 	}
1197 
1198 	lua_pushboolean (L, ret);
1199 
1200 	if (ret) {
1201 		pctx = lua_newuserdata (L, sizeof (ctx));
1202 		*pctx = ctx;
1203 		rspamd_lua_setclass (L, "rspamd{redis}", -1);
1204 	}
1205 	else {
1206 		lua_pushnil (L);
1207 	}
1208 
1209 	return 2;
1210 }
1211 
1212 /***
1213  * @function rspamd_redis.make_request_sync({params})
1214  * Make blocking request to redis server, params is a table of key=value arguments in any order
1215  * @param {ip|string} host server address
1216  * @param {string} cmd command to be sent to redis
1217  * @param {table} args numeric array of strings used as redis arguments
1218  * @param {number} timeout timeout in seconds for request (1.0 by default)
1219  * @return {boolean + result} `true` and a result if a request has been successful
1220  */
1221 static int
lua_redis_make_request_sync(lua_State * L)1222 lua_redis_make_request_sync (lua_State *L)
1223 {
1224 	LUA_TRACE_POINT;
1225 	struct rspamd_lua_ip *addr = NULL;
1226 	rspamd_inet_addr_t *ip = NULL;
1227 	const gchar *cmd = NULL, *host;
1228 	struct timeval tv;
1229 	gboolean ret = FALSE;
1230 	gdouble timeout = REDIS_DEFAULT_TIMEOUT;
1231 	gchar **args = NULL;
1232 	gsize *arglens = NULL;
1233 	guint nargs = 0, flags = 0;
1234 	redisContext *ctx;
1235 	redisReply *r;
1236 
1237 	if (lua_istable (L, 1)) {
1238 		lua_pushvalue (L, 1);
1239 
1240 		lua_pushstring (L, "cmd");
1241 		lua_gettable (L, -2);
1242 		cmd = lua_tostring (L, -1);
1243 		lua_pop (L, 1);
1244 
1245 		lua_pushstring (L, "host");
1246 		lua_gettable (L, -2);
1247 		if (lua_type (L, -1) == LUA_TUSERDATA) {
1248 			addr = lua_check_ip (L, -1);
1249 		}
1250 		else if (lua_type (L, -1) == LUA_TSTRING) {
1251 			host = lua_tostring (L, -1);
1252 			if (rspamd_parse_inet_address (&ip,
1253 					host, strlen (host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
1254 				addr = g_alloca (sizeof (*addr));
1255 				addr->addr = ip;
1256 
1257 				if (rspamd_inet_address_get_port (ip) == 0) {
1258 					rspamd_inet_address_set_port (ip, 6379);
1259 				}
1260 			}
1261 		}
1262 		lua_pop (L, 1);
1263 
1264 		lua_pushstring (L, "timeout");
1265 		lua_gettable (L, -2);
1266 		if (lua_type (L, -1) == LUA_TNUMBER) {
1267 			timeout = lua_tonumber (L, -1);
1268 		}
1269 		lua_pop (L, 1);
1270 
1271 		lua_pushstring (L, "opaque_data");
1272 		lua_gettable (L, -2);
1273 		if (!!lua_toboolean (L, -1)) {
1274 			flags |= LUA_REDIS_TEXTDATA;
1275 		}
1276 		lua_pop (L, 1);
1277 
1278 
1279 		if (cmd) {
1280 			lua_pushstring (L, "args");
1281 			lua_gettable (L, -2);
1282 			lua_redis_parse_args (L, -1, cmd, &args, &arglens, &nargs);
1283 			lua_pop (L, 1);
1284 		}
1285 
1286 		lua_pop (L, 1);
1287 
1288 		if (addr && cmd) {
1289 			ret = TRUE;
1290 		}
1291 	}
1292 
1293 	if (ret) {
1294 		double_to_tv (timeout, &tv);
1295 
1296 		if (rspamd_inet_address_get_af (addr->addr) == AF_UNIX) {
1297 			ctx = redisConnectUnixWithTimeout (
1298 					rspamd_inet_address_to_string (addr->addr), tv);
1299 		}
1300 		else {
1301 			ctx = redisConnectWithTimeout (
1302 					rspamd_inet_address_to_string (addr->addr),
1303 					rspamd_inet_address_get_port (addr->addr), tv);
1304 		}
1305 
1306 		if (ip) {
1307 			rspamd_inet_address_free (ip);
1308 		}
1309 
1310 		if (ctx == NULL || ctx->err) {
1311 			redisFree (ctx);
1312 			lua_redis_free_args (args, arglens, nargs);
1313 			lua_pushboolean (L, FALSE);
1314 
1315 			return 1;
1316 		}
1317 
1318 		r = redisCommandArgv (ctx,
1319 					nargs,
1320 					(const gchar **)args,
1321 					arglens);
1322 
1323 		if (r != NULL) {
1324 			if (r->type != REDIS_REPLY_ERROR) {
1325 				lua_pushboolean (L, TRUE);
1326 				lua_redis_push_reply (L, r, flags & LUA_REDIS_TEXTDATA);
1327 			}
1328 			else {
1329 				lua_pushboolean (L, FALSE);
1330 				lua_pushstring (L, r->str);
1331 			}
1332 
1333 			freeReplyObject (r);
1334 			redisFree (ctx);
1335 			lua_redis_free_args (args, arglens, nargs);
1336 
1337 			return 2;
1338 		}
1339 		else {
1340 			msg_info ("call to redis failed: %s", ctx->errstr);
1341 			redisFree (ctx);
1342 			lua_redis_free_args (args, arglens, nargs);
1343 			lua_pushboolean (L, FALSE);
1344 		}
1345 	}
1346 	else {
1347 		if (ip) {
1348 			rspamd_inet_address_free (ip);
1349 		}
1350 		msg_err ("bad arguments for redis request");
1351 		lua_redis_free_args (args, arglens, nargs);
1352 
1353 		lua_pushboolean (L, FALSE);
1354 	}
1355 
1356 	return 1;
1357 }
1358 
1359 /***
1360  * @function rspamd_redis.connect({params})
1361  * Make request to redis server, params is a table of key=value arguments in any order
1362  * @param {task} task worker task object
1363  * @param {ip|string} host server address
1364  * @param {number} timeout timeout in seconds for request (1.0 by default)
1365  * @return {boolean,redis} new connection object or nil if connection failed
1366  */
1367 static int
lua_redis_connect(lua_State * L)1368 lua_redis_connect (lua_State *L)
1369 {
1370 	LUA_TRACE_POINT;
1371 	struct lua_redis_userdata *ud;
1372 	struct lua_redis_ctx *ctx, **pctx;
1373 	gdouble timeout = REDIS_DEFAULT_TIMEOUT;
1374 
1375 	ctx = rspamd_lua_redis_prepare_connection (L, NULL, TRUE);
1376 
1377 	if (ctx) {
1378 		ud = &ctx->async;
1379 
1380 		lua_pushstring (L, "timeout");
1381 		lua_gettable (L, 1);
1382 		if (lua_type (L, -1) == LUA_TNUMBER) {
1383 			timeout = lua_tonumber (L, -1);
1384 		}
1385 
1386 		lua_pop (L, 1);
1387 		ud->timeout = timeout;
1388 	}
1389 	else {
1390 		lua_pushboolean (L, FALSE);
1391 		lua_pushnil (L);
1392 
1393 		return 2;
1394 	}
1395 
1396 	lua_pushboolean (L, TRUE);
1397 	pctx = lua_newuserdata (L, sizeof (ctx));
1398 	*pctx = ctx;
1399 	rspamd_lua_setclass (L, "rspamd{redis}", -1);
1400 
1401 	return 2;
1402 }
1403 
1404 /***
1405  * @function rspamd_redis.connect_sync({params})
1406  * Make blocking request to redis server, params is a table of key=value arguments in any order
1407  * @param {ip|string} host server address
1408  * @param {number} timeout timeout in seconds for request (1.0 by default)
1409  * @return {redis} redis object if a request has been successful
1410  */
1411 static int
lua_redis_connect_sync(lua_State * L)1412 lua_redis_connect_sync (lua_State *L)
1413 {
1414 	LUA_TRACE_POINT;
1415 	gdouble timeout = REDIS_DEFAULT_TIMEOUT;
1416 	struct lua_redis_ctx *ctx, **pctx;
1417 
1418 	ctx = rspamd_lua_redis_prepare_connection (L, NULL, FALSE);
1419 
1420 	if (ctx) {
1421 		if (lua_istable (L, 1)) {
1422 			lua_pushstring (L, "timeout");
1423 			lua_gettable (L, 1);
1424 			if (lua_type (L, -1) == LUA_TNUMBER) {
1425 				timeout = lua_tonumber (L, -1);
1426 			}
1427 			lua_pop (L, 1);
1428 		}
1429 
1430 		ctx->async.timeout = timeout;
1431 
1432 		lua_pushboolean (L, TRUE);
1433 		pctx = lua_newuserdata (L, sizeof (ctx));
1434 		*pctx = ctx;
1435 		rspamd_lua_setclass (L, "rspamd{redis}", -1);
1436 	}
1437 	else {
1438 		lua_pushboolean (L, FALSE);
1439 		lua_pushstring (L, "bad arguments for redis request");
1440 		return 2;
1441 	}
1442 
1443 	return 2;
1444 }
1445 
1446 /***
1447  * @method rspamd_redis:add_cmd(cmd, {args})
1448  * Append new cmd to redis pipeline
1449  * @param {string} cmd command to be sent to redis
1450  * @param {table} args array of strings used as redis arguments
1451  * @return {boolean} `true` if a request has been successful
1452  */
1453 static int
lua_redis_add_cmd(lua_State * L)1454 lua_redis_add_cmd (lua_State *L)
1455 {
1456 	LUA_TRACE_POINT;
1457 	struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
1458 	struct lua_redis_request_specific_userdata *sp_ud;
1459 	struct lua_redis_userdata *ud;
1460 	const gchar *cmd = NULL;
1461 	gint args_pos = 2;
1462 	gint cbref = -1, ret;
1463 
1464 	if (ctx) {
1465 		if (ctx->flags & LUA_REDIS_TERMINATED) {
1466 			lua_pushboolean (L, FALSE);
1467 			lua_pushstring (L, "Connection is terminated");
1468 
1469 			return 2;
1470 		}
1471 
1472 		/* Async version */
1473 		if (lua_type (L, 2) == LUA_TSTRING) {
1474 			/* No callback version */
1475 			cmd = lua_tostring (L, 2);
1476 			args_pos = 3;
1477 		}
1478 		else if (lua_type (L, 2) == LUA_TFUNCTION) {
1479 			lua_pushvalue (L, 2);
1480 			cbref = luaL_ref (L, LUA_REGISTRYINDEX);
1481 			cmd = lua_tostring (L, 3);
1482 			args_pos = 4;
1483 		}
1484 		else {
1485 			return luaL_error (L, "invalid arguments");
1486 		}
1487 
1488 		sp_ud = g_malloc0 (sizeof (*sp_ud));
1489 		if (IS_ASYNC (ctx)) {
1490 			sp_ud->c = &ctx->async;
1491 			ud = &ctx->async;
1492 			sp_ud->cbref = cbref;
1493 		}
1494 		else {
1495 			sp_ud->c = &ctx->async;
1496 			ud = &ctx->async;
1497 		}
1498 		sp_ud->ctx = ctx;
1499 
1500 		lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args,
1501 					&sp_ud->arglens, &sp_ud->nargs);
1502 
1503 		LL_PREPEND (sp_ud->c->specific, sp_ud);
1504 
1505 		if (ud->s && rspamd_session_blocked (ud->s)) {
1506 			lua_pushboolean (L, 0);
1507 			lua_pushstring (L, "session is terminating");
1508 
1509 			return 2;
1510 		}
1511 
1512 		if (IS_ASYNC (ctx)) {
1513 			ret = redisAsyncCommandArgv (sp_ud->c->ctx,
1514 					lua_redis_callback,
1515 					sp_ud,
1516 					sp_ud->nargs,
1517 					(const gchar **)sp_ud->args,
1518 					sp_ud->arglens);
1519 		}
1520 		else {
1521 			ret = redisAsyncCommandArgv (sp_ud->c->ctx,
1522 					lua_redis_callback_sync,
1523 					sp_ud,
1524 					sp_ud->nargs,
1525 					(const gchar **)sp_ud->args,
1526 					sp_ud->arglens);
1527 		}
1528 
1529 		if (ret == REDIS_OK) {
1530 			if (ud->s) {
1531 				rspamd_session_add_event (ud->s,
1532 						lua_redis_fin,
1533 						sp_ud,
1534 						M);
1535 
1536 				if (ud->item) {
1537 					rspamd_symcache_item_async_inc (ud->task, ud->item, M);
1538 				}
1539 			}
1540 
1541 			sp_ud->timeout_ev.data = sp_ud;
1542 
1543 			if (IS_ASYNC (ctx)) {
1544 				ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout,
1545 						sp_ud->c->timeout, 0.0);
1546 			}
1547 			else {
1548 				ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout_sync,
1549 						sp_ud->c->timeout, 0.0);
1550 			}
1551 
1552 			ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
1553 			REDIS_RETAIN (ctx);
1554 			ctx->cmds_pending ++;
1555 		}
1556 		else {
1557 			msg_info ("call to redis failed: %s",
1558 					sp_ud->c->ctx->errstr);
1559 			lua_pushboolean (L, 0);
1560 			lua_pushstring (L, sp_ud->c->ctx->errstr);
1561 
1562 			return 2;
1563 		}
1564 	}
1565 
1566 	lua_pushboolean (L, true);
1567 
1568 	return 1;
1569 }
1570 
1571 /***
1572  * @method rspamd_redis:exec()
1573  * Executes pending commands (suitable for blocking IO only for now)
1574  * @return {boolean}, {table}, ...: pairs in format [bool, result] for each request pending
1575  */
1576 static int
lua_redis_exec(lua_State * L)1577 lua_redis_exec (lua_State *L)
1578 {
1579 	LUA_TRACE_POINT;
1580 	struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
1581 
1582 	if (ctx == NULL) {
1583 		lua_error (L);
1584 
1585 		return 1;
1586 	}
1587 
1588 	if (IS_ASYNC (ctx)) {
1589 		lua_pushstring (L, "Async redis pipelining is not implemented");
1590 		lua_error (L);
1591 		return 0;
1592 	}
1593 	else {
1594 		if (false /* !ctx->d.sync */) {
1595 			lua_pushstring (L, "cannot exec commands when not connected");
1596 			lua_error (L);
1597 			return 0;
1598 		}
1599 		else {
1600 			if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) == 0) {
1601 				lua_pushstring (L, "No pending commands to execute");
1602 				lua_error (L);
1603 			}
1604 			if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) > 0) {
1605 				gint results = lua_redis_push_results (ctx, L);
1606 				return results;
1607 			}
1608 			else {
1609 				ctx->thread = lua_thread_pool_get_running_entry (ctx->async.cfg->lua_thread_pool);
1610 				return lua_thread_yield (ctx->thread, 0);
1611 			}
1612 		}
1613 	}
1614 }
1615 #else
1616 static int
lua_redis_make_request(lua_State * L)1617 lua_redis_make_request (lua_State *L)
1618 {
1619 	msg_warn ("rspamd is compiled with no redis support");
1620 
1621 	lua_pushboolean (L, FALSE);
1622 
1623 	return 1;
1624 }
1625 static int
lua_redis_make_request_sync(lua_State * L)1626 lua_redis_make_request_sync (lua_State *L)
1627 {
1628 	msg_warn ("rspamd is compiled with no redis support");
1629 
1630 	lua_pushboolean (L, FALSE);
1631 
1632 	return 1;
1633 }
1634 static int
lua_redis_connect(lua_State * L)1635 lua_redis_connect (lua_State *L)
1636 {
1637 	msg_warn ("rspamd is compiled with no redis support");
1638 
1639 	lua_pushboolean (L, FALSE);
1640 
1641 	return 1;
1642 }
1643 static int
lua_redis_connect_sync(lua_State * L)1644 lua_redis_connect_sync (lua_State *L)
1645 {
1646 	msg_warn ("rspamd is compiled with no redis support");
1647 
1648 	lua_pushboolean (L, FALSE);
1649 
1650 	return 1;
1651 }
1652 static int
lua_redis_add_cmd(lua_State * L)1653 lua_redis_add_cmd (lua_State *L)
1654 {
1655 	msg_warn ("rspamd is compiled with no redis support");
1656 
1657 	lua_pushboolean (L, FALSE);
1658 
1659 	return 1;
1660 }
1661 static int
lua_redis_exec(lua_State * L)1662 lua_redis_exec (lua_State *L)
1663 {
1664 	msg_warn ("rspamd is compiled with no redis support");
1665 
1666 	lua_pushboolean (L, FALSE);
1667 
1668 	return 1;
1669 }
1670 static int
lua_redis_gc(lua_State * L)1671 lua_redis_gc (lua_State *L)
1672 {
1673 	return 0;
1674 }
1675 #endif
1676 
1677 static gint
lua_load_redis(lua_State * L)1678 lua_load_redis (lua_State * L)
1679 {
1680 	lua_newtable (L);
1681 	luaL_register (L, NULL, redislib_f);
1682 
1683 	return 1;
1684 }
1685 
1686 static gint
lua_redis_null_idx(lua_State * L)1687 lua_redis_null_idx (lua_State *L)
1688 {
1689 	lua_pushnil (L);
1690 
1691 	return 1;
1692 }
1693 
1694 static void
lua_redis_null_mt(lua_State * L)1695 lua_redis_null_mt (lua_State *L)
1696 {
1697 	luaL_newmetatable (L, "redis{null}");
1698 
1699 	lua_pushcfunction (L, lua_redis_null_idx);
1700 	lua_setfield (L, -2, "__index");
1701 	lua_pushcfunction (L, lua_redis_null_idx);
1702 	lua_setfield (L, -2, "__tostring");
1703 
1704 	lua_pop (L, 1);
1705 }
1706 
1707 /**
1708  * Open redis library
1709  * @param L lua stack
1710  * @return
1711  */
1712 void
luaopen_redis(lua_State * L)1713 luaopen_redis (lua_State * L)
1714 {
1715 	rspamd_lua_new_class (L, "rspamd{redis}", redislib_m);
1716 	lua_pop (L, 1);
1717 	rspamd_lua_add_preload (L, "rspamd_redis", lua_load_redis);
1718 
1719 	/* Set null element */
1720 	lua_redis_null_mt (L);
1721 	redis_null = lua_newuserdata (L, 0);
1722 	luaL_getmetatable (L, "redis{null}");
1723 	lua_setmetatable (L, -2);
1724 	lua_setfield (L, LUA_REGISTRYINDEX, "redis.null");
1725 }
1726