1 /*
2  * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is
20  * Anthony Minessale II <anthm@freeswitch.org>
21  * Portions created by the Initial Developer are Copyright (C)
22  * the Initial Developer. All Rights Reserved.
23  *
24  * Contributor(s):
25  *
26  * Anthony Minessale II <anthm@freeswitch.org>
27  * Michael Jerris <mike@jerris.com>
28  * Paul D. Tinsley <pdt at jackhammer.org>
29  * Emmanuel Schmidbauer <eschmidbauer@gmail.com>
30  * Andrey Volk <andywolk@gmail.com>
31  *
32  *
33  * switch_core_sqldb.c -- Main Core Library (statistics tracker)
34  *
35  */
36 
37 #include <switch.h>
38 #include "private/switch_core_pvt.h"
39 
40 #define SWITCH_SQL_QUEUE_LEN 100000
41 #define SWITCH_SQL_QUEUE_PAUSE_LEN 90000
42 
43 struct switch_cache_db_handle {
44 	char name[CACHE_DB_LEN];
45 	switch_cache_db_handle_type_t type;
46 	switch_cache_db_native_handle_t native_handle;
47 	time_t last_used;
48 	switch_mutex_t *mutex;
49 	switch_mutex_t *io_mutex;
50 	switch_memory_pool_t *pool;
51 	int32_t flags;
52 	unsigned long hash;
53 	unsigned long thread_hash;
54 	char creator[CACHE_DB_LEN];
55 	char last_user[CACHE_DB_LEN];
56 	uint32_t use_count;
57 	uint64_t total_used_count;
58 	struct switch_cache_db_handle *next;
59 };
60 
61 static struct {
62 	switch_memory_pool_t *memory_pool;
63 	switch_thread_t *db_thread;
64 	int db_thread_running;
65 	switch_bool_t manage;
66 	switch_mutex_t *io_mutex;
67 	switch_mutex_t *dbh_mutex;
68 	switch_mutex_t *ctl_mutex;
69 	switch_cache_db_handle_t *handle_pool;
70 	uint32_t total_handles;
71 	uint32_t total_used_handles;
72 	switch_cache_db_handle_t *dbh;
73 	switch_sql_queue_manager_t *qm;
74 	int paused;
75 } sql_manager;
76 
77 
78 static void switch_core_sqldb_start_thread(void);
79 static void switch_core_sqldb_stop_thread(void);
80 
81 #define database_interface_handle_callback_exec(database_interface, dih, sql, callback, pdata, err) database_interface->callback_exec_detailed(__FILE__, (char *)__SWITCH_FUNC__, __LINE__, dih, sql, callback, pdata, err)
82 #define database_interface_handle_exec(database_interface, dih, sql, err) database_interface->exec_detailed(__FILE__, (char *)__SWITCH_FUNC__, __LINE__, dih, sql, err)
83 
create_handle(switch_cache_db_handle_type_t type)84 static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t type)
85 {
86 	switch_cache_db_handle_t *new_dbh = NULL;
87 	switch_memory_pool_t *pool = NULL;
88 
89 	switch_core_new_memory_pool(&pool);
90 	new_dbh = switch_core_alloc(pool, sizeof(*new_dbh));
91 	new_dbh->pool = pool;
92 	new_dbh->type = type;
93 	switch_mutex_init(&new_dbh->mutex, SWITCH_MUTEX_NESTED, new_dbh->pool);
94 
95 	return new_dbh;
96 }
97 
destroy_handle(switch_cache_db_handle_t ** dbh)98 static void destroy_handle(switch_cache_db_handle_t **dbh)
99 {
100 	if (dbh && *dbh && (*dbh)->pool) {
101 		switch_core_destroy_memory_pool(&(*dbh)->pool);
102 		*dbh = NULL;
103 	}
104 }
105 
add_handle(switch_cache_db_handle_t * dbh,const char * db_str,const char * db_callsite_str,const char * thread_str)106 static void add_handle(switch_cache_db_handle_t *dbh, const char *db_str, const char *db_callsite_str, const char *thread_str)
107 {
108 	switch_ssize_t hlen = -1;
109 
110 	switch_mutex_lock(sql_manager.dbh_mutex);
111 
112 	switch_set_string(dbh->creator, db_callsite_str);
113 
114 	switch_set_string(dbh->name, db_str);
115 	dbh->hash = switch_ci_hashfunc_default(db_str, &hlen);
116 	dbh->thread_hash = switch_ci_hashfunc_default(thread_str, &hlen);
117 
118 	dbh->use_count++;
119 	dbh->total_used_count++;
120 	sql_manager.total_used_handles++;
121 	dbh->next = sql_manager.handle_pool;
122 
123 	sql_manager.handle_pool = dbh;
124 	sql_manager.total_handles++;
125 	switch_mutex_lock(dbh->mutex);
126 	switch_mutex_unlock(sql_manager.dbh_mutex);
127 }
128 
del_handle(switch_cache_db_handle_t * dbh)129 static void del_handle(switch_cache_db_handle_t *dbh)
130 {
131 	switch_cache_db_handle_t *dbh_ptr, *last = NULL;
132 
133 	switch_mutex_lock(sql_manager.dbh_mutex);
134 	for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
135 		if (dbh_ptr == dbh) {
136 			if (last) {
137 				last->next = dbh_ptr->next;
138 			} else {
139 				sql_manager.handle_pool = dbh_ptr->next;
140 			}
141 			sql_manager.total_handles--;
142 			break;
143 		}
144 
145 		last = dbh_ptr;
146 	}
147 	switch_mutex_unlock(sql_manager.dbh_mutex);
148 }
149 
switch_cache_db_database_interface_flush_handles(switch_database_interface_t * database_interface)150 SWITCH_DECLARE(void) switch_cache_db_database_interface_flush_handles(switch_database_interface_t *database_interface)
151 {
152 	switch_cache_db_handle_t *dbh_ptr = NULL;
153 
154 	switch_mutex_lock(sql_manager.dbh_mutex);
155 
156 top:
157 
158 	for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
159 		if (switch_mutex_trylock(dbh_ptr->mutex) == SWITCH_STATUS_SUCCESS) {
160 			if (dbh_ptr->type != SCDB_TYPE_DATABASE_INTERFACE) {
161 				continue;
162 			}
163 
164 			if (dbh_ptr->native_handle.database_interface_dbh->connection_options.database_interface != database_interface) {
165 				continue;
166 			}
167 
168 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Dropping DB connection %s\n", dbh_ptr->name);
169 
170 			database_interface->handle_destroy(&dbh_ptr->native_handle.database_interface_dbh);
171 
172 			del_handle(dbh_ptr);
173 			switch_mutex_unlock(dbh_ptr->mutex);
174 			destroy_handle(&dbh_ptr);
175 			goto top;
176 		}
177 	}
178 
179 	switch_mutex_unlock(sql_manager.dbh_mutex);
180 }
181 
get_handle(const char * db_str,const char * user_str,const char * thread_str)182 static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user_str, const char *thread_str)
183 {
184 	switch_ssize_t hlen = -1;
185 	unsigned long hash = 0, thread_hash = 0;
186 	switch_cache_db_handle_t *dbh_ptr, *r = NULL;
187 
188 	hash = switch_ci_hashfunc_default(db_str, &hlen);
189 	thread_hash = switch_ci_hashfunc_default(thread_str, &hlen);
190 
191 	switch_mutex_lock(sql_manager.dbh_mutex);
192 
193 	/* First loop allows a thread to use a handle multiple times sumiltaneously
194 	   but only if that handle is in use by the same thread. In that case use_count will be incremented.
195 	   This allows SQLite to read and write within a single thread, giving the same handle for both operations.
196 	*/
197 	for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
198 		if (dbh_ptr->thread_hash == thread_hash && dbh_ptr->hash == hash &&
199 			!switch_test_flag(dbh_ptr, CDF_PRUNE) && switch_mutex_trylock(dbh_ptr->mutex) == SWITCH_STATUS_SUCCESS) {
200 			r = dbh_ptr;
201 			break;
202 		}
203 	}
204 
205 	if (!r) {
206 		/* If a handle idles, take it and associate with the thread.
207 		   If a handle is in use, skip and create new one.
208 		*/
209 		for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
210 			if (dbh_ptr->hash == hash && !dbh_ptr->use_count && !switch_test_flag(dbh_ptr, CDF_PRUNE) &&
211 				switch_mutex_trylock(dbh_ptr->mutex) == SWITCH_STATUS_SUCCESS) {
212 				r = dbh_ptr;
213 				r->thread_hash = thread_hash;
214 				break;
215 			}
216 		}
217 	}
218 
219 	if (r) {
220 		r->use_count++;
221 		r->total_used_count++;
222 		sql_manager.total_used_handles++;
223 		switch_set_string(r->last_user, user_str);
224 	}
225 
226 	switch_mutex_unlock(sql_manager.dbh_mutex);
227 
228 	return r;
229 
230 }
231 
232 /*!
233   \brief Open the default system database
234 */
_switch_core_db_handle(switch_cache_db_handle_t ** dbh,const char * file,const char * func,int line)235 SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
236 {
237 	switch_status_t r;
238 	char *dsn;
239 
240 	if (!sql_manager.manage) {
241 		return SWITCH_STATUS_FALSE;
242 	}
243 
244 	if (!zstr(runtime.odbc_dsn)) {
245 		dsn = runtime.odbc_dsn;
246 	} else if (!zstr(runtime.dbname)) {
247 		dsn = runtime.dbname;
248 	} else {
249 		dsn = "core";
250 	}
251 
252 	if ((r = _switch_cache_db_get_db_handle_dsn_ex(dbh, dsn, SWITCH_TRUE, file, func, line)) != SWITCH_STATUS_SUCCESS) {
253 		*dbh = NULL;
254 	}
255 
256 	return r;
257 }
258 
259 #define SQL_CACHE_TIMEOUT 30
260 #define SQL_REG_TIMEOUT 15
261 
262 
sql_close(time_t prune)263 static void sql_close(time_t prune)
264 {
265 	switch_cache_db_handle_t *dbh = NULL;
266 	int locked = 0;
267 	int sanity = 10000;
268 
269 	switch_mutex_lock(sql_manager.dbh_mutex);
270  top:
271 	locked = 0;
272 
273 	for (dbh = sql_manager.handle_pool; dbh; dbh = dbh->next) {
274 		time_t diff = 0;
275 
276 		if (prune > 0 && prune > dbh->last_used) {
277 			diff = (time_t) prune - dbh->last_used;
278 		}
279 
280 		if (prune > 0 && (dbh->use_count || switch_test_flag(dbh, CDF_NONEXPIRING) || (diff < SQL_CACHE_TIMEOUT && !switch_test_flag(dbh, CDF_PRUNE)))) {
281 			continue;
282 		}
283 
284 		if (switch_mutex_trylock(dbh->mutex) == SWITCH_STATUS_SUCCESS) {
285 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Dropping idle DB connection %s\n", dbh->name);
286 
287 			switch (dbh->type) {
288 				case SCDB_TYPE_DATABASE_INTERFACE:
289 				{
290 					switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
291 					database_interface->handle_destroy(&dbh->native_handle.database_interface_dbh);
292 				}
293 				break;
294 			case SCDB_TYPE_ODBC:
295 				{
296 					switch_odbc_handle_destroy(&dbh->native_handle.odbc_dbh);
297 				}
298 				break;
299 			case SCDB_TYPE_CORE_DB:
300 				{
301 					switch_core_db_close(dbh->native_handle.core_db_dbh->handle);
302 					dbh->native_handle.core_db_dbh->handle = NULL;
303 				}
304 				break;
305 			}
306 
307 			del_handle(dbh);
308 			switch_mutex_unlock(dbh->mutex);
309 			destroy_handle(&dbh);
310 			goto top;
311 
312 		} else {
313 			if (!prune) {
314 				if (!sanity) {
315 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SANITY CHECK FAILED!  Handle %s (%s;%s) was not properly released.\n",
316 									  dbh->name, dbh->creator, dbh->last_user);
317 				} else {
318 					locked++;
319 				}
320 			}
321 			continue;
322 		}
323 
324 	}
325 
326 	if (locked) {
327 		if (!prune) {
328 			switch_cond_next();
329 			if (sanity) sanity--;
330 		}
331 		goto top;
332 	}
333 
334 	switch_mutex_unlock(sql_manager.dbh_mutex);
335 }
336 
337 
switch_cache_db_get_type(switch_cache_db_handle_t * dbh)338 SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_cache_db_get_type(switch_cache_db_handle_t *dbh)
339 {
340 	return dbh->type;
341 }
342 
switch_cache_db_flush_handles(void)343 SWITCH_DECLARE(void) switch_cache_db_flush_handles(void)
344 {
345 	sql_close(switch_epoch_time_now(NULL) + SQL_CACHE_TIMEOUT + 1);
346 }
347 
348 
switch_cache_db_release_db_handle(switch_cache_db_handle_t ** dbh)349 SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t **dbh)
350 {
351 	if (dbh && *dbh) {
352 
353 		switch((*dbh)->type) {
354 		case SCDB_TYPE_DATABASE_INTERFACE:
355 			{
356 				switch_database_interface_t *database_interface = (*dbh)->native_handle.database_interface_dbh->connection_options.database_interface;
357 				database_interface->flush((*dbh)->native_handle.database_interface_dbh);
358 			}
359 		break;
360 		default:
361 			break;
362 		}
363 
364 		switch_mutex_lock(sql_manager.dbh_mutex);
365 		(*dbh)->last_used = switch_epoch_time_now(NULL);
366 
367 		(*dbh)->io_mutex = NULL;
368 
369 		if ((*dbh)->use_count) {
370 			--(*dbh)->use_count;
371 		}
372 		switch_mutex_unlock((*dbh)->mutex);
373 		sql_manager.total_used_handles--;
374 		*dbh = NULL;
375 		switch_mutex_unlock(sql_manager.dbh_mutex);
376 	}
377 }
378 
379 
switch_cache_db_dismiss_db_handle(switch_cache_db_handle_t ** dbh)380 SWITCH_DECLARE(void) switch_cache_db_dismiss_db_handle(switch_cache_db_handle_t **dbh)
381 {
382 	switch_cache_db_release_db_handle(dbh);
383 }
384 
385 #ifndef MIN
386 #define MIN(a,b) (((a) < (b)) ? (a) : (b))
387 #endif
388 
switch_database_available(char * dsn)389 SWITCH_DECLARE(switch_status_t) switch_database_available(char* dsn)
390 {
391 	switch_status_t status = SWITCH_STATUS_FALSE;
392 	switch_database_interface_t *database_interface;
393 
394 	if (!dsn) {
395 		status = SWITCH_STATUS_SUCCESS;
396 	}
397 	else {
398 		char *colon_slashes = NULL;
399 		if (NULL != (colon_slashes = strstr(dsn, "://")))
400 		{
401 			char prefix[16] = "";
402 			strncpy(prefix, dsn, MIN(colon_slashes - dsn, 15));
403 
404 			if (!strncasecmp(prefix, "odbc", 4)) {
405 				if (switch_odbc_available()) status = SWITCH_STATUS_SUCCESS;
406 			}
407 			else if (!strncasecmp(prefix, "sqlite", 6)) {
408 				status = SWITCH_STATUS_SUCCESS;
409 			}
410 			else if ((database_interface = switch_loadable_module_get_database_interface(prefix, NULL))) {
411 				status = SWITCH_STATUS_SUCCESS;
412 				UNPROTECT_INTERFACE(database_interface);
413 			}
414 		}
415 		else if (strchr(dsn + 2, ':')) {
416 			status = SWITCH_STATUS_SUCCESS;
417 		}
418 	}
419 
420 	return status;
421 }
422 
switch_core_check_core_db_dsn(void)423 SWITCH_DECLARE(switch_status_t) switch_core_check_core_db_dsn(void)
424 {
425 	return switch_database_available(runtime.odbc_dsn);
426 }
427 
_switch_cache_db_get_db_handle_dsn(switch_cache_db_handle_t ** dbh,const char * dsn,const char * file,const char * func,int line)428 SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle_dsn(switch_cache_db_handle_t **dbh, const char *dsn,
429 	const char *file, const char *func, int line)
430 {
431 	return _switch_cache_db_get_db_handle_dsn_ex(dbh, dsn, SWITCH_FALSE, file, func, line);
432 }
433 
_switch_cache_db_get_db_handle_dsn_ex(switch_cache_db_handle_t ** dbh,const char * dsn,switch_bool_t make_module_no_unloadable,const char * file,const char * func,int line)434 SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle_dsn_ex(switch_cache_db_handle_t **dbh, const char *dsn, switch_bool_t make_module_no_unloadable,
435 																   const char *file, const char *func, int line)
436 {
437 	switch_cache_db_connection_options_t connection_options = { {0} };
438 	switch_cache_db_handle_type_t type = SCDB_TYPE_CORE_DB;
439 	switch_database_interface_t *database_interface = NULL;
440 	char tmp[256] = "";
441 	char *p;
442 	switch_status_t status = SWITCH_STATUS_FALSE;
443 	int i;
444 
445 	char *colon_slashes = NULL;
446 	if ( NULL != (colon_slashes = strstr(dsn, "://")) )
447 	{
448 		char prefix[16] = "";
449 		strncpy(prefix, dsn, MIN(colon_slashes - dsn, 15));
450 
451 		if ((database_interface = switch_loadable_module_get_database_interface(prefix, NULL))) {
452 			type = SCDB_TYPE_DATABASE_INTERFACE;
453 			connection_options.database_interface_options.make_module_no_unloadable = make_module_no_unloadable;
454 			connection_options.database_interface_options.database_interface = database_interface;
455 			connection_options.database_interface_options.original_dsn = dsn;
456 			connection_options.database_interface_options.connection_string = colon_slashes + 3;
457 			strcpy(connection_options.database_interface_options.prefix, prefix);
458 			UNPROTECT_INTERFACE(database_interface);
459 		}
460 	}
461 
462 	if (!connection_options.database_interface_options.connection_string)
463 	{
464 		if (!strncasecmp(dsn, "sqlite://", 9)) {
465 			type = SCDB_TYPE_CORE_DB;
466 			connection_options.core_db_options.db_path = (char *)(dsn + 9);
467 			if (!strncasecmp(connection_options.core_db_options.db_path, "memory://", 9)) {
468 				connection_options.core_db_options.in_memory = SWITCH_TRUE;
469 				connection_options.core_db_options.db_path = (char *)(connection_options.core_db_options.db_path + 9);
470 			}
471 		}
472 		else if ((!(i = strncasecmp(dsn, "odbc://", 7))) || (strchr(dsn + 2, ':') && !colon_slashes)) {
473 			type = SCDB_TYPE_ODBC;
474 
475 			if (i) {
476 				switch_set_string(tmp, dsn);
477 			}
478 			else {
479 				switch_set_string(tmp, dsn + 7);
480 			}
481 
482 			connection_options.odbc_options.dsn = tmp;
483 
484 			if ((p = strchr(tmp, ':'))) {
485 				*p++ = '\0';
486 				connection_options.odbc_options.user = p;
487 
488 				if ((p = strchr(connection_options.odbc_options.user, ':'))) {
489 					*p++ = '\0';
490 					connection_options.odbc_options.pass = p;
491 				}
492 			}
493 		}
494 		else {
495 			type = SCDB_TYPE_CORE_DB;
496 			connection_options.core_db_options.db_path = (char *)dsn;
497 		}
498 	}
499 
500 	status = _switch_cache_db_get_db_handle(dbh, type, &connection_options, file, func, line);
501 
502 	if (status != SWITCH_STATUS_SUCCESS) *dbh = NULL;
503 
504 	return status;
505 }
506 
507 
_switch_cache_db_get_db_handle(switch_cache_db_handle_t ** dbh,switch_cache_db_handle_type_t type,switch_cache_db_connection_options_t * connection_options,const char * file,const char * func,int line)508 SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_handle_t **dbh,
509 															   switch_cache_db_handle_type_t type,
510 															   switch_cache_db_connection_options_t *connection_options,
511 															   const char *file, const char *func, int line)
512 {
513 	switch_thread_id_t self = switch_thread_self();
514 	char thread_str[CACHE_DB_LEN] = "";
515 	char db_str[CACHE_DB_LEN] = "";
516 	char db_callsite_str[CACHE_DB_LEN] = "";
517 	switch_cache_db_handle_t *new_dbh = NULL;
518 	int waiting = 0;
519 	uint32_t yield_len = 100000, total_yield = 0;
520 
521 	const char *db_name = NULL;
522 	const char *odbc_user = NULL;
523 	const char *odbc_pass = NULL;
524 	const char *db_type = NULL;
525 
526 	while(runtime.max_db_handles && sql_manager.total_handles >= runtime.max_db_handles && sql_manager.total_used_handles >= sql_manager.total_handles) {
527 		if (!waiting++) {
528 			switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_WARNING, "Max handles %u exceeded, blocking....\n",
529 							  runtime.max_db_handles);
530 		}
531 
532 		switch_yield(yield_len);
533 		total_yield += yield_len;
534 
535 		if (runtime.db_handle_timeout && total_yield > runtime.db_handle_timeout) {
536 			switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "Error connecting\n");
537 			*dbh = NULL;
538 			return SWITCH_STATUS_FALSE;
539 		}
540 	}
541 
542 	switch (type) {
543 	case SCDB_TYPE_DATABASE_INTERFACE:
544 		{
545 			db_name = connection_options->database_interface_options.connection_string;
546 			odbc_user = NULL;
547 			odbc_pass = NULL;
548 			db_type = "database_interface";
549 		}
550 		break;
551 	case SCDB_TYPE_ODBC:
552 		{
553 			db_name = connection_options->odbc_options.dsn;
554 			odbc_user = connection_options->odbc_options.user;
555 			odbc_pass = connection_options->odbc_options.pass;
556 			db_type = "odbc";
557 		}
558 		break;
559 	case SCDB_TYPE_CORE_DB:
560 		{
561 			db_name = connection_options->core_db_options.db_path;
562 			odbc_user = NULL;
563 			odbc_pass = NULL;
564 			db_type = "core_db";
565 		}
566 		break;
567 	}
568 
569 	if (!db_name) {
570 		return SWITCH_STATUS_FALSE;
571 	}
572 
573 	if (odbc_user || odbc_pass) {
574 		snprintf(db_str, sizeof(db_str) - 1, "db=\"%s\";type=\"%s\"user=\"%s\";pass=\"%s\"", db_name, db_type, odbc_user, odbc_pass);
575 	} else {
576 		snprintf(db_str, sizeof(db_str) - 1, "db=\"%s\",type=\"%s\"", db_name, db_type);
577 	}
578 	snprintf(db_callsite_str, sizeof(db_callsite_str) - 1, "%s:%d", file, line);
579 	snprintf(thread_str, sizeof(thread_str) - 1, "thread=\"%lu\"",  (unsigned long) (intptr_t) self);
580 
581 	if ((new_dbh = get_handle(db_str, db_callsite_str, thread_str))) {
582 		if (type == SCDB_TYPE_DATABASE_INTERFACE) {
583 			switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10,
584 				"Reuse Unused Cached DB handle %s [Database interface prefix: %s]\n", new_dbh->name, connection_options->database_interface_options.prefix);
585 		} else {
586 			switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10,
587 				"Reuse Unused Cached DB handle %s [%s]\n", new_dbh->name, switch_cache_db_type_name(new_dbh->type));
588 		}
589 	} else {
590 		switch_core_db_t *db = NULL;
591 		switch_odbc_handle_t *odbc_dbh = NULL;
592 		switch_database_interface_handle_t *database_interface_dbh = NULL;
593 
594 		switch (type) {
595 		case SCDB_TYPE_DATABASE_INTERFACE:
596 			{
597 				switch_database_interface_t *database_interface = connection_options->database_interface_options.database_interface;
598 
599 				if (SWITCH_STATUS_SUCCESS != database_interface->handle_new(connection_options->database_interface_options, &database_interface_dbh)) {
600 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! Can't create new handle! Can't connect to DSN %s\n", connection_options->database_interface_options.original_dsn);
601 					goto end;
602 				}
603 
604 				if (database_interface_dbh) {
605 					database_interface_dbh->connection_options = connection_options->database_interface_options;
606 
607 					if (connection_options->database_interface_options.make_module_no_unloadable == SWITCH_TRUE)
608 					{
609 						PROTECT_INTERFACE(database_interface)
610 							switch_loadable_module_protect(database_interface->parent->module_name);
611 						UNPROTECT_INTERFACE(database_interface)
612 					}
613 				}
614 			}
615 			break;
616 		case SCDB_TYPE_ODBC:
617 			{
618 				if (!switch_odbc_available()) {
619 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC NOT AVAILABLE! Can't connect to DSN %s\n", connection_options->odbc_options.dsn);
620 					goto end;
621 				}
622 
623 				if ((odbc_dbh = switch_odbc_handle_new(connection_options->odbc_options.dsn,
624 													   connection_options->odbc_options.user, connection_options->odbc_options.pass))) {
625 					if (switch_odbc_handle_connect(odbc_dbh) != SWITCH_ODBC_SUCCESS) {
626 						switch_odbc_handle_destroy(&odbc_dbh);
627 					}
628 				}
629 			}
630 			break;
631 		case SCDB_TYPE_CORE_DB:
632 			{
633 				if (!connection_options->core_db_options.in_memory) {
634 					db = switch_core_db_open_file(connection_options->core_db_options.db_path);
635 				} else {
636 					db = switch_core_db_open_in_memory(connection_options->core_db_options.db_path);
637 				}
638 			}
639 			break;
640 
641 		default:
642 			goto end;
643 		}
644 
645 		if (!db && !odbc_dbh && !database_interface_dbh) {
646 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure to connect to %s %s!\n", switch_cache_db_type_name(type), db_name);
647 			goto end;
648 		}
649 
650 		new_dbh = create_handle(type);
651 
652 		switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10,
653 						  "Create Cached DB handle %s [%s] %s:%d\n", new_dbh->name, switch_cache_db_type_name(type), file, line);
654 
655 		if (database_interface_dbh) {
656 			new_dbh->native_handle.database_interface_dbh = database_interface_dbh;
657 		} else if (db) {
658 			if (!(new_dbh->native_handle.core_db_dbh = switch_core_alloc(new_dbh->pool, sizeof(*new_dbh->native_handle.core_db_dbh)))) {
659 				destroy_handle(&new_dbh);
660 				switch_core_db_close(db);
661 				goto end;
662 			}
663 			new_dbh->native_handle.core_db_dbh->handle = db;
664 			new_dbh->native_handle.core_db_dbh->in_memory = connection_options->core_db_options.in_memory;
665 		} else if (odbc_dbh) {
666 			new_dbh->native_handle.odbc_dbh = odbc_dbh;
667 		}
668 
669 		add_handle(new_dbh, db_str, db_callsite_str, thread_str);
670 	}
671 
672  end:
673 
674 	if (new_dbh) {
675 		new_dbh->last_used = switch_epoch_time_now(NULL);
676 	}
677 
678 	*dbh = new_dbh;
679 
680 	return *dbh ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE;
681 }
682 
683 
switch_cache_db_execute_sql_real(switch_cache_db_handle_t * dbh,const char * sql,char ** err)684 static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t *dbh, const char *sql, char **err)
685 {
686 	switch_status_t status = SWITCH_STATUS_FALSE;
687 	char *errmsg = NULL;
688 	char *tmp = NULL;
689 	char *type = NULL;
690 	switch_mutex_t *io_mutex = dbh->io_mutex;
691 
692 	if (io_mutex) switch_mutex_lock(io_mutex);
693 
694 	if (err) {
695 		*err = NULL;
696 	}
697 
698 	switch (dbh->type) {
699 	case SCDB_TYPE_DATABASE_INTERFACE:
700 		{
701 			switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
702 			type = (char *)dbh->native_handle.database_interface_dbh->connection_options.prefix;
703 			status = database_interface_handle_exec(database_interface, dbh->native_handle.database_interface_dbh, sql, &errmsg);
704 		}
705 		break;
706 	case SCDB_TYPE_ODBC:
707 		{
708 			type = "ODBC";
709 			status = switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, sql, NULL, &errmsg);
710 		}
711 		break;
712 	case SCDB_TYPE_CORE_DB:
713 		{
714 			int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, sql, NULL, NULL, &errmsg);
715 			type = "NATIVE";
716 
717 			if (ret == SWITCH_CORE_DB_OK) {
718 				status = SWITCH_STATUS_SUCCESS;
719 			}
720 
721 			if (errmsg) {
722 				switch_strdup(tmp, errmsg);
723 				switch_core_db_free(errmsg);
724 				errmsg = tmp;
725 			}
726 		}
727 		break;
728 	}
729 
730 	if (errmsg) {
731 		if (!switch_stristr("already exists", errmsg) && !switch_stristr("duplicate key name", errmsg)) {
732 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[%s] %s SQL ERR [%s]\n%s\n", dbh->name, (type ? type : "Unknown"), errmsg, sql);
733 		}
734 		if (err) {
735 			*err = errmsg;
736 		} else {
737 			switch_safe_free(errmsg);
738 		}
739 	}
740 
741 
742 	if (io_mutex) switch_mutex_unlock(io_mutex);
743 
744 	return status;
745 }
746 
747 /**
748    OMFG you cruel bastards.  Who chooses 64k as a max buffer len for a sql statement, have you ever heard of transactions?
749 **/
switch_cache_db_execute_sql_chunked(switch_cache_db_handle_t * dbh,char * sql,uint32_t chunk_size,char ** err)750 static switch_status_t switch_cache_db_execute_sql_chunked(switch_cache_db_handle_t *dbh, char *sql, uint32_t chunk_size, char **err)
751 {
752 	switch_status_t status = SWITCH_STATUS_FALSE;
753 	char *p, *s, *e;
754 	switch_size_t chunk_count;
755 	switch_size_t len;
756 
757 	switch_assert(chunk_size);
758 
759 	if (err)
760 		*err = NULL;
761 
762 	len = strlen(sql);
763 
764 	if (chunk_size > len) {
765 		return switch_cache_db_execute_sql_real(dbh, sql, err);
766 	}
767 
768 	if (!(chunk_count = strlen(sql) / chunk_size)) {
769 		return SWITCH_STATUS_FALSE;
770 	}
771 
772 	e = end_of_p(sql);
773 	s = sql;
774 
775 	while (s && s < e) {
776 		p = s + chunk_size;
777 
778 		if (p > e) {
779 			p = e;
780 		}
781 
782 		while (p > s) {
783 			if (*p == '\n' && *(p - 1) == ';') {
784 				*p = '\0';
785 				*(p - 1) = '\0';
786 				p++;
787 				break;
788 			}
789 
790 			p--;
791 		}
792 
793 		if (p <= s)
794 			break;
795 
796 
797 		status = switch_cache_db_execute_sql_real(dbh, s, err);
798 		if (status != SWITCH_STATUS_SUCCESS || (err && *err)) {
799 			break;
800 		}
801 
802 		s = p;
803 
804 	}
805 
806 	return status;
807 
808 }
809 
810 
switch_cache_db_execute_sql(switch_cache_db_handle_t * dbh,char * sql,char ** err)811 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_handle_t *dbh, char *sql, char **err)
812 {
813 	switch_status_t status = SWITCH_STATUS_FALSE;
814 	switch_mutex_t *io_mutex = dbh->io_mutex;
815 
816 	if (io_mutex) switch_mutex_lock(io_mutex);
817 
818 	switch (dbh->type) {
819 	default:
820 		{
821 			status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, 32768, err);
822 		}
823 		break;
824 	}
825 
826 	if (io_mutex) switch_mutex_unlock(io_mutex);
827 
828 	return status;
829 
830 }
831 
832 
switch_cache_db_affected_rows(switch_cache_db_handle_t * dbh)833 SWITCH_DECLARE(int) switch_cache_db_affected_rows(switch_cache_db_handle_t *dbh)
834 {
835 	switch (dbh->type) {
836 	case SCDB_TYPE_CORE_DB:
837 		{
838 			return switch_core_db_changes(dbh->native_handle.core_db_dbh->handle);
839 		}
840 		break;
841 	case SCDB_TYPE_ODBC:
842 		{
843 			return switch_odbc_handle_affected_rows(dbh->native_handle.odbc_dbh);
844 		}
845 		break;
846 	case SCDB_TYPE_DATABASE_INTERFACE:
847 		{
848 			switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
849 			int affected_rows = 0;
850 			database_interface->affected_rows(dbh->native_handle.database_interface_dbh, &affected_rows);
851 			return affected_rows;
852 		}
853 		break;
854 	}
855 	return 0;
856 }
857 
switch_cache_db_load_extension(switch_cache_db_handle_t * dbh,const char * extension)858 SWITCH_DECLARE(int) switch_cache_db_load_extension(switch_cache_db_handle_t *dbh, const char *extension)
859 {
860 	switch (dbh->type) {
861 	case SCDB_TYPE_CORE_DB:
862 		{
863 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "try to load extension [%s]!\n", extension);
864 			return switch_core_db_load_extension(dbh->native_handle.core_db_dbh->handle, extension);
865 		}
866 		break;
867 	case SCDB_TYPE_ODBC:
868 		{
869 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "load extension not supported by type ODBC!\n");
870 		}
871 		break;
872 	case SCDB_TYPE_DATABASE_INTERFACE:
873 		{
874 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "load extension not supported by type DATABASE_INTERFACE!\n");
875 		}
876 		break;
877 	}
878 	return 0;
879 }
880 
881 
switch_cache_db_execute_sql2str(switch_cache_db_handle_t * dbh,char * sql,char * str,size_t len,char ** err)882 SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t *dbh, char *sql, char *str, size_t len, char **err)
883 {
884 	switch_status_t status = SWITCH_STATUS_FALSE;
885 	switch_mutex_t *io_mutex = dbh->io_mutex;
886 
887 	if (io_mutex) switch_mutex_lock(io_mutex);
888 
889 	memset(str, 0, len);
890 
891 	switch (dbh->type) {
892 	case SCDB_TYPE_CORE_DB:
893 		{
894 			switch_core_db_stmt_t *stmt;
895 
896 			if (switch_core_db_prepare(dbh->native_handle.core_db_dbh->handle, sql, -1, &stmt, 0)) {
897 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Statement Error [%s]!\n", sql);
898 				goto end;
899 			} else {
900 				int running = 1;
901 				int colcount;
902 
903 				while (running < 5000) {
904 					int result = switch_core_db_step(stmt);
905 					const unsigned char *txt;
906 
907 					if (result == SWITCH_CORE_DB_ROW) {
908 						if ((colcount = switch_core_db_column_count(stmt)) > 0) {
909 							if ((txt = switch_core_db_column_text(stmt, 0))) {
910 								switch_copy_string(str, (char *) txt, len);
911 								status = SWITCH_STATUS_SUCCESS;
912 							}
913 						}
914 						break;
915 					} else if (result == SWITCH_CORE_DB_BUSY) {
916 						running++;
917 						switch_cond_next();
918 						continue;
919 					}
920 					break;
921 				}
922 
923 				switch_core_db_finalize(stmt);
924 			}
925 		}
926 		break;
927 	case SCDB_TYPE_ODBC:
928 		{
929 			status = switch_odbc_handle_exec_string(dbh->native_handle.odbc_dbh, sql, str, len, err);
930 		}
931 		break;
932 	case SCDB_TYPE_DATABASE_INTERFACE:
933 		{
934 			switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
935 			status = database_interface->exec_string(dbh->native_handle.database_interface_dbh, sql, str, len, err);
936 		}
937 		break;
938 	}
939 
940  end:
941 
942 	if (io_mutex) switch_mutex_unlock(io_mutex);
943 
944 	return status == SWITCH_STATUS_SUCCESS ? str : NULL;
945 
946 }
947 
switch_cache_db_persistant_execute(switch_cache_db_handle_t * dbh,const char * sql,uint32_t retries)948 SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_db_handle_t *dbh, const char *sql, uint32_t retries)
949 {
950 	char *errmsg = NULL;
951 	switch_status_t status = SWITCH_STATUS_FALSE;
952 	uint8_t forever = 0;
953 	switch_mutex_t *io_mutex = dbh->io_mutex;
954 
955 	if (!retries) {
956 		forever = 1;
957 		retries = 1000;
958 	}
959 
960 	while (retries > 0) {
961 
962 		if (io_mutex) switch_mutex_lock(io_mutex);
963 		switch_cache_db_execute_sql_real(dbh, sql, &errmsg);
964 		if (io_mutex) switch_mutex_unlock(io_mutex);
965 
966 
967 		if (errmsg) {
968 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", errmsg);
969 			switch_safe_free(errmsg);
970 			switch_yield(100000);
971 			retries--;
972 			if (retries == 0 && forever) {
973 				retries = 1000;
974 				continue;
975 			}
976 		} else {
977 			status = SWITCH_STATUS_SUCCESS;
978 			break;
979 		}
980 	}
981 
982 	return status;
983 }
984 
985 
switch_cache_db_persistant_execute_trans_full(switch_cache_db_handle_t * dbh,char * sql,uint32_t retries,const char * pre_trans_execute,const char * post_trans_execute,const char * inner_pre_trans_execute,const char * inner_post_trans_execute)986 SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans_full(switch_cache_db_handle_t *dbh,
987 																			  char *sql, uint32_t retries,
988 																			  const char *pre_trans_execute,
989 																			  const char *post_trans_execute,
990 																			  const char *inner_pre_trans_execute,
991 																			  const char *inner_post_trans_execute)
992 {
993 	char *errmsg = NULL;
994 	switch_status_t status = SWITCH_STATUS_FALSE;
995 	uint8_t forever = 0;
996 	unsigned begin_retries = 100;
997 	uint8_t again = 0;
998 	switch_mutex_t *io_mutex = dbh->io_mutex;
999 
1000 	if (!retries) {
1001 		forever = 1;
1002 		retries = 1000;
1003 	}
1004 
1005 	if (io_mutex) switch_mutex_lock(io_mutex);
1006 
1007 	if (!zstr(pre_trans_execute)) {
1008 		switch_cache_db_execute_sql_real(dbh, pre_trans_execute, &errmsg);
1009 		if (errmsg) {
1010 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", pre_trans_execute, errmsg);
1011 			switch_safe_free(errmsg);
1012 		}
1013 	}
1014 
1015  again:
1016 
1017 	while (begin_retries > 0) {
1018 		again = 0;
1019 
1020 		switch(dbh->type) {
1021 		case SCDB_TYPE_CORE_DB:
1022 			{
1023 				switch_cache_db_execute_sql_real(dbh, "BEGIN EXCLUSIVE", &errmsg);
1024 			}
1025 			break;
1026 		case SCDB_TYPE_ODBC:
1027 			{
1028 				switch_odbc_status_t result;
1029 
1030 				if ((result = switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
1031 					char tmp[100];
1032 					switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
1033 					errmsg = strdup(tmp);
1034 				}
1035 			}
1036 			break;
1037 		case SCDB_TYPE_DATABASE_INTERFACE:
1038 			{
1039 				switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1040 				switch_status_t result;
1041 
1042 				if ((result = database_interface->sql_set_auto_commit_attr(dbh->native_handle.database_interface_dbh, 0)) != SWITCH_STATUS_SUCCESS) {
1043 					char tmp[100];
1044 					switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
1045 					errmsg = strdup(tmp);
1046 				}
1047 			}
1048 			break;
1049 		}
1050 
1051 		if (errmsg) {
1052 			begin_retries--;
1053 			if (strstr(errmsg, "cannot start a transaction within a transaction")) {
1054 				again = 1;
1055 			} else {
1056 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SQL Retry [%s]\n", errmsg);
1057 			}
1058 			switch_safe_free(errmsg);
1059 
1060 			if (again) {
1061 				switch(dbh->type) {
1062 				case SCDB_TYPE_CORE_DB:
1063 					{
1064 						switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
1065 					}
1066 					break;
1067 				case SCDB_TYPE_ODBC:
1068 					{
1069 						switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
1070 						switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
1071 					}
1072 					break;
1073 				case SCDB_TYPE_DATABASE_INTERFACE:
1074 					{
1075 						switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1076 						switch_status_t result;
1077 
1078 						if ((result = database_interface->commit(dbh->native_handle.database_interface_dbh)) != SWITCH_STATUS_SUCCESS) {
1079 							char tmp[100];
1080 							switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to commit transaction", result);
1081 						}
1082 					}
1083 					break;
1084 				}
1085 
1086 				goto again;
1087 			}
1088 
1089 			switch_yield(100000);
1090 
1091 			if (begin_retries == 0) {
1092 				goto done;
1093 			}
1094 
1095 			continue;
1096 		}
1097 
1098 		break;
1099 	}
1100 
1101 
1102 	if (!zstr(inner_pre_trans_execute)) {
1103 		switch_cache_db_execute_sql_real(dbh, inner_pre_trans_execute, &errmsg);
1104 		if (errmsg) {
1105 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", inner_pre_trans_execute, errmsg);
1106 			switch_safe_free(errmsg);
1107 		}
1108 	}
1109 
1110 	while (retries > 0) {
1111 
1112 		switch_cache_db_execute_sql(dbh, sql, &errmsg);
1113 
1114 		if (errmsg) {
1115 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", errmsg);
1116 			switch_safe_free(errmsg);
1117 			errmsg = NULL;
1118 			switch_yield(100000);
1119 			retries--;
1120 			if (retries == 0 && forever) {
1121 				retries = 1000;
1122 				continue;
1123 			}
1124 		} else {
1125 			status = SWITCH_STATUS_SUCCESS;
1126 			break;
1127 		}
1128 	}
1129 
1130 	if (!zstr(inner_post_trans_execute)) {
1131 		switch_cache_db_execute_sql_real(dbh, inner_post_trans_execute, &errmsg);
1132 		if (errmsg) {
1133 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", inner_post_trans_execute, errmsg);
1134 			switch_safe_free(errmsg);
1135 		}
1136 	}
1137 
1138  done:
1139 
1140 	switch(dbh->type) {
1141 	case SCDB_TYPE_CORE_DB:
1142 		{
1143 			switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
1144 		}
1145 		break;
1146 	case SCDB_TYPE_ODBC:
1147 		{
1148 			switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
1149 			switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
1150 		}
1151 		break;
1152 	case SCDB_TYPE_DATABASE_INTERFACE:
1153 		{
1154 			switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1155 			switch_status_t result;
1156 
1157 			if ((result = database_interface->commit(dbh->native_handle.database_interface_dbh)) != SWITCH_STATUS_SUCCESS) {
1158 				char tmp[100];
1159 				switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to commit transaction", result);
1160 			}
1161 		}
1162 		break;
1163 	}
1164 
1165 	if (!zstr(post_trans_execute)) {
1166 		switch_cache_db_execute_sql_real(dbh, post_trans_execute, &errmsg);
1167 		if (errmsg) {
1168 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", post_trans_execute, errmsg);
1169 			switch_safe_free(errmsg);
1170 		}
1171 	}
1172 
1173 	if (io_mutex) switch_mutex_unlock(io_mutex);
1174 
1175 	return status;
1176 }
1177 
1178 struct helper {
1179 	switch_core_db_event_callback_func_t callback;
1180 	void *pdata;
1181 };
1182 
helper_callback(void * pArg,int argc,char ** argv,char ** columnNames)1183 static int helper_callback(void *pArg, int argc, char **argv, char **columnNames)
1184 {
1185 	struct helper *h = (struct helper *) pArg;
1186 	int r = 0;
1187 	switch_event_t *event;
1188 
1189 	switch_event_create_array_pair(&event, columnNames, argv, argc);
1190 
1191 	r = h->callback(h->pdata, event);
1192 
1193 	switch_event_destroy(&event);
1194 
1195 	return r;
1196 }
1197 
switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t * dbh,const char * sql,switch_core_db_event_callback_func_t callback,void * pdata,char ** err)1198 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh,
1199 																	 const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err)
1200 {
1201 	switch_status_t status = SWITCH_STATUS_FALSE;
1202 	char *errmsg = NULL;
1203 	switch_mutex_t *io_mutex = dbh->io_mutex;
1204 	struct helper h = {0};
1205 
1206 
1207 	if (err) {
1208 		*err = NULL;
1209 	}
1210 
1211 	if (io_mutex) switch_mutex_lock(io_mutex);
1212 
1213 	h.callback = callback;
1214 	h.pdata = pdata;
1215 
1216 	switch (dbh->type) {
1217 		case SCDB_TYPE_DATABASE_INTERFACE:
1218 		{
1219 			switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1220 
1221 			if ((status = database_interface_handle_callback_exec(database_interface, dbh->native_handle.database_interface_dbh, sql, helper_callback, &h, err)) != SWITCH_STATUS_SUCCESS) {
1222 				char tmp[100];
1223 				switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to execute_sql_event_callback", status);
1224 			}
1225 		}
1226 		break;
1227 	case SCDB_TYPE_ODBC:
1228 		{
1229 			status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, helper_callback, &h, err);
1230 		}
1231 		break;
1232 	case SCDB_TYPE_CORE_DB:
1233 		{
1234 			int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, sql, helper_callback, &h, &errmsg);
1235 
1236 			if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) {
1237 				status = SWITCH_STATUS_SUCCESS;
1238 			}
1239 
1240 			if (errmsg) {
1241 				dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2);
1242 				if (!strstr(errmsg, "query abort")) {
1243 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
1244 				}
1245 				switch_core_db_free(errmsg);
1246 			}
1247 		}
1248 		break;
1249 	}
1250 
1251 	if (io_mutex) switch_mutex_unlock(io_mutex);
1252 
1253 	return status;
1254 }
1255 
switch_cache_db_execute_sql_event_callback_err(switch_cache_db_handle_t * dbh,const char * sql,switch_core_db_event_callback_func_t callback,switch_core_db_err_callback_func_t err_callback,void * pdata,char ** err)1256 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback_err(switch_cache_db_handle_t *dbh, const char *sql,
1257 																			   switch_core_db_event_callback_func_t callback,
1258 																			   switch_core_db_err_callback_func_t err_callback,
1259 																			   void *pdata, char **err)
1260 {
1261 	switch_status_t status = SWITCH_STATUS_FALSE;
1262 	char *errmsg = NULL;
1263 	switch_mutex_t *io_mutex = dbh->io_mutex;
1264 	struct helper h;
1265 
1266 
1267 	if (err) {
1268 		*err = NULL;
1269 	}
1270 
1271 	if (io_mutex) switch_mutex_lock(io_mutex);
1272 
1273 	h.callback = callback;
1274 	h.pdata = pdata;
1275 
1276 	switch (dbh->type) {
1277 	case SCDB_TYPE_DATABASE_INTERFACE:
1278 		{
1279 			switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1280 
1281 			if ((status = database_interface_handle_callback_exec(database_interface, dbh->native_handle.database_interface_dbh, sql, helper_callback, &h, err)) != SWITCH_STATUS_SUCCESS) {
1282 				char tmp[100];
1283 				switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to execute_sql_event_callback_err", status);
1284 			}
1285 
1286 			if (err && *err) {
1287 				(*err_callback)(pdata, (const char*)*err);
1288 			}
1289 		}
1290 		break;
1291 	case SCDB_TYPE_ODBC:
1292 		{
1293 			status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, helper_callback, &h, err);
1294 			if (err && *err) {
1295 				(*err_callback)(pdata, (const char*)*err);
1296 			}
1297 		}
1298 		break;
1299 	case SCDB_TYPE_CORE_DB:
1300 		{
1301 			int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, sql, helper_callback, &h, &errmsg);
1302 
1303 			if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) {
1304 				status = SWITCH_STATUS_SUCCESS;
1305 			}
1306 
1307 			if (errmsg) {
1308 				dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2);
1309 				if (!strstr(errmsg, "query abort")) {
1310 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
1311 				}
1312 			}
1313 			if ((ret == SWITCH_CORE_DB_ABORT || errmsg) && err_callback) {
1314 				(*err_callback)(pdata, errmsg);
1315 			}
1316 			if (errmsg) {
1317 				switch_core_db_free(errmsg);
1318 			}
1319 		}
1320 		break;
1321 	}
1322 
1323 	if (io_mutex) switch_mutex_unlock(io_mutex);
1324 
1325 	return status;
1326 }
1327 
switch_cache_db_execute_sql_callback(switch_cache_db_handle_t * dbh,const char * sql,switch_core_db_callback_func_t callback,void * pdata,char ** err)1328 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cache_db_handle_t *dbh,
1329 																	 const char *sql, switch_core_db_callback_func_t callback, void *pdata, char **err)
1330 {
1331 	switch_status_t status = SWITCH_STATUS_FALSE;
1332 	char *errmsg = NULL;
1333 	switch_mutex_t *io_mutex = dbh->io_mutex;
1334 
1335 	if (err) {
1336 		*err = NULL;
1337 	}
1338 
1339 	if (io_mutex) switch_mutex_lock(io_mutex);
1340 
1341 
1342 	switch (dbh->type) {
1343 		case SCDB_TYPE_DATABASE_INTERFACE:
1344 		{
1345 			switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1346 
1347 			if ((status = database_interface_handle_callback_exec(database_interface, dbh->native_handle.database_interface_dbh, sql, callback, pdata, err)) != SWITCH_STATUS_SUCCESS) {
1348 				char tmp[100];
1349 				switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to execute_sql_callback", status);
1350 			}
1351 		}
1352 		break;
1353 	case SCDB_TYPE_ODBC:
1354 		{
1355 			status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, callback, pdata, err);
1356 		}
1357 		break;
1358 	case SCDB_TYPE_CORE_DB:
1359 		{
1360 			int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, sql, callback, pdata, &errmsg);
1361 
1362 			if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) {
1363 				status = SWITCH_STATUS_SUCCESS;
1364 			}
1365 
1366 			if (errmsg) {
1367 				dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2);
1368 				if (!strstr(errmsg, "query abort")) {
1369 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
1370 				}
1371 				switch_core_db_free(errmsg);
1372 			}
1373 		}
1374 		break;
1375 	}
1376 
1377 	if (io_mutex) switch_mutex_unlock(io_mutex);
1378 
1379 	return status;
1380 }
1381 
switch_cache_db_execute_sql_callback_err(switch_cache_db_handle_t * dbh,const char * sql,switch_core_db_callback_func_t callback,switch_core_db_err_callback_func_t err_callback,void * pdata,char ** err)1382 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback_err(switch_cache_db_handle_t *dbh, const char *sql,
1383 																		 switch_core_db_callback_func_t callback,
1384 																		 switch_core_db_err_callback_func_t err_callback, void *pdata, char **err)
1385 {
1386 	switch_status_t status = SWITCH_STATUS_FALSE;
1387 	char *errmsg = NULL;
1388 	switch_mutex_t *io_mutex = dbh->io_mutex;
1389 
1390 	if (err) {
1391 		*err = NULL;
1392 	}
1393 
1394 	if (io_mutex) switch_mutex_lock(io_mutex);
1395 
1396 
1397 	switch (dbh->type) {
1398 	case SCDB_TYPE_DATABASE_INTERFACE:
1399 		{
1400 			switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1401 
1402 			if ((status = database_interface_handle_callback_exec(database_interface, dbh->native_handle.database_interface_dbh, sql, callback, pdata, err)) != SWITCH_STATUS_SUCCESS) {
1403 				char tmp[100];
1404 				switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to execute_sql_callback_err", status);
1405 			}
1406 
1407 			if (err && *err) {
1408 				(*err_callback)(pdata, (const char*)*err);
1409 			}
1410 		}
1411 		break;
1412 	case SCDB_TYPE_ODBC:
1413 		{
1414 			status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, callback, pdata, err);
1415 			if (err && *err) {
1416 				(*err_callback)(pdata, (const char*)*err);
1417 			}
1418 		}
1419 		break;
1420 	case SCDB_TYPE_CORE_DB:
1421 		{
1422 			int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, sql, callback, pdata, &errmsg);
1423 
1424 			if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) {
1425 				status = SWITCH_STATUS_SUCCESS;
1426 			}
1427 
1428 			if (errmsg) {
1429 				dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2);
1430 				if (!strstr(errmsg, "query abort")) {
1431 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
1432 				}
1433 			}
1434 			if ((ret == SWITCH_CORE_DB_ABORT || errmsg) && err_callback) {
1435 				(*err_callback)(pdata, errmsg);
1436 			}
1437 			if (errmsg) {
1438 				switch_core_db_free(errmsg);
1439 			}
1440 		}
1441 		break;
1442 	}
1443 
1444 	if (io_mutex) switch_mutex_unlock(io_mutex);
1445 
1446 	return status;
1447 }
1448 
switch_cache_db_create_schema(switch_cache_db_handle_t * dbh,char * sql,char ** err)1449 SWITCH_DECLARE(switch_status_t) switch_cache_db_create_schema(switch_cache_db_handle_t *dbh, char *sql, char **err)
1450 {
1451 	switch_status_t r = SWITCH_STATUS_SUCCESS;
1452 
1453 	switch_assert(sql != NULL);
1454 
1455 	if (switch_test_flag((&runtime), SCF_AUTO_SCHEMAS)) {
1456 		r = switch_cache_db_execute_sql(dbh, sql, err);
1457 	}
1458 
1459 	return r;
1460 }
1461 
1462 /*!
1463  * \brief Performs test_sql and if it fails performs drop_sql and reactive_sql.
1464  *
1465  * If auto-clear-sql is disabled, then this function will do nothing and it is
1466  * assumed that the queries are not needed. If auto-create-schemas is disabled,
1467  * then just test_sql is executed, but drop_sql and reactive_sql are not.
1468  *
1469  * Otherwise, test_sql gets executed. If that succeeds, then there is nothing to
1470  * do. Otherwise drop_sql is executed (its result is ignored) and then finally
1471  * reactive_sql is executed.
1472  *
1473  * \return If auto-create-schemas is enabled, SWITCH_TRUE is returned if
1474  * test_sql succeeds, SWITCH_FALSE otherwise. If reactive_sql is executed
1475  * successfully SWITCH_TRUE is returned, otherwise SWITCH_FALSE is returned.
1476  */
switch_cache_db_test_reactive(switch_cache_db_handle_t * dbh,const char * test_sql,const char * drop_sql,const char * reactive_sql)1477 SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh,
1478 															const char *test_sql, const char *drop_sql, const char *reactive_sql)
1479 {
1480 	return switch_cache_db_test_reactive_ex(dbh, test_sql, drop_sql, reactive_sql, NULL);
1481 }
1482 
switch_cache_db_test_reactive_ex(switch_cache_db_handle_t * dbh,const char * test_sql,const char * drop_sql,const char * reactive_sql,const char * row_size_limited_reactive_sql)1483 SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive_ex(switch_cache_db_handle_t *dbh,
1484 															const char *test_sql, const char *drop_sql, const char *reactive_sql, const char *row_size_limited_reactive_sql)
1485 {
1486 	switch_bool_t r = SWITCH_TRUE;
1487 	switch_mutex_t *io_mutex = dbh->io_mutex;
1488 
1489 	switch_assert(test_sql != NULL);
1490 	switch_assert(reactive_sql != NULL);
1491 
1492 	if (!switch_test_flag((&runtime), SCF_CLEAR_SQL)) {
1493 		return SWITCH_TRUE;
1494 	}
1495 
1496 	if (!switch_test_flag((&runtime), SCF_AUTO_SCHEMAS)) {
1497 		switch_status_t status = switch_cache_db_execute_sql(dbh, (char *)test_sql, NULL);
1498 
1499 		return (status == SWITCH_STATUS_SUCCESS) ? SWITCH_TRUE : SWITCH_FALSE;
1500 	}
1501 
1502 	if (io_mutex) switch_mutex_lock(io_mutex);
1503 
1504 	switch (dbh->type) {
1505 	case SCDB_TYPE_DATABASE_INTERFACE:
1506 		{
1507 			switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1508 			switch_status_t result;
1509 
1510 			if ((result = database_interface_handle_exec(database_interface, dbh->native_handle.database_interface_dbh, test_sql, NULL)) != SWITCH_STATUS_SUCCESS) {
1511 				char tmp[100];
1512 				switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to test_reactive with test_sql", result);
1513 
1514 				if (drop_sql) {
1515 					if ((result = database_interface_handle_exec(database_interface, dbh->native_handle.database_interface_dbh, drop_sql, NULL)) != SWITCH_STATUS_SUCCESS) {
1516 						char tmp[100];
1517 						switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to test_reactive with drop_sql", result);
1518 					}
1519 				}
1520 
1521 				if ((result = database_interface_handle_exec(database_interface, dbh->native_handle.database_interface_dbh, reactive_sql, NULL)) != SWITCH_STATUS_SUCCESS) {
1522 					char tmp[100];
1523 					switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to test_reactive with reactive_sql", result);
1524 
1525 					if (row_size_limited_reactive_sql && switch_test_flag(database_interface, SWITCH_DATABASE_FLAG_ROW_SIZE_LIMIT)) {
1526 						if ((result = database_interface_handle_exec(database_interface, dbh->native_handle.database_interface_dbh, row_size_limited_reactive_sql, NULL)) != SWITCH_STATUS_SUCCESS) {
1527 							switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to test_reactive with row_size_limited_reactive_sql", result);
1528 						}
1529 					}
1530 				}
1531 
1532 				r = (result == SWITCH_STATUS_SUCCESS);
1533 			}
1534 		}
1535 		break;
1536 	case SCDB_TYPE_ODBC:
1537 		{
1538 			if (switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, test_sql, NULL, NULL) != SWITCH_ODBC_SUCCESS) {
1539 				if (drop_sql) {
1540 					switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, drop_sql, NULL, NULL);
1541 				}
1542 				r = switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, reactive_sql, NULL, NULL) == SWITCH_ODBC_SUCCESS;
1543 			}
1544 		}
1545 		break;
1546 	case SCDB_TYPE_CORE_DB:
1547 		{
1548 			char *errmsg = NULL;
1549 			switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, test_sql, NULL, NULL, &errmsg);
1550 
1551 			if (errmsg) {
1552 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SQL ERR [%s]\n[%s]\nAuto Generating Table!\n", errmsg, test_sql);
1553 				switch_core_db_free(errmsg);
1554 				errmsg = NULL;
1555 				if (drop_sql) {
1556 					switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, drop_sql, NULL, NULL, &errmsg);
1557 				}
1558 				if (errmsg) {
1559 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Ignoring SQL ERR [%s]\n[%s]\n", errmsg, drop_sql);
1560 					switch_core_db_free(errmsg);
1561 					errmsg = NULL;
1562 				}
1563 				switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, reactive_sql, NULL, NULL, &errmsg);
1564 				if (errmsg) {
1565 					r = SWITCH_FALSE;
1566 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SQL ERR [%s]\n[%s]\n", errmsg, reactive_sql);
1567 					switch_core_db_free(errmsg);
1568 					errmsg = NULL;
1569 				} else {
1570 					r = SWITCH_TRUE;
1571 				}
1572 			}
1573 		}
1574 		break;
1575 	}
1576 
1577 
1578 	if (io_mutex) switch_mutex_unlock(io_mutex);
1579 
1580 	return r;
1581 }
1582 
1583 
switch_core_sql_db_thread(switch_thread_t * thread,void * obj)1584 static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *thread, void *obj)
1585 {
1586 	int sec = 0, reg_sec = 0;;
1587 
1588 	sql_manager.db_thread_running = 1;
1589 
1590 	while (sql_manager.db_thread_running == 1) {
1591 		if (++sec == SQL_CACHE_TIMEOUT) {
1592 			sql_close(switch_epoch_time_now(NULL));
1593 			sec = 0;
1594 		}
1595 
1596 		if (switch_test_flag((&runtime), SCF_USE_SQL) && ++reg_sec == SQL_REG_TIMEOUT) {
1597 			switch_core_expire_registration(0);
1598 			reg_sec = 0;
1599 		}
1600 		switch_yield(1000000);
1601 	}
1602 
1603 
1604 	return NULL;
1605 }
1606 
1607 
1608 static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj);
1609 
1610 struct switch_sql_queue_manager {
1611 	const char *name;
1612 	switch_cache_db_handle_t *event_db;
1613 	switch_queue_t **sql_queue;
1614 	uint32_t *pre_written;
1615 	uint32_t *written;
1616 	uint32_t numq;
1617 	char *dsn;
1618 	switch_thread_t *thread;
1619 	int thread_initiated;
1620 	int thread_running;
1621 	switch_thread_cond_t *cond;
1622 	switch_mutex_t *cond_mutex;
1623 	switch_mutex_t *cond2_mutex;
1624 	switch_mutex_t *mutex;
1625 	char *pre_trans_execute;
1626 	char *post_trans_execute;
1627 	char *inner_pre_trans_execute;
1628 	char *inner_post_trans_execute;
1629 	switch_memory_pool_t *pool;
1630 	uint32_t max_trans;
1631 	uint32_t confirm;
1632 	uint8_t paused;
1633 };
1634 
qm_wake(switch_sql_queue_manager_t * qm)1635 static int qm_wake(switch_sql_queue_manager_t *qm)
1636 {
1637 	switch_status_t status;
1638 	int tries = 0;
1639 
1640  top:
1641 
1642 	status = switch_mutex_trylock(qm->cond_mutex);
1643 
1644 	if (status == SWITCH_STATUS_SUCCESS) {
1645 		switch_thread_cond_signal(qm->cond);
1646 		switch_mutex_unlock(qm->cond_mutex);
1647 		return 1;
1648 	} else {
1649 		if (switch_mutex_trylock(qm->cond2_mutex) == SWITCH_STATUS_SUCCESS) {
1650 			switch_mutex_unlock(qm->cond2_mutex);
1651 		} else {
1652 			if (++tries < 10) {
1653 				switch_cond_next();
1654 				goto top;
1655 			}
1656 		}
1657 	}
1658 
1659 	return 0;
1660 }
1661 
qm_ttl(switch_sql_queue_manager_t * qm)1662 static uint32_t qm_ttl(switch_sql_queue_manager_t *qm)
1663 {
1664 	uint32_t ttl = 0;
1665 	uint32_t i;
1666 
1667 	for (i = 0; i < qm->numq; i++) {
1668 		ttl += switch_queue_size(qm->sql_queue[i]);
1669 	}
1670 
1671 	return ttl;
1672 }
1673 
1674 struct db_job {
1675 	switch_sql_queue_manager_t *qm;
1676 	char *sql;
1677 	switch_core_db_callback_func_t callback;
1678 	switch_core_db_err_callback_func_t err_callback;
1679 	switch_core_db_event_callback_func_t event_callback;
1680 	switch_core_db_err_callback_func_t event_err_callback;
1681 	void *pdata;
1682 	int event;
1683 	switch_memory_pool_t *pool;
1684 };
1685 
sql_in_thread(switch_thread_t * thread,void * obj)1686 static void *SWITCH_THREAD_FUNC sql_in_thread (switch_thread_t *thread, void *obj)
1687 {
1688 	struct db_job *job = (struct db_job *) obj;
1689 	switch_memory_pool_t *pool = job->pool;
1690 	char *err = NULL;
1691 	switch_cache_db_handle_t *dbh;
1692 
1693 
1694 	if (switch_cache_db_get_db_handle_dsn(&dbh, job->qm->dsn) != SWITCH_STATUS_SUCCESS) {
1695 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot connect DSN %s\n", job->qm->dsn);
1696 		return NULL;
1697 	}
1698 
1699 	if (job->callback && !job->err_callback) {
1700 		switch_cache_db_execute_sql_callback(dbh, job->sql, job->callback, job->pdata, &err);
1701 	} else if (job->callback && job->err_callback) {
1702 		switch_cache_db_execute_sql_callback_err(dbh, job->sql, job->callback, job->err_callback, job->pdata, &err);
1703 	} else if (job->event_callback && !job->event_err_callback) {
1704 		switch_cache_db_execute_sql_event_callback(dbh, job->sql, job->event_callback, job->pdata, &err);
1705 	} else if (job->event_callback && job->event_err_callback) {
1706 		switch_cache_db_execute_sql_event_callback_err(dbh, job->sql, job->event_callback, job->event_err_callback, job->pdata, &err);
1707 	}
1708 
1709 	if (err) {
1710 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", job->sql, err);
1711 		switch_safe_free(err);
1712 	}
1713 
1714 	switch_cache_db_release_db_handle(&dbh);
1715 
1716 	if (pool) {
1717 		switch_core_destroy_memory_pool(&pool);
1718 	}
1719 
1720 	return NULL;
1721 }
1722 
new_job(switch_sql_queue_manager_t * qm,const char * sql,switch_core_db_callback_func_t callback,switch_core_db_err_callback_func_t err_callback,switch_core_db_event_callback_func_t event_callback,switch_core_db_err_callback_func_t event_err_callback,void * pdata)1723 static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char *sql,
1724 									 switch_core_db_callback_func_t callback,
1725 									 switch_core_db_err_callback_func_t err_callback,
1726 									 switch_core_db_event_callback_func_t event_callback,
1727 									 switch_core_db_err_callback_func_t event_err_callback,
1728 									 void *pdata)
1729 {
1730 	switch_memory_pool_t *pool;
1731 	switch_thread_data_t *td;
1732 	struct db_job *job;
1733 	switch_core_new_memory_pool(&pool);
1734 
1735 	td = switch_core_alloc(pool, sizeof(*td));
1736 	job = switch_core_alloc(pool, sizeof(*job));
1737 
1738 	td->func = sql_in_thread;
1739 	td->obj = job;
1740 
1741 	job->sql = switch_core_strdup(pool, sql);
1742 	job->qm = qm;
1743 
1744 	if (callback) {
1745 		job->callback = callback;
1746 		job->err_callback = err_callback;
1747 	} else if (event_callback) {
1748 		job->event_callback = event_callback;
1749 		job->event_err_callback = event_err_callback;
1750 	}
1751 
1752 	job->pdata = pdata;
1753 	job->pool = pool;
1754 
1755 	return td;
1756 }
1757 
1758 
switch_sql_queue_manager_execute_sql_callback(switch_sql_queue_manager_t * qm,const char * sql,switch_core_db_callback_func_t callback,void * pdata)1759 SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback(switch_sql_queue_manager_t *qm,
1760 																   const char *sql, switch_core_db_callback_func_t callback, void *pdata)
1761 {
1762 
1763 	switch_thread_data_t *td;
1764 	if ((td = new_job(qm, sql, callback, NULL, NULL, NULL, pdata))) {
1765 		switch_thread_pool_launch_thread(&td);
1766 	}
1767 }
1768 
switch_sql_queue_manager_execute_sql_callback_err(switch_sql_queue_manager_t * qm,const char * sql,switch_core_db_callback_func_t callback,switch_core_db_err_callback_func_t err_callback,void * pdata)1769 SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback_err(switch_sql_queue_manager_t *qm, const char *sql,
1770 																	   switch_core_db_callback_func_t callback,
1771 																	   switch_core_db_err_callback_func_t err_callback, void *pdata)
1772 {
1773 
1774 	switch_thread_data_t *td;
1775 	if ((td = new_job(qm, sql, callback, err_callback, NULL, NULL, pdata))) {
1776 		switch_thread_pool_launch_thread(&td);
1777 	}
1778 }
1779 
switch_sql_queue_manager_execute_sql_event_callback(switch_sql_queue_manager_t * qm,const char * sql,switch_core_db_event_callback_func_t callback,void * pdata)1780 SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback(switch_sql_queue_manager_t *qm,
1781 																		 const char *sql, switch_core_db_event_callback_func_t callback, void *pdata)
1782 {
1783 
1784 	switch_thread_data_t *td;
1785 	if ((td = new_job(qm, sql, NULL, NULL, callback, NULL, pdata))) {
1786 		switch_thread_pool_launch_thread(&td);
1787 	}
1788 }
1789 
switch_sql_queue_manager_execute_sql_event_callback_err(switch_sql_queue_manager_t * qm,const char * sql,switch_core_db_event_callback_func_t callback,switch_core_db_err_callback_func_t err_callback,void * pdata)1790 SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback_err(switch_sql_queue_manager_t *qm, const char *sql,
1791 																			 switch_core_db_event_callback_func_t callback,
1792 																			 switch_core_db_err_callback_func_t err_callback,
1793 																			 void *pdata)
1794 {
1795 
1796 	switch_thread_data_t *td;
1797 	if ((td = new_job(qm, sql, NULL, NULL, callback, err_callback, pdata))) {
1798 		switch_thread_pool_launch_thread(&td);
1799 	}
1800 }
1801 
1802 
do_flush(switch_sql_queue_manager_t * qm,int i,switch_cache_db_handle_t * dbh)1803 static void do_flush(switch_sql_queue_manager_t *qm, int i, switch_cache_db_handle_t *dbh)
1804 {
1805 	void *pop = NULL;
1806 	switch_queue_t *q = qm->sql_queue[i];
1807 
1808 	switch_mutex_lock(qm->mutex);
1809 	while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
1810 		if (pop) {
1811 			if (dbh) {
1812 				switch_cache_db_execute_sql(dbh, (char *) pop, NULL);
1813 			}
1814 			switch_safe_free(pop);
1815 		}
1816 	}
1817 	switch_mutex_unlock(qm->mutex);
1818 
1819 }
1820 
1821 
switch_sql_queue_manager_resume(switch_sql_queue_manager_t * qm)1822 SWITCH_DECLARE(void) switch_sql_queue_manager_resume(switch_sql_queue_manager_t *qm)
1823 {
1824 	switch_mutex_lock(qm->mutex);
1825 	qm->paused = 0;
1826 	switch_mutex_unlock(qm->mutex);
1827 
1828 	qm_wake(qm);
1829 
1830 }
1831 
switch_sql_queue_manager_pause(switch_sql_queue_manager_t * qm,switch_bool_t flush)1832 SWITCH_DECLARE(void) switch_sql_queue_manager_pause(switch_sql_queue_manager_t *qm, switch_bool_t flush)
1833 {
1834 	uint32_t i;
1835 
1836 	switch_mutex_lock(qm->mutex);
1837 	qm->paused = 1;
1838 	switch_mutex_unlock(qm->mutex);
1839 
1840 	if (flush) {
1841 		for(i = 0; i < qm->numq; i++) {
1842 			do_flush(qm, i, NULL);
1843 		}
1844 	}
1845 
1846 }
1847 
switch_sql_queue_manager_size(switch_sql_queue_manager_t * qm,uint32_t index)1848 SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index)
1849 {
1850 	int size = 0;
1851 
1852 	switch_mutex_lock(qm->mutex);
1853 	if (index < qm->numq) {
1854 		size = switch_queue_size(qm->sql_queue[index]);
1855 	}
1856 	switch_mutex_unlock(qm->mutex);
1857 
1858 	return size;
1859 }
1860 
switch_sql_queue_manager_stop(switch_sql_queue_manager_t * qm)1861 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm)
1862 {
1863 	switch_status_t status = SWITCH_STATUS_FALSE;
1864 	uint32_t i, sanity = 100;
1865 
1866 	if (qm->thread_running == 1) {
1867 		qm->thread_running = -1;
1868 
1869 		while(--sanity && qm->thread_running == -1) {
1870 			for(i = 0; i < qm->numq; i++) {
1871 				switch_queue_push(qm->sql_queue[i], NULL);
1872 				switch_queue_interrupt_all(qm->sql_queue[i]);
1873 			}
1874 			qm_wake(qm);
1875 
1876 			if (qm->thread_running == -1) {
1877 				switch_yield(100000);
1878 			}
1879 		}
1880 		status = SWITCH_STATUS_SUCCESS;
1881 	}
1882 
1883 	if (qm->thread) {
1884 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Stopping SQL thread.\n", qm->name);
1885 		qm_wake(qm);
1886 		switch_thread_join(&status, qm->thread);
1887 		qm->thread = NULL;
1888 		status = SWITCH_STATUS_SUCCESS;
1889 	}
1890 
1891 	return status;
1892 }
1893 
switch_sql_queue_manager_start(switch_sql_queue_manager_t * qm)1894 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm)
1895 {
1896 	switch_threadattr_t *thd_attr;
1897 
1898 	if (!qm->thread_running) {
1899 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Starting SQL thread.\n", qm->name);
1900 		switch_threadattr_create(&thd_attr, qm->pool);
1901 		switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
1902 		switch_threadattr_priority_set(thd_attr, SWITCH_PRI_NORMAL);
1903 		if (switch_thread_create(&qm->thread, thd_attr, switch_user_sql_thread, qm, qm->pool) == SWITCH_STATUS_SUCCESS) {
1904 			while (!qm->thread_initiated) {
1905 				switch_cond_next();
1906 			}
1907 
1908 			if (qm->event_db) {
1909 				return SWITCH_STATUS_SUCCESS;
1910 			}
1911 		}
1912 	}
1913 
1914 	return SWITCH_STATUS_FALSE;
1915 }
1916 
switch_sql_queue_manager_destroy(switch_sql_queue_manager_t ** qmp)1917 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp)
1918 {
1919 	switch_sql_queue_manager_t *qm;
1920 	switch_status_t status = SWITCH_STATUS_SUCCESS;
1921 	switch_memory_pool_t *pool;
1922 	uint32_t i;
1923 
1924 	switch_assert(qmp);
1925 	qm = *qmp;
1926 	if (!qm) {
1927 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "No SQL queue to destroy.\n");
1928 		return SWITCH_STATUS_NOOP;
1929 	}
1930 
1931 	*qmp = NULL;
1932 
1933 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Destroying SQL queue.\n", qm->name);
1934 
1935 	switch_sql_queue_manager_stop(qm);
1936 
1937 
1938 
1939 	for(i = 0; i < qm->numq; i++) {
1940 		do_flush(qm, i, NULL);
1941 	}
1942 
1943 	pool = qm->pool;
1944 	switch_core_destroy_memory_pool(&pool);
1945 
1946 	return status;
1947 }
1948 
switch_sql_queue_manager_push(switch_sql_queue_manager_t * qm,const char * sql,uint32_t pos,switch_bool_t dup)1949 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
1950 {
1951 	char *sqlptr = NULL;
1952 	switch_status_t status;
1953 	int x = 0;
1954 
1955 	if (sql_manager.paused || qm->thread_running != 1) {
1956 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql);
1957 		if (!dup) free((char *)sql);
1958 		qm_wake(qm);
1959 		return SWITCH_STATUS_SUCCESS;
1960 	}
1961 
1962 	if (qm->thread_running != 1) {
1963 		if (!dup) free((char *)sql);
1964 		return SWITCH_STATUS_FALSE;
1965 	}
1966 
1967 	if (pos > qm->numq - 1) {
1968 		pos = 0;
1969 	}
1970 
1971 	sqlptr = dup ? strdup(sql) : (char *)sql;
1972 
1973 	do {
1974 		switch_mutex_lock(qm->mutex);
1975 		status = switch_queue_trypush(qm->sql_queue[pos], sqlptr);
1976 		switch_mutex_unlock(qm->mutex);
1977 		if (status != SWITCH_STATUS_SUCCESS) {
1978 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "Delay %d sending sql\n", x);
1979 			if (x++) {
1980 				switch_yield(1000000 * x);
1981 			}
1982 		}
1983 	} while(status != SWITCH_STATUS_SUCCESS);
1984 
1985 	qm_wake(qm);
1986 
1987 	return SWITCH_STATUS_SUCCESS;
1988 }
1989 
1990 
switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t * qm,const char * sql,uint32_t pos,switch_bool_t dup)1991 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
1992 {
1993 #define EXEC_NOW
1994 #ifdef EXEC_NOW
1995 	switch_cache_db_handle_t *dbh;
1996 
1997 	if (sql_manager.paused || qm->thread_running != 1) {
1998 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql);
1999 		if (!dup) free((char *)sql);
2000 		qm_wake(qm);
2001 		return SWITCH_STATUS_SUCCESS;
2002 	}
2003 
2004 	if (switch_cache_db_get_db_handle_dsn(&dbh, qm->dsn) == SWITCH_STATUS_SUCCESS) {
2005 		switch_cache_db_execute_sql(dbh, (char *)sql, NULL);
2006 		switch_cache_db_release_db_handle(&dbh);
2007 	}
2008 
2009 	if (!dup) free((char *)sql);
2010 
2011 #else
2012 
2013 	int size, x = 0, sanity = 0;
2014 	uint32_t written, want;
2015 
2016 	if (sql_manager.paused) {
2017 		if (!dup) free((char *)sql);
2018 		qm_wake(qm);
2019 		return SWITCH_STATUS_SUCCESS;
2020 	}
2021 
2022 	if (qm->thread_running != 1) {
2023 		if (!dup) free((char *)sql);
2024 		return SWITCH_STATUS_FALSE;
2025 	}
2026 
2027 	if (pos > qm->numq - 1) {
2028 		pos = 0;
2029 	}
2030 
2031 	switch_mutex_lock(qm->mutex);
2032 	qm->confirm++;
2033 	switch_queue_push(qm->sql_queue[pos], dup ? strdup(sql) : (char *)sql);
2034 	written = qm->pre_written[pos];
2035 	size = switch_sql_queue_manager_size(qm, pos);
2036 	want = written + size;
2037 	switch_mutex_unlock(qm->mutex);
2038 
2039 	qm_wake(qm);
2040 
2041 	while((qm->written[pos] < want) || (qm->written[pos] >= written && want < written && qm->written[pos] > want)) {
2042 		switch_yield(5000);
2043 
2044 		if (++x == 200) {
2045 			qm_wake(qm);
2046 			x = 0;
2047 			if (++sanity == 20) {
2048 				break;
2049 			}
2050 		}
2051 	}
2052 
2053 	switch_mutex_lock(qm->mutex);
2054 	qm->confirm--;
2055 	switch_mutex_unlock(qm->mutex);
2056 #endif
2057 
2058 	return SWITCH_STATUS_SUCCESS;
2059 }
2060 
2061 
2062 
2063 
2064 
switch_sql_queue_manager_init_name(const char * name,switch_sql_queue_manager_t ** qmp,uint32_t numq,const char * dsn,uint32_t max_trans,const char * pre_trans_execute,const char * post_trans_execute,const char * inner_pre_trans_execute,const char * inner_post_trans_execute)2065 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *name,
2066 																   switch_sql_queue_manager_t **qmp,
2067 																   uint32_t numq, const char *dsn, uint32_t max_trans,
2068 																   const char *pre_trans_execute,
2069 																   const char *post_trans_execute,
2070 																   const char *inner_pre_trans_execute,
2071 																   const char *inner_post_trans_execute)
2072 {
2073 	switch_memory_pool_t *pool;
2074 	switch_sql_queue_manager_t *qm;
2075 	uint32_t i;
2076 
2077 	if (!numq) numq = 1;
2078 
2079 	switch_core_new_memory_pool(&pool);
2080 	qm = switch_core_alloc(pool, sizeof(*qm));
2081 
2082 	qm->pool = pool;
2083 	qm->numq = numq;
2084 	qm->dsn = switch_core_strdup(qm->pool, dsn);
2085 	qm->name = switch_core_strdup(qm->pool, name);
2086 	qm->max_trans = max_trans;
2087 
2088 	switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool);
2089 	switch_mutex_init(&qm->cond2_mutex, SWITCH_MUTEX_NESTED, qm->pool);
2090 	switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool);
2091 	switch_thread_cond_create(&qm->cond, qm->pool);
2092 
2093 	qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq);
2094 	qm->written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
2095 	qm->pre_written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
2096 
2097 	for (i = 0; i < qm->numq; i++) {
2098 		switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool);
2099 	}
2100 
2101 	if (pre_trans_execute) {
2102 		qm->pre_trans_execute = switch_core_strdup(qm->pool, pre_trans_execute);
2103 	}
2104 	if (post_trans_execute) {
2105 		qm->post_trans_execute = switch_core_strdup(qm->pool, post_trans_execute);
2106 	}
2107 	if (inner_pre_trans_execute) {
2108 		qm->inner_pre_trans_execute = switch_core_strdup(qm->pool, inner_pre_trans_execute);
2109 	}
2110 	if (inner_post_trans_execute) {
2111 		qm->inner_post_trans_execute = switch_core_strdup(qm->pool, inner_post_trans_execute);
2112 	}
2113 
2114 	*qmp = qm;
2115 
2116 	return SWITCH_STATUS_SUCCESS;
2117 
2118 }
2119 
do_trans(switch_sql_queue_manager_t * qm)2120 static uint32_t do_trans(switch_sql_queue_manager_t *qm)
2121 {
2122 	char *errmsg = NULL;
2123 	void *pop;
2124 	switch_status_t status;
2125 	uint32_t ttl = 0;
2126 	switch_mutex_t *io_mutex = qm->event_db->io_mutex;
2127 	uint32_t i;
2128 
2129 	if (io_mutex) switch_mutex_lock(io_mutex);
2130 
2131 	if (!zstr(qm->pre_trans_execute)) {
2132 		switch_cache_db_execute_sql_real(qm->event_db, qm->pre_trans_execute, &errmsg);
2133 		if (errmsg) {
2134 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", qm->pre_trans_execute, errmsg);
2135 			switch_safe_free(errmsg);
2136 		}
2137 	}
2138 
2139 	switch(qm->event_db->type) {
2140 	case SCDB_TYPE_CORE_DB:
2141 		{
2142 			switch_cache_db_execute_sql_real(qm->event_db, "BEGIN EXCLUSIVE", &errmsg);
2143 		}
2144 		break;
2145 	case SCDB_TYPE_ODBC:
2146 		{
2147 			switch_odbc_status_t result;
2148 
2149 			if ((result = switch_odbc_SQLSetAutoCommitAttr(qm->event_db->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
2150 				char tmp[100];
2151 				switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
2152 				errmsg = strdup(tmp);
2153 			}
2154 		}
2155 		break;
2156 	case SCDB_TYPE_DATABASE_INTERFACE:
2157 		{
2158 			switch_database_interface_t *database_interface = qm->event_db->native_handle.database_interface_dbh->connection_options.database_interface;
2159 			switch_status_t result;
2160 
2161 			if ((result = database_interface->sql_set_auto_commit_attr(qm->event_db->native_handle.database_interface_dbh, 0)) != SWITCH_STATUS_SUCCESS) {
2162 				char tmp[100];
2163 				switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
2164 				errmsg = strdup(tmp);
2165 			}
2166 		}
2167 		break;
2168 	}
2169 
2170 	if (errmsg) {
2171 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "ERROR [%s], [%s]\n", errmsg, qm->event_db->name);
2172 		switch_safe_free(errmsg);
2173 		goto end;
2174 	}
2175 
2176 
2177 	if (!zstr(qm->inner_pre_trans_execute)) {
2178 		switch_cache_db_execute_sql_real(qm->event_db, qm->inner_pre_trans_execute, &errmsg);
2179 		if (errmsg) {
2180 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", qm->inner_pre_trans_execute, errmsg);
2181 			switch_safe_free(errmsg);
2182 		}
2183 	}
2184 
2185 
2186 	while(qm->max_trans == 0 || ttl <= qm->max_trans) {
2187 		pop = NULL;
2188 
2189 		for (i = 0; (qm->max_trans == 0 || ttl <= qm->max_trans) && (i < qm->numq); i++) {
2190 			switch_mutex_lock(qm->mutex);
2191 			switch_queue_trypop(qm->sql_queue[i], &pop);
2192 			switch_mutex_unlock(qm->mutex);
2193 			if (pop) break;
2194 		}
2195 
2196 		if (pop) {
2197 			if ((status = switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) {
2198 				switch_mutex_lock(qm->mutex);
2199 				qm->pre_written[i]++;
2200 				switch_mutex_unlock(qm->mutex);
2201 				ttl++;
2202 			}
2203 			switch_safe_free(pop);
2204 			if (status != SWITCH_STATUS_SUCCESS) break;
2205 		} else {
2206 			break;
2207 		}
2208 	}
2209 
2210 	if (!zstr(qm->inner_post_trans_execute)) {
2211 		switch_cache_db_execute_sql_real(qm->event_db, qm->inner_post_trans_execute, &errmsg);
2212 		if (errmsg) {
2213 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", qm->inner_post_trans_execute, errmsg);
2214 			switch_safe_free(errmsg);
2215 		}
2216 	}
2217 
2218 
2219  end:
2220 
2221 	switch(qm->event_db->type) {
2222 	case SCDB_TYPE_CORE_DB:
2223 		{
2224 			switch_cache_db_execute_sql_real(qm->event_db, "COMMIT", NULL);
2225 		}
2226 		break;
2227 	case SCDB_TYPE_ODBC:
2228 		{
2229 			switch_odbc_SQLEndTran(qm->event_db->native_handle.odbc_dbh, 1);
2230 			switch_odbc_SQLSetAutoCommitAttr(qm->event_db->native_handle.odbc_dbh, 1);
2231 		}
2232 		break;
2233 	case SCDB_TYPE_DATABASE_INTERFACE:
2234 		{
2235 			switch_database_interface_t *database_interface = qm->event_db->native_handle.database_interface_dbh->connection_options.database_interface;
2236 			switch_status_t result;
2237 
2238 			if ((result = database_interface->commit(qm->event_db->native_handle.database_interface_dbh)) != SWITCH_STATUS_SUCCESS) {
2239 				char tmp[100];
2240 				switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to commit transaction", result);
2241 			}
2242 		}
2243 		break;
2244 	}
2245 
2246 
2247 	if (!zstr(qm->post_trans_execute)) {
2248 		switch_cache_db_execute_sql_real(qm->event_db, qm->post_trans_execute, &errmsg);
2249 		if (errmsg) {
2250 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", qm->post_trans_execute, errmsg);
2251 			switch_safe_free(errmsg);
2252 		}
2253 	}
2254 
2255 
2256 	switch_mutex_lock(qm->mutex);
2257 	for (i = 0; i < qm->numq; i++) {
2258 		qm->written[i] = qm->pre_written[i];
2259 	}
2260 	switch_mutex_unlock(qm->mutex);
2261 
2262 
2263 	if (io_mutex) switch_mutex_unlock(io_mutex);
2264 
2265 	return ttl;
2266 }
2267 
switch_user_sql_thread(switch_thread_t * thread,void * obj)2268 static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj)
2269 {
2270 
2271 	uint32_t sanity = 120;
2272 	switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
2273 	uint32_t i;
2274 
2275 	while (sanity && !qm->event_db) {
2276 		if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
2277 			break;
2278 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
2279 		switch_yield(500000);
2280 		sanity--;
2281 	}
2282 
2283 	if (!qm->event_db) {
2284 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
2285 		qm->thread_initiated = 1;
2286 		return NULL;
2287 	}
2288 
2289 	switch_mutex_lock(qm->cond_mutex);
2290 
2291 	switch (qm->event_db->type) {
2292 	case SCDB_TYPE_DATABASE_INTERFACE:
2293 		break;
2294 	case SCDB_TYPE_ODBC:
2295 		break;
2296 	case SCDB_TYPE_CORE_DB:
2297 		{
2298 			switch_cache_db_execute_sql(qm->event_db, "PRAGMA synchronous=OFF;", NULL);
2299 			switch_cache_db_execute_sql(qm->event_db, "PRAGMA count_changes=OFF;", NULL);
2300 			switch_cache_db_execute_sql(qm->event_db, "PRAGMA temp_store=MEMORY;", NULL);
2301 			switch_cache_db_execute_sql(qm->event_db, "PRAGMA journal_mode=OFF;", NULL);
2302 		}
2303 		break;
2304 	}
2305 
2306 	qm->thread_initiated = 1;
2307 	qm->thread_running = 1;
2308 
2309 	while (qm->thread_running == 1) {
2310 		uint32_t i, lc;
2311 		uint32_t written = 0, iterations = 0;
2312 
2313 		if (qm->paused) {
2314 			goto check;
2315 		}
2316 
2317 		if (sql_manager.paused) {
2318 			for (i = 0; i < qm->numq; i++) {
2319 				do_flush(qm, i, NULL);
2320 			}
2321 			goto check;
2322 		}
2323 
2324 		do {
2325 			if (!qm_ttl(qm)) {
2326 				goto check;
2327 			}
2328 			written = do_trans(qm);
2329 			iterations += written;
2330 		} while(written == qm->max_trans);
2331 
2332 		if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
2333 			char line[128] = "";
2334 			switch_size_t l;
2335 
2336 			switch_snprintf(line, sizeof(line), "%s RUN QUEUE [", qm->name);
2337 
2338 			for (i = 0; i < qm->numq; i++) {
2339 				l = strlen(line);
2340 				switch_snprintf(line + l, sizeof(line) - l, "%d%s", switch_queue_size(qm->sql_queue[i]), i == qm->numq - 1 ? "" : "|");
2341 			}
2342 
2343 			l = strlen(line);
2344 			switch_snprintf(line + l, sizeof(line) - l, "]--[%d]\n", iterations);
2345 
2346 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
2347 
2348 		}
2349 
2350 	check:
2351 
2352 		if ((lc = qm_ttl(qm)) == 0) {
2353 			switch_mutex_lock(qm->cond2_mutex);
2354 			switch_thread_cond_wait(qm->cond, qm->cond_mutex);
2355 			switch_mutex_unlock(qm->cond2_mutex);
2356 		}
2357 
2358 		i = 40;
2359 
2360 		while (--i > 0 && (lc = qm_ttl(qm)) < 500) {
2361 			switch_yield(5000);
2362 		}
2363 
2364 
2365 	}
2366 
2367 	switch_mutex_unlock(qm->cond_mutex);
2368 
2369 	for(i = 0; i < qm->numq; i++) {
2370 		do_flush(qm, i, qm->event_db);
2371 	}
2372 
2373 	switch_cache_db_release_db_handle(&qm->event_db);
2374 
2375 	qm->thread_running = 0;
2376 
2377 	return NULL;
2378 }
2379 
2380 
parse_presence_data_cols(switch_event_t * event)2381 static char *parse_presence_data_cols(switch_event_t *event)
2382 {
2383 	char *cols[128] = { 0 };
2384 	int col_count = 0;
2385 	char *data_copy;
2386 	switch_stream_handle_t stream = { 0 };
2387 	int i;
2388 	char *r;
2389 	char col_name[128] = "";
2390 	const char *data = switch_event_get_header(event, "presence-data-cols");
2391 
2392 	if (zstr(data)) {
2393 		return NULL;
2394 	}
2395 
2396 	data_copy = strdup(data);
2397 
2398 	col_count = switch_split(data_copy, ':', cols);
2399 
2400 	SWITCH_STANDARD_STREAM(stream);
2401 
2402 	for (i = 0; i < col_count; i++) {
2403 		const char *val = NULL;
2404 
2405 		switch_snprintfv(col_name, sizeof(col_name), "PD-%q", cols[i]);
2406 		val = switch_event_get_header_nil(event, col_name);
2407 		if (zstr(val)) {
2408 			stream.write_function(&stream, "%q=NULL,", cols[i]);
2409 		} else {
2410 			stream.write_function(&stream, "%q='%q',", cols[i], val);
2411 		}
2412 	}
2413 
2414 	r = (char *) stream.data;
2415 
2416 	if (end_of(r) == ',') {
2417 		end_of(r) = '\0';
2418 	}
2419 
2420 	switch_safe_free(data_copy);
2421 
2422 	return r;
2423 
2424 }
2425 
2426 
2427 #define MAX_SQL 5
2428 #define new_sql()   switch_assert(sql_idx+1 < MAX_SQL); if (exists) sql[sql_idx++]
2429 #define new_sql_a() switch_assert(sql_idx+1 < MAX_SQL); sql[sql_idx++]
2430 
core_event_handler(switch_event_t * event)2431 static void core_event_handler(switch_event_t *event)
2432 {
2433 	char *sql[MAX_SQL] = { 0 };
2434 	int sql_idx = 0;
2435 	char *extra_cols;
2436 	int exists = 1;
2437 	char *uuid = NULL;
2438 
2439 	switch_assert(event);
2440 
2441 	switch (event->event_id) {
2442 	case SWITCH_EVENT_CHANNEL_UUID:
2443 	case SWITCH_EVENT_CHANNEL_CREATE:
2444 	case SWITCH_EVENT_CHANNEL_ANSWER:
2445 	case SWITCH_EVENT_CHANNEL_PROGRESS_MEDIA:
2446 	case SWITCH_EVENT_CHANNEL_HOLD:
2447 	case SWITCH_EVENT_CHANNEL_UNHOLD:
2448 	case SWITCH_EVENT_CHANNEL_EXECUTE:
2449 	case SWITCH_EVENT_CHANNEL_ORIGINATE:
2450 	case SWITCH_EVENT_CALL_UPDATE:
2451 	case SWITCH_EVENT_CHANNEL_CALLSTATE:
2452 	case SWITCH_EVENT_CHANNEL_STATE:
2453 	case SWITCH_EVENT_CHANNEL_BRIDGE:
2454 	case SWITCH_EVENT_CHANNEL_UNBRIDGE:
2455 	case SWITCH_EVENT_CALL_SECURE:
2456 		{
2457 			if ((uuid = switch_event_get_header(event, "unique-id"))) {
2458 				exists = switch_ivr_uuid_exists(uuid);
2459 			}
2460 		}
2461 		break;
2462 	default:
2463 		break;
2464 	}
2465 
2466 	switch (event->event_id) {
2467 	case SWITCH_EVENT_ADD_SCHEDULE:
2468 		{
2469 			const char *id = switch_event_get_header(event, "task-id");
2470 			const char *manager = switch_event_get_header(event, "task-sql_manager");
2471 
2472 			if (id) {
2473 				new_sql() = switch_mprintf("insert into tasks (task_id, task_desc, task_group, task_runtime, task_sql_manager, hostname) "
2474 										   "values(%q,'%q','%q',%q,%q,'%q')",
2475 										   id,
2476 										   switch_event_get_header_nil(event, "task-desc"),
2477 										   switch_event_get_header_nil(event, "task-group"),
2478 										   switch_event_get_header_nil(event, "task-runtime"),
2479 										   manager ? manager : "0",
2480 										   switch_core_get_hostname()
2481 										   );
2482 			}
2483 		}
2484 		break;
2485 	case SWITCH_EVENT_DEL_SCHEDULE:
2486 	case SWITCH_EVENT_EXE_SCHEDULE:
2487 		new_sql() = switch_mprintf("delete from tasks where task_id=%q and hostname='%q'",
2488 								   switch_event_get_header_nil(event, "task-id"), switch_core_get_hostname());
2489 		break;
2490 	case SWITCH_EVENT_RE_SCHEDULE:
2491 		{
2492 			const char *id = switch_event_get_header(event, "task-id");
2493 			const char *manager = switch_event_get_header(event, "task-sql_manager");
2494 
2495 			if (id) {
2496 				new_sql() = switch_mprintf("update tasks set task_desc='%q',task_group='%q', task_runtime=%q, task_sql_manager=%q where task_id=%q and hostname='%q'",
2497 										   switch_event_get_header_nil(event, "task-desc"),
2498 										   switch_event_get_header_nil(event, "task-group"),
2499 										   switch_event_get_header_nil(event, "task-runtime"),
2500 										   manager ? manager : "0",
2501 										   id,
2502 										   switch_core_get_hostname()
2503 										   );
2504 			}
2505 		}
2506 		break;
2507 	case SWITCH_EVENT_CHANNEL_DESTROY:
2508 		{
2509 			const char *uuid = switch_event_get_header(event, "unique-id");
2510 
2511 			if (uuid) {
2512 				new_sql() = switch_mprintf("delete from channels where uuid='%q'",
2513 										   uuid);
2514 
2515 				new_sql() = switch_mprintf("delete from calls where (caller_uuid='%q' or callee_uuid='%q')",
2516 										   uuid, uuid);
2517 
2518 			}
2519 		}
2520 		break;
2521 	case SWITCH_EVENT_CHANNEL_UUID:
2522 		{
2523 			new_sql() = switch_mprintf("update channels set uuid='%q' where uuid='%q'",
2524 									   switch_event_get_header_nil(event, "unique-id"),
2525 									   switch_event_get_header_nil(event, "old-unique-id")
2526 									   );
2527 
2528 			new_sql() = switch_mprintf("update channels set call_uuid='%q' where call_uuid='%q'",
2529 									   switch_event_get_header_nil(event, "unique-id"),
2530 									   switch_event_get_header_nil(event, "old-unique-id")
2531 									   );
2532 			break;
2533 		}
2534 	case SWITCH_EVENT_CHANNEL_CREATE:
2535 		new_sql() = switch_mprintf("insert into channels (uuid,direction,created,created_epoch, name,state,callstate,dialplan,context,hostname,initial_cid_name,initial_cid_num,initial_ip_addr,initial_dest,initial_dialplan,initial_context) "
2536 								   "values('%q','%q','%q','%ld','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q')",
2537 								   switch_event_get_header_nil(event, "unique-id"),
2538 								   switch_event_get_header_nil(event, "call-direction"),
2539 								   switch_event_get_header_nil(event, "event-date-local"),
2540 								   (long) switch_epoch_time_now(NULL),
2541 								   switch_event_get_header_nil(event, "channel-name"),
2542 								   switch_event_get_header_nil(event, "channel-state"),
2543 								   switch_event_get_header_nil(event, "channel-call-state"),
2544 								   switch_event_get_header_nil(event, "caller-dialplan"),
2545 								   switch_event_get_header_nil(event, "caller-context"), switch_core_get_switchname(),
2546 								   switch_event_get_header_nil(event, "caller-caller-id-name"),
2547 								   switch_event_get_header_nil(event, "caller-caller-id-number"),
2548 								   switch_event_get_header_nil(event, "caller-network-addr"),
2549 								   switch_event_get_header_nil(event, "caller-destination-number"),
2550 								   switch_event_get_header_nil(event, "caller-dialplan"),
2551 								   switch_event_get_header_nil(event, "caller-context")
2552 								   );
2553 		break;
2554 	case SWITCH_EVENT_CHANNEL_ANSWER:
2555 	case SWITCH_EVENT_CHANNEL_PROGRESS_MEDIA:
2556 	case SWITCH_EVENT_CODEC:
2557 		new_sql() =
2558 			switch_mprintf
2559 			("update channels set read_codec='%q',read_rate='%q',read_bit_rate='%q',write_codec='%q',write_rate='%q',write_bit_rate='%q' where uuid='%q'",
2560 			 switch_event_get_header_nil(event, "channel-read-codec-name"),
2561 			 switch_event_get_header_nil(event, "channel-read-codec-rate"),
2562 			 switch_event_get_header_nil(event, "channel-read-codec-bit-rate"),
2563 			 switch_event_get_header_nil(event, "channel-write-codec-name"),
2564 			 switch_event_get_header_nil(event, "channel-write-codec-rate"),
2565 			 switch_event_get_header_nil(event, "channel-write-codec-bit-rate"),
2566 			 switch_event_get_header_nil(event, "unique-id"));
2567 		break;
2568 	case SWITCH_EVENT_CHANNEL_HOLD:
2569 	case SWITCH_EVENT_CHANNEL_UNHOLD:
2570 	case SWITCH_EVENT_CHANNEL_EXECUTE: {
2571 
2572 		new_sql() = switch_mprintf("update channels set application='%q',application_data='%q',"
2573 								   "presence_id='%q',presence_data='%q',accountcode='%q' where uuid='%q'",
2574 								   switch_event_get_header_nil(event, "application"),
2575 								   switch_event_get_header_nil(event, "application-data"),
2576 								   switch_event_get_header_nil(event, "channel-presence-id"),
2577 								   switch_event_get_header_nil(event, "channel-presence-data"),
2578 								   switch_event_get_header_nil(event, "variable_accountcode"),
2579 								   switch_event_get_header_nil(event, "unique-id")
2580 								   );
2581 
2582 	}
2583 		break;
2584 
2585 	case SWITCH_EVENT_CHANNEL_ORIGINATE:
2586 		{
2587 			if ((extra_cols = parse_presence_data_cols(event))) {
2588 				new_sql() = switch_mprintf("update channels set "
2589 										   "presence_id='%q',presence_data='%q',accountcode='%q',call_uuid='%q',%s where uuid='%q'",
2590 										   switch_event_get_header_nil(event, "channel-presence-id"),
2591 										   switch_event_get_header_nil(event, "channel-presence-data"),
2592 										   switch_event_get_header_nil(event, "variable_accountcode"),
2593 										   switch_event_get_header_nil(event, "channel-call-uuid"),
2594 										   extra_cols,
2595 										   switch_event_get_header_nil(event, "unique-id"));
2596 				free(extra_cols);
2597 			} else {
2598 				new_sql() = switch_mprintf("update channels set "
2599 										   "presence_id='%q',presence_data='%q',accountcode='%q',call_uuid='%q' where uuid='%q'",
2600 										   switch_event_get_header_nil(event, "channel-presence-id"),
2601 										   switch_event_get_header_nil(event, "channel-presence-data"),
2602 										   switch_event_get_header_nil(event, "variable_accountcode"),
2603 										   switch_event_get_header_nil(event, "channel-call-uuid"),
2604 										   switch_event_get_header_nil(event, "unique-id"));
2605 			}
2606 
2607 		}
2608 
2609 		break;
2610 	case SWITCH_EVENT_CALL_UPDATE:
2611 		{
2612 			new_sql() = switch_mprintf("update channels set callee_name='%q',callee_num='%q',sent_callee_name='%q',sent_callee_num='%q',callee_direction='%q',"
2613 									   "cid_name='%q',cid_num='%q' where uuid='%q'",
2614 									   switch_event_get_header_nil(event, "caller-callee-id-name"),
2615 									   switch_event_get_header_nil(event, "caller-callee-id-number"),
2616 									   switch_event_get_header_nil(event, "sent-callee-id-name"),
2617 									   switch_event_get_header_nil(event, "sent-callee-id-number"),
2618 									   switch_event_get_header_nil(event, "direction"),
2619 									   switch_event_get_header_nil(event, "caller-caller-id-name"),
2620 									   switch_event_get_header_nil(event, "caller-caller-id-number"),
2621 									   switch_event_get_header_nil(event, "unique-id")
2622 									   );
2623 		}
2624 		break;
2625 	case SWITCH_EVENT_CHANNEL_CALLSTATE:
2626 		{
2627 			char *num = switch_event_get_header_nil(event, "channel-call-state-number");
2628 			switch_channel_callstate_t callstate = CCS_DOWN;
2629 
2630 			if (num) {
2631 				callstate = atoi(num);
2632 			}
2633 
2634 			if (callstate != CCS_DOWN && callstate != CCS_HANGUP) {
2635 				if ((extra_cols = parse_presence_data_cols(event))) {
2636 					new_sql() = switch_mprintf("update channels set callstate='%q',%s where uuid='%q'",
2637 											   switch_event_get_header_nil(event, "channel-call-state"),
2638 											   extra_cols,
2639 											   switch_event_get_header_nil(event, "unique-id"));
2640 					free(extra_cols);
2641 				} else {
2642 					new_sql() = switch_mprintf("update channels set callstate='%q' where uuid='%q'",
2643 											   switch_event_get_header_nil(event, "channel-call-state"),
2644 											   switch_event_get_header_nil(event, "unique-id"));
2645 				}
2646 			}
2647 
2648 		}
2649 		break;
2650 	case SWITCH_EVENT_CHANNEL_STATE:
2651 		{
2652 			char *state = switch_event_get_header_nil(event, "channel-state-number");
2653 			switch_channel_state_t state_i = CS_DESTROY;
2654 
2655 			if (!zstr(state)) {
2656 				state_i = atoi(state);
2657 			}
2658 
2659 			switch (state_i) {
2660 			case CS_NEW:
2661 			case CS_DESTROY:
2662 			case CS_REPORTING:
2663 #ifndef SWITCH_DEPRECATED_CORE_DB
2664 			case CS_HANGUP: /* marked for deprication */
2665 #endif
2666 			case CS_INIT:
2667 				break;
2668 #ifdef SWITCH_DEPRECATED_CORE_DB
2669 			case CS_HANGUP: /* marked for deprication */
2670 				new_sql_a() = switch_mprintf("update channels set state='%q' where uuid='%q'",
2671 											 switch_event_get_header_nil(event, "channel-state"),
2672 											 switch_event_get_header_nil(event, "unique-id"));
2673 				break;
2674 #endif
2675 			case CS_EXECUTE:
2676 				if ((extra_cols = parse_presence_data_cols(event))) {
2677 					new_sql() = switch_mprintf("update channels set state='%q',%s where uuid='%q'",
2678 											   switch_event_get_header_nil(event, "channel-state"),
2679 											   extra_cols,
2680 											   switch_event_get_header_nil(event, "unique-id"));
2681 					free(extra_cols);
2682 
2683 				} else {
2684 					new_sql() = switch_mprintf("update channels set state='%q' where uuid='%q'",
2685 											   switch_event_get_header_nil(event, "channel-state"),
2686 											   switch_event_get_header_nil(event, "unique-id"));
2687 				}
2688 				break;
2689 			case CS_ROUTING:
2690 				if ((extra_cols = parse_presence_data_cols(event))) {
2691 					new_sql() = switch_mprintf("update channels set state='%q',cid_name='%q',cid_num='%q',callee_name='%q',callee_num='%q',"
2692 											   "sent_callee_name='%q',sent_callee_num='%q',"
2693 											   "ip_addr='%q',dest='%q',dialplan='%q',context='%q',presence_id='%q',presence_data='%q',accountcode='%q',%s "
2694 											   "where uuid='%q'",
2695 											   switch_event_get_header_nil(event, "channel-state"),
2696 											   switch_event_get_header_nil(event, "caller-caller-id-name"),
2697 											   switch_event_get_header_nil(event, "caller-caller-id-number"),
2698 											   switch_event_get_header_nil(event, "caller-callee-id-name"),
2699 											   switch_event_get_header_nil(event, "caller-callee-id-number"),
2700 											   switch_event_get_header_nil(event, "sent-callee-id-name"),
2701 											   switch_event_get_header_nil(event, "sent-callee-id-number"),
2702 											   switch_event_get_header_nil(event, "caller-network-addr"),
2703 											   switch_event_get_header_nil(event, "caller-destination-number"),
2704 											   switch_event_get_header_nil(event, "caller-dialplan"),
2705 											   switch_event_get_header_nil(event, "caller-context"),
2706 											   switch_event_get_header_nil(event, "channel-presence-id"),
2707 											   switch_event_get_header_nil(event, "channel-presence-data"),
2708 											   switch_event_get_header_nil(event, "variable_accountcode"),
2709 											   extra_cols,
2710 											   switch_event_get_header_nil(event, "unique-id"));
2711 					free(extra_cols);
2712 				} else {
2713 					new_sql() = switch_mprintf("update channels set state='%q',cid_name='%q',cid_num='%q',callee_name='%q',callee_num='%q',"
2714 											   "sent_callee_name='%q',sent_callee_num='%q',"
2715 											   "ip_addr='%q',dest='%q',dialplan='%q',context='%q',presence_id='%q',presence_data='%q',accountcode='%q' "
2716 											   "where uuid='%q'",
2717 											   switch_event_get_header_nil(event, "channel-state"),
2718 											   switch_event_get_header_nil(event, "caller-caller-id-name"),
2719 											   switch_event_get_header_nil(event, "caller-caller-id-number"),
2720 											   switch_event_get_header_nil(event, "caller-callee-id-name"),
2721 											   switch_event_get_header_nil(event, "caller-callee-id-number"),
2722 											   switch_event_get_header_nil(event, "sent-callee-id-name"),
2723 											   switch_event_get_header_nil(event, "sent-callee-id-number"),
2724 											   switch_event_get_header_nil(event, "caller-network-addr"),
2725 											   switch_event_get_header_nil(event, "caller-destination-number"),
2726 											   switch_event_get_header_nil(event, "caller-dialplan"),
2727 											   switch_event_get_header_nil(event, "caller-context"),
2728 											   switch_event_get_header_nil(event, "channel-presence-id"),
2729 											   switch_event_get_header_nil(event, "channel-presence-data"),
2730 											   switch_event_get_header_nil(event, "variable_accountcode"),
2731 											   switch_event_get_header_nil(event, "unique-id"));
2732 				}
2733 				break;
2734 			default:
2735 				new_sql() = switch_mprintf("update channels set state='%q' where uuid='%q'",
2736 										   switch_event_get_header_nil(event, "channel-state"),
2737 										   switch_event_get_header_nil(event, "unique-id"));
2738 				break;
2739 			}
2740 
2741 			break;
2742 
2743 
2744 		}
2745 	case SWITCH_EVENT_CHANNEL_BRIDGE:
2746 		{
2747 			const char *a_uuid, *b_uuid, *uuid;
2748 
2749 			a_uuid = switch_event_get_header(event, "Bridge-A-Unique-ID");
2750 			b_uuid = switch_event_get_header(event, "Bridge-B-Unique-ID");
2751 			uuid = switch_event_get_header(event, "unique-id");
2752 
2753 			if (zstr(a_uuid) || zstr(b_uuid)) {
2754 				a_uuid = switch_event_get_header_nil(event, "caller-unique-id");
2755 				b_uuid = switch_event_get_header_nil(event, "other-leg-unique-id");
2756 			}
2757 
2758 			if (uuid && (extra_cols = parse_presence_data_cols(event))) {
2759 				new_sql() = switch_mprintf("update channels set %s where uuid='%q'", extra_cols, uuid);
2760 				switch_safe_free(extra_cols);
2761 			}
2762 
2763 			new_sql() = switch_mprintf("update channels set call_uuid='%q' where uuid='%q' or uuid='%q'",
2764 									   switch_event_get_header_nil(event, "channel-call-uuid"), a_uuid, b_uuid);
2765 
2766 
2767 			new_sql() = switch_mprintf("insert into calls (call_uuid,call_created,call_created_epoch,"
2768 									   "caller_uuid,callee_uuid,hostname) "
2769 									   "values ('%q','%q','%ld','%q','%q','%q')",
2770 									   switch_event_get_header_nil(event, "channel-call-uuid"),
2771 									   switch_event_get_header_nil(event, "event-date-local"),
2772 									   (long) switch_epoch_time_now(NULL),
2773 									   a_uuid,
2774 									   b_uuid,
2775 									   switch_core_get_switchname()
2776 									   );
2777 		}
2778 		break;
2779 	case SWITCH_EVENT_CHANNEL_UNBRIDGE:
2780 		{
2781 			char *cuuid = switch_event_get_header_nil(event, "caller-unique-id");
2782 			char *uuid = switch_event_get_header(event, "unique-id");
2783 
2784 			if (uuid && (extra_cols = parse_presence_data_cols(event))) {
2785 				new_sql() = switch_mprintf("update channels set %s where uuid='%q'", extra_cols, uuid);
2786 				switch_safe_free(extra_cols);
2787 			}
2788 
2789 			new_sql() = switch_mprintf("update channels set call_uuid=uuid where call_uuid='%q'",
2790 									   switch_event_get_header_nil(event, "channel-call-uuid"));
2791 
2792 			new_sql() = switch_mprintf("delete from calls where (caller_uuid='%q' or callee_uuid='%q')",
2793 									   cuuid, cuuid);
2794 			break;
2795 		}
2796 	case SWITCH_EVENT_SHUTDOWN:
2797 		new_sql() = switch_mprintf("delete from channels where hostname='%q';"
2798 								   "delete from interfaces where hostname='%q';"
2799 								   "delete from calls where hostname='%q'",
2800 								   switch_core_get_switchname(), switch_core_get_hostname(), switch_core_get_switchname()
2801 								   );
2802 		break;
2803 	case SWITCH_EVENT_LOG:
2804 		return;
2805 	case SWITCH_EVENT_MODULE_LOAD:
2806 		{
2807 			const char *type = switch_event_get_header_nil(event, "type");
2808 			const char *name = switch_event_get_header_nil(event, "name");
2809 			const char *description = switch_event_get_header_nil(event, "description");
2810 			const char *syntax = switch_event_get_header_nil(event, "syntax");
2811 			const char *key = switch_event_get_header_nil(event, "key");
2812 			const char *filename = switch_event_get_header_nil(event, "filename");
2813 			if (!zstr(type) && !zstr(name)) {
2814 				new_sql() =
2815 					switch_mprintf
2816 					("insert into interfaces (type,name,description,syntax,ikey,filename,hostname) values('%q','%q','%q','%q','%q','%q','%q')", type, name,
2817 					 switch_str_nil(description), switch_str_nil(syntax), switch_str_nil(key), switch_str_nil(filename),
2818 					 switch_core_get_hostname()
2819 					 );
2820 			}
2821 			break;
2822 		}
2823 	case SWITCH_EVENT_MODULE_UNLOAD:
2824 		{
2825 			const char *type = switch_event_get_header_nil(event, "type");
2826 			const char *name = switch_event_get_header_nil(event, "name");
2827 			if (!zstr(type) && !zstr(name)) {
2828 				new_sql() = switch_mprintf("delete from interfaces where type='%q' and name='%q' and hostname='%q'", type, name,
2829 										   switch_core_get_hostname());
2830 			}
2831 			break;
2832 		}
2833 	case SWITCH_EVENT_CALL_SECURE:
2834 		{
2835 			const char *type = switch_event_get_header_nil(event, "secure_type");
2836 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Secure Type: %s\n", type);
2837 			if (zstr(type)) {
2838 				break;
2839 			}
2840 			new_sql() = switch_mprintf("update channels set secure='%q' where uuid='%q'",
2841 									   type, switch_event_get_header_nil(event, "caller-unique-id")
2842 									   );
2843 			break;
2844 		}
2845 	case SWITCH_EVENT_NAT:
2846 		{
2847 			const char *op = switch_event_get_header_nil(event, "op");
2848 			switch_bool_t sticky = switch_true(switch_event_get_header_nil(event, "sticky"));
2849 			if (!strcmp("add", op)) {
2850 				new_sql() = switch_mprintf("insert into nat (port, proto, sticky, hostname) values (%q, %q, %d,'%q')",
2851 										   switch_event_get_header_nil(event, "port"),
2852 										   switch_event_get_header_nil(event, "proto"), sticky, switch_core_get_hostname()
2853 										   );
2854 			} else if (!strcmp("del", op)) {
2855 				new_sql() = switch_mprintf("delete from nat where port=%q and proto=%q and hostname='%q'",
2856 										   switch_event_get_header_nil(event, "port"),
2857 										   switch_event_get_header_nil(event, "proto"), switch_core_get_hostname());
2858 			} else if (!strcmp("status", op)) {
2859 				/* call show nat api */
2860 			} else if (!strcmp("status_response", op)) {
2861 				/* ignore */
2862 			} else {
2863 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown op for SWITCH_EVENT_NAT: %s\n", op);
2864 			}
2865 			break;
2866 		}
2867 	default:
2868 		break;
2869 	}
2870 
2871 	if (sql_idx) {
2872 		int i = 0;
2873 
2874 
2875 		for (i = 0; i < sql_idx; i++) {
2876 			if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) {
2877 				switch_sql_queue_manager_push(sql_manager.qm, sql[i], 1, SWITCH_FALSE);
2878 			} else {
2879 				switch_sql_queue_manager_push(sql_manager.qm, sql[i], 0, SWITCH_FALSE);
2880 			}
2881 			sql[i] = NULL;
2882 		}
2883 	}
2884 }
2885 
2886 
2887 static char create_complete_sql[] =
2888 	"CREATE TABLE complete (\n"
2889 	"   sticky  INTEGER,\n"
2890 	"   a1  VARCHAR(128),\n"
2891 	"   a2  VARCHAR(128),\n"
2892 	"   a3  VARCHAR(128),\n"
2893 	"   a4  VARCHAR(128),\n"
2894 	"   a5  VARCHAR(128),\n"
2895 	"   a6  VARCHAR(128),\n"
2896 	"   a7  VARCHAR(128),\n"
2897 	"   a8  VARCHAR(128),\n"
2898 	"   a9  VARCHAR(128),\n"
2899 	"   a10 VARCHAR(128),\n"
2900 	"   hostname VARCHAR(256)\n"
2901 	");\n";
2902 
2903 static char create_alias_sql[] =
2904 	"CREATE TABLE aliases (\n"
2905 	"   sticky  INTEGER,\n"
2906 	"   alias  VARCHAR(128),\n"
2907 	"   command  VARCHAR(4096),\n"
2908 	"   hostname VARCHAR(256)\n"
2909 	");\n";
2910 
2911 static char create_channels_sql[] =
2912 	"CREATE TABLE channels (\n"
2913 	"   uuid  VARCHAR(256),\n"
2914 	"   direction  VARCHAR(32),\n"
2915 	"   created  VARCHAR(128),\n"
2916 	"   created_epoch  INTEGER,\n"
2917 	"   name  VARCHAR(1024),\n"
2918 	"   state  VARCHAR(64),\n"
2919 	"   cid_name  VARCHAR(1024),\n"
2920 	"   cid_num  VARCHAR(256),\n"
2921 	"   ip_addr  VARCHAR(256),\n"
2922 	"   dest  VARCHAR(1024),\n"
2923 	"   application  VARCHAR(128),\n"
2924 	"   application_data  VARCHAR(4096),\n"
2925 	"   dialplan VARCHAR(128),\n"
2926 	"   context VARCHAR(128),\n"
2927 	"   read_codec  VARCHAR(128),\n"
2928 	"   read_rate  VARCHAR(32),\n"
2929 	"   read_bit_rate  VARCHAR(32),\n"
2930 	"   write_codec  VARCHAR(128),\n"
2931 	"   write_rate  VARCHAR(32),\n"
2932 	"   write_bit_rate  VARCHAR(32),\n"
2933 	"   secure VARCHAR(64),\n"
2934 	"   hostname VARCHAR(256),\n"
2935 	"   presence_id VARCHAR(4096),\n"
2936 	"   presence_data VARCHAR(4096),\n"
2937 	"   accountcode VARCHAR(256),\n"
2938 	"   callstate  VARCHAR(64),\n"
2939 	"   callee_name  VARCHAR(1024),\n"
2940 	"   callee_num  VARCHAR(256),\n"
2941 	"   callee_direction  VARCHAR(5),\n"
2942 	"   call_uuid  VARCHAR(256),\n"
2943 	"   sent_callee_name  VARCHAR(1024),\n"
2944 	"   sent_callee_num  VARCHAR(256),\n"
2945 	"   initial_cid_name  VARCHAR(1024),\n"
2946 	"   initial_cid_num  VARCHAR(256),\n"
2947 	"   initial_ip_addr  VARCHAR(256),\n"
2948 	"   initial_dest  VARCHAR(1024),\n"
2949 	"   initial_dialplan  VARCHAR(128),\n"
2950 	"   initial_context  VARCHAR(128)\n"
2951 	");\n";
2952 
2953 static char create_row_size_limited_channels_sql[] =
2954 	"CREATE TABLE channels (\n"
2955 	"   uuid  VARCHAR(256),\n"
2956 	"   direction  VARCHAR(32),\n"
2957 	"   created  VARCHAR(128),\n"
2958 	"   created_epoch  INTEGER,\n"
2959 	"   name  VARCHAR(1024),\n"
2960 	"   state  VARCHAR(64),\n"
2961 	"   cid_name  VARCHAR(1024),\n"
2962 	"   cid_num  VARCHAR(256),\n"
2963 	"   ip_addr  VARCHAR(256),\n"
2964 	"   dest  VARCHAR(1024),\n"
2965 	"   application  VARCHAR(128),\n"
2966 	"   application_data TEXT,\n"
2967 	"   dialplan VARCHAR(128),\n"
2968 	"   context VARCHAR(128),\n"
2969 	"   read_codec  VARCHAR(128),\n"
2970 	"   read_rate  VARCHAR(32),\n"
2971 	"   read_bit_rate  VARCHAR(32),\n"
2972 	"   write_codec  VARCHAR(128),\n"
2973 	"   write_rate  VARCHAR(32),\n"
2974 	"   write_bit_rate  VARCHAR(32),\n"
2975 	"   secure VARCHAR(64),\n"
2976 	"   hostname VARCHAR(256),\n"
2977 	"   presence_id VARCHAR(4096),\n"
2978 	"   presence_data TEXT,\n"
2979 	"   accountcode VARCHAR(256),\n"
2980 	"   callstate  VARCHAR(64),\n"
2981 	"   callee_name  VARCHAR(1024),\n"
2982 	"   callee_num  VARCHAR(256),\n"
2983 	"   callee_direction  VARCHAR(5),\n"
2984 	"   call_uuid  VARCHAR(256),\n"
2985 	"   sent_callee_name  VARCHAR(1024),\n"
2986 	"   sent_callee_num  VARCHAR(256),\n"
2987 	"   initial_cid_name  VARCHAR(1024),\n"
2988 	"   initial_cid_num  VARCHAR(256),\n"
2989 	"   initial_ip_addr  VARCHAR(256),\n"
2990 	"   initial_dest  VARCHAR(1024),\n"
2991 	"   initial_dialplan  VARCHAR(128),\n"
2992 	"   initial_context  VARCHAR(128)\n"
2993 ");\n";
2994 
2995 static char create_calls_sql[] =
2996 	"CREATE TABLE calls (\n"
2997 	"   call_uuid  VARCHAR(255),\n"
2998 	"   call_created  VARCHAR(128),\n"
2999 	"   call_created_epoch  INTEGER,\n"
3000 	"   caller_uuid      VARCHAR(256),\n"
3001 	"   callee_uuid      VARCHAR(256),\n"
3002 	"   hostname VARCHAR(256)\n"
3003 	");\n";
3004 
3005 static char create_interfaces_sql[] =
3006 	"CREATE TABLE interfaces (\n"
3007 	"   type             VARCHAR(128),\n"
3008 	"   name             VARCHAR(1024),\n"
3009 	"   description      VARCHAR(4096),\n"
3010 	"   ikey             VARCHAR(1024),\n"
3011 	"   filename         VARCHAR(4096),\n"
3012 	"   syntax           VARCHAR(4096),\n"
3013 	"   hostname VARCHAR(256)\n"
3014 	");\n";
3015 
3016 static char create_tasks_sql[] =
3017 	"CREATE TABLE tasks (\n"
3018 	"   task_id             INTEGER,\n"
3019 	"   task_desc           VARCHAR(4096),\n"
3020 	"   task_group          VARCHAR(1024),\n"
3021 	"   task_runtime        BIGINT,\n"
3022 	"   task_sql_manager    INTEGER,\n"
3023 	"   hostname            VARCHAR(256)\n"
3024 	");\n";
3025 
3026 static char create_nat_sql[] =
3027 	"CREATE TABLE nat (\n"
3028 	"   sticky  INTEGER,\n"
3029 	"	port	INTEGER,\n"
3030 	"	proto	INTEGER,\n"
3031 	"   hostname VARCHAR(256)\n"
3032 	");\n";
3033 
3034 
3035 static char create_registrations_sql[] =
3036 	"CREATE TABLE registrations (\n"
3037 	"   reg_user      VARCHAR(256),\n"
3038 	"   realm     VARCHAR(256),\n"
3039 	"   token     VARCHAR(256),\n"
3040 	/* If url is modified please check for code in switch_core_sqldb_start for dependencies for MSSQL" */
3041 	"   url      TEXT,\n"
3042 	"   expires  INTEGER,\n"
3043 	"   network_ip VARCHAR(256),\n"
3044 	"   network_port VARCHAR(256),\n"
3045 	"   network_proto VARCHAR(256),\n"
3046 	"   hostname VARCHAR(256),\n"
3047 	"   metadata VARCHAR(256)\n"
3048 	");\n";
3049 
3050 
3051 
3052 
3053 static char detailed_calls_sql[] =
3054 	"create view detailed_calls as select "
3055 	"a.uuid as uuid,"
3056 	"a.direction as direction,"
3057 	"a.created as created,"
3058 	"a.created_epoch as created_epoch,"
3059 	"a.name as name,"
3060 	"a.state as state,"
3061 	"a.cid_name as cid_name,"
3062 	"a.cid_num as cid_num,"
3063 	"a.ip_addr as ip_addr,"
3064 	"a.dest as dest,"
3065 	"a.application as application,"
3066 	"a.application_data as application_data,"
3067 	"a.dialplan as dialplan,"
3068 	"a.context as context,"
3069 	"a.read_codec as read_codec,"
3070 	"a.read_rate as read_rate,"
3071 	"a.read_bit_rate as read_bit_rate,"
3072 	"a.write_codec as write_codec,"
3073 	"a.write_rate as write_rate,"
3074 	"a.write_bit_rate as write_bit_rate,"
3075 	"a.secure as secure,"
3076 	"a.hostname as hostname,"
3077 	"a.presence_id as presence_id,"
3078 	"a.presence_data as presence_data,"
3079 	"a.accountcode as accountcode,"
3080 	"a.callstate as callstate,"
3081 	"a.callee_name as callee_name,"
3082 	"a.callee_num as callee_num,"
3083 	"a.callee_direction as callee_direction,"
3084 	"a.call_uuid as call_uuid,"
3085 	"a.sent_callee_name as sent_callee_name,"
3086 	"a.sent_callee_num as sent_callee_num,"
3087 	"b.uuid as b_uuid,"
3088 	"b.direction as b_direction,"
3089 	"b.created as b_created,"
3090 	"b.created_epoch as b_created_epoch,"
3091 	"b.name as b_name,"
3092 	"b.state as b_state,"
3093 	"b.cid_name as b_cid_name,"
3094 	"b.cid_num as b_cid_num,"
3095 	"b.ip_addr as b_ip_addr,"
3096 	"b.dest as b_dest,"
3097 	"b.application as b_application,"
3098 	"b.application_data as b_application_data,"
3099 	"b.dialplan as b_dialplan,"
3100 	"b.context as b_context,"
3101 	"b.read_codec as b_read_codec,"
3102 	"b.read_rate as b_read_rate,"
3103 	"b.read_bit_rate as b_read_bit_rate,"
3104 	"b.write_codec as b_write_codec,"
3105 	"b.write_rate as b_write_rate,"
3106 	"b.write_bit_rate as b_write_bit_rate,"
3107 	"b.secure as b_secure,"
3108 	"b.hostname as b_hostname,"
3109 	"b.presence_id as b_presence_id,"
3110 	"b.presence_data as b_presence_data,"
3111 	"b.accountcode as b_accountcode,"
3112 	"b.callstate as b_callstate,"
3113 	"b.callee_name as b_callee_name,"
3114 	"b.callee_num as b_callee_num,"
3115 	"b.callee_direction as b_callee_direction,"
3116 	"b.call_uuid as b_call_uuid,"
3117 	"b.sent_callee_name as b_sent_callee_name,"
3118 	"b.sent_callee_num as b_sent_callee_num,"
3119 	"c.call_created_epoch as call_created_epoch "
3120 	"from channels a "
3121 	"left join calls c on a.uuid = c.caller_uuid and a.hostname = c.hostname "
3122 	"left join channels b on b.uuid = c.callee_uuid and b.hostname = c.hostname "
3123 	"where a.uuid = c.caller_uuid or a.uuid not in (select callee_uuid from calls)";
3124 
3125 
3126 static char recovery_sql[] =
3127 	"CREATE TABLE recovery (\n"
3128 	"   runtime_uuid    VARCHAR(255),\n"
3129 	"   technology      VARCHAR(255),\n"
3130 	"   profile_name    VARCHAR(255),\n"
3131 	"   hostname        VARCHAR(255),\n"
3132 	"   uuid            VARCHAR(255),\n"
3133 	"   metadata        text\n"
3134 	");\n";
3135 
3136 static char basic_calls_sql[] =
3137 	"create view basic_calls as select "
3138 	"a.uuid as uuid,"
3139 	"a.direction as direction,"
3140 	"a.created as created,"
3141 	"a.created_epoch as created_epoch,"
3142 	"a.name as name,"
3143 	"a.state as state,"
3144 	"a.cid_name as cid_name,"
3145 	"a.cid_num as cid_num,"
3146 	"a.ip_addr as ip_addr,"
3147 	"a.dest as dest,"
3148 
3149 	"a.presence_id as presence_id,"
3150 	"a.presence_data as presence_data,"
3151 	"a.accountcode as accountcode,"
3152 	"a.callstate as callstate,"
3153 	"a.callee_name as callee_name,"
3154 	"a.callee_num as callee_num,"
3155 	"a.callee_direction as callee_direction,"
3156 	"a.call_uuid as call_uuid,"
3157 	"a.hostname as hostname,"
3158 	"a.sent_callee_name as sent_callee_name,"
3159 	"a.sent_callee_num as sent_callee_num,"
3160 
3161 
3162 	"b.uuid as b_uuid,"
3163 	"b.direction as b_direction,"
3164 	"b.created as b_created,"
3165 	"b.created_epoch as b_created_epoch,"
3166 	"b.name as b_name,"
3167 	"b.state as b_state,"
3168 	"b.cid_name as b_cid_name,"
3169 	"b.cid_num as b_cid_num,"
3170 	"b.ip_addr as b_ip_addr,"
3171 	"b.dest as b_dest,"
3172 
3173 	"b.presence_id as b_presence_id,"
3174 	"b.presence_data as b_presence_data,"
3175 	"b.accountcode as b_accountcode,"
3176 	"b.callstate as b_callstate,"
3177 	"b.callee_name as b_callee_name,"
3178 	"b.callee_num as b_callee_num,"
3179 	"b.callee_direction as b_callee_direction,"
3180 	"b.sent_callee_name as b_sent_callee_name,"
3181 	"b.sent_callee_num as b_sent_callee_num,"
3182 	"c.call_created_epoch as call_created_epoch "
3183 
3184 	"from channels a "
3185 	"left join calls c on a.uuid = c.caller_uuid and a.hostname = c.hostname "
3186 	"left join channels b on b.uuid = c.callee_uuid and b.hostname = c.hostname "
3187 	"where a.uuid = c.caller_uuid or a.uuid not in (select callee_uuid from calls)";
3188 
3189 
3190 
switch_core_recovery_flush(const char * technology,const char * profile_name)3191 SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name)
3192 {
3193 	char *sql = NULL;
3194 	switch_cache_db_handle_t *dbh;
3195 
3196 	if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
3197 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
3198 		return;
3199 	}
3200 
3201 	if (zstr(technology)) {
3202 
3203 		if (zstr(profile_name)) {
3204 			sql = switch_mprintf("delete from recovery");
3205 		} else {
3206 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "INVALID\n");
3207 		}
3208 
3209 	} else {
3210 		if (zstr(profile_name)) {
3211 			sql = switch_mprintf("delete from recovery where technology='%q' ", technology);
3212 		} else {
3213 			sql = switch_mprintf("delete from recovery where technology='%q' and profile_name='%q'", technology, profile_name);
3214 		}
3215 	}
3216 
3217 	if (sql) {
3218 		switch_cache_db_execute_sql(dbh, sql, NULL);
3219 		switch_safe_free(sql);
3220 	}
3221 
3222 	switch_cache_db_release_db_handle(&dbh);
3223 }
3224 
3225 
recover_callback(void * pArg,int argc,char ** argv,char ** columnNames)3226 static int recover_callback(void *pArg, int argc, char **argv, char **columnNames)
3227 {
3228 	int *rp = (int *) pArg;
3229 	switch_xml_t xml;
3230 	switch_endpoint_interface_t *ep;
3231 	switch_core_session_t *session;
3232 
3233 	if (argc < 4) {
3234 		return 0;
3235 	}
3236 
3237 	if (!(xml = switch_xml_parse_str_dynamic(argv[4], SWITCH_TRUE))) {
3238 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "XML ERROR\n");
3239 		return 0;
3240 	}
3241 
3242 	if (!(ep = switch_loadable_module_get_endpoint_interface(argv[0]))) {
3243 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "EP ERROR\n");
3244 		return 0;
3245 	}
3246 
3247 	if (!(session = switch_core_session_request_xml(ep, NULL, xml))) {
3248 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid cdr data, call not recovered\n");
3249 		goto end;
3250 	}
3251 
3252 	if (ep->recover_callback) {
3253 		switch_caller_extension_t *extension = NULL;
3254 		switch_channel_t *channel = switch_core_session_get_channel(session);
3255 		int r = 0;
3256 
3257 		if ((r = ep->recover_callback(session)) > 0) {
3258 			const char *cbname;
3259 
3260 			switch_channel_set_flag(session->channel, CF_RECOVERING);
3261 
3262 
3263 			if (switch_channel_get_partner_uuid(channel)) {
3264 				switch_channel_set_flag(channel, CF_RECOVERING_BRIDGE);
3265 			}
3266 
3267 			switch_core_media_recover_session(session);
3268 
3269 			if ((cbname = switch_channel_get_variable(channel, "secondary_recovery_module"))) {
3270 				switch_core_recover_callback_t recover_callback;
3271 
3272 				if ((recover_callback = switch_core_get_secondary_recover_callback(cbname))) {
3273 					r = recover_callback(session);
3274 				}
3275 			}
3276 
3277 
3278 		}
3279 
3280 		if (r > 0) {
3281 
3282 			if (!switch_channel_test_flag(channel, CF_RECOVERING_BRIDGE)) {
3283 				switch_xml_t callflow, param, x_extension;
3284 				if ((extension = switch_caller_extension_new(session, "recovery", "recovery")) == 0) {
3285 					abort();
3286 				}
3287 
3288 				if ((callflow = switch_xml_child(xml, "callflow")) && (x_extension = switch_xml_child(callflow, "extension"))) {
3289 					for (param = switch_xml_child(x_extension, "application"); param; param = param->next) {
3290 						const char *var = switch_xml_attr_soft(param, "app_name");
3291 						const char *val = switch_xml_attr_soft(param, "app_data");
3292 						/* skip announcement type apps */
3293 						if (strcasecmp(var, "speak") && strcasecmp(var, "playback") && strcasecmp(var, "gentones") && strcasecmp(var, "say")) {
3294 							switch_caller_extension_add_application(session, extension, var, val);
3295 						}
3296 					}
3297 				}
3298 
3299 				switch_channel_set_caller_extension(channel, extension);
3300 			}
3301 
3302 			switch_channel_set_state(channel, CS_INIT);
3303 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE,
3304 							  "Resurrecting fallen channel %s\n", switch_channel_get_name(channel));
3305 			switch_core_session_thread_launch(session);
3306 
3307 			*rp = (*rp) + 1;
3308 
3309 		}
3310 
3311 	} else {
3312 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Endpoint %s has no recovery function\n", argv[0]);
3313 	}
3314 
3315 
3316  end:
3317 
3318 	UNPROTECT_INTERFACE(ep);
3319 
3320 	switch_xml_free(xml);
3321 
3322 	return 0;
3323 }
3324 
switch_core_recovery_recover(const char * technology,const char * profile_name)3325 SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const char *profile_name)
3326 
3327 {
3328 	char *sql = NULL;
3329 	char *errmsg = NULL;
3330 	switch_cache_db_handle_t *dbh;
3331 	int r = 0;
3332 
3333 	if (!sql_manager.manage) {
3334 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "DATABASE NOT AVAIALBLE, REVCOVERY NOT POSSIBLE\n");
3335 		return 0;
3336 	}
3337 
3338 	if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
3339 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
3340 		return 0;
3341 	}
3342 
3343 	if (zstr(technology)) {
3344 
3345 		if (zstr(profile_name)) {
3346 			sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
3347 								 "from recovery where runtime_uuid!='%q'",
3348 								 switch_core_get_uuid());
3349 		} else {
3350 			sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
3351 								 "from recovery where runtime_uuid!='%q' and profile_name='%q'",
3352 								 switch_core_get_uuid(), profile_name);
3353 		}
3354 
3355 	} else {
3356 
3357 		if (zstr(profile_name)) {
3358 			sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
3359 								 "from recovery where technology='%q' and runtime_uuid!='%q'",
3360 								 technology, switch_core_get_uuid());
3361 		} else {
3362 			sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
3363 								 "from recovery where technology='%q' and runtime_uuid!='%q' and profile_name='%q'",
3364 								 technology, switch_core_get_uuid(), profile_name);
3365 		}
3366 	}
3367 
3368 
3369 	switch_cache_db_execute_sql_callback(dbh, sql, recover_callback, &r, &errmsg);
3370 
3371 	if (errmsg) {
3372 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
3373 		switch_safe_free(errmsg);
3374 	}
3375 
3376 	switch_safe_free(sql);
3377 
3378 	if (zstr(technology)) {
3379 		if (zstr(profile_name)) {
3380 			sql = switch_mprintf("delete from recovery where runtime_uuid!='%q'",
3381 								 switch_core_get_uuid());
3382 		} else {
3383 			sql = switch_mprintf("delete from recovery where runtime_uuid!='%q' and profile_name='%q'",
3384 								 switch_core_get_uuid(), profile_name);
3385 		}
3386 	} else {
3387 		if (zstr(profile_name)) {
3388 			sql = switch_mprintf("delete from recovery where runtime_uuid!='%q' and technology='%q' ",
3389 								 switch_core_get_uuid(), technology);
3390 		} else {
3391 			sql = switch_mprintf("delete from recovery where runtime_uuid!='%q' and technology='%q' and profile_name='%q'",
3392 								 switch_core_get_uuid(), technology, profile_name);
3393 		}
3394 	}
3395 
3396 	switch_cache_db_execute_sql(dbh, sql, NULL);
3397 	switch_safe_free(sql);
3398 
3399 	switch_cache_db_release_db_handle(&dbh);
3400 
3401 	return r;
3402 
3403 }
3404 
switch_core_dbtype(void)3405 SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_core_dbtype(void)
3406 {
3407 	switch_cache_db_handle_type_t type = SCDB_TYPE_CORE_DB;
3408 
3409 	switch_mutex_lock(sql_manager.ctl_mutex);
3410 	if (sql_manager.qm && sql_manager.qm->event_db) {
3411 		type = sql_manager.qm->event_db->type;
3412 	}
3413 	switch_mutex_unlock(sql_manager.ctl_mutex);
3414 
3415 	return type;
3416 }
3417 
switch_core_sql_exec(const char * sql)3418 SWITCH_DECLARE(void) switch_core_sql_exec(const char *sql)
3419 {
3420 	if (!sql_manager.manage) {
3421 		return;
3422 	}
3423 
3424 	if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
3425 		return;
3426 	}
3427 
3428 
3429 	switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_TRUE);
3430 }
3431 
switch_core_recovery_untrack(switch_core_session_t * session,switch_bool_t force)3432 SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force)
3433 {
3434 	char *sql = NULL;
3435 	switch_channel_t *channel = switch_core_session_get_channel(session);
3436 
3437 	if (!sql_manager.manage) {
3438 		return;
3439 	}
3440 
3441 	if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) {
3442 		return;
3443 	}
3444 
3445 	if (!switch_channel_test_flag(channel, CF_TRACKABLE)) {
3446 		return;
3447 	}
3448 
3449 	if ((switch_channel_test_flag(channel, CF_RECOVERING))) {
3450 		return;
3451 	}
3452 
3453 	if (switch_channel_test_flag(channel, CF_TRACKED) || force) {
3454 
3455 		if (force) {
3456 			sql = switch_mprintf("delete from recovery where uuid='%q'", switch_core_session_get_uuid(session));
3457 
3458 		} else {
3459 			sql = switch_mprintf("delete from recovery where runtime_uuid='%q' and uuid='%q'",
3460 								 switch_core_get_uuid(), switch_core_session_get_uuid(session));
3461 		}
3462 
3463 		switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_FALSE);
3464 
3465 		switch_channel_clear_flag(channel, CF_TRACKED);
3466 	}
3467 
3468 }
3469 
switch_core_recovery_track(switch_core_session_t * session)3470 SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
3471 {
3472 	switch_xml_t cdr = NULL;
3473 	char *xml_cdr_text = NULL;
3474 	char *sql = NULL;
3475 	switch_channel_t *channel = switch_core_session_get_channel(session);
3476 	const char *profile_name;
3477 	const char *technology;
3478 
3479 	if (!sql_manager.manage) {
3480 		return;
3481 	}
3482 
3483 	if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) {
3484 		return;
3485 	}
3486 
3487 	if (switch_channel_test_flag(channel, CF_RECOVERING) || !switch_channel_test_flag(channel, CF_TRACKABLE)) {
3488 		return;
3489 	}
3490 
3491 	profile_name = switch_channel_get_variable_dup(channel, "recovery_profile_name", SWITCH_FALSE, -1);
3492 	technology = session->endpoint_interface->interface_name;
3493 
3494 	if (switch_ivr_generate_xml_cdr(session, &cdr) == SWITCH_STATUS_SUCCESS) {
3495 		xml_cdr_text = switch_xml_toxml_nolock(cdr, SWITCH_FALSE);
3496 		switch_xml_free(cdr);
3497 	}
3498 
3499 	if (xml_cdr_text) {
3500 		if (switch_channel_test_flag(channel, CF_TRACKED)) {
3501 			sql = switch_mprintf("update recovery set metadata='%q' where uuid='%q'",  xml_cdr_text, switch_core_session_get_uuid(session));
3502 		} else {
3503 			sql = switch_mprintf("insert into recovery (runtime_uuid, technology, profile_name, hostname, uuid, metadata) "
3504 								 "values ('%q','%q','%q','%q','%q','%q')",
3505 								 switch_core_get_uuid(), switch_str_nil(technology),
3506 								 switch_str_nil(profile_name), switch_core_get_switchname(), switch_core_session_get_uuid(session), xml_cdr_text);
3507 		}
3508 
3509 		switch_sql_queue_manager_push(sql_manager.qm, sql, 2, SWITCH_FALSE);
3510 
3511 		switch_safe_free(xml_cdr_text);
3512 		switch_channel_set_flag(channel, CF_TRACKED);
3513 
3514 	}
3515 
3516 }
3517 
3518 
3519 
switch_core_add_registration(const char * user,const char * realm,const char * token,const char * url,uint32_t expires,const char * network_ip,const char * network_port,const char * network_proto,const char * metadata)3520 SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, const char *realm, const char *token, const char *url, uint32_t expires,
3521 															 const char *network_ip, const char *network_port, const char *network_proto,
3522 															 const char *metadata)
3523 {
3524 	char *sql;
3525 
3526 	if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
3527 		return SWITCH_STATUS_FALSE;
3528 	}
3529 
3530 	if (runtime.multiple_registrations) {
3531 		sql = switch_mprintf("delete from registrations where hostname='%q' and (url='%q' or token='%q')",
3532 							 switch_core_get_switchname(), url, switch_str_nil(token));
3533 	} else {
3534 		sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q'",
3535 							 user, realm, switch_core_get_switchname());
3536 	}
3537 
3538 	switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
3539 
3540 	if ( !zstr(metadata) ) {
3541 		sql = switch_mprintf("insert into registrations (reg_user,realm,token,url,expires,network_ip,network_port,network_proto,hostname,metadata) "
3542 							 "values ('%q','%q','%q','%q',%ld,'%q','%q','%q','%q','%q')",
3543 							 switch_str_nil(user),
3544 							 switch_str_nil(realm),
3545 							 switch_str_nil(token),
3546 							 switch_str_nil(url),
3547 							 expires,
3548 							 switch_str_nil(network_ip),
3549 							 switch_str_nil(network_port),
3550 							 switch_str_nil(network_proto),
3551 							 switch_core_get_switchname(),
3552 							 metadata
3553 							 );
3554 	} else {
3555 		sql = switch_mprintf("insert into registrations (reg_user,realm,token,url,expires,network_ip,network_port,network_proto,hostname) "
3556 							 "values ('%q','%q','%q','%q',%ld,'%q','%q','%q','%q')",
3557 							 switch_str_nil(user),
3558 							 switch_str_nil(realm),
3559 							 switch_str_nil(token),
3560 							 switch_str_nil(url),
3561 							 expires,
3562 							 switch_str_nil(network_ip),
3563 							 switch_str_nil(network_port),
3564 							 switch_str_nil(network_proto),
3565 							 switch_core_get_switchname()
3566 							 );
3567 	}
3568 
3569 
3570 	switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
3571 
3572 	return SWITCH_STATUS_SUCCESS;
3573 }
3574 
switch_core_del_registration(const char * user,const char * realm,const char * token)3575 SWITCH_DECLARE(switch_status_t) switch_core_del_registration(const char *user, const char *realm, const char *token)
3576 {
3577 
3578 	char *sql;
3579 
3580 	if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
3581 		return SWITCH_STATUS_FALSE;
3582 	}
3583 
3584 	if (!zstr(token) && runtime.multiple_registrations) {
3585 		sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q' and token='%q'", user, realm, switch_core_get_switchname(), token);
3586 	} else {
3587 		sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q'", user, realm, switch_core_get_switchname());
3588 	}
3589 
3590 	switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
3591 
3592 
3593 	return SWITCH_STATUS_SUCCESS;
3594 }
3595 
switch_core_expire_registration(int force)3596 SWITCH_DECLARE(switch_status_t) switch_core_expire_registration(int force)
3597 {
3598 
3599 	char *sql;
3600 	time_t now;
3601 
3602 	if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
3603 		return SWITCH_STATUS_FALSE;
3604 	}
3605 
3606 	now = switch_epoch_time_now(NULL);
3607 
3608 	if (force) {
3609 		sql = switch_mprintf("delete from registrations where hostname='%q'", switch_core_get_switchname());
3610 	} else {
3611 		sql = switch_mprintf("delete from registrations where expires > 0 and expires <= %ld and hostname='%q'", now, switch_core_get_switchname());
3612 	}
3613 
3614 	switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
3615 
3616 	return SWITCH_STATUS_SUCCESS;
3617 
3618 }
3619 
switch_core_sqldb_start(switch_memory_pool_t * pool,switch_bool_t manage)3620 switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage)
3621 {
3622 	switch_threadattr_t *thd_attr;
3623 
3624 	sql_manager.memory_pool = pool;
3625 	sql_manager.manage = manage;
3626 
3627 	switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
3628 	switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
3629 	switch_mutex_init(&sql_manager.ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
3630 
3631 	if (!sql_manager.manage) goto skip;
3632 
3633  top:
3634 
3635 	/* Activate SQL database */
3636 	if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) {
3637 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
3638 
3639 		if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
3640 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC IS REQUIRED!\n");
3641 			return SWITCH_STATUS_FALSE;
3642 		}
3643 
3644 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "CORE DATABASE INITIALIZATION FAILURE! CHECK `core-db-dsn`!\n");
3645 
3646 		switch_clear_flag((&runtime), SCF_USE_SQL);
3647 		return SWITCH_STATUS_FALSE;
3648 	}
3649 
3650 
3651 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Opening DB\n");
3652 
3653 	switch (sql_manager.dbh->type) {
3654 	case SCDB_TYPE_DATABASE_INTERFACE:
3655 	case SCDB_TYPE_ODBC:
3656 		if (switch_test_flag((&runtime), SCF_CLEAR_SQL)) {
3657 			char sql[512] = "";
3658 			char *tables[] = { "channels", "calls", "tasks", NULL };
3659 			int i;
3660 			const char *hostname = switch_core_get_switchname();
3661 
3662 			for (i = 0; tables[i]; i++) {
3663 				switch_snprintfv(sql, sizeof(sql), "delete from %q where hostname='%q'", tables[i], hostname);
3664 				switch_cache_db_execute_sql(sql_manager.dbh, sql, NULL);
3665 			}
3666 		}
3667 		break;
3668 	case SCDB_TYPE_CORE_DB:
3669 		{
3670 			switch_cache_db_execute_sql(sql_manager.dbh, "drop table channels", NULL);
3671 			switch_cache_db_execute_sql(sql_manager.dbh, "drop table calls", NULL);
3672 			switch_cache_db_execute_sql(sql_manager.dbh, "drop view detailed_calls", NULL);
3673 			switch_cache_db_execute_sql(sql_manager.dbh, "drop view basic_calls", NULL);
3674 			switch_cache_db_execute_sql(sql_manager.dbh, "drop table interfaces", NULL);
3675 			switch_cache_db_execute_sql(sql_manager.dbh, "drop table tasks", NULL);
3676 			switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA synchronous=OFF;", NULL);
3677 			switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA count_changes=OFF;", NULL);
3678 			switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA default_cache_size=8000", NULL);
3679 			switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA temp_store=MEMORY;", NULL);
3680 			switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA journal_mode=OFF;", NULL);
3681 		}
3682 		break;
3683 	}
3684 
3685 	switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from aliases", "DROP TABLE aliases", create_alias_sql);
3686 	switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from complete", "DROP TABLE complete", create_complete_sql);
3687 	switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from nat", "DROP TABLE nat", create_nat_sql);
3688 	switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user=''",
3689 								  "DROP TABLE registrations", create_registrations_sql);
3690 
3691 	switch_cache_db_test_reactive(sql_manager.dbh, "select metadata from registrations", NULL, "ALTER TABLE registrations ADD COLUMN metadata VARCHAR(256)");
3692 
3693 
3694 	switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql);
3695 	switch_cache_db_create_schema(sql_manager.dbh, "create index recovery1 on recovery(technology)", NULL);
3696 	switch_cache_db_create_schema(sql_manager.dbh, "create index recovery2 on recovery(profile_name)", NULL);
3697 	switch_cache_db_create_schema(sql_manager.dbh, "create index recovery3 on recovery(uuid)", NULL);
3698 	switch_cache_db_create_schema(sql_manager.dbh, "create index recovery3 on recovery(runtime_uuid)", NULL);
3699 
3700 
3701 
3702 
3703 	switch (sql_manager.dbh->type) {
3704 	case SCDB_TYPE_DATABASE_INTERFACE:
3705 	case SCDB_TYPE_ODBC:
3706 		{
3707 			char *err;
3708 			int result = 0;
3709 
3710 			switch_cache_db_test_reactive_ex(sql_manager.dbh, "select call_uuid, read_bit_rate, sent_callee_name, initial_cid_name, initial_cid_num, initial_ip_addr, initial_dest, initial_dialplan, initial_context, accountcode from channels", "DROP TABLE channels", create_channels_sql, create_row_size_limited_channels_sql);
3711 			switch_cache_db_test_reactive(sql_manager.dbh, "select call_uuid from calls", "DROP TABLE calls", create_calls_sql);
3712 			switch_cache_db_test_reactive(sql_manager.dbh, "select * from basic_calls where sent_callee_name=''", "DROP VIEW basic_calls", basic_calls_sql);
3713 			switch_cache_db_test_reactive(sql_manager.dbh, "select * from detailed_calls where sent_callee_name=''", "DROP VIEW detailed_calls", detailed_calls_sql);
3714 			if (runtime.odbc_dbtype == DBTYPE_DEFAULT) {
3715 				switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user=''",
3716 											  "DROP TABLE registrations", create_registrations_sql);
3717 			} else {
3718 				char *tmp = switch_string_replace(create_registrations_sql, "url      TEXT", "url      VARCHAR(max)");
3719 				switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user=''",
3720 											  "DROP TABLE registrations", tmp);
3721 				free(tmp);
3722 			}
3723 			switch_cache_db_test_reactive(sql_manager.dbh, "select ikey from interfaces", "DROP TABLE interfaces", create_interfaces_sql);
3724 			switch_cache_db_test_reactive(sql_manager.dbh, "select task_id, task_desc, task_group, task_runtime, task_sql_manager, hostname from tasks",
3725 										  "DROP TABLE tasks", create_tasks_sql);
3726 
3727 
3728 			switch(sql_manager.dbh->type) {
3729 			case SCDB_TYPE_CORE_DB:
3730 				{
3731 					switch_cache_db_execute_sql_real(sql_manager.dbh, "BEGIN EXCLUSIVE", &err);
3732 				}
3733 				break;
3734 			case SCDB_TYPE_ODBC:
3735 				{
3736 					switch_odbc_status_t result;
3737 
3738 					if ((result = switch_odbc_SQLSetAutoCommitAttr(sql_manager.dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
3739 						char tmp[100];
3740 						switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
3741 						err = strdup(tmp);
3742 					}
3743 				}
3744 				break;
3745 			case SCDB_TYPE_DATABASE_INTERFACE:
3746 				{
3747 					switch_database_interface_t *database_interface = sql_manager.dbh->native_handle.database_interface_dbh->connection_options.database_interface;
3748 					switch_status_t result;
3749 
3750 					if ((result = database_interface->sql_set_auto_commit_attr(sql_manager.dbh->native_handle.database_interface_dbh, 0)) != SWITCH_STATUS_SUCCESS) {
3751 						char tmp[100];
3752 						switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
3753 						err = strdup(tmp);
3754 					}
3755 				}
3756 				break;
3757 			}
3758 
3759 			switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels where hostname=''", &err);
3760 			if (!err) {
3761 				switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels where hostname=''", &err);
3762 
3763 				switch(sql_manager.dbh->type) {
3764 				case SCDB_TYPE_CORE_DB:
3765 					{
3766 						switch_cache_db_execute_sql_real(sql_manager.dbh, "COMMIT", &err);
3767 					}
3768 					break;
3769 				case SCDB_TYPE_ODBC:
3770 					{
3771 						if (switch_odbc_SQLEndTran(sql_manager.dbh->native_handle.odbc_dbh, 1) != SWITCH_ODBC_SUCCESS ||
3772 							switch_odbc_SQLSetAutoCommitAttr(sql_manager.dbh->native_handle.odbc_dbh, 1) != SWITCH_ODBC_SUCCESS) {
3773 							char tmp[100];
3774 							switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to commit transaction.", result);
3775 							err = strdup(tmp);
3776 						}
3777 					}
3778 					break;
3779 				case SCDB_TYPE_DATABASE_INTERFACE:
3780 					{
3781 						switch_database_interface_t *database_interface = sql_manager.dbh->native_handle.database_interface_dbh->connection_options.database_interface;
3782 						switch_status_t result;
3783 
3784 						if ((result = database_interface->commit(sql_manager.dbh->native_handle.database_interface_dbh)) != SWITCH_STATUS_SUCCESS) {
3785 							char tmp[100];
3786 							switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to commit transaction", result);
3787 							err = strdup(tmp);
3788 						}
3789 					}
3790 					break;
3791 				}
3792 			}
3793 
3794 
3795 			if (err) {
3796 				//runtime.odbc_dsn = NULL;
3797 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Database Error [%s]\n", err);
3798 				//switch_cache_db_release_db_handle(&sql_manager.dbh);
3799 				if (switch_stristr("read-only", err)) {
3800 					switch_safe_free(err);
3801 				} else {
3802 					switch_safe_free(err);
3803 					goto top;
3804 				}
3805 			}
3806 		}
3807 		break;
3808 	case SCDB_TYPE_CORE_DB:
3809 		{
3810 			switch_cache_db_execute_sql(sql_manager.dbh, create_channels_sql, NULL);
3811 			switch_cache_db_execute_sql(sql_manager.dbh, create_calls_sql, NULL);
3812 			switch_cache_db_execute_sql(sql_manager.dbh, create_interfaces_sql, NULL);
3813 			switch_cache_db_execute_sql(sql_manager.dbh, create_tasks_sql, NULL);
3814 			switch_cache_db_execute_sql(sql_manager.dbh, detailed_calls_sql, NULL);
3815 			switch_cache_db_execute_sql(sql_manager.dbh, basic_calls_sql, NULL);
3816 
3817 			if (sql_manager.dbh->native_handle.core_db_dbh->in_memory == SWITCH_TRUE) {
3818 				switch_set_flag(sql_manager.dbh, CDF_NONEXPIRING);
3819 			}
3820 		}
3821 		break;
3822 	}
3823 
3824 	if (switch_test_flag((&runtime), SCF_CLEAR_SQL)) {
3825 		char sql[512] = "";
3826 		char *tables[] = { "complete", "aliases", "nat", NULL };
3827 		int i;
3828 		const char *hostname = switch_core_get_hostname();
3829 
3830 		for (i = 0; tables[i]; i++) {
3831 			switch_snprintfv(sql, sizeof(sql), "delete from %q where sticky=0 and hostname='%q'", tables[i], hostname);
3832 			switch_cache_db_execute_sql(sql_manager.dbh, sql, NULL);
3833 		}
3834 
3835 		switch_snprintfv(sql, sizeof(sql), "delete from interfaces where hostname='%q'", hostname);
3836 		switch_cache_db_execute_sql(sql_manager.dbh, sql, NULL);
3837 	}
3838 
3839 	switch_cache_db_create_schema(sql_manager.dbh, "create index alias1 on aliases (alias)", NULL);
3840 	switch_cache_db_create_schema(sql_manager.dbh, "create index tasks1 on tasks (hostname,task_id)", NULL);
3841 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete1 on complete (a1,hostname)", NULL);
3842 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete2 on complete (a2,hostname)", NULL);
3843 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete3 on complete (a3,hostname)", NULL);
3844 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete4 on complete (a4,hostname)", NULL);
3845 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete5 on complete (a5,hostname)", NULL);
3846 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete6 on complete (a6,hostname)", NULL);
3847 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete7 on complete (a7,hostname)", NULL);
3848 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete8 on complete (a8,hostname)", NULL);
3849 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete9 on complete (a9,hostname)", NULL);
3850 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete10 on complete (a10,hostname)", NULL);
3851 	switch_cache_db_create_schema(sql_manager.dbh, "create index complete11 on complete (a1,a2,a3,a4,a5,a6,a7,a8,a9,a10,hostname)", NULL);
3852 	switch_cache_db_create_schema(sql_manager.dbh, "create index nat_map_port_proto on nat (port,proto,hostname)", NULL);
3853 	switch_cache_db_create_schema(sql_manager.dbh, "create index chidx1 on channels (hostname)", NULL);
3854 	switch_cache_db_create_schema(sql_manager.dbh, "create index uuindex on channels (uuid, hostname)", NULL);
3855 	switch_cache_db_create_schema(sql_manager.dbh, "create index uuindex2 on channels (call_uuid)", NULL);
3856 	switch_cache_db_create_schema(sql_manager.dbh, "create index callsidx1 on calls (hostname)", NULL);
3857 	switch_cache_db_create_schema(sql_manager.dbh, "create index eruuindex on calls (caller_uuid, hostname)", NULL);
3858 	switch_cache_db_create_schema(sql_manager.dbh, "create index eeuuindex on calls (callee_uuid)", NULL);
3859 	switch_cache_db_create_schema(sql_manager.dbh, "create index eeuuindex2 on calls (call_uuid)", NULL);
3860 	switch_cache_db_create_schema(sql_manager.dbh, "create index regindex1 on registrations (reg_user,realm,hostname)", NULL);
3861 
3862 
3863  skip:
3864 
3865 	if (sql_manager.manage) {
3866 		/* Initiate switch_sql_queue_manager */
3867 		switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
3868 		switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
3869 		switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
3870 		switch_core_sqldb_start_thread();
3871 		switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool);
3872 
3873 		/* switch_sql_queue_manager initiated, now we can bind to core_event_handler */
3874 #ifdef SWITCH_SQL_BIND_EVERY_EVENT
3875 		switch_event_bind("core_db", SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3876 #else
3877 		switch_event_bind("core_db", SWITCH_EVENT_ADD_SCHEDULE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3878 		switch_event_bind("core_db", SWITCH_EVENT_DEL_SCHEDULE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3879 		switch_event_bind("core_db", SWITCH_EVENT_EXE_SCHEDULE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3880 		switch_event_bind("core_db", SWITCH_EVENT_RE_SCHEDULE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3881 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_DESTROY, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3882 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_UUID, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3883 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_CREATE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3884 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_ANSWER, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3885 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_PROGRESS_MEDIA, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3886 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_HOLD, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3887 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_UNHOLD, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3888 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_EXECUTE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3889 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_ORIGINATE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3890 		switch_event_bind("core_db", SWITCH_EVENT_CALL_UPDATE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3891 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_CALLSTATE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3892 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_STATE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3893 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_BRIDGE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3894 		switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_UNBRIDGE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3895 		switch_event_bind("core_db", SWITCH_EVENT_SHUTDOWN, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3896 		switch_event_bind("core_db", SWITCH_EVENT_LOG, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3897 		switch_event_bind("core_db", SWITCH_EVENT_MODULE_LOAD, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3898 		switch_event_bind("core_db", SWITCH_EVENT_MODULE_UNLOAD, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3899 		switch_event_bind("core_db", SWITCH_EVENT_CALL_SECURE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3900 		switch_event_bind("core_db", SWITCH_EVENT_NAT, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3901 		switch_event_bind("core_db", SWITCH_EVENT_CODEC, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3902 #endif
3903 	}
3904 
3905 	switch_cache_db_release_db_handle(&sql_manager.dbh);
3906 
3907 	return SWITCH_STATUS_SUCCESS;
3908 }
3909 
switch_core_sqldb_pause(void)3910 SWITCH_DECLARE(void) switch_core_sqldb_pause(void)
3911 {
3912 	if (sql_manager.paused) {
3913 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already paused.\n");
3914 	}
3915 	sql_manager.paused = 1;
3916 }
3917 
switch_core_sqldb_resume(void)3918 SWITCH_DECLARE(void) switch_core_sqldb_resume(void)
3919 {
3920 	if (!sql_manager.paused) {
3921 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already running.\n");
3922 	}
3923 	sql_manager.paused = 0;
3924 }
3925 
3926 
switch_core_sqldb_stop_thread(void)3927 static void switch_core_sqldb_stop_thread(void)
3928 {
3929 	switch_mutex_lock(sql_manager.ctl_mutex);
3930 	if (sql_manager.manage) {
3931 		if (sql_manager.qm) {
3932 			switch_sql_queue_manager_destroy(&sql_manager.qm);
3933 		}
3934 	} else {
3935 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
3936 	}
3937 
3938 	switch_mutex_unlock(sql_manager.ctl_mutex);
3939 }
3940 
switch_core_sqldb_start_thread(void)3941 static void switch_core_sqldb_start_thread(void)
3942 {
3943 
3944 	switch_mutex_lock(sql_manager.ctl_mutex);
3945 	if (sql_manager.manage) {
3946 		if (!sql_manager.qm) {
3947 			char *dbname = runtime.odbc_dsn;
3948 
3949 			if (zstr(dbname)) {
3950 				dbname = runtime.dbname;
3951 				if (zstr(dbname)) {
3952 					dbname = "core";
3953 				}
3954 			}
3955 
3956 			switch_sql_queue_manager_init_name("CORE",
3957 											   &sql_manager.qm,
3958 											   4,
3959 											   dbname,
3960 											   SWITCH_MAX_TRANS,
3961 											   runtime.core_db_pre_trans_execute,
3962 											   runtime.core_db_post_trans_execute,
3963 											   runtime.core_db_inner_pre_trans_execute,
3964 											   runtime.core_db_inner_post_trans_execute);
3965 
3966 		}
3967 		switch_sql_queue_manager_start(sql_manager.qm);
3968 	} else {
3969 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
3970 	}
3971 	switch_mutex_unlock(sql_manager.ctl_mutex);
3972 }
3973 
switch_core_sqldb_stop(void)3974 void switch_core_sqldb_stop(void)
3975 {
3976 	switch_status_t st;
3977 
3978 	switch_event_unbind_callback(core_event_handler);
3979 
3980 	if (sql_manager.db_thread && sql_manager.db_thread_running) {
3981 		sql_manager.db_thread_running = -1;
3982 		switch_thread_join(&st, sql_manager.db_thread);
3983 	}
3984 
3985 	switch_core_sqldb_stop_thread();
3986 
3987 	switch_cache_db_flush_handles();
3988 	sql_close(0);
3989 }
3990 
switch_cache_db_status(switch_stream_handle_t * stream)3991 SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
3992 {
3993 	/* return some status info suitable for the cli */
3994 	switch_cache_db_handle_t *dbh = NULL;
3995 	switch_bool_t locked = SWITCH_FALSE;
3996 	time_t now = switch_epoch_time_now(NULL);
3997 	char cleankey_str[CACHE_DB_LEN];
3998 	char *pos1 = NULL;
3999 	char *pos2 = NULL;
4000 	int count = 0, used = 0;
4001 
4002 	switch_mutex_lock(sql_manager.dbh_mutex);
4003 
4004 	for (dbh = sql_manager.handle_pool; dbh; dbh = dbh->next) {
4005 		char *needles[3];
4006 		time_t diff = 0;
4007 		int i = 0;
4008 
4009 		needles[0] = "pass=\"";
4010 		needles[1] = "password=";
4011 		needles[2] = "password='";
4012 
4013 		diff = now - dbh->last_used;
4014 
4015 		if (switch_mutex_trylock(dbh->mutex) == SWITCH_STATUS_SUCCESS) {
4016 			switch_mutex_unlock(dbh->mutex);
4017 			locked = SWITCH_FALSE;
4018 		} else {
4019 			locked = SWITCH_TRUE;
4020 		}
4021 
4022 		/* sanitize password */
4023 		memset(cleankey_str, 0, sizeof(cleankey_str));
4024 		for (i = 0; i < 3; i++) {
4025 			if((pos1 = strstr(dbh->name, needles[i]))) {
4026 				pos1 += strlen(needles[i]);
4027 
4028 				if (!(pos2 = strstr(pos1, "\""))) {
4029 					if (!(pos2 = strstr(pos1, "'"))) {
4030 						if (!(pos2 = strstr(pos1, " "))) {
4031 							pos2 = pos1 + strlen(pos1);
4032 						}
4033 					}
4034 				}
4035 				strncpy(cleankey_str, dbh->name, pos1 - dbh->name);
4036 				strcpy(&cleankey_str[pos1 - dbh->name], pos2);
4037 				break;
4038 			}
4039 		}
4040 		if (i == 3) {
4041 			snprintf(cleankey_str, sizeof(cleankey_str), "%s", dbh->name);
4042 		}
4043 
4044 		count++;
4045 
4046 		if (dbh->use_count) {
4047 			used++;
4048 		}
4049 
4050 		stream->write_function(stream, "%s\n\tType: %s\n\tLast used: %d\n\tTotal used: %ld\n\tFlags: %s, %s(%d)%s\n"
4051 							   "\tCreator: %s\n\tLast User: %s\n",
4052 							   cleankey_str,
4053 							   switch_cache_db_type_name(dbh->type),
4054 							   diff,
4055 							   dbh->total_used_count,
4056 							   locked ? "Locked" : "Unlocked",
4057 							   dbh->use_count ? "Attached" : "Detached", dbh->use_count, switch_test_flag(dbh, CDF_NONEXPIRING) ? ", Non-expiring" : "", dbh->creator, dbh->last_user);
4058 	}
4059 
4060 	stream->write_function(stream, "%d total. %d in use.\n", count, used);
4061 
4062 	switch_mutex_unlock(sql_manager.dbh_mutex);
4063 }
4064 
switch_sql_concat(void)4065 SWITCH_DECLARE(char*)switch_sql_concat(void)
4066 {
4067 	if(runtime.odbc_dbtype == DBTYPE_MSSQL)
4068 		return "+";
4069 
4070 	return "||";
4071 }
4072 
4073 /* For Emacs:
4074  * Local Variables:
4075  * mode:c
4076  * indent-tabs-mode:t
4077  * tab-width:4
4078  * c-basic-offset:4
4079  * End:
4080  * For VIM:
4081  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
4082  */
4083