1 /*
2 * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3 * Copyright (C) 2005-2016, Anthony Minessale II <anthm@freeswitch.org>
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is
20 * Anthony Minessale II <anthm@freeswitch.org>
21 * Portions created by the Initial Developer are Copyright (C)
22 * the Initial Developer. All Rights Reserved.
23 *
24 * Contributor(s):
25 * William King <william.king@quentustech.com>
26 * Chris Rienzo <chris.rienzo@citrix.com>
27 *
28 * mod_hiredis.c -- Redis DB access module
29 *
30 */
31 
32 #include "mod_hiredis.h"
33 
34 mod_hiredis_global_t mod_hiredis_globals;
35 
36 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hiredis_shutdown);
37 SWITCH_MODULE_LOAD_FUNCTION(mod_hiredis_load);
38 SWITCH_MODULE_DEFINITION(mod_hiredis, mod_hiredis_load, mod_hiredis_shutdown, NULL);
39 
40 #define DECR_DEL_SCRIPT "local v=redis.call(\"decr\",KEYS[1]);if v <= 0 then redis.call(\"del\",KEYS[1]) end;return v;"
41 
42 /**
43  * Get exclusive access to limit_pvt, if it exists
44  */
get_limit_pvt(switch_core_session_t * session)45 static hiredis_limit_pvt_t *get_limit_pvt(switch_core_session_t *session)
46 {
47 	switch_channel_t *channel = switch_core_session_get_channel(session);
48 
49 	hiredis_limit_pvt_t *limit_pvt = switch_channel_get_private(channel, "hiredis_limit_pvt");
50 	if (limit_pvt) {
51 		/* pvt already exists, return it */
52 		switch_mutex_lock(limit_pvt->mutex);
53 		return limit_pvt;
54 	}
55 	return NULL;
56 }
57 
58 /**
59  * Add limit_pvt and get exclusive access to it
60  */
add_limit_pvt(switch_core_session_t * session)61 static hiredis_limit_pvt_t *add_limit_pvt(switch_core_session_t *session)
62 {
63 	switch_channel_t *channel = switch_core_session_get_channel(session);
64 
65 	hiredis_limit_pvt_t *limit_pvt = switch_channel_get_private(channel, "hiredis_limit_pvt");
66 	if (limit_pvt) {
67 		/* pvt already exists, return it */
68 		switch_mutex_lock(limit_pvt->mutex);
69 		return limit_pvt;
70 	}
71 
72 	/* not created yet, add it - NOTE a channel mutex would be better here if we had access to it */
73 	switch_mutex_lock(mod_hiredis_globals.limit_pvt_mutex);
74 	limit_pvt = switch_channel_get_private(channel, "hiredis_limit_pvt");
75 	if (limit_pvt) {
76 		/* was just added by another thread */
77 		switch_mutex_unlock(mod_hiredis_globals.limit_pvt_mutex);
78 		switch_mutex_lock(limit_pvt->mutex);
79 		return limit_pvt;
80 	}
81 
82 	/* still not created yet, add it */
83 	limit_pvt = switch_core_session_alloc(session, sizeof(*limit_pvt));
84 	switch_mutex_init(&limit_pvt->mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
85 	limit_pvt->first = NULL;
86 	switch_channel_set_private(channel, "hiredis_limit_pvt", limit_pvt);
87 	switch_mutex_unlock(mod_hiredis_globals.limit_pvt_mutex);
88 	switch_mutex_lock(limit_pvt->mutex);
89 	return limit_pvt;
90 }
91 
92 /**
93  * Release exclusive acess to limit_pvt
94  */
release_limit_pvt(hiredis_limit_pvt_t * limit_pvt)95 static void release_limit_pvt(hiredis_limit_pvt_t *limit_pvt)
96 {
97 	if (limit_pvt) {
98 		switch_mutex_unlock(limit_pvt->mutex);
99 	}
100 }
101 
SWITCH_STANDARD_APP(raw_app)102 SWITCH_STANDARD_APP(raw_app)
103 {
104 	switch_channel_t *channel = switch_core_session_get_channel(session);
105 	char *response = NULL, *profile_name = NULL, *cmd = NULL;
106 	hiredis_profile_t *profile = NULL;
107 
108 	if ( !zstr(data) ) {
109 		profile_name = strdup(data);
110 	} else {
111 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: invalid data! Use the format 'default set keyname value' \n");
112 		goto done;
113 	}
114 
115 	if ( (cmd = strchr(profile_name, ' '))) {
116 		*cmd = '\0';
117 		cmd++;
118 	} else {
119 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: invalid data! Use the format 'default set keyname value' \n");
120 		goto done;
121 	}
122 
123 	profile = switch_core_hash_find(mod_hiredis_globals.profiles, profile_name);
124 
125 	if ( !profile ) {
126 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: Unable to locate profile[%s]\n", profile_name);
127 		return;
128 	}
129 
130 	if ( hiredis_profile_execute_sync(profile, session, &response, cmd) != SWITCH_STATUS_SUCCESS) {
131 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] because [%s]\n", profile_name, cmd, response ? response : "");
132 	}
133 
134 	switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : "");
135 
136  done:
137 	switch_safe_free(profile_name);
138 	switch_safe_free(response);
139 	return;
140 }
141 
SWITCH_STANDARD_API(raw_api)142 SWITCH_STANDARD_API(raw_api)
143 {
144 	hiredis_profile_t *profile = NULL;
145 	char *data = NULL, *input = NULL, *response = NULL;
146 	switch_status_t status = SWITCH_STATUS_SUCCESS;
147 
148 	if ( !zstr(cmd) ) {
149 		input = strdup(cmd);
150 	} else {
151 		switch_goto_status(SWITCH_STATUS_GENERR, done);
152 	}
153 
154 	if ( (data = strchr(input, ' '))) {
155 		*data = '\0';
156 		data++;
157 	}
158 
159 	switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "hiredis: debug: profile[%s] for command [%s]\n", input, data);
160 
161 	profile = switch_core_hash_find(mod_hiredis_globals.profiles, input);
162 
163 	if ( !profile ) {
164 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: Unable to locate profile[%s]\n", input);
165 		switch_goto_status(SWITCH_STATUS_GENERR, done);
166 	}
167 
168 	if ( hiredis_profile_execute_sync(profile, session, &response, data) != SWITCH_STATUS_SUCCESS) {
169 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] reason:[%s]\n", input, data, response ? response : "");
170 		switch_goto_status(SWITCH_STATUS_GENERR, done);
171 	}
172 
173 	if (response) {
174 		stream->write_function(stream, response);
175 	}
176  done:
177 	switch_safe_free(input);
178 	switch_safe_free(response);
179 	return status;
180 }
181 
182 /*
183 SWITCH_LIMIT_INCR(name) static switch_status_t name (switch_core_session_t *session, const char *realm, const char *resource,
184                                                      const int max, const int interval)
185 */
SWITCH_LIMIT_INCR(hiredis_limit_incr)186 SWITCH_LIMIT_INCR(hiredis_limit_incr)
187 {
188 	switch_channel_t *channel = switch_core_session_get_channel(session);
189 	hiredis_profile_t *profile = NULL;
190 	char *response = NULL, *limit_key = NULL;
191 	int64_t count = 0; /* Redis defines the incr action as to be performed on a 64 bit signed integer */
192 	time_t now = switch_epoch_time_now(NULL);
193 	switch_status_t status = SWITCH_STATUS_SUCCESS;
194 	hiredis_limit_pvt_t *limit_pvt = NULL;
195 	hiredis_limit_pvt_node_t *limit_pvt_node = NULL;
196 	switch_memory_pool_t *session_pool = switch_core_session_get_pool(session);
197 
198 	if ( zstr(realm) ) {
199 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: realm must be defined\n");
200 		switch_goto_status(SWITCH_STATUS_GENERR, done);
201 	}
202 
203 	if ( interval < 0 ) {
204 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: interval must be >= 0\n");
205 		switch_goto_status(SWITCH_STATUS_GENERR, done);
206 	}
207 
208 	profile = switch_core_hash_find(mod_hiredis_globals.profiles, realm);
209 
210 	if ( !profile ) {
211 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: Unable to locate profile[%s]\n", realm);
212 		switch_goto_status(SWITCH_STATUS_GENERR, done);
213 	}
214 
215 	if ( interval ) {
216 		limit_key = switch_core_session_sprintf(session, "%s_%d", resource, now / interval);
217 	} else {
218 		limit_key = switch_core_session_sprintf(session, "%s", resource);
219 	}
220 
221 	if ( (status = hiredis_profile_execute_pipeline_printf(profile, session, &response, "incr %s", limit_key) ) != SWITCH_STATUS_SUCCESS ) {
222 		if ( status == SWITCH_STATUS_SOCKERR && profile->ignore_connect_fail) {
223 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] connection error incrementing [%s]\n", realm, limit_key);
224 			switch_goto_status(SWITCH_STATUS_SUCCESS, done);
225 		} else if ( profile->ignore_error ) {
226 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] general error incrementing [%s]\n", realm, limit_key);
227 			switch_goto_status(SWITCH_STATUS_SUCCESS, done);
228 		}
229 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error incrementing [%s] because [%s]\n", realm, limit_key, response ? response : "");
230 		switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : "");
231 		switch_goto_status(SWITCH_STATUS_GENERR, done);
232 	}
233 
234 	/* set expiration for interval on first increment */
235 	if ( interval && !strcmp("1", response ? response : "") ) {
236 		hiredis_profile_execute_pipeline_printf(profile, session, NULL, "expire %s %d", limit_key, interval);
237 	}
238 
239 	switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : "");
240 
241 	count = atoll(response ? response : "");
242 
243 	if ( switch_is_number(response ? response : "") && count <= 0 ) {
244 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "limit not positive after increment, resource = %s, val = %s\n", limit_key, response ? response : "");
245 	} else {
246 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "resource = %s, response = %s\n", limit_key, response ? response : "");
247 	}
248 
249 	if ( !switch_is_number(response ? response : "") && !profile->ignore_error ) {
250 		/* got response error */
251 		switch_goto_status(SWITCH_STATUS_GENERR, done);
252 	} else if ( max > 0 && count > 0 && count > max ) {
253 		switch_channel_set_variable(channel, "hiredis_limit_exceeded", "true");
254 		if ( !interval ) { /* don't need to decrement intervals if limit exceeded since the interval keys are named w/ timestamp */
255 			if ( profile->delete_when_zero ) {
256 				hiredis_profile_eval_pipeline(profile, session, NULL, DECR_DEL_SCRIPT, 1, limit_key);
257 			} else {
258 				hiredis_profile_execute_pipeline_printf(profile, session, NULL, "decr %s", limit_key);
259 			}
260 		}
261 		switch_goto_status(SWITCH_STATUS_GENERR, done);
262 	}
263 
264 	if ( !interval && count > 0 ) {
265 		/* only non-interval limits need to be released on session destroy */
266 		limit_pvt_node = switch_core_alloc(session_pool, sizeof(*limit_pvt_node));
267 		limit_pvt_node->realm = switch_core_strdup(session_pool, realm);
268 		limit_pvt_node->resource = switch_core_strdup(session_pool, resource);
269 		limit_pvt_node->limit_key = limit_key;
270 		limit_pvt_node->inc = 1;
271 		limit_pvt_node->interval = interval;
272 		limit_pvt = add_limit_pvt(session);
273 		limit_pvt_node->next = limit_pvt->first;
274 		limit_pvt->first = limit_pvt_node;
275 		release_limit_pvt(limit_pvt);
276 	}
277 
278  done:
279 	switch_safe_free(response);
280 	return status;
281 }
282 
283 /*
284   SWITCH_LIMIT_RELEASE(name) static switch_status_t name (switch_core_session_t *session, const char *realm, const char *resource)
285 */
SWITCH_LIMIT_RELEASE(hiredis_limit_release)286 SWITCH_LIMIT_RELEASE(hiredis_limit_release)
287 {
288 	switch_channel_t *channel = switch_core_session_get_channel(session);
289 	hiredis_profile_t *profile = NULL;
290 	char *response = NULL;
291 	switch_status_t status = SWITCH_STATUS_SUCCESS;
292 	hiredis_limit_pvt_t *limit_pvt = get_limit_pvt(session);
293 
294 	if (!limit_pvt) {
295 		/* nothing to release */
296 		return SWITCH_STATUS_SUCCESS;
297 	}
298 
299 	/* If realm and resource are NULL, then clear all of the limits */
300 	if ( zstr(realm) && zstr(resource) ) {
301 		hiredis_limit_pvt_node_t *cur = NULL;
302 
303 		for ( cur = limit_pvt->first; cur; cur = cur->next ) {
304 			/* Rate limited resources are not auto-decremented, they will expire. */
305 			if ( !cur->interval && cur->inc ) {
306 				switch_status_t result;
307 				cur->inc = 0; /* mark as released */
308 				profile = switch_core_hash_find(mod_hiredis_globals.profiles, cur->realm);
309 				if ( profile->delete_when_zero ) {
310 					result = hiredis_profile_eval_pipeline(profile, session, &response, DECR_DEL_SCRIPT, 1, cur->limit_key);
311 				} else {
312 					result = hiredis_profile_execute_pipeline_printf(profile, session, &response, "decr %s", cur->limit_key);
313 				}
314 				if ( result != SWITCH_STATUS_SUCCESS ) {
315 					switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error decrementing [%s] because [%s]\n",
316 									  cur->realm, cur->limit_key, response ? response : "");
317 				}
318 				switch_safe_free(response);
319 				response = NULL;
320 			}
321 		}
322 	} else if (!zstr(resource) ) {
323 		/* clear single non-interval resource */
324 		hiredis_limit_pvt_node_t *cur = NULL;
325 		for (cur = limit_pvt->first; cur; cur = cur->next ) {
326 			if ( !cur->interval && cur->inc && !strcmp(cur->resource, resource) && (zstr(realm) || !strcmp(cur->realm, realm)) ) {
327 				/* found the resource to clear */
328 				cur->inc = 0; /* mark as released */
329 				profile = switch_core_hash_find(mod_hiredis_globals.profiles, cur->realm);
330 				if (profile) {
331 					if ( profile->delete_when_zero ) {
332 						status = hiredis_profile_eval_pipeline(profile, session, &response, DECR_DEL_SCRIPT, 1, cur->limit_key);
333 					} else {
334 						status = hiredis_profile_execute_pipeline_printf(profile, session, &response, "decr %s", cur->limit_key);
335 					}
336 					if ( status != SWITCH_STATUS_SUCCESS ) {
337 						if ( status == SWITCH_STATUS_SOCKERR && profile->ignore_connect_fail ) {
338 							switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] connection error decrementing [%s]\n", cur->realm, cur->limit_key);
339 							switch_goto_status(SWITCH_STATUS_SUCCESS, done);
340 						} else if ( profile->ignore_error ) {
341 							switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] general error decrementing [%s]\n", realm, cur->limit_key);
342 							switch_goto_status(SWITCH_STATUS_SUCCESS, done);
343 						}
344 						switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error decrementing [%s] because [%s]\n", realm, cur->limit_key, response ? response : "");
345 						switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : "");
346 						switch_goto_status(SWITCH_STATUS_GENERR, done);
347 					}
348 
349 					switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : "");
350 				}
351 				break;
352 			}
353 		}
354 	}
355 
356  done:
357 	release_limit_pvt(limit_pvt);
358 	switch_safe_free(response);
359 	return status;
360 }
361 
362 /*
363 SWITCH_LIMIT_USAGE(name) static int name (const char *realm, const char *resource, uint32_t *rcount)
364  */
SWITCH_LIMIT_USAGE(hiredis_limit_usage)365 SWITCH_LIMIT_USAGE(hiredis_limit_usage)
366 {
367 	hiredis_profile_t *profile = switch_core_hash_find(mod_hiredis_globals.profiles, realm);
368 	int64_t count = 0; /* Redis defines the incr action as to be performed on a 64 bit signed integer */
369 	char *response = NULL;
370 
371 	if ( zstr(realm) ) {
372 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: realm must be defined\n");
373 		goto err;
374 	}
375 
376 	if ( !profile ) {
377 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: Unable to locate profile[%s]\n", realm);
378 		goto err;
379 	}
380 
381 	if ( hiredis_profile_execute_pipeline_printf(profile, NULL, &response, "get %s", resource) != SWITCH_STATUS_SUCCESS ) {
382 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: profile[%s] error querying [%s] because [%s]\n", realm, resource, response ? response : "");
383 		goto err;
384 	}
385 
386 	count = atoll(response ? response : "");
387 
388 	switch_safe_free(response);
389 	return count;
390 
391  err:
392 	switch_safe_free(response);
393 	return -1;
394 }
395 
396 /*
397 SWITCH_LIMIT_RESET(name) static switch_status_t name (void)
398  */
SWITCH_LIMIT_RESET(hiredis_limit_reset)399 SWITCH_LIMIT_RESET(hiredis_limit_reset)
400 {
401 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: unable to globally reset hiredis limit resources. Use 'hiredis_raw set resource_name 0'\n");
402 	return SWITCH_STATUS_NOTIMPL;
403 }
404 
405 /*
406   SWITCH_LIMIT_INTERVAL_RESET(name) static switch_status_t name (const char *realm, const char *resource)
407 */
SWITCH_LIMIT_INTERVAL_RESET(hiredis_limit_interval_reset)408 SWITCH_LIMIT_INTERVAL_RESET(hiredis_limit_interval_reset)
409 {
410 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: unable to reset hiredis interval limit resources.\n");
411 	return SWITCH_STATUS_NOTIMPL;
412 }
413 
414 /*
415 SWITCH_LIMIT_STATUS(name) static char * name (void)
416  */
SWITCH_LIMIT_STATUS(hiredis_limit_status)417 SWITCH_LIMIT_STATUS(hiredis_limit_status)
418 {
419 	return strdup("-ERR not supported");
420 }
421 
SWITCH_MODULE_LOAD_FUNCTION(mod_hiredis_load)422 SWITCH_MODULE_LOAD_FUNCTION(mod_hiredis_load)
423 {
424 	switch_application_interface_t *app_interface;
425 	switch_api_interface_t *api_interface;
426 	switch_limit_interface_t *limit_interface;
427 
428 	memset(&mod_hiredis_globals, 0, sizeof(mod_hiredis_globals));
429 	*module_interface = switch_loadable_module_create_module_interface(pool, modname);
430 	mod_hiredis_globals.pool = pool;
431 	switch_mutex_init(&mod_hiredis_globals.limit_pvt_mutex, SWITCH_MUTEX_NESTED, pool);
432 
433 	switch_core_hash_init(&(mod_hiredis_globals.profiles));
434 
435 	if ( mod_hiredis_do_config() != SWITCH_STATUS_SUCCESS ) {
436 		return SWITCH_STATUS_GENERR;
437 	}
438 
439 	SWITCH_ADD_LIMIT(limit_interface, "hiredis", hiredis_limit_incr, hiredis_limit_release, hiredis_limit_usage,
440 					 hiredis_limit_reset, hiredis_limit_status, hiredis_limit_interval_reset);
441 	SWITCH_ADD_APP(app_interface, "hiredis_raw", "hiredis_raw", "hiredis_raw", raw_app, "", SAF_SUPPORT_NOMEDIA | SAF_ROUTING_EXEC | SAF_ZOMBIE_EXEC);
442 	SWITCH_ADD_API(api_interface, "hiredis_raw", "hiredis_raw", raw_api, "");
443 
444 	return SWITCH_STATUS_SUCCESS;
445 }
446 
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hiredis_shutdown)447 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hiredis_shutdown)
448 {
449 	switch_hash_index_t *hi;
450 	hiredis_profile_t *profile = NULL;
451 	/* loop through profiles, and destroy them */
452 
453 	while ((hi = switch_core_hash_first(mod_hiredis_globals.profiles))) {
454 		switch_core_hash_this(hi, NULL, NULL, (void **)&profile);
455 		hiredis_profile_destroy(&profile);
456 		switch_safe_free(hi);
457 	}
458 
459 	switch_core_hash_destroy(&(mod_hiredis_globals.profiles));
460 
461 	return SWITCH_STATUS_SUCCESS;
462 }
463 
464 
465 /* For Emacs:
466  * Local Variables:
467  * mode:c
468  * indent-tabs-mode:t
469  * tab-width:4
470  * c-basic-offset:4
471  * End:
472  * For VIM:
473  * vim:set softtabstop=4 shiftwidth=4 tabstop=4
474  */
475