1 /*
2  * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is
20  * Anthony Minessale II <anthm@freeswitch.org>
21  * Portions created by the Initial Developer are Copyright (C)
22  * the Initial Developer. All Rights Reserved.
23  *
24  * Contributor(s):
25  *
26  * Anthony Minessale II <anthm@freeswitch.org>
27  * Ken Rice <krice at suspicious dot org
28  * Mathieu Rene <mathieu.rene@gmail.com>
29  * Bret McDanel <trixter AT 0xdecafbad.com>
30  * Rupa Schomaker <rupa@rupa.com>
31  *
32  * mod_hash.c -- Hash api, hash backend for limit
33  *
34  */
35 
36 #include <switch.h>
37 #include "esl.h"
38 
39 #define LIMIT_HASH_CLEANUP_INTERVAL 900
40 
41 SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load);
42 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hash_shutdown);
43 SWITCH_MODULE_DEFINITION(mod_hash, mod_hash_load, mod_hash_shutdown, NULL);
44 
45 /* CORE STUFF */
46 static struct {
47 	switch_memory_pool_t *pool;
48 	switch_thread_rwlock_t *limit_hash_rwlock;
49 	switch_hash_t *limit_hash;
50 	switch_thread_rwlock_t *db_hash_rwlock;
51 	switch_hash_t *db_hash;
52 	switch_thread_rwlock_t *remote_hash_rwlock;
53 	switch_hash_t *remote_hash;
54 } globals;
55 
56 typedef struct {
57 	uint32_t total_usage;	/* < Total */
58 	uint32_t rate_usage;	/* < Current rate usage */
59 	time_t last_check;		/* < Last rate check */
60 	uint32_t interval;		/* < Interval used on last rate check */
61 	switch_time_t last_update;	/* < Last updated timestamp (rate or total) */
62 } limit_hash_item_t;
63 
64 struct callback {
65 	char *buf;
66 	size_t len;
67 	int matches;
68 };
69 
70 typedef struct callback callback_t;
71 
72 /* HASH STUFF */
73 typedef struct {
74 	switch_hash_t *hash;
75 } limit_hash_private_t;
76 
77 typedef enum {
78 	REMOTE_OFF = 0,	/* < Thread not running */
79 	REMOTE_DOWN, 	/* <C annot connect to remote instance */
80 	REMOTE_UP		/* < All good */
81 } limit_remote_state_t;
82 
state_str(limit_remote_state_t state)83 static inline const char *state_str(limit_remote_state_t state) {
84 	switch (state) {
85 		case REMOTE_OFF:
86 			return "Off";
87 		case REMOTE_DOWN:
88 			return "Down";
89 		case REMOTE_UP:
90 			return "Up";
91 	}
92 	return "";
93 }
94 
95 typedef struct {
96 	const char *name;
97 	const char *host;
98 	const char *username;
99 	const char *password;
100 	int port;
101 
102 	int interval;
103 
104 	esl_handle_t handle;
105 
106 	switch_hash_t *index;
107 	switch_thread_rwlock_t *rwlock;
108 	switch_memory_pool_t *pool;
109 
110 	switch_bool_t running;
111 	switch_thread_t *thread;
112 
113 	limit_remote_state_t state;
114 } limit_remote_t;
115 
116 static limit_hash_item_t get_remote_usage(const char *key);
117 void limit_remote_destroy(limit_remote_t **r);
118 static void do_config(switch_bool_t reload);
119 
120 
121 /* \brief Enforces limit_hash restrictions
122  * \param session current session
123  * \param realm limit realm
124  * \param id limit id
125  * \param max maximum count
126  * \param interval interval for rate limiting
127  * \return SWITCH_TRUE if the access is allowed, SWITCH_FALSE if it isnt
128  */
SWITCH_LIMIT_INCR(limit_incr_hash)129 SWITCH_LIMIT_INCR(limit_incr_hash)
130 {
131 	switch_channel_t *channel = switch_core_session_get_channel(session);
132 	char *hashkey = NULL;
133 	switch_status_t status = SWITCH_STATUS_SUCCESS;
134 	limit_hash_item_t *item = NULL;
135 	time_t now = switch_epoch_time_now(NULL);
136 	limit_hash_private_t *pvt = NULL;
137 	uint8_t increment = 1;
138 	limit_hash_item_t remote_usage;
139 
140 	hashkey = switch_core_session_sprintf(session, "%s_%s", realm, resource);
141 
142 	switch_thread_rwlock_wrlock(globals.limit_hash_rwlock);
143 	/* Check if that realm+resource has ever been checked */
144 	if (!(item = (limit_hash_item_t *) switch_core_hash_find(globals.limit_hash, hashkey))) {
145 		/* No, create an empty structure and add it, then continue like as if it existed */
146 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG10, "Creating new limit structure: key: %s\n", hashkey);
147 		item = (limit_hash_item_t *) malloc(sizeof(limit_hash_item_t));
148 		switch_assert(item);
149 		memset(item, 0, sizeof(limit_hash_item_t));
150 		switch_core_hash_insert(globals.limit_hash, hashkey, item);
151 	}
152 
153 	if (!(pvt = switch_channel_get_private(channel, "limit_hash"))) {
154 		pvt = (limit_hash_private_t *) switch_core_session_alloc(session, sizeof(limit_hash_private_t));
155 		memset(pvt, 0, sizeof(limit_hash_private_t));
156 		switch_channel_set_private(channel, "limit_hash", pvt);
157 	}
158 	if (!(pvt->hash)) {
159 		switch_core_hash_init(&pvt->hash);
160 	}
161 	increment = !switch_core_hash_find(pvt->hash, hashkey);
162  	remote_usage = get_remote_usage(hashkey);
163 
164 	if (interval > 0) {
165 		item->interval = interval;
166 		if (item->last_check <= (now - interval)) {
167 			item->rate_usage = 1;
168 			item->last_check = now;
169 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG10, "Usage for %s reset to 1\n",
170 							  hashkey);
171 		} else {
172 			/* Always increment rate when its checked as it doesnt depend on the channel */
173 			item->rate_usage++;
174 
175 			if ((max >= 0) && (item->rate_usage > (uint32_t) max)) {
176 				switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Usage for %s exceeds maximum rate of %d/%ds, now at %d\n",
177 								  hashkey, max, interval, item->rate_usage);
178 				status = SWITCH_STATUS_GENERR;
179 				goto end;
180 			}
181 		}
182 	} else if ((max >= 0) && (item->total_usage + increment + remote_usage.total_usage > (uint32_t) max)) {
183 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Usage for %s is already at max value (%d)\n", hashkey, item->total_usage);
184 		status = SWITCH_STATUS_GENERR;
185 		goto end;
186 	}
187 
188 	if (increment) {
189 		item->total_usage++;
190 
191 		switch_core_hash_insert(pvt->hash, hashkey, item);
192 
193 		if (max == -1) {
194 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Usage for %s is now %d\n", hashkey, item->total_usage + remote_usage.total_usage);
195 		} else if (interval == 0) {
196 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Usage for %s is now %d/%d\n", hashkey, item->total_usage + remote_usage.total_usage, max);
197 		} else {
198 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Usage for %s is now %d/%d for the last %d seconds\n", hashkey,
199 							  item->rate_usage, max, interval);
200 		}
201 
202 		switch_limit_fire_event("hash", realm, resource, item->total_usage, item->rate_usage, max, max >= 0 ? (uint32_t) max : 0);
203 	}
204 
205 	/* Save current usage & rate into channel variables so it can be used later in the dialplan, or added to CDR records */
206 	{
207 		const char *susage = switch_core_session_sprintf(session, "%d", item->total_usage);
208 		const char *srate = switch_core_session_sprintf(session, "%d", item->rate_usage);
209 
210 		switch_channel_set_variable(channel, "limit_usage", susage);
211 		switch_channel_set_variable(channel, switch_core_session_sprintf(session, "limit_usage_%s", hashkey), susage);
212 
213 		switch_channel_set_variable(channel, "limit_rate", srate);
214 		switch_channel_set_variable(channel, switch_core_session_sprintf(session, "limit_rate_%s", hashkey), srate);
215 	}
216 
217   end:
218 	switch_thread_rwlock_unlock(globals.limit_hash_rwlock);
219 	return status;
220 }
221 
222 /* !\brief Determines whether a given entry is ready to be removed. */
SWITCH_HASH_DELETE_FUNC(limit_hash_cleanup_delete_callback)223 SWITCH_HASH_DELETE_FUNC(limit_hash_cleanup_delete_callback) {
224 	limit_hash_item_t *item = (limit_hash_item_t *) val;
225 	time_t now = switch_epoch_time_now(NULL);
226 
227 	/* reset to 0 if window has passed so we can clean it up */
228 	if (item->rate_usage > 0 && (item->last_check <= (now - item->interval))) {
229 		item->rate_usage = 0;
230 	}
231 
232 	if (item->total_usage == 0 && item->rate_usage == 0) {
233 		/* Noone is using this item anymore */
234 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Freeing limit item: %s\n", (const char *) key);
235 
236 		free(item);
237 		return SWITCH_TRUE;
238 	}
239 
240 	return SWITCH_FALSE;
241 }
242 
SWITCH_HASH_DELETE_FUNC(limit_hash_remote_cleanup_callback)243 SWITCH_HASH_DELETE_FUNC(limit_hash_remote_cleanup_callback)
244 {
245 	limit_hash_item_t *item = (limit_hash_item_t *) val;
246 	switch_time_t now = (switch_time_t)(intptr_t)pData;
247 
248 	if (item->last_update != now) {
249 		return SWITCH_TRUE;
250 	}
251 
252 	return SWITCH_FALSE;
253 }
254 
255 /* !\brief Periodically checks for unused limit entries and frees them */
SWITCH_STANDARD_SCHED_FUNC(limit_hash_cleanup_callback)256 SWITCH_STANDARD_SCHED_FUNC(limit_hash_cleanup_callback)
257 {
258 	switch_thread_rwlock_wrlock(globals.limit_hash_rwlock);
259 	if (globals.limit_hash) {
260 		switch_core_hash_delete_multi(globals.limit_hash, limit_hash_cleanup_delete_callback, NULL);
261 	}
262 	switch_thread_rwlock_unlock(globals.limit_hash_rwlock);
263 
264 	if (globals.limit_hash) {
265 		task->runtime = switch_epoch_time_now(NULL) + LIMIT_HASH_CLEANUP_INTERVAL;
266 	}
267 }
268 
269 /* !\brief Releases usage of a limit_hash-controlled resource  */
SWITCH_LIMIT_RELEASE(limit_release_hash)270 SWITCH_LIMIT_RELEASE(limit_release_hash)
271 {
272 	switch_channel_t *channel = switch_core_session_get_channel(session);
273 	limit_hash_private_t *pvt = switch_channel_get_private(channel, "limit_hash");
274 	limit_hash_item_t *item = NULL;
275 
276 	switch_thread_rwlock_wrlock(globals.limit_hash_rwlock);
277 
278 	if (!pvt || !pvt->hash) {
279 		switch_thread_rwlock_unlock(globals.limit_hash_rwlock);
280 		return SWITCH_STATUS_SUCCESS;
281 	}
282 
283 	/* clear for uuid */
284 	if (realm == NULL && resource == NULL) {
285 		switch_hash_index_t *hi = NULL;
286 		/* Loop through the channel's hashtable which contains mapping to all the limit_hash_item_t referenced by that channel */
287 		while ((hi = switch_core_hash_first_iter(pvt->hash, hi))) {
288 			void *val = NULL;
289 			const void *key;
290 			switch_ssize_t keylen;
291 
292 			switch_core_hash_this(hi, &key, &keylen, &val);
293 
294 			item = (limit_hash_item_t *) val;
295 			item->total_usage--;
296 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Usage for %s is now %d\n", (const char *) key, item->total_usage);
297 
298 			if (item->total_usage == 0 && item->rate_usage == 0) {
299 				/* Noone is using this item anymore */
300 				switch_core_hash_delete(globals.limit_hash, (const char *) key);
301 				free(item);
302 			}
303 
304 			switch_core_hash_delete(pvt->hash, (const char *) key);
305 		}
306 		switch_core_hash_destroy(&pvt->hash);
307 	} else {
308 		char *hashkey = switch_core_session_sprintf(session, "%s_%s", realm, resource);
309 
310 		if ((item = (limit_hash_item_t *) switch_core_hash_find(pvt->hash, hashkey))) {
311 			item->total_usage--;
312 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Usage for %s is now %d\n", (const char *) hashkey, item->total_usage);
313 
314 			switch_core_hash_delete(pvt->hash, hashkey);
315 
316 			if (item->total_usage == 0 && item->rate_usage == 0) {
317 				/* Noone is using this item anymore */
318 				switch_core_hash_delete(globals.limit_hash, (const char *) hashkey);
319 				free(item);
320 			}
321 		}
322 	}
323 
324 	switch_thread_rwlock_unlock(globals.limit_hash_rwlock);
325 
326 	return SWITCH_STATUS_SUCCESS;
327 }
328 
SWITCH_LIMIT_USAGE(limit_usage_hash)329 SWITCH_LIMIT_USAGE(limit_usage_hash)
330 {
331 	char *hash_key = NULL;
332 	limit_hash_item_t *item = NULL;
333 	int count = 0;
334 	limit_hash_item_t remote_usage;
335 
336 	switch_thread_rwlock_rdlock(globals.limit_hash_rwlock);
337 
338 	hash_key = switch_mprintf("%s_%s", realm, resource);
339 	remote_usage = get_remote_usage(hash_key);
340 
341 	count = remote_usage.total_usage;
342 	*rcount = remote_usage.rate_usage;
343 
344 	if ((item = switch_core_hash_find(globals.limit_hash, hash_key))) {
345 		count += item->total_usage;
346 		*rcount += item->rate_usage;
347 	}
348 
349  	switch_safe_free(hash_key);
350 	switch_thread_rwlock_unlock(globals.limit_hash_rwlock);
351 
352 	return count;
353 }
354 
SWITCH_LIMIT_RESET(limit_reset_hash)355 SWITCH_LIMIT_RESET(limit_reset_hash)
356 {
357 	return SWITCH_STATUS_GENERR;
358 }
359 
SWITCH_LIMIT_INTERVAL_RESET(limit_interval_reset_hash)360 SWITCH_LIMIT_INTERVAL_RESET(limit_interval_reset_hash)
361 {
362 	char *hash_key = NULL;
363 	limit_hash_item_t *item = NULL;
364 
365 	switch_thread_rwlock_rdlock(globals.limit_hash_rwlock);
366 
367 	hash_key = switch_mprintf("%s_%s", realm, resource);
368 	if ((item = switch_core_hash_find(globals.limit_hash, hash_key))) {
369 		item->rate_usage = 0;
370 		item->last_check = switch_epoch_time_now(NULL);
371 	}
372 
373  	switch_safe_free(hash_key);
374 	switch_thread_rwlock_unlock(globals.limit_hash_rwlock);
375 	return SWITCH_STATUS_SUCCESS;
376 }
377 
SWITCH_LIMIT_STATUS(limit_status_hash)378 SWITCH_LIMIT_STATUS(limit_status_hash)
379 {
380 	/*
381 	switch_hash_index_t *hi = NULL;
382 	int count = 0;
383 	char *ret = NULL;
384 
385 	switch_thread_rwlock_rdlock(globals.limit_hash_rwlock);
386 
387 	for (hi = switch_core_hash_first(globals.limit_hash); hi; switch_core_hash_next(hi)) {
388 		count++;
389 	}
390 
391 	switch_thread_rwlock_unlock(globals.limit_hash_rwlock);
392 
393 	ret = switch_mprintf("There are %d elements being tracked.", count);
394 	return ret;
395 	*/
396 	return strdup("-ERR not supported yet (locking problems).");
397 }
398 
399 /* APP/API STUFF */
400 
401 /* CORE HASH STUFF */
402 
403 #define HASH_USAGE "[insert|insert_ifempty|delete|delete_ifmatch]/<realm>/<key>/<val>"
404 #define HASH_DESC "save data"
405 
SWITCH_STANDARD_APP(hash_function)406 SWITCH_STANDARD_APP(hash_function)
407 {
408 	int argc = 0;
409 	char *argv[4] = { 0 };
410 	char *mydata = NULL;
411 	char *hash_key = NULL;
412 	char *value = NULL;
413 
414 	switch_thread_rwlock_wrlock(globals.db_hash_rwlock);
415 
416 	if (!zstr(data)) {
417 		mydata = strdup(data);
418 		switch_assert(mydata);
419 		argc = switch_separate_string(mydata, '/', argv, (sizeof(argv) / sizeof(argv[0])));
420 	}
421 
422 	if (argc < 3 || !argv[0]) {
423 		goto usage;
424 	}
425 
426 	hash_key = switch_mprintf("%s_%s", argv[1], argv[2]);
427 
428 	if (!strcasecmp(argv[0], "insert")) {
429 		if (argc < 4) {
430 			goto usage;
431 		}
432 		if ((value = switch_core_hash_find(globals.db_hash, hash_key))) {
433 			free(value);
434 			switch_core_hash_delete(globals.db_hash, hash_key);
435 		}
436 		value = strdup(argv[3]);
437 		switch_assert(value);
438 		switch_core_hash_insert(globals.db_hash, hash_key, value);
439 	} else if (!strcasecmp(argv[0], "insert_ifempty")) {
440 		if (argc < 4) {
441 			goto usage;
442 		}
443 		if (!(value = switch_core_hash_find(globals.db_hash, hash_key))) {
444 			value = strdup(argv[3]);
445 			switch_assert(value);
446 			switch_core_hash_insert(globals.db_hash, hash_key, value);
447 		}
448 
449 	} else if (!strcasecmp(argv[0], "delete")) {
450 		if ((value = switch_core_hash_find(globals.db_hash, hash_key))) {
451 			switch_safe_free(value);
452 			switch_core_hash_delete(globals.db_hash, hash_key);
453 		}
454 	} else if (!strcasecmp(argv[0], "delete_ifmatch")) {
455 		if (argc < 4) {
456 			goto usage;
457 		}
458 		if ((value = switch_core_hash_find(globals.db_hash, hash_key))) {
459 			if(!strcmp(argv[3], value)) {
460 				switch_safe_free(value);
461 				switch_core_hash_delete(globals.db_hash, hash_key);
462 			}
463 		}
464 	} else {
465 		goto usage;
466 	}
467 
468 	goto done;
469 
470   usage:
471 	switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "USAGE: hash %s\n", HASH_USAGE);
472 
473   done:
474 	switch_thread_rwlock_unlock(globals.db_hash_rwlock);
475 	switch_safe_free(mydata);
476 	switch_safe_free(hash_key);
477 }
478 
479 #define HASH_API_USAGE "insert|insert_ifempty|select|delete|delete_ifmatch/realm/key[/value]"
SWITCH_STANDARD_API(hash_api_function)480 SWITCH_STANDARD_API(hash_api_function)
481 {
482 	int argc = 0;
483 	char *argv[4] = { 0 };
484 	char *mydata = NULL;
485 	char *value = NULL;
486 	char *hash_key = NULL;
487 
488 	if (!zstr(cmd)) {
489 		mydata = strdup(cmd);
490 		switch_assert(mydata);
491 		argc = switch_separate_string(mydata, '/', argv, (sizeof(argv) / sizeof(argv[0])));
492 	}
493 
494 	if (argc < 3 || !argv[0]) {
495 		goto usage;
496 	}
497 
498 	hash_key = switch_mprintf("%s_%s", argv[1], argv[2]);
499 
500 	if (!strcasecmp(argv[0], "insert")) {
501 		if (argc < 4) {
502 			goto usage;
503 		}
504 		switch_thread_rwlock_wrlock(globals.db_hash_rwlock);
505 		if ((value = switch_core_hash_find(globals.db_hash, hash_key))) {
506 			switch_safe_free(value);
507 			switch_core_hash_delete(globals.db_hash, hash_key);
508 		}
509 		value = strdup(argv[3]);
510 		switch_assert(value);
511 		switch_core_hash_insert(globals.db_hash, hash_key, value);
512 		stream->write_function(stream, "+OK\n");
513 		switch_thread_rwlock_unlock(globals.db_hash_rwlock);
514 	} else if (!strcasecmp(argv[0], "insert_ifempty")) {
515 		if (argc < 4) {
516 			goto usage;
517 		}
518 		switch_thread_rwlock_wrlock(globals.db_hash_rwlock);
519 		if ((value = switch_core_hash_find(globals.db_hash, hash_key))) {
520 			stream->write_function(stream, "-ERR key already exists\n");
521 		} else {
522 			value = strdup(argv[3]);
523 			switch_assert(value);
524 			switch_core_hash_insert(globals.db_hash, hash_key, value);
525 			stream->write_function(stream, "+OK\n");
526 		}
527 		switch_thread_rwlock_unlock(globals.db_hash_rwlock);
528 	} else if (!strcasecmp(argv[0], "delete")) {
529 		switch_thread_rwlock_wrlock(globals.db_hash_rwlock);
530 		if ((value = switch_core_hash_find(globals.db_hash, hash_key))) {
531 			switch_safe_free(value);
532 			switch_core_hash_delete(globals.db_hash, hash_key);
533 			stream->write_function(stream, "+OK\n");
534 		} else {
535 			stream->write_function(stream, "-ERR Not found\n");
536 		}
537 		switch_thread_rwlock_unlock(globals.db_hash_rwlock);
538 	} else if (!strcasecmp(argv[0], "delete_ifmatch")) {
539 		if (argc < 4) {
540 			goto usage;
541 		}
542 		switch_thread_rwlock_wrlock(globals.db_hash_rwlock);
543 		if ((value = switch_core_hash_find(globals.db_hash, hash_key))) {
544 			if(!strcmp(argv[3],value)) {
545 				switch_safe_free(value);
546 				switch_core_hash_delete(globals.db_hash, hash_key);
547 				stream->write_function(stream, "+OK\n");
548 			} else {
549 				stream->write_function(stream, "-ERR Doesn't match\n");
550 			}
551 		} else {
552 			stream->write_function(stream, "-ERR Not found\n");
553 		}
554 		switch_thread_rwlock_unlock(globals.db_hash_rwlock);
555 	} else if (!strcasecmp(argv[0], "select")) {
556 		switch_thread_rwlock_rdlock(globals.db_hash_rwlock);
557 		if ((value = switch_core_hash_find(globals.db_hash, hash_key))) {
558 			stream->write_function(stream, "%s", value);
559 		}
560 		switch_thread_rwlock_unlock(globals.db_hash_rwlock);
561 	} else {
562 		goto usage;
563 	}
564 
565 	goto done;
566 
567   usage:
568 	stream->write_function(stream, "-ERR Usage: hash %s\n", HASH_API_USAGE);
569 
570   done:
571 
572 	switch_safe_free(mydata);
573 	switch_safe_free(hash_key);
574 
575 	return SWITCH_STATUS_SUCCESS;
576 }
577 
578 #define HASH_DUMP_SYNTAX "all|limit|db [<realm>]"
SWITCH_STANDARD_API(hash_dump_function)579 SWITCH_STANDARD_API(hash_dump_function)
580 {
581 	int mode;
582 	switch_hash_index_t *hi;
583 	int argc = 0;
584 	char *argv[4] = { 0 };
585 	char *mydata = NULL;
586 	int realm = 0;
587 	char *realmvalue = NULL;
588 
589 	if (zstr(cmd)) {
590 		stream->write_function(stream, "Usage: "HASH_DUMP_SYNTAX"\n");
591 		goto done;
592 	}
593 
594 	mydata = strdup(cmd);
595 	switch_assert(mydata);
596 	argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
597 	cmd = argv[0];
598 
599 	if (argc == 2) {
600 		realm = 1;
601 		realmvalue = switch_mprintf("%s_", argv[1]);
602 	}
603 
604 	if (!strcmp(cmd, "all")) {
605 		mode = 3;
606 	} else if (!strcmp(cmd, "limit")) {
607 		mode = 1;
608 	} else if (!strcmp(cmd, "db")) {
609 		mode = 2;
610 	} else {
611 		stream->write_function(stream, "Usage: "HASH_DUMP_SYNTAX"\n");
612 		goto done;
613 	}
614 
615 	if (mode & 1) {
616 		switch_thread_rwlock_rdlock(globals.limit_hash_rwlock);
617 		for (hi = switch_core_hash_first(globals.limit_hash); hi; hi = switch_core_hash_next(&hi)) {
618 			void *val = NULL;
619 			const void *key;
620 			switch_ssize_t keylen;
621 			limit_hash_item_t *item;
622 			switch_core_hash_this(hi, &key, &keylen, &val);
623 
624 			item = (limit_hash_item_t *)val;
625 
626 			stream->write_function(stream, "L/%s/%d/%d/%d/%d\n", key, item->total_usage, item->rate_usage, item->interval, item->last_check);
627 		}
628 		switch_thread_rwlock_unlock(globals.limit_hash_rwlock);
629 	}
630 
631 	if (mode & 2) {
632 		switch_thread_rwlock_rdlock(globals.db_hash_rwlock);
633 		for (hi = switch_core_hash_first(globals.db_hash); hi; hi = switch_core_hash_next(&hi)) {
634 			void *val = NULL;
635 			const void *key;
636 			switch_ssize_t keylen;
637 			switch_core_hash_this(hi, &key, &keylen, &val);
638 			if (realm) {
639 				if (strstr(key, realmvalue)) {
640 					stream->write_function(stream, "D/%s/%s\n", key, (char*)val);
641 				}
642 			} else {
643 				stream->write_function(stream, "D/%s/%s\n", key, (char*)val);
644 			}
645 		}
646 		switch_thread_rwlock_unlock(globals.db_hash_rwlock);
647 	}
648 
649  done:
650 	switch_safe_free(mydata);
651 	switch_safe_free(realmvalue);
652 
653 	return SWITCH_STATUS_SUCCESS;
654 }
655 
656 #define HASH_REMOTE_SYNTAX "list|kill [name]|rescan"
SWITCH_STANDARD_API(hash_remote_function)657 SWITCH_STANDARD_API(hash_remote_function)
658 {
659 	//int argc;
660 	char *argv[10];
661 	char *dup = NULL;
662 
663 	if (zstr(cmd)) {
664 		stream->write_function(stream, "-ERR Usage: "HASH_REMOTE_SYNTAX"\n");
665 		return SWITCH_STATUS_SUCCESS;
666 	}
667 
668 	dup = strdup(cmd);
669 
670 	switch_split(dup, ' ', argv);
671 	if (argv[0] && !strcmp(argv[0], "list")) {
672 		switch_hash_index_t *hi;
673 		stream->write_function(stream, "Remote connections:\nName\t\t\tState\n");
674 
675 		switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
676 		for (hi = switch_core_hash_first(globals.remote_hash); hi; hi = switch_core_hash_next(&hi)) {
677 			void *val;
678 			const void *key;
679 			switch_ssize_t keylen;
680 			limit_remote_t *item;
681 			switch_core_hash_this(hi, &key, &keylen, &val);
682 
683 			item = (limit_remote_t *)val;
684 			stream->write_function(stream, "%s\t\t\t%s\n", item->name, state_str(item->state));
685 		}
686 		switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
687 		stream->write_function(stream, "+OK\n");
688 
689 	} else if (argv[0] && !strcmp(argv[0], "kill")) {
690 		const char *name = argv[1];
691 		limit_remote_t *remote;
692 		if (zstr(name)) {
693 			stream->write_function(stream, "-ERR Usage: "HASH_REMOTE_SYNTAX"\n");
694 			goto done;
695 		}
696 		switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
697 		remote = switch_core_hash_find(globals.remote_hash, name);
698 		switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
699 
700 		if (remote) {
701 			limit_remote_destroy(&remote);
702 
703 			switch_thread_rwlock_wrlock(globals.remote_hash_rwlock);
704 			switch_core_hash_delete(globals.remote_hash, name);
705 			switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
706 
707 			stream->write_function(stream, "+OK\n");
708 		} else {
709 			stream->write_function(stream, "-ERR No such remote instance %s\n", name);
710 		}
711 	} else if (argv[0] && !strcmp(argv[0], "rescan")) {
712 		do_config(SWITCH_TRUE);
713 		stream->write_function(stream, "+OK\n");
714 	} else {
715 		stream->write_function(stream, "-ERR Usage: "HASH_REMOTE_SYNTAX"\n");
716 
717 	}
718 
719 done:
720 
721 	switch_safe_free(dup);
722 
723 	return SWITCH_STATUS_SUCCESS;
724 }
725 
limit_remote_create(const char * name,const char * host,uint16_t port,const char * username,const char * password,int interval)726 limit_remote_t *limit_remote_create(const char *name, const char *host, uint16_t port, const char *username, const char *password, int interval)
727 {
728 	limit_remote_t *r;
729 	switch_memory_pool_t *pool;
730 
731 	switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
732 	if (switch_core_hash_find(globals.remote_hash, name)) {
733 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Already have a remote instance named %s\n", name);
734 			switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
735 			return NULL;
736 	}
737 	switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
738 
739 	if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
740 		return NULL;
741 	}
742 
743 	r = switch_core_alloc(pool, sizeof(limit_remote_t));
744 	r->pool = pool;
745 	r->name = switch_core_strdup(r->pool, name);
746 	r->host = switch_core_strdup(r->pool, host);
747 	r->port = port;
748 	r->username = switch_core_strdup(r->pool, username);
749 	r->password = switch_core_strdup(r->pool, password);
750 	r->interval = interval;
751 
752 	switch_thread_rwlock_create(&r->rwlock, pool);
753 	switch_core_hash_init(&r->index);
754 
755 	switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
756 	switch_core_hash_insert(globals.remote_hash, name, r);
757 	switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
758 
759 	return r;
760 }
761 
limit_remote_destroy(limit_remote_t ** r)762 void limit_remote_destroy(limit_remote_t **r)
763 {
764 	if (r && *r) {
765 		(*r)->state = REMOTE_OFF;
766 
767 		if ((*r)->thread) {
768 			switch_status_t retval;
769 			switch_thread_join(&retval, (*r)->thread);
770 		}
771 
772 		switch_thread_rwlock_wrlock((*r)->rwlock);
773 
774 		/* Free hashtable data */
775 		switch_core_hash_destroy(&(*r)->index);
776 
777 		switch_thread_rwlock_unlock((*r)->rwlock);
778 		switch_thread_rwlock_destroy((*r)->rwlock);
779 
780 		switch_core_destroy_memory_pool(&((*r)->pool));
781 		*r = NULL;
782 	}
783 }
784 
785 /* Compute the usage sum of a resource on remote boxes */
get_remote_usage(const char * key)786 static limit_hash_item_t get_remote_usage(const char *key) {
787 	limit_hash_item_t usage = { 0 };
788 	switch_hash_index_t *hi;
789 
790 	switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
791 	for (hi = switch_core_hash_first(globals.remote_hash); hi; hi = switch_core_hash_next(&hi)) {
792 		void *val;
793 		const void *hashkey;
794 		switch_ssize_t keylen;
795 		limit_remote_t *remote;
796 		limit_hash_item_t *item;
797 		switch_core_hash_this(hi, &hashkey, &keylen, &val);
798 
799 		remote = (limit_remote_t *)val;
800 		if (remote->state != REMOTE_UP) {
801 			continue;
802 		}
803 
804 		switch_thread_rwlock_rdlock(remote->rwlock);
805 		if ((item = switch_core_hash_find(remote->index, key))) {
806 			usage.total_usage += item->total_usage;
807 			usage.rate_usage += item->rate_usage;
808 			if (!usage.last_check) {
809 				usage.last_check = item->last_check;
810 			}
811 		}
812 		switch_thread_rwlock_unlock(remote->rwlock);
813 	}
814 
815 	switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
816 
817 	return usage;
818 }
819 
limit_remote_thread(switch_thread_t * thread,void * obj)820 static void *SWITCH_THREAD_FUNC limit_remote_thread(switch_thread_t *thread, void *obj)
821 {
822 	limit_remote_t *remote = (limit_remote_t*)obj;
823 	while (remote->state > REMOTE_OFF) {
824 		if (remote->state != REMOTE_UP) {
825 			if  (esl_connect_timeout(&remote->handle, remote->host, (esl_port_t)remote->port, remote->username, remote->password, 5000) == ESL_SUCCESS) {
826 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Connected to remote FreeSWITCH (%s) at %s:%d\n",
827 					remote->name, remote->host, remote->port);
828 
829 				remote->state = REMOTE_UP;
830 			} else {
831 				esl_disconnect(&remote->handle);
832 				memset(&remote->handle, 0, sizeof(remote->handle));
833 			}
834 		} else {
835 			if (esl_send_recv_timed(&remote->handle, "api hash_dump limit", 5000) != ESL_SUCCESS) {
836 				esl_disconnect(&remote->handle);
837 				memset(&remote->handle, 0, sizeof(remote->handle));
838 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Disconnected from remote FreeSWITCH (%s) at %s:%d\n",
839 					remote->name, remote->host, remote->port);
840 				memset(&remote->handle, 0, sizeof(remote->handle));
841 				remote->state = REMOTE_DOWN;
842 				/* Delete all remote tracking entries */
843 				switch_thread_rwlock_wrlock(remote->rwlock);
844 				switch_core_hash_delete_multi(remote->index, limit_hash_remote_cleanup_callback, NULL);
845 				switch_thread_rwlock_unlock(remote->rwlock);
846 			} else {
847 				if (!zstr(remote->handle.last_sr_event->body)) {
848 					char *data = strdup(remote->handle.last_sr_event->body);
849 					char *p = data, *p2;
850 					switch_time_t now = switch_epoch_time_now(NULL);
851 					while (p && *p) {
852 						/* We are getting the limit data as:
853 							L/key/usage/rate/interval/last_checked
854 						*/
855 						if ((p2 = strchr(p, '\n'))) {
856 							*p2++ = '\0';
857 						}
858 
859 						/* Now p points at the beginning of the current line,
860 						p2 at the start of the next one */
861 						if (*p == 'L') { /* Limit data */
862 							char *argv[5];
863 							int argc = switch_split(p+2, '/', argv);
864 
865 							if (argc < 5) {
866 								switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "[%s] Protocol error: missing argument in line: %s\n",
867 									remote->name, p);
868 							} else {
869 								limit_hash_item_t *item;
870 								switch_thread_rwlock_wrlock(remote->rwlock);
871 								if (!(item = switch_core_hash_find(remote->index, argv[0]))) {
872 									switch_zmalloc(item, sizeof(*item));
873 									switch_core_hash_insert_auto_free(remote->index, argv[0], item);
874 								}
875 								item->total_usage = atoi(argv[1]);
876 								item->rate_usage = atoi(argv[2]);
877 								item->interval = atoi(argv[3]);
878 								item->last_check = atoi(argv[4]);
879 								item->last_update = now;
880 								switch_thread_rwlock_unlock(remote->rwlock);
881 							}
882 						}
883 
884 						p = p2;
885 					}
886 					free(data);
887 
888 					/* Now free up anything that wasn't in this update since it means their usage is 0 */
889 					switch_thread_rwlock_wrlock(remote->rwlock);
890 					switch_core_hash_delete_multi(remote->index, limit_hash_remote_cleanup_callback, (void*)(intptr_t)now);
891 					switch_thread_rwlock_unlock(remote->rwlock);
892 				}
893 			}
894 		}
895 
896 		switch_yield(remote->interval * 1000);
897 	}
898 
899 	remote->thread = NULL;
900 
901 	return NULL;
902 }
903 
do_config(switch_bool_t reload)904 static void do_config(switch_bool_t reload)
905 {
906 	switch_xml_t xml = NULL, x_lists = NULL, x_list = NULL, cfg = NULL;
907 	if ((xml = switch_xml_open_cfg("hash.conf", &cfg, NULL))) {
908 		if ((x_lists = switch_xml_child(cfg, "remotes"))) {
909 			for (x_list = switch_xml_child(x_lists, "remote"); x_list; x_list = x_list->next) {
910 				const char *name = switch_xml_attr(x_list, "name");
911 				const char *host = switch_xml_attr(x_list, "host");
912 				const char *szport = switch_xml_attr(x_list, "port");
913 				const char *username = switch_xml_attr(x_list, "username");
914 				const char *password = switch_xml_attr(x_list, "password");
915 				const char *szinterval = switch_xml_attr(x_list, "interval");
916 				uint16_t port = 0;
917 				int	interval = 0;
918 				limit_remote_t *remote;
919 				switch_threadattr_t *thd_attr = NULL;
920 
921 				if (reload) {
922 					switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
923 					if (switch_core_hash_find(globals.remote_hash, name)) {
924 						switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
925 						continue;
926 					}
927 					switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
928 				}
929 
930 				if (!zstr(szport)) {
931 					port = (uint16_t)atoi(szport);
932 				}
933 
934 				if (!zstr(szinterval)) {
935 					interval = atoi(szinterval);
936 				}
937 
938 				remote = limit_remote_create(name, host, port, username, password, interval);
939 
940 				remote->state = REMOTE_DOWN;
941 
942 				switch_threadattr_create(&thd_attr, remote->pool);
943 				switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
944 				switch_thread_create(&remote->thread, thd_attr, limit_remote_thread, remote, remote->pool);
945 			}
946 		}
947 		switch_xml_free(xml);
948 	}
949 }
950 
951 /* INIT/DEINIT STUFF */
SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load)952 SWITCH_MODULE_LOAD_FUNCTION(mod_hash_load)
953 {
954 	switch_application_interface_t *app_interface;
955 	switch_api_interface_t *commands_api_interface;
956 	switch_limit_interface_t *limit_interface;
957 	switch_status_t status;
958 
959 	memset(&globals, 0, sizeof(globals));
960 	globals.pool = pool;
961 
962 	status = switch_event_reserve_subclass(LIMIT_EVENT_USAGE);
963 	if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_INUSE) {
964 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register event subclass \"%s\" (%d)\n", LIMIT_EVENT_USAGE, status);
965 		return SWITCH_STATUS_FALSE;
966 	}
967 
968 	switch_thread_rwlock_create(&globals.limit_hash_rwlock, globals.pool);
969 	switch_thread_rwlock_create(&globals.db_hash_rwlock, globals.pool);
970 	switch_thread_rwlock_create(&globals.remote_hash_rwlock, globals.pool);
971 	switch_core_hash_init(&globals.limit_hash);
972 	switch_core_hash_init(&globals.db_hash);
973 	switch_core_hash_init(&globals.remote_hash);
974 
975 	/* connect my internal structure to the blank pointer passed to me */
976 	*module_interface = switch_loadable_module_create_module_interface(pool, modname);
977 
978 	/* register limit interfaces */
979 	SWITCH_ADD_LIMIT(limit_interface, "hash", limit_incr_hash, limit_release_hash, limit_usage_hash, limit_reset_hash, limit_status_hash, limit_interval_reset_hash);
980 
981 	switch_scheduler_add_task(switch_epoch_time_now(NULL) + LIMIT_HASH_CLEANUP_INTERVAL, limit_hash_cleanup_callback, "limit_hash_cleanup", "mod_hash", 0, NULL,
982 						  SSHF_NONE);
983 
984 	SWITCH_ADD_APP(app_interface, "hash", "Insert into the hashtable", HASH_DESC, hash_function, HASH_USAGE, SAF_SUPPORT_NOMEDIA | SAF_ZOMBIE_EXEC)
985 	SWITCH_ADD_API(commands_api_interface, "hash", "hash get/set", hash_api_function, "[insert|delete|select]/<realm>/<key>/<value>");
986 	SWITCH_ADD_API(commands_api_interface, "hash_dump", "dump hash/limit_hash data (used for synchronization)", hash_dump_function, HASH_DUMP_SYNTAX);
987 	SWITCH_ADD_API(commands_api_interface, "hash_remote", "hash remote", hash_remote_function, HASH_REMOTE_SYNTAX);
988 
989 	switch_console_set_complete("add hash insert");
990 	switch_console_set_complete("add hash delete");
991 	switch_console_set_complete("add hash select");
992 
993 	switch_console_set_complete("add hash_remote list");
994 	switch_console_set_complete("add hash_remote kill");
995 	switch_console_set_complete("add hash_remote rescan");
996 
997 	do_config(SWITCH_FALSE);
998 
999 	/* indicate that the module should continue to be loaded */
1000 	return SWITCH_STATUS_SUCCESS;
1001 }
1002 
1003 
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hash_shutdown)1004 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hash_shutdown)
1005 {
1006 	switch_hash_index_t *hi = NULL;
1007 	switch_bool_t remote_clean = SWITCH_TRUE;
1008 
1009 	switch_scheduler_del_task_group("mod_hash");
1010 
1011 	/* Kill remote connections, destroy needs a wrlock so we unlock after finding a pointer */
1012 	while(remote_clean) {
1013 		void *val;
1014 		const void *key = NULL;
1015 		switch_ssize_t keylen;
1016 		limit_remote_t *item = NULL;
1017 
1018 		switch_thread_rwlock_rdlock(globals.remote_hash_rwlock);
1019 		if ((hi = switch_core_hash_first(globals.remote_hash))) {
1020 			switch_core_hash_this(hi, &key, &keylen, &val);
1021 			item = (limit_remote_t *)val;
1022 		}
1023 		switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
1024 
1025 		if (!item) {
1026 			remote_clean = SWITCH_FALSE;
1027 		} else {
1028 			limit_remote_destroy(&item);
1029 			switch_thread_rwlock_wrlock(globals.remote_hash_rwlock);
1030 			switch_core_hash_delete(globals.remote_hash, key);
1031 			switch_thread_rwlock_unlock(globals.remote_hash_rwlock);
1032 		}
1033 	}
1034 
1035 	switch_thread_rwlock_wrlock(globals.limit_hash_rwlock);
1036 	switch_thread_rwlock_wrlock(globals.db_hash_rwlock);
1037 
1038 	while ((hi = switch_core_hash_first_iter( globals.limit_hash, hi))) {
1039 		void *val = NULL;
1040 		const void *key;
1041 		switch_ssize_t keylen;
1042 		switch_core_hash_this(hi, &key, &keylen, &val);
1043 		free(val);
1044 		switch_core_hash_delete(globals.limit_hash, key);
1045 	}
1046 
1047 	while ((hi = switch_core_hash_first_iter( globals.db_hash, hi))) {
1048 		void *val = NULL;
1049 		const void *key;
1050 		switch_ssize_t keylen;
1051 		switch_core_hash_this(hi, &key, &keylen, &val);
1052 		free(val);
1053 		switch_core_hash_delete(globals.db_hash, key);
1054 	}
1055 
1056 	switch_core_hash_destroy(&globals.limit_hash);
1057 	switch_core_hash_destroy(&globals.db_hash);
1058 	switch_core_hash_destroy(&globals.remote_hash);
1059 
1060 	switch_thread_rwlock_unlock(globals.limit_hash_rwlock);
1061 	switch_thread_rwlock_unlock(globals.db_hash_rwlock);
1062 
1063 	switch_thread_rwlock_destroy(globals.db_hash_rwlock);
1064 	switch_thread_rwlock_destroy(globals.limit_hash_rwlock);
1065 	switch_thread_rwlock_destroy(globals.remote_hash_rwlock);
1066 
1067 
1068 	return SWITCH_STATUS_SUCCESS;
1069 }
1070 
1071 /* For Emacs:
1072  * Local Variables:
1073  * mode:c
1074  * indent-tabs-mode:t
1075  * tab-width:4
1076  * c-basic-offset:4
1077  * End:
1078  * For VIM:
1079  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
1080  */
1081