1 /*
2 * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3 * Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is
20 * Anthony Minessale II <anthm@freeswitch.org>
21 * Portions created by the Initial Developer are Copyright (C)
22 * the Initial Developer. All Rights Reserved.
23 *
24 * Contributor(s):
25 *
26 * Anthony Minessale II <anthm@freeswitch.org>
27 * Michael Jerris <mike@jerris.com>
28 * Paul D. Tinsley <pdt at jackhammer.org>
29 * Emmanuel Schmidbauer <eschmidbauer@gmail.com>
30 * Andrey Volk <andywolk@gmail.com>
31 *
32 *
33 * switch_core_sqldb.c -- Main Core Library (statistics tracker)
34 *
35 */
36
37 #include <switch.h>
38 #include "private/switch_core_pvt.h"
39
40 #define SWITCH_SQL_QUEUE_LEN 100000
41 #define SWITCH_SQL_QUEUE_PAUSE_LEN 90000
42
43 struct switch_cache_db_handle {
44 char name[CACHE_DB_LEN];
45 switch_cache_db_handle_type_t type;
46 switch_cache_db_native_handle_t native_handle;
47 time_t last_used;
48 switch_mutex_t *mutex;
49 switch_mutex_t *io_mutex;
50 switch_memory_pool_t *pool;
51 int32_t flags;
52 unsigned long hash;
53 unsigned long thread_hash;
54 char creator[CACHE_DB_LEN];
55 char last_user[CACHE_DB_LEN];
56 uint32_t use_count;
57 uint64_t total_used_count;
58 struct switch_cache_db_handle *next;
59 };
60
61 static struct {
62 switch_memory_pool_t *memory_pool;
63 switch_thread_t *db_thread;
64 int db_thread_running;
65 switch_bool_t manage;
66 switch_mutex_t *io_mutex;
67 switch_mutex_t *dbh_mutex;
68 switch_mutex_t *ctl_mutex;
69 switch_cache_db_handle_t *handle_pool;
70 uint32_t total_handles;
71 uint32_t total_used_handles;
72 switch_cache_db_handle_t *dbh;
73 switch_sql_queue_manager_t *qm;
74 int paused;
75 } sql_manager;
76
77
78 static void switch_core_sqldb_start_thread(void);
79 static void switch_core_sqldb_stop_thread(void);
80
81 #define database_interface_handle_callback_exec(database_interface, dih, sql, callback, pdata, err) database_interface->callback_exec_detailed(__FILE__, (char *)__SWITCH_FUNC__, __LINE__, dih, sql, callback, pdata, err)
82 #define database_interface_handle_exec(database_interface, dih, sql, err) database_interface->exec_detailed(__FILE__, (char *)__SWITCH_FUNC__, __LINE__, dih, sql, err)
83
create_handle(switch_cache_db_handle_type_t type)84 static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t type)
85 {
86 switch_cache_db_handle_t *new_dbh = NULL;
87 switch_memory_pool_t *pool = NULL;
88
89 switch_core_new_memory_pool(&pool);
90 new_dbh = switch_core_alloc(pool, sizeof(*new_dbh));
91 new_dbh->pool = pool;
92 new_dbh->type = type;
93 switch_mutex_init(&new_dbh->mutex, SWITCH_MUTEX_NESTED, new_dbh->pool);
94
95 return new_dbh;
96 }
97
destroy_handle(switch_cache_db_handle_t ** dbh)98 static void destroy_handle(switch_cache_db_handle_t **dbh)
99 {
100 if (dbh && *dbh && (*dbh)->pool) {
101 switch_core_destroy_memory_pool(&(*dbh)->pool);
102 *dbh = NULL;
103 }
104 }
105
add_handle(switch_cache_db_handle_t * dbh,const char * db_str,const char * db_callsite_str,const char * thread_str)106 static void add_handle(switch_cache_db_handle_t *dbh, const char *db_str, const char *db_callsite_str, const char *thread_str)
107 {
108 switch_ssize_t hlen = -1;
109
110 switch_mutex_lock(sql_manager.dbh_mutex);
111
112 switch_set_string(dbh->creator, db_callsite_str);
113
114 switch_set_string(dbh->name, db_str);
115 dbh->hash = switch_ci_hashfunc_default(db_str, &hlen);
116 dbh->thread_hash = switch_ci_hashfunc_default(thread_str, &hlen);
117
118 dbh->use_count++;
119 dbh->total_used_count++;
120 sql_manager.total_used_handles++;
121 dbh->next = sql_manager.handle_pool;
122
123 sql_manager.handle_pool = dbh;
124 sql_manager.total_handles++;
125 switch_mutex_lock(dbh->mutex);
126 switch_mutex_unlock(sql_manager.dbh_mutex);
127 }
128
del_handle(switch_cache_db_handle_t * dbh)129 static void del_handle(switch_cache_db_handle_t *dbh)
130 {
131 switch_cache_db_handle_t *dbh_ptr, *last = NULL;
132
133 switch_mutex_lock(sql_manager.dbh_mutex);
134 for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
135 if (dbh_ptr == dbh) {
136 if (last) {
137 last->next = dbh_ptr->next;
138 } else {
139 sql_manager.handle_pool = dbh_ptr->next;
140 }
141 sql_manager.total_handles--;
142 break;
143 }
144
145 last = dbh_ptr;
146 }
147 switch_mutex_unlock(sql_manager.dbh_mutex);
148 }
149
switch_cache_db_database_interface_flush_handles(switch_database_interface_t * database_interface)150 SWITCH_DECLARE(void) switch_cache_db_database_interface_flush_handles(switch_database_interface_t *database_interface)
151 {
152 switch_cache_db_handle_t *dbh_ptr = NULL;
153
154 switch_mutex_lock(sql_manager.dbh_mutex);
155
156 top:
157
158 for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
159 if (switch_mutex_trylock(dbh_ptr->mutex) == SWITCH_STATUS_SUCCESS) {
160 if (dbh_ptr->type != SCDB_TYPE_DATABASE_INTERFACE) {
161 continue;
162 }
163
164 if (dbh_ptr->native_handle.database_interface_dbh->connection_options.database_interface != database_interface) {
165 continue;
166 }
167
168 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Dropping DB connection %s\n", dbh_ptr->name);
169
170 database_interface->handle_destroy(&dbh_ptr->native_handle.database_interface_dbh);
171
172 del_handle(dbh_ptr);
173 switch_mutex_unlock(dbh_ptr->mutex);
174 destroy_handle(&dbh_ptr);
175 goto top;
176 }
177 }
178
179 switch_mutex_unlock(sql_manager.dbh_mutex);
180 }
181
get_handle(const char * db_str,const char * user_str,const char * thread_str)182 static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user_str, const char *thread_str)
183 {
184 switch_ssize_t hlen = -1;
185 unsigned long hash = 0, thread_hash = 0;
186 switch_cache_db_handle_t *dbh_ptr, *r = NULL;
187
188 hash = switch_ci_hashfunc_default(db_str, &hlen);
189 thread_hash = switch_ci_hashfunc_default(thread_str, &hlen);
190
191 switch_mutex_lock(sql_manager.dbh_mutex);
192
193 /* First loop allows a thread to use a handle multiple times sumiltaneously
194 but only if that handle is in use by the same thread. In that case use_count will be incremented.
195 This allows SQLite to read and write within a single thread, giving the same handle for both operations.
196 */
197 for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
198 if (dbh_ptr->thread_hash == thread_hash && dbh_ptr->hash == hash &&
199 !switch_test_flag(dbh_ptr, CDF_PRUNE) && switch_mutex_trylock(dbh_ptr->mutex) == SWITCH_STATUS_SUCCESS) {
200 r = dbh_ptr;
201 break;
202 }
203 }
204
205 if (!r) {
206 /* If a handle idles, take it and associate with the thread.
207 If a handle is in use, skip and create new one.
208 */
209 for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
210 if (dbh_ptr->hash == hash && !dbh_ptr->use_count && !switch_test_flag(dbh_ptr, CDF_PRUNE) &&
211 switch_mutex_trylock(dbh_ptr->mutex) == SWITCH_STATUS_SUCCESS) {
212 r = dbh_ptr;
213 r->thread_hash = thread_hash;
214 break;
215 }
216 }
217 }
218
219 if (r) {
220 r->use_count++;
221 r->total_used_count++;
222 sql_manager.total_used_handles++;
223 switch_set_string(r->last_user, user_str);
224 }
225
226 switch_mutex_unlock(sql_manager.dbh_mutex);
227
228 return r;
229
230 }
231
232 /*!
233 \brief Open the default system database
234 */
_switch_core_db_handle(switch_cache_db_handle_t ** dbh,const char * file,const char * func,int line)235 SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
236 {
237 switch_status_t r;
238 char *dsn;
239
240 if (!sql_manager.manage) {
241 return SWITCH_STATUS_FALSE;
242 }
243
244 if (!zstr(runtime.odbc_dsn)) {
245 dsn = runtime.odbc_dsn;
246 } else if (!zstr(runtime.dbname)) {
247 dsn = runtime.dbname;
248 } else {
249 dsn = "core";
250 }
251
252 if ((r = _switch_cache_db_get_db_handle_dsn_ex(dbh, dsn, SWITCH_TRUE, file, func, line)) != SWITCH_STATUS_SUCCESS) {
253 *dbh = NULL;
254 }
255
256 return r;
257 }
258
259 #define SQL_CACHE_TIMEOUT 30
260 #define SQL_REG_TIMEOUT 15
261
262
sql_close(time_t prune)263 static void sql_close(time_t prune)
264 {
265 switch_cache_db_handle_t *dbh = NULL;
266 int locked = 0;
267 int sanity = 10000;
268
269 switch_mutex_lock(sql_manager.dbh_mutex);
270 top:
271 locked = 0;
272
273 for (dbh = sql_manager.handle_pool; dbh; dbh = dbh->next) {
274 time_t diff = 0;
275
276 if (prune > 0 && prune > dbh->last_used) {
277 diff = (time_t) prune - dbh->last_used;
278 }
279
280 if (prune > 0 && (dbh->use_count || switch_test_flag(dbh, CDF_NONEXPIRING) || (diff < SQL_CACHE_TIMEOUT && !switch_test_flag(dbh, CDF_PRUNE)))) {
281 continue;
282 }
283
284 if (switch_mutex_trylock(dbh->mutex) == SWITCH_STATUS_SUCCESS) {
285 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Dropping idle DB connection %s\n", dbh->name);
286
287 switch (dbh->type) {
288 case SCDB_TYPE_DATABASE_INTERFACE:
289 {
290 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
291 database_interface->handle_destroy(&dbh->native_handle.database_interface_dbh);
292 }
293 break;
294 case SCDB_TYPE_ODBC:
295 {
296 switch_odbc_handle_destroy(&dbh->native_handle.odbc_dbh);
297 }
298 break;
299 case SCDB_TYPE_CORE_DB:
300 {
301 switch_core_db_close(dbh->native_handle.core_db_dbh->handle);
302 dbh->native_handle.core_db_dbh->handle = NULL;
303 }
304 break;
305 }
306
307 del_handle(dbh);
308 switch_mutex_unlock(dbh->mutex);
309 destroy_handle(&dbh);
310 goto top;
311
312 } else {
313 if (!prune) {
314 if (!sanity) {
315 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SANITY CHECK FAILED! Handle %s (%s;%s) was not properly released.\n",
316 dbh->name, dbh->creator, dbh->last_user);
317 } else {
318 locked++;
319 }
320 }
321 continue;
322 }
323
324 }
325
326 if (locked) {
327 if (!prune) {
328 switch_cond_next();
329 if (sanity) sanity--;
330 }
331 goto top;
332 }
333
334 switch_mutex_unlock(sql_manager.dbh_mutex);
335 }
336
337
switch_cache_db_get_type(switch_cache_db_handle_t * dbh)338 SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_cache_db_get_type(switch_cache_db_handle_t *dbh)
339 {
340 return dbh->type;
341 }
342
switch_cache_db_flush_handles(void)343 SWITCH_DECLARE(void) switch_cache_db_flush_handles(void)
344 {
345 sql_close(switch_epoch_time_now(NULL) + SQL_CACHE_TIMEOUT + 1);
346 }
347
348
switch_cache_db_release_db_handle(switch_cache_db_handle_t ** dbh)349 SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t **dbh)
350 {
351 if (dbh && *dbh) {
352
353 switch((*dbh)->type) {
354 case SCDB_TYPE_DATABASE_INTERFACE:
355 {
356 switch_database_interface_t *database_interface = (*dbh)->native_handle.database_interface_dbh->connection_options.database_interface;
357 database_interface->flush((*dbh)->native_handle.database_interface_dbh);
358 }
359 break;
360 default:
361 break;
362 }
363
364 switch_mutex_lock(sql_manager.dbh_mutex);
365 (*dbh)->last_used = switch_epoch_time_now(NULL);
366
367 (*dbh)->io_mutex = NULL;
368
369 if ((*dbh)->use_count) {
370 --(*dbh)->use_count;
371 }
372 switch_mutex_unlock((*dbh)->mutex);
373 sql_manager.total_used_handles--;
374 *dbh = NULL;
375 switch_mutex_unlock(sql_manager.dbh_mutex);
376 }
377 }
378
379
switch_cache_db_dismiss_db_handle(switch_cache_db_handle_t ** dbh)380 SWITCH_DECLARE(void) switch_cache_db_dismiss_db_handle(switch_cache_db_handle_t **dbh)
381 {
382 switch_cache_db_release_db_handle(dbh);
383 }
384
385 #ifndef MIN
386 #define MIN(a,b) (((a) < (b)) ? (a) : (b))
387 #endif
388
switch_database_available(char * dsn)389 SWITCH_DECLARE(switch_status_t) switch_database_available(char* dsn)
390 {
391 switch_status_t status = SWITCH_STATUS_FALSE;
392 switch_database_interface_t *database_interface;
393
394 if (!dsn) {
395 status = SWITCH_STATUS_SUCCESS;
396 }
397 else {
398 char *colon_slashes = NULL;
399 if (NULL != (colon_slashes = strstr(dsn, "://")))
400 {
401 char prefix[16] = "";
402 strncpy(prefix, dsn, MIN(colon_slashes - dsn, 15));
403
404 if (!strncasecmp(prefix, "odbc", 4)) {
405 if (switch_odbc_available()) status = SWITCH_STATUS_SUCCESS;
406 }
407 else if (!strncasecmp(prefix, "sqlite", 6)) {
408 status = SWITCH_STATUS_SUCCESS;
409 }
410 else if ((database_interface = switch_loadable_module_get_database_interface(prefix, NULL))) {
411 status = SWITCH_STATUS_SUCCESS;
412 UNPROTECT_INTERFACE(database_interface);
413 }
414 }
415 else if (strchr(dsn + 2, ':')) {
416 status = SWITCH_STATUS_SUCCESS;
417 }
418 }
419
420 return status;
421 }
422
switch_core_check_core_db_dsn(void)423 SWITCH_DECLARE(switch_status_t) switch_core_check_core_db_dsn(void)
424 {
425 return switch_database_available(runtime.odbc_dsn);
426 }
427
_switch_cache_db_get_db_handle_dsn(switch_cache_db_handle_t ** dbh,const char * dsn,const char * file,const char * func,int line)428 SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle_dsn(switch_cache_db_handle_t **dbh, const char *dsn,
429 const char *file, const char *func, int line)
430 {
431 return _switch_cache_db_get_db_handle_dsn_ex(dbh, dsn, SWITCH_FALSE, file, func, line);
432 }
433
_switch_cache_db_get_db_handle_dsn_ex(switch_cache_db_handle_t ** dbh,const char * dsn,switch_bool_t make_module_no_unloadable,const char * file,const char * func,int line)434 SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle_dsn_ex(switch_cache_db_handle_t **dbh, const char *dsn, switch_bool_t make_module_no_unloadable,
435 const char *file, const char *func, int line)
436 {
437 switch_cache_db_connection_options_t connection_options = { {0} };
438 switch_cache_db_handle_type_t type = SCDB_TYPE_CORE_DB;
439 switch_database_interface_t *database_interface = NULL;
440 char tmp[256] = "";
441 char *p;
442 switch_status_t status = SWITCH_STATUS_FALSE;
443 int i;
444
445 char *colon_slashes = NULL;
446 if ( NULL != (colon_slashes = strstr(dsn, "://")) )
447 {
448 char prefix[16] = "";
449 strncpy(prefix, dsn, MIN(colon_slashes - dsn, 15));
450
451 if ((database_interface = switch_loadable_module_get_database_interface(prefix, NULL))) {
452 type = SCDB_TYPE_DATABASE_INTERFACE;
453 connection_options.database_interface_options.make_module_no_unloadable = make_module_no_unloadable;
454 connection_options.database_interface_options.database_interface = database_interface;
455 connection_options.database_interface_options.original_dsn = dsn;
456 connection_options.database_interface_options.connection_string = colon_slashes + 3;
457 strcpy(connection_options.database_interface_options.prefix, prefix);
458 UNPROTECT_INTERFACE(database_interface);
459 }
460 }
461
462 if (!connection_options.database_interface_options.connection_string)
463 {
464 if (!strncasecmp(dsn, "sqlite://", 9)) {
465 type = SCDB_TYPE_CORE_DB;
466 connection_options.core_db_options.db_path = (char *)(dsn + 9);
467 if (!strncasecmp(connection_options.core_db_options.db_path, "memory://", 9)) {
468 connection_options.core_db_options.in_memory = SWITCH_TRUE;
469 connection_options.core_db_options.db_path = (char *)(connection_options.core_db_options.db_path + 9);
470 }
471 }
472 else if ((!(i = strncasecmp(dsn, "odbc://", 7))) || (strchr(dsn + 2, ':') && !colon_slashes)) {
473 type = SCDB_TYPE_ODBC;
474
475 if (i) {
476 switch_set_string(tmp, dsn);
477 }
478 else {
479 switch_set_string(tmp, dsn + 7);
480 }
481
482 connection_options.odbc_options.dsn = tmp;
483
484 if ((p = strchr(tmp, ':'))) {
485 *p++ = '\0';
486 connection_options.odbc_options.user = p;
487
488 if ((p = strchr(connection_options.odbc_options.user, ':'))) {
489 *p++ = '\0';
490 connection_options.odbc_options.pass = p;
491 }
492 }
493 }
494 else {
495 type = SCDB_TYPE_CORE_DB;
496 connection_options.core_db_options.db_path = (char *)dsn;
497 }
498 }
499
500 status = _switch_cache_db_get_db_handle(dbh, type, &connection_options, file, func, line);
501
502 if (status != SWITCH_STATUS_SUCCESS) *dbh = NULL;
503
504 return status;
505 }
506
507
_switch_cache_db_get_db_handle(switch_cache_db_handle_t ** dbh,switch_cache_db_handle_type_t type,switch_cache_db_connection_options_t * connection_options,const char * file,const char * func,int line)508 SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_handle_t **dbh,
509 switch_cache_db_handle_type_t type,
510 switch_cache_db_connection_options_t *connection_options,
511 const char *file, const char *func, int line)
512 {
513 switch_thread_id_t self = switch_thread_self();
514 char thread_str[CACHE_DB_LEN] = "";
515 char db_str[CACHE_DB_LEN] = "";
516 char db_callsite_str[CACHE_DB_LEN] = "";
517 switch_cache_db_handle_t *new_dbh = NULL;
518 int waiting = 0;
519 uint32_t yield_len = 100000, total_yield = 0;
520
521 const char *db_name = NULL;
522 const char *odbc_user = NULL;
523 const char *odbc_pass = NULL;
524 const char *db_type = NULL;
525
526 while(runtime.max_db_handles && sql_manager.total_handles >= runtime.max_db_handles && sql_manager.total_used_handles >= sql_manager.total_handles) {
527 if (!waiting++) {
528 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_WARNING, "Max handles %u exceeded, blocking....\n",
529 runtime.max_db_handles);
530 }
531
532 switch_yield(yield_len);
533 total_yield += yield_len;
534
535 if (runtime.db_handle_timeout && total_yield > runtime.db_handle_timeout) {
536 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "Error connecting\n");
537 *dbh = NULL;
538 return SWITCH_STATUS_FALSE;
539 }
540 }
541
542 switch (type) {
543 case SCDB_TYPE_DATABASE_INTERFACE:
544 {
545 db_name = connection_options->database_interface_options.connection_string;
546 odbc_user = NULL;
547 odbc_pass = NULL;
548 db_type = "database_interface";
549 }
550 break;
551 case SCDB_TYPE_ODBC:
552 {
553 db_name = connection_options->odbc_options.dsn;
554 odbc_user = connection_options->odbc_options.user;
555 odbc_pass = connection_options->odbc_options.pass;
556 db_type = "odbc";
557 }
558 break;
559 case SCDB_TYPE_CORE_DB:
560 {
561 db_name = connection_options->core_db_options.db_path;
562 odbc_user = NULL;
563 odbc_pass = NULL;
564 db_type = "core_db";
565 }
566 break;
567 }
568
569 if (!db_name) {
570 return SWITCH_STATUS_FALSE;
571 }
572
573 if (odbc_user || odbc_pass) {
574 snprintf(db_str, sizeof(db_str) - 1, "db=\"%s\";type=\"%s\"user=\"%s\";pass=\"%s\"", db_name, db_type, odbc_user, odbc_pass);
575 } else {
576 snprintf(db_str, sizeof(db_str) - 1, "db=\"%s\",type=\"%s\"", db_name, db_type);
577 }
578 snprintf(db_callsite_str, sizeof(db_callsite_str) - 1, "%s:%d", file, line);
579 snprintf(thread_str, sizeof(thread_str) - 1, "thread=\"%lu\"", (unsigned long) (intptr_t) self);
580
581 if ((new_dbh = get_handle(db_str, db_callsite_str, thread_str))) {
582 if (type == SCDB_TYPE_DATABASE_INTERFACE) {
583 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10,
584 "Reuse Unused Cached DB handle %s [Database interface prefix: %s]\n", new_dbh->name, connection_options->database_interface_options.prefix);
585 } else {
586 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10,
587 "Reuse Unused Cached DB handle %s [%s]\n", new_dbh->name, switch_cache_db_type_name(new_dbh->type));
588 }
589 } else {
590 switch_core_db_t *db = NULL;
591 switch_odbc_handle_t *odbc_dbh = NULL;
592 switch_database_interface_handle_t *database_interface_dbh = NULL;
593
594 switch (type) {
595 case SCDB_TYPE_DATABASE_INTERFACE:
596 {
597 switch_database_interface_t *database_interface = connection_options->database_interface_options.database_interface;
598
599 if (SWITCH_STATUS_SUCCESS != database_interface->handle_new(connection_options->database_interface_options, &database_interface_dbh)) {
600 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! Can't create new handle! Can't connect to DSN %s\n", connection_options->database_interface_options.original_dsn);
601 goto end;
602 }
603
604 if (database_interface_dbh) {
605 database_interface_dbh->connection_options = connection_options->database_interface_options;
606
607 if (connection_options->database_interface_options.make_module_no_unloadable == SWITCH_TRUE)
608 {
609 PROTECT_INTERFACE(database_interface)
610 switch_loadable_module_protect(database_interface->parent->module_name);
611 UNPROTECT_INTERFACE(database_interface)
612 }
613 }
614 }
615 break;
616 case SCDB_TYPE_ODBC:
617 {
618 if (!switch_odbc_available()) {
619 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC NOT AVAILABLE! Can't connect to DSN %s\n", connection_options->odbc_options.dsn);
620 goto end;
621 }
622
623 if ((odbc_dbh = switch_odbc_handle_new(connection_options->odbc_options.dsn,
624 connection_options->odbc_options.user, connection_options->odbc_options.pass))) {
625 if (switch_odbc_handle_connect(odbc_dbh) != SWITCH_ODBC_SUCCESS) {
626 switch_odbc_handle_destroy(&odbc_dbh);
627 }
628 }
629 }
630 break;
631 case SCDB_TYPE_CORE_DB:
632 {
633 if (!connection_options->core_db_options.in_memory) {
634 db = switch_core_db_open_file(connection_options->core_db_options.db_path);
635 } else {
636 db = switch_core_db_open_in_memory(connection_options->core_db_options.db_path);
637 }
638 }
639 break;
640
641 default:
642 goto end;
643 }
644
645 if (!db && !odbc_dbh && !database_interface_dbh) {
646 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure to connect to %s %s!\n", switch_cache_db_type_name(type), db_name);
647 goto end;
648 }
649
650 new_dbh = create_handle(type);
651
652 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10,
653 "Create Cached DB handle %s [%s] %s:%d\n", new_dbh->name, switch_cache_db_type_name(type), file, line);
654
655 if (database_interface_dbh) {
656 new_dbh->native_handle.database_interface_dbh = database_interface_dbh;
657 } else if (db) {
658 if (!(new_dbh->native_handle.core_db_dbh = switch_core_alloc(new_dbh->pool, sizeof(*new_dbh->native_handle.core_db_dbh)))) {
659 destroy_handle(&new_dbh);
660 switch_core_db_close(db);
661 goto end;
662 }
663 new_dbh->native_handle.core_db_dbh->handle = db;
664 new_dbh->native_handle.core_db_dbh->in_memory = connection_options->core_db_options.in_memory;
665 } else if (odbc_dbh) {
666 new_dbh->native_handle.odbc_dbh = odbc_dbh;
667 }
668
669 add_handle(new_dbh, db_str, db_callsite_str, thread_str);
670 }
671
672 end:
673
674 if (new_dbh) {
675 new_dbh->last_used = switch_epoch_time_now(NULL);
676 }
677
678 *dbh = new_dbh;
679
680 return *dbh ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE;
681 }
682
683
switch_cache_db_execute_sql_real(switch_cache_db_handle_t * dbh,const char * sql,char ** err)684 static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t *dbh, const char *sql, char **err)
685 {
686 switch_status_t status = SWITCH_STATUS_FALSE;
687 char *errmsg = NULL;
688 char *tmp = NULL;
689 char *type = NULL;
690 switch_mutex_t *io_mutex = dbh->io_mutex;
691
692 if (io_mutex) switch_mutex_lock(io_mutex);
693
694 if (err) {
695 *err = NULL;
696 }
697
698 switch (dbh->type) {
699 case SCDB_TYPE_DATABASE_INTERFACE:
700 {
701 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
702 type = (char *)dbh->native_handle.database_interface_dbh->connection_options.prefix;
703 status = database_interface_handle_exec(database_interface, dbh->native_handle.database_interface_dbh, sql, &errmsg);
704 }
705 break;
706 case SCDB_TYPE_ODBC:
707 {
708 type = "ODBC";
709 status = switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, sql, NULL, &errmsg);
710 }
711 break;
712 case SCDB_TYPE_CORE_DB:
713 {
714 int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, sql, NULL, NULL, &errmsg);
715 type = "NATIVE";
716
717 if (ret == SWITCH_CORE_DB_OK) {
718 status = SWITCH_STATUS_SUCCESS;
719 }
720
721 if (errmsg) {
722 switch_strdup(tmp, errmsg);
723 switch_core_db_free(errmsg);
724 errmsg = tmp;
725 }
726 }
727 break;
728 }
729
730 if (errmsg) {
731 if (!switch_stristr("already exists", errmsg) && !switch_stristr("duplicate key name", errmsg)) {
732 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[%s] %s SQL ERR [%s]\n%s\n", dbh->name, (type ? type : "Unknown"), errmsg, sql);
733 }
734 if (err) {
735 *err = errmsg;
736 } else {
737 switch_safe_free(errmsg);
738 }
739 }
740
741
742 if (io_mutex) switch_mutex_unlock(io_mutex);
743
744 return status;
745 }
746
747 /**
748 OMFG you cruel bastards. Who chooses 64k as a max buffer len for a sql statement, have you ever heard of transactions?
749 **/
switch_cache_db_execute_sql_chunked(switch_cache_db_handle_t * dbh,char * sql,uint32_t chunk_size,char ** err)750 static switch_status_t switch_cache_db_execute_sql_chunked(switch_cache_db_handle_t *dbh, char *sql, uint32_t chunk_size, char **err)
751 {
752 switch_status_t status = SWITCH_STATUS_FALSE;
753 char *p, *s, *e;
754 switch_size_t chunk_count;
755 switch_size_t len;
756
757 switch_assert(chunk_size);
758
759 if (err)
760 *err = NULL;
761
762 len = strlen(sql);
763
764 if (chunk_size > len) {
765 return switch_cache_db_execute_sql_real(dbh, sql, err);
766 }
767
768 if (!(chunk_count = strlen(sql) / chunk_size)) {
769 return SWITCH_STATUS_FALSE;
770 }
771
772 e = end_of_p(sql);
773 s = sql;
774
775 while (s && s < e) {
776 p = s + chunk_size;
777
778 if (p > e) {
779 p = e;
780 }
781
782 while (p > s) {
783 if (*p == '\n' && *(p - 1) == ';') {
784 *p = '\0';
785 *(p - 1) = '\0';
786 p++;
787 break;
788 }
789
790 p--;
791 }
792
793 if (p <= s)
794 break;
795
796
797 status = switch_cache_db_execute_sql_real(dbh, s, err);
798 if (status != SWITCH_STATUS_SUCCESS || (err && *err)) {
799 break;
800 }
801
802 s = p;
803
804 }
805
806 return status;
807
808 }
809
810
switch_cache_db_execute_sql(switch_cache_db_handle_t * dbh,char * sql,char ** err)811 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_handle_t *dbh, char *sql, char **err)
812 {
813 switch_status_t status = SWITCH_STATUS_FALSE;
814 switch_mutex_t *io_mutex = dbh->io_mutex;
815
816 if (io_mutex) switch_mutex_lock(io_mutex);
817
818 switch (dbh->type) {
819 default:
820 {
821 status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, 32768, err);
822 }
823 break;
824 }
825
826 if (io_mutex) switch_mutex_unlock(io_mutex);
827
828 return status;
829
830 }
831
832
switch_cache_db_affected_rows(switch_cache_db_handle_t * dbh)833 SWITCH_DECLARE(int) switch_cache_db_affected_rows(switch_cache_db_handle_t *dbh)
834 {
835 switch (dbh->type) {
836 case SCDB_TYPE_CORE_DB:
837 {
838 return switch_core_db_changes(dbh->native_handle.core_db_dbh->handle);
839 }
840 break;
841 case SCDB_TYPE_ODBC:
842 {
843 return switch_odbc_handle_affected_rows(dbh->native_handle.odbc_dbh);
844 }
845 break;
846 case SCDB_TYPE_DATABASE_INTERFACE:
847 {
848 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
849 int affected_rows = 0;
850 database_interface->affected_rows(dbh->native_handle.database_interface_dbh, &affected_rows);
851 return affected_rows;
852 }
853 break;
854 }
855 return 0;
856 }
857
switch_cache_db_load_extension(switch_cache_db_handle_t * dbh,const char * extension)858 SWITCH_DECLARE(int) switch_cache_db_load_extension(switch_cache_db_handle_t *dbh, const char *extension)
859 {
860 switch (dbh->type) {
861 case SCDB_TYPE_CORE_DB:
862 {
863 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "try to load extension [%s]!\n", extension);
864 return switch_core_db_load_extension(dbh->native_handle.core_db_dbh->handle, extension);
865 }
866 break;
867 case SCDB_TYPE_ODBC:
868 {
869 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "load extension not supported by type ODBC!\n");
870 }
871 break;
872 case SCDB_TYPE_DATABASE_INTERFACE:
873 {
874 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "load extension not supported by type DATABASE_INTERFACE!\n");
875 }
876 break;
877 }
878 return 0;
879 }
880
881
switch_cache_db_execute_sql2str(switch_cache_db_handle_t * dbh,char * sql,char * str,size_t len,char ** err)882 SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t *dbh, char *sql, char *str, size_t len, char **err)
883 {
884 switch_status_t status = SWITCH_STATUS_FALSE;
885 switch_mutex_t *io_mutex = dbh->io_mutex;
886
887 if (io_mutex) switch_mutex_lock(io_mutex);
888
889 memset(str, 0, len);
890
891 switch (dbh->type) {
892 case SCDB_TYPE_CORE_DB:
893 {
894 switch_core_db_stmt_t *stmt;
895
896 if (switch_core_db_prepare(dbh->native_handle.core_db_dbh->handle, sql, -1, &stmt, 0)) {
897 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Statement Error [%s]!\n", sql);
898 goto end;
899 } else {
900 int running = 1;
901 int colcount;
902
903 while (running < 5000) {
904 int result = switch_core_db_step(stmt);
905 const unsigned char *txt;
906
907 if (result == SWITCH_CORE_DB_ROW) {
908 if ((colcount = switch_core_db_column_count(stmt)) > 0) {
909 if ((txt = switch_core_db_column_text(stmt, 0))) {
910 switch_copy_string(str, (char *) txt, len);
911 status = SWITCH_STATUS_SUCCESS;
912 }
913 }
914 break;
915 } else if (result == SWITCH_CORE_DB_BUSY) {
916 running++;
917 switch_cond_next();
918 continue;
919 }
920 break;
921 }
922
923 switch_core_db_finalize(stmt);
924 }
925 }
926 break;
927 case SCDB_TYPE_ODBC:
928 {
929 status = switch_odbc_handle_exec_string(dbh->native_handle.odbc_dbh, sql, str, len, err);
930 }
931 break;
932 case SCDB_TYPE_DATABASE_INTERFACE:
933 {
934 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
935 status = database_interface->exec_string(dbh->native_handle.database_interface_dbh, sql, str, len, err);
936 }
937 break;
938 }
939
940 end:
941
942 if (io_mutex) switch_mutex_unlock(io_mutex);
943
944 return status == SWITCH_STATUS_SUCCESS ? str : NULL;
945
946 }
947
switch_cache_db_persistant_execute(switch_cache_db_handle_t * dbh,const char * sql,uint32_t retries)948 SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_db_handle_t *dbh, const char *sql, uint32_t retries)
949 {
950 char *errmsg = NULL;
951 switch_status_t status = SWITCH_STATUS_FALSE;
952 uint8_t forever = 0;
953 switch_mutex_t *io_mutex = dbh->io_mutex;
954
955 if (!retries) {
956 forever = 1;
957 retries = 1000;
958 }
959
960 while (retries > 0) {
961
962 if (io_mutex) switch_mutex_lock(io_mutex);
963 switch_cache_db_execute_sql_real(dbh, sql, &errmsg);
964 if (io_mutex) switch_mutex_unlock(io_mutex);
965
966
967 if (errmsg) {
968 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", errmsg);
969 switch_safe_free(errmsg);
970 switch_yield(100000);
971 retries--;
972 if (retries == 0 && forever) {
973 retries = 1000;
974 continue;
975 }
976 } else {
977 status = SWITCH_STATUS_SUCCESS;
978 break;
979 }
980 }
981
982 return status;
983 }
984
985
switch_cache_db_persistant_execute_trans_full(switch_cache_db_handle_t * dbh,char * sql,uint32_t retries,const char * pre_trans_execute,const char * post_trans_execute,const char * inner_pre_trans_execute,const char * inner_post_trans_execute)986 SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans_full(switch_cache_db_handle_t *dbh,
987 char *sql, uint32_t retries,
988 const char *pre_trans_execute,
989 const char *post_trans_execute,
990 const char *inner_pre_trans_execute,
991 const char *inner_post_trans_execute)
992 {
993 char *errmsg = NULL;
994 switch_status_t status = SWITCH_STATUS_FALSE;
995 uint8_t forever = 0;
996 unsigned begin_retries = 100;
997 uint8_t again = 0;
998 switch_mutex_t *io_mutex = dbh->io_mutex;
999
1000 if (!retries) {
1001 forever = 1;
1002 retries = 1000;
1003 }
1004
1005 if (io_mutex) switch_mutex_lock(io_mutex);
1006
1007 if (!zstr(pre_trans_execute)) {
1008 switch_cache_db_execute_sql_real(dbh, pre_trans_execute, &errmsg);
1009 if (errmsg) {
1010 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", pre_trans_execute, errmsg);
1011 switch_safe_free(errmsg);
1012 }
1013 }
1014
1015 again:
1016
1017 while (begin_retries > 0) {
1018 again = 0;
1019
1020 switch(dbh->type) {
1021 case SCDB_TYPE_CORE_DB:
1022 {
1023 switch_cache_db_execute_sql_real(dbh, "BEGIN EXCLUSIVE", &errmsg);
1024 }
1025 break;
1026 case SCDB_TYPE_ODBC:
1027 {
1028 switch_odbc_status_t result;
1029
1030 if ((result = switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
1031 char tmp[100];
1032 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
1033 errmsg = strdup(tmp);
1034 }
1035 }
1036 break;
1037 case SCDB_TYPE_DATABASE_INTERFACE:
1038 {
1039 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1040 switch_status_t result;
1041
1042 if ((result = database_interface->sql_set_auto_commit_attr(dbh->native_handle.database_interface_dbh, 0)) != SWITCH_STATUS_SUCCESS) {
1043 char tmp[100];
1044 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
1045 errmsg = strdup(tmp);
1046 }
1047 }
1048 break;
1049 }
1050
1051 if (errmsg) {
1052 begin_retries--;
1053 if (strstr(errmsg, "cannot start a transaction within a transaction")) {
1054 again = 1;
1055 } else {
1056 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SQL Retry [%s]\n", errmsg);
1057 }
1058 switch_safe_free(errmsg);
1059
1060 if (again) {
1061 switch(dbh->type) {
1062 case SCDB_TYPE_CORE_DB:
1063 {
1064 switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
1065 }
1066 break;
1067 case SCDB_TYPE_ODBC:
1068 {
1069 switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
1070 switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
1071 }
1072 break;
1073 case SCDB_TYPE_DATABASE_INTERFACE:
1074 {
1075 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1076 switch_status_t result;
1077
1078 if ((result = database_interface->commit(dbh->native_handle.database_interface_dbh)) != SWITCH_STATUS_SUCCESS) {
1079 char tmp[100];
1080 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to commit transaction", result);
1081 }
1082 }
1083 break;
1084 }
1085
1086 goto again;
1087 }
1088
1089 switch_yield(100000);
1090
1091 if (begin_retries == 0) {
1092 goto done;
1093 }
1094
1095 continue;
1096 }
1097
1098 break;
1099 }
1100
1101
1102 if (!zstr(inner_pre_trans_execute)) {
1103 switch_cache_db_execute_sql_real(dbh, inner_pre_trans_execute, &errmsg);
1104 if (errmsg) {
1105 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", inner_pre_trans_execute, errmsg);
1106 switch_safe_free(errmsg);
1107 }
1108 }
1109
1110 while (retries > 0) {
1111
1112 switch_cache_db_execute_sql(dbh, sql, &errmsg);
1113
1114 if (errmsg) {
1115 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", errmsg);
1116 switch_safe_free(errmsg);
1117 errmsg = NULL;
1118 switch_yield(100000);
1119 retries--;
1120 if (retries == 0 && forever) {
1121 retries = 1000;
1122 continue;
1123 }
1124 } else {
1125 status = SWITCH_STATUS_SUCCESS;
1126 break;
1127 }
1128 }
1129
1130 if (!zstr(inner_post_trans_execute)) {
1131 switch_cache_db_execute_sql_real(dbh, inner_post_trans_execute, &errmsg);
1132 if (errmsg) {
1133 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", inner_post_trans_execute, errmsg);
1134 switch_safe_free(errmsg);
1135 }
1136 }
1137
1138 done:
1139
1140 switch(dbh->type) {
1141 case SCDB_TYPE_CORE_DB:
1142 {
1143 switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
1144 }
1145 break;
1146 case SCDB_TYPE_ODBC:
1147 {
1148 switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
1149 switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
1150 }
1151 break;
1152 case SCDB_TYPE_DATABASE_INTERFACE:
1153 {
1154 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1155 switch_status_t result;
1156
1157 if ((result = database_interface->commit(dbh->native_handle.database_interface_dbh)) != SWITCH_STATUS_SUCCESS) {
1158 char tmp[100];
1159 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to commit transaction", result);
1160 }
1161 }
1162 break;
1163 }
1164
1165 if (!zstr(post_trans_execute)) {
1166 switch_cache_db_execute_sql_real(dbh, post_trans_execute, &errmsg);
1167 if (errmsg) {
1168 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", post_trans_execute, errmsg);
1169 switch_safe_free(errmsg);
1170 }
1171 }
1172
1173 if (io_mutex) switch_mutex_unlock(io_mutex);
1174
1175 return status;
1176 }
1177
1178 struct helper {
1179 switch_core_db_event_callback_func_t callback;
1180 void *pdata;
1181 };
1182
helper_callback(void * pArg,int argc,char ** argv,char ** columnNames)1183 static int helper_callback(void *pArg, int argc, char **argv, char **columnNames)
1184 {
1185 struct helper *h = (struct helper *) pArg;
1186 int r = 0;
1187 switch_event_t *event;
1188
1189 switch_event_create_array_pair(&event, columnNames, argv, argc);
1190
1191 r = h->callback(h->pdata, event);
1192
1193 switch_event_destroy(&event);
1194
1195 return r;
1196 }
1197
switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t * dbh,const char * sql,switch_core_db_event_callback_func_t callback,void * pdata,char ** err)1198 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh,
1199 const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err)
1200 {
1201 switch_status_t status = SWITCH_STATUS_FALSE;
1202 char *errmsg = NULL;
1203 switch_mutex_t *io_mutex = dbh->io_mutex;
1204 struct helper h = {0};
1205
1206
1207 if (err) {
1208 *err = NULL;
1209 }
1210
1211 if (io_mutex) switch_mutex_lock(io_mutex);
1212
1213 h.callback = callback;
1214 h.pdata = pdata;
1215
1216 switch (dbh->type) {
1217 case SCDB_TYPE_DATABASE_INTERFACE:
1218 {
1219 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1220
1221 if ((status = database_interface_handle_callback_exec(database_interface, dbh->native_handle.database_interface_dbh, sql, helper_callback, &h, err)) != SWITCH_STATUS_SUCCESS) {
1222 char tmp[100];
1223 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to execute_sql_event_callback", status);
1224 }
1225 }
1226 break;
1227 case SCDB_TYPE_ODBC:
1228 {
1229 status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, helper_callback, &h, err);
1230 }
1231 break;
1232 case SCDB_TYPE_CORE_DB:
1233 {
1234 int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, sql, helper_callback, &h, &errmsg);
1235
1236 if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) {
1237 status = SWITCH_STATUS_SUCCESS;
1238 }
1239
1240 if (errmsg) {
1241 dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2);
1242 if (!strstr(errmsg, "query abort")) {
1243 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
1244 }
1245 switch_core_db_free(errmsg);
1246 }
1247 }
1248 break;
1249 }
1250
1251 if (io_mutex) switch_mutex_unlock(io_mutex);
1252
1253 return status;
1254 }
1255
switch_cache_db_execute_sql_event_callback_err(switch_cache_db_handle_t * dbh,const char * sql,switch_core_db_event_callback_func_t callback,switch_core_db_err_callback_func_t err_callback,void * pdata,char ** err)1256 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback_err(switch_cache_db_handle_t *dbh, const char *sql,
1257 switch_core_db_event_callback_func_t callback,
1258 switch_core_db_err_callback_func_t err_callback,
1259 void *pdata, char **err)
1260 {
1261 switch_status_t status = SWITCH_STATUS_FALSE;
1262 char *errmsg = NULL;
1263 switch_mutex_t *io_mutex = dbh->io_mutex;
1264 struct helper h;
1265
1266
1267 if (err) {
1268 *err = NULL;
1269 }
1270
1271 if (io_mutex) switch_mutex_lock(io_mutex);
1272
1273 h.callback = callback;
1274 h.pdata = pdata;
1275
1276 switch (dbh->type) {
1277 case SCDB_TYPE_DATABASE_INTERFACE:
1278 {
1279 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1280
1281 if ((status = database_interface_handle_callback_exec(database_interface, dbh->native_handle.database_interface_dbh, sql, helper_callback, &h, err)) != SWITCH_STATUS_SUCCESS) {
1282 char tmp[100];
1283 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to execute_sql_event_callback_err", status);
1284 }
1285
1286 if (err && *err) {
1287 (*err_callback)(pdata, (const char*)*err);
1288 }
1289 }
1290 break;
1291 case SCDB_TYPE_ODBC:
1292 {
1293 status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, helper_callback, &h, err);
1294 if (err && *err) {
1295 (*err_callback)(pdata, (const char*)*err);
1296 }
1297 }
1298 break;
1299 case SCDB_TYPE_CORE_DB:
1300 {
1301 int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, sql, helper_callback, &h, &errmsg);
1302
1303 if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) {
1304 status = SWITCH_STATUS_SUCCESS;
1305 }
1306
1307 if (errmsg) {
1308 dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2);
1309 if (!strstr(errmsg, "query abort")) {
1310 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
1311 }
1312 }
1313 if ((ret == SWITCH_CORE_DB_ABORT || errmsg) && err_callback) {
1314 (*err_callback)(pdata, errmsg);
1315 }
1316 if (errmsg) {
1317 switch_core_db_free(errmsg);
1318 }
1319 }
1320 break;
1321 }
1322
1323 if (io_mutex) switch_mutex_unlock(io_mutex);
1324
1325 return status;
1326 }
1327
switch_cache_db_execute_sql_callback(switch_cache_db_handle_t * dbh,const char * sql,switch_core_db_callback_func_t callback,void * pdata,char ** err)1328 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cache_db_handle_t *dbh,
1329 const char *sql, switch_core_db_callback_func_t callback, void *pdata, char **err)
1330 {
1331 switch_status_t status = SWITCH_STATUS_FALSE;
1332 char *errmsg = NULL;
1333 switch_mutex_t *io_mutex = dbh->io_mutex;
1334
1335 if (err) {
1336 *err = NULL;
1337 }
1338
1339 if (io_mutex) switch_mutex_lock(io_mutex);
1340
1341
1342 switch (dbh->type) {
1343 case SCDB_TYPE_DATABASE_INTERFACE:
1344 {
1345 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1346
1347 if ((status = database_interface_handle_callback_exec(database_interface, dbh->native_handle.database_interface_dbh, sql, callback, pdata, err)) != SWITCH_STATUS_SUCCESS) {
1348 char tmp[100];
1349 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to execute_sql_callback", status);
1350 }
1351 }
1352 break;
1353 case SCDB_TYPE_ODBC:
1354 {
1355 status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, callback, pdata, err);
1356 }
1357 break;
1358 case SCDB_TYPE_CORE_DB:
1359 {
1360 int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, sql, callback, pdata, &errmsg);
1361
1362 if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) {
1363 status = SWITCH_STATUS_SUCCESS;
1364 }
1365
1366 if (errmsg) {
1367 dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2);
1368 if (!strstr(errmsg, "query abort")) {
1369 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
1370 }
1371 switch_core_db_free(errmsg);
1372 }
1373 }
1374 break;
1375 }
1376
1377 if (io_mutex) switch_mutex_unlock(io_mutex);
1378
1379 return status;
1380 }
1381
switch_cache_db_execute_sql_callback_err(switch_cache_db_handle_t * dbh,const char * sql,switch_core_db_callback_func_t callback,switch_core_db_err_callback_func_t err_callback,void * pdata,char ** err)1382 SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback_err(switch_cache_db_handle_t *dbh, const char *sql,
1383 switch_core_db_callback_func_t callback,
1384 switch_core_db_err_callback_func_t err_callback, void *pdata, char **err)
1385 {
1386 switch_status_t status = SWITCH_STATUS_FALSE;
1387 char *errmsg = NULL;
1388 switch_mutex_t *io_mutex = dbh->io_mutex;
1389
1390 if (err) {
1391 *err = NULL;
1392 }
1393
1394 if (io_mutex) switch_mutex_lock(io_mutex);
1395
1396
1397 switch (dbh->type) {
1398 case SCDB_TYPE_DATABASE_INTERFACE:
1399 {
1400 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1401
1402 if ((status = database_interface_handle_callback_exec(database_interface, dbh->native_handle.database_interface_dbh, sql, callback, pdata, err)) != SWITCH_STATUS_SUCCESS) {
1403 char tmp[100];
1404 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to execute_sql_callback_err", status);
1405 }
1406
1407 if (err && *err) {
1408 (*err_callback)(pdata, (const char*)*err);
1409 }
1410 }
1411 break;
1412 case SCDB_TYPE_ODBC:
1413 {
1414 status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, callback, pdata, err);
1415 if (err && *err) {
1416 (*err_callback)(pdata, (const char*)*err);
1417 }
1418 }
1419 break;
1420 case SCDB_TYPE_CORE_DB:
1421 {
1422 int ret = switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, sql, callback, pdata, &errmsg);
1423
1424 if (ret == SWITCH_CORE_DB_OK || ret == SWITCH_CORE_DB_ABORT) {
1425 status = SWITCH_STATUS_SUCCESS;
1426 }
1427
1428 if (errmsg) {
1429 dbh->last_used = switch_epoch_time_now(NULL) - (SQL_CACHE_TIMEOUT * 2);
1430 if (!strstr(errmsg, "query abort")) {
1431 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
1432 }
1433 }
1434 if ((ret == SWITCH_CORE_DB_ABORT || errmsg) && err_callback) {
1435 (*err_callback)(pdata, errmsg);
1436 }
1437 if (errmsg) {
1438 switch_core_db_free(errmsg);
1439 }
1440 }
1441 break;
1442 }
1443
1444 if (io_mutex) switch_mutex_unlock(io_mutex);
1445
1446 return status;
1447 }
1448
switch_cache_db_create_schema(switch_cache_db_handle_t * dbh,char * sql,char ** err)1449 SWITCH_DECLARE(switch_status_t) switch_cache_db_create_schema(switch_cache_db_handle_t *dbh, char *sql, char **err)
1450 {
1451 switch_status_t r = SWITCH_STATUS_SUCCESS;
1452
1453 switch_assert(sql != NULL);
1454
1455 if (switch_test_flag((&runtime), SCF_AUTO_SCHEMAS)) {
1456 r = switch_cache_db_execute_sql(dbh, sql, err);
1457 }
1458
1459 return r;
1460 }
1461
1462 /*!
1463 * \brief Performs test_sql and if it fails performs drop_sql and reactive_sql.
1464 *
1465 * If auto-clear-sql is disabled, then this function will do nothing and it is
1466 * assumed that the queries are not needed. If auto-create-schemas is disabled,
1467 * then just test_sql is executed, but drop_sql and reactive_sql are not.
1468 *
1469 * Otherwise, test_sql gets executed. If that succeeds, then there is nothing to
1470 * do. Otherwise drop_sql is executed (its result is ignored) and then finally
1471 * reactive_sql is executed.
1472 *
1473 * \return If auto-create-schemas is enabled, SWITCH_TRUE is returned if
1474 * test_sql succeeds, SWITCH_FALSE otherwise. If reactive_sql is executed
1475 * successfully SWITCH_TRUE is returned, otherwise SWITCH_FALSE is returned.
1476 */
switch_cache_db_test_reactive(switch_cache_db_handle_t * dbh,const char * test_sql,const char * drop_sql,const char * reactive_sql)1477 SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh,
1478 const char *test_sql, const char *drop_sql, const char *reactive_sql)
1479 {
1480 return switch_cache_db_test_reactive_ex(dbh, test_sql, drop_sql, reactive_sql, NULL);
1481 }
1482
switch_cache_db_test_reactive_ex(switch_cache_db_handle_t * dbh,const char * test_sql,const char * drop_sql,const char * reactive_sql,const char * row_size_limited_reactive_sql)1483 SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive_ex(switch_cache_db_handle_t *dbh,
1484 const char *test_sql, const char *drop_sql, const char *reactive_sql, const char *row_size_limited_reactive_sql)
1485 {
1486 switch_bool_t r = SWITCH_TRUE;
1487 switch_mutex_t *io_mutex = dbh->io_mutex;
1488
1489 switch_assert(test_sql != NULL);
1490 switch_assert(reactive_sql != NULL);
1491
1492 if (!switch_test_flag((&runtime), SCF_CLEAR_SQL)) {
1493 return SWITCH_TRUE;
1494 }
1495
1496 if (!switch_test_flag((&runtime), SCF_AUTO_SCHEMAS)) {
1497 switch_status_t status = switch_cache_db_execute_sql(dbh, (char *)test_sql, NULL);
1498
1499 return (status == SWITCH_STATUS_SUCCESS) ? SWITCH_TRUE : SWITCH_FALSE;
1500 }
1501
1502 if (io_mutex) switch_mutex_lock(io_mutex);
1503
1504 switch (dbh->type) {
1505 case SCDB_TYPE_DATABASE_INTERFACE:
1506 {
1507 switch_database_interface_t *database_interface = dbh->native_handle.database_interface_dbh->connection_options.database_interface;
1508 switch_status_t result;
1509
1510 if ((result = database_interface_handle_exec(database_interface, dbh->native_handle.database_interface_dbh, test_sql, NULL)) != SWITCH_STATUS_SUCCESS) {
1511 char tmp[100];
1512 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to test_reactive with test_sql", result);
1513
1514 if (drop_sql) {
1515 if ((result = database_interface_handle_exec(database_interface, dbh->native_handle.database_interface_dbh, drop_sql, NULL)) != SWITCH_STATUS_SUCCESS) {
1516 char tmp[100];
1517 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to test_reactive with drop_sql", result);
1518 }
1519 }
1520
1521 if ((result = database_interface_handle_exec(database_interface, dbh->native_handle.database_interface_dbh, reactive_sql, NULL)) != SWITCH_STATUS_SUCCESS) {
1522 char tmp[100];
1523 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to test_reactive with reactive_sql", result);
1524
1525 if (row_size_limited_reactive_sql && switch_test_flag(database_interface, SWITCH_DATABASE_FLAG_ROW_SIZE_LIMIT)) {
1526 if ((result = database_interface_handle_exec(database_interface, dbh->native_handle.database_interface_dbh, row_size_limited_reactive_sql, NULL)) != SWITCH_STATUS_SUCCESS) {
1527 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to test_reactive with row_size_limited_reactive_sql", result);
1528 }
1529 }
1530 }
1531
1532 r = (result == SWITCH_STATUS_SUCCESS);
1533 }
1534 }
1535 break;
1536 case SCDB_TYPE_ODBC:
1537 {
1538 if (switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, test_sql, NULL, NULL) != SWITCH_ODBC_SUCCESS) {
1539 if (drop_sql) {
1540 switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, drop_sql, NULL, NULL);
1541 }
1542 r = switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, reactive_sql, NULL, NULL) == SWITCH_ODBC_SUCCESS;
1543 }
1544 }
1545 break;
1546 case SCDB_TYPE_CORE_DB:
1547 {
1548 char *errmsg = NULL;
1549 switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, test_sql, NULL, NULL, &errmsg);
1550
1551 if (errmsg) {
1552 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SQL ERR [%s]\n[%s]\nAuto Generating Table!\n", errmsg, test_sql);
1553 switch_core_db_free(errmsg);
1554 errmsg = NULL;
1555 if (drop_sql) {
1556 switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, drop_sql, NULL, NULL, &errmsg);
1557 }
1558 if (errmsg) {
1559 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Ignoring SQL ERR [%s]\n[%s]\n", errmsg, drop_sql);
1560 switch_core_db_free(errmsg);
1561 errmsg = NULL;
1562 }
1563 switch_core_db_exec(dbh->native_handle.core_db_dbh->handle, reactive_sql, NULL, NULL, &errmsg);
1564 if (errmsg) {
1565 r = SWITCH_FALSE;
1566 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SQL ERR [%s]\n[%s]\n", errmsg, reactive_sql);
1567 switch_core_db_free(errmsg);
1568 errmsg = NULL;
1569 } else {
1570 r = SWITCH_TRUE;
1571 }
1572 }
1573 }
1574 break;
1575 }
1576
1577
1578 if (io_mutex) switch_mutex_unlock(io_mutex);
1579
1580 return r;
1581 }
1582
1583
switch_core_sql_db_thread(switch_thread_t * thread,void * obj)1584 static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *thread, void *obj)
1585 {
1586 int sec = 0, reg_sec = 0;;
1587
1588 sql_manager.db_thread_running = 1;
1589
1590 while (sql_manager.db_thread_running == 1) {
1591 if (++sec == SQL_CACHE_TIMEOUT) {
1592 sql_close(switch_epoch_time_now(NULL));
1593 sec = 0;
1594 }
1595
1596 if (switch_test_flag((&runtime), SCF_USE_SQL) && ++reg_sec == SQL_REG_TIMEOUT) {
1597 switch_core_expire_registration(0);
1598 reg_sec = 0;
1599 }
1600 switch_yield(1000000);
1601 }
1602
1603
1604 return NULL;
1605 }
1606
1607
1608 static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj);
1609
1610 struct switch_sql_queue_manager {
1611 const char *name;
1612 switch_cache_db_handle_t *event_db;
1613 switch_queue_t **sql_queue;
1614 uint32_t *pre_written;
1615 uint32_t *written;
1616 uint32_t numq;
1617 char *dsn;
1618 switch_thread_t *thread;
1619 int thread_initiated;
1620 int thread_running;
1621 switch_thread_cond_t *cond;
1622 switch_mutex_t *cond_mutex;
1623 switch_mutex_t *cond2_mutex;
1624 switch_mutex_t *mutex;
1625 char *pre_trans_execute;
1626 char *post_trans_execute;
1627 char *inner_pre_trans_execute;
1628 char *inner_post_trans_execute;
1629 switch_memory_pool_t *pool;
1630 uint32_t max_trans;
1631 uint32_t confirm;
1632 uint8_t paused;
1633 };
1634
qm_wake(switch_sql_queue_manager_t * qm)1635 static int qm_wake(switch_sql_queue_manager_t *qm)
1636 {
1637 switch_status_t status;
1638 int tries = 0;
1639
1640 top:
1641
1642 status = switch_mutex_trylock(qm->cond_mutex);
1643
1644 if (status == SWITCH_STATUS_SUCCESS) {
1645 switch_thread_cond_signal(qm->cond);
1646 switch_mutex_unlock(qm->cond_mutex);
1647 return 1;
1648 } else {
1649 if (switch_mutex_trylock(qm->cond2_mutex) == SWITCH_STATUS_SUCCESS) {
1650 switch_mutex_unlock(qm->cond2_mutex);
1651 } else {
1652 if (++tries < 10) {
1653 switch_cond_next();
1654 goto top;
1655 }
1656 }
1657 }
1658
1659 return 0;
1660 }
1661
qm_ttl(switch_sql_queue_manager_t * qm)1662 static uint32_t qm_ttl(switch_sql_queue_manager_t *qm)
1663 {
1664 uint32_t ttl = 0;
1665 uint32_t i;
1666
1667 for (i = 0; i < qm->numq; i++) {
1668 ttl += switch_queue_size(qm->sql_queue[i]);
1669 }
1670
1671 return ttl;
1672 }
1673
1674 struct db_job {
1675 switch_sql_queue_manager_t *qm;
1676 char *sql;
1677 switch_core_db_callback_func_t callback;
1678 switch_core_db_err_callback_func_t err_callback;
1679 switch_core_db_event_callback_func_t event_callback;
1680 switch_core_db_err_callback_func_t event_err_callback;
1681 void *pdata;
1682 int event;
1683 switch_memory_pool_t *pool;
1684 };
1685
sql_in_thread(switch_thread_t * thread,void * obj)1686 static void *SWITCH_THREAD_FUNC sql_in_thread (switch_thread_t *thread, void *obj)
1687 {
1688 struct db_job *job = (struct db_job *) obj;
1689 switch_memory_pool_t *pool = job->pool;
1690 char *err = NULL;
1691 switch_cache_db_handle_t *dbh;
1692
1693
1694 if (switch_cache_db_get_db_handle_dsn(&dbh, job->qm->dsn) != SWITCH_STATUS_SUCCESS) {
1695 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot connect DSN %s\n", job->qm->dsn);
1696 return NULL;
1697 }
1698
1699 if (job->callback && !job->err_callback) {
1700 switch_cache_db_execute_sql_callback(dbh, job->sql, job->callback, job->pdata, &err);
1701 } else if (job->callback && job->err_callback) {
1702 switch_cache_db_execute_sql_callback_err(dbh, job->sql, job->callback, job->err_callback, job->pdata, &err);
1703 } else if (job->event_callback && !job->event_err_callback) {
1704 switch_cache_db_execute_sql_event_callback(dbh, job->sql, job->event_callback, job->pdata, &err);
1705 } else if (job->event_callback && job->event_err_callback) {
1706 switch_cache_db_execute_sql_event_callback_err(dbh, job->sql, job->event_callback, job->event_err_callback, job->pdata, &err);
1707 }
1708
1709 if (err) {
1710 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", job->sql, err);
1711 switch_safe_free(err);
1712 }
1713
1714 switch_cache_db_release_db_handle(&dbh);
1715
1716 if (pool) {
1717 switch_core_destroy_memory_pool(&pool);
1718 }
1719
1720 return NULL;
1721 }
1722
new_job(switch_sql_queue_manager_t * qm,const char * sql,switch_core_db_callback_func_t callback,switch_core_db_err_callback_func_t err_callback,switch_core_db_event_callback_func_t event_callback,switch_core_db_err_callback_func_t event_err_callback,void * pdata)1723 static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char *sql,
1724 switch_core_db_callback_func_t callback,
1725 switch_core_db_err_callback_func_t err_callback,
1726 switch_core_db_event_callback_func_t event_callback,
1727 switch_core_db_err_callback_func_t event_err_callback,
1728 void *pdata)
1729 {
1730 switch_memory_pool_t *pool;
1731 switch_thread_data_t *td;
1732 struct db_job *job;
1733 switch_core_new_memory_pool(&pool);
1734
1735 td = switch_core_alloc(pool, sizeof(*td));
1736 job = switch_core_alloc(pool, sizeof(*job));
1737
1738 td->func = sql_in_thread;
1739 td->obj = job;
1740
1741 job->sql = switch_core_strdup(pool, sql);
1742 job->qm = qm;
1743
1744 if (callback) {
1745 job->callback = callback;
1746 job->err_callback = err_callback;
1747 } else if (event_callback) {
1748 job->event_callback = event_callback;
1749 job->event_err_callback = event_err_callback;
1750 }
1751
1752 job->pdata = pdata;
1753 job->pool = pool;
1754
1755 return td;
1756 }
1757
1758
switch_sql_queue_manager_execute_sql_callback(switch_sql_queue_manager_t * qm,const char * sql,switch_core_db_callback_func_t callback,void * pdata)1759 SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback(switch_sql_queue_manager_t *qm,
1760 const char *sql, switch_core_db_callback_func_t callback, void *pdata)
1761 {
1762
1763 switch_thread_data_t *td;
1764 if ((td = new_job(qm, sql, callback, NULL, NULL, NULL, pdata))) {
1765 switch_thread_pool_launch_thread(&td);
1766 }
1767 }
1768
switch_sql_queue_manager_execute_sql_callback_err(switch_sql_queue_manager_t * qm,const char * sql,switch_core_db_callback_func_t callback,switch_core_db_err_callback_func_t err_callback,void * pdata)1769 SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_callback_err(switch_sql_queue_manager_t *qm, const char *sql,
1770 switch_core_db_callback_func_t callback,
1771 switch_core_db_err_callback_func_t err_callback, void *pdata)
1772 {
1773
1774 switch_thread_data_t *td;
1775 if ((td = new_job(qm, sql, callback, err_callback, NULL, NULL, pdata))) {
1776 switch_thread_pool_launch_thread(&td);
1777 }
1778 }
1779
switch_sql_queue_manager_execute_sql_event_callback(switch_sql_queue_manager_t * qm,const char * sql,switch_core_db_event_callback_func_t callback,void * pdata)1780 SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback(switch_sql_queue_manager_t *qm,
1781 const char *sql, switch_core_db_event_callback_func_t callback, void *pdata)
1782 {
1783
1784 switch_thread_data_t *td;
1785 if ((td = new_job(qm, sql, NULL, NULL, callback, NULL, pdata))) {
1786 switch_thread_pool_launch_thread(&td);
1787 }
1788 }
1789
switch_sql_queue_manager_execute_sql_event_callback_err(switch_sql_queue_manager_t * qm,const char * sql,switch_core_db_event_callback_func_t callback,switch_core_db_err_callback_func_t err_callback,void * pdata)1790 SWITCH_DECLARE(void) switch_sql_queue_manager_execute_sql_event_callback_err(switch_sql_queue_manager_t *qm, const char *sql,
1791 switch_core_db_event_callback_func_t callback,
1792 switch_core_db_err_callback_func_t err_callback,
1793 void *pdata)
1794 {
1795
1796 switch_thread_data_t *td;
1797 if ((td = new_job(qm, sql, NULL, NULL, callback, err_callback, pdata))) {
1798 switch_thread_pool_launch_thread(&td);
1799 }
1800 }
1801
1802
do_flush(switch_sql_queue_manager_t * qm,int i,switch_cache_db_handle_t * dbh)1803 static void do_flush(switch_sql_queue_manager_t *qm, int i, switch_cache_db_handle_t *dbh)
1804 {
1805 void *pop = NULL;
1806 switch_queue_t *q = qm->sql_queue[i];
1807
1808 switch_mutex_lock(qm->mutex);
1809 while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
1810 if (pop) {
1811 if (dbh) {
1812 switch_cache_db_execute_sql(dbh, (char *) pop, NULL);
1813 }
1814 switch_safe_free(pop);
1815 }
1816 }
1817 switch_mutex_unlock(qm->mutex);
1818
1819 }
1820
1821
switch_sql_queue_manager_resume(switch_sql_queue_manager_t * qm)1822 SWITCH_DECLARE(void) switch_sql_queue_manager_resume(switch_sql_queue_manager_t *qm)
1823 {
1824 switch_mutex_lock(qm->mutex);
1825 qm->paused = 0;
1826 switch_mutex_unlock(qm->mutex);
1827
1828 qm_wake(qm);
1829
1830 }
1831
switch_sql_queue_manager_pause(switch_sql_queue_manager_t * qm,switch_bool_t flush)1832 SWITCH_DECLARE(void) switch_sql_queue_manager_pause(switch_sql_queue_manager_t *qm, switch_bool_t flush)
1833 {
1834 uint32_t i;
1835
1836 switch_mutex_lock(qm->mutex);
1837 qm->paused = 1;
1838 switch_mutex_unlock(qm->mutex);
1839
1840 if (flush) {
1841 for(i = 0; i < qm->numq; i++) {
1842 do_flush(qm, i, NULL);
1843 }
1844 }
1845
1846 }
1847
switch_sql_queue_manager_size(switch_sql_queue_manager_t * qm,uint32_t index)1848 SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index)
1849 {
1850 int size = 0;
1851
1852 switch_mutex_lock(qm->mutex);
1853 if (index < qm->numq) {
1854 size = switch_queue_size(qm->sql_queue[index]);
1855 }
1856 switch_mutex_unlock(qm->mutex);
1857
1858 return size;
1859 }
1860
switch_sql_queue_manager_stop(switch_sql_queue_manager_t * qm)1861 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm)
1862 {
1863 switch_status_t status = SWITCH_STATUS_FALSE;
1864 uint32_t i, sanity = 100;
1865
1866 if (qm->thread_running == 1) {
1867 qm->thread_running = -1;
1868
1869 while(--sanity && qm->thread_running == -1) {
1870 for(i = 0; i < qm->numq; i++) {
1871 switch_queue_push(qm->sql_queue[i], NULL);
1872 switch_queue_interrupt_all(qm->sql_queue[i]);
1873 }
1874 qm_wake(qm);
1875
1876 if (qm->thread_running == -1) {
1877 switch_yield(100000);
1878 }
1879 }
1880 status = SWITCH_STATUS_SUCCESS;
1881 }
1882
1883 if (qm->thread) {
1884 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Stopping SQL thread.\n", qm->name);
1885 qm_wake(qm);
1886 switch_thread_join(&status, qm->thread);
1887 qm->thread = NULL;
1888 status = SWITCH_STATUS_SUCCESS;
1889 }
1890
1891 return status;
1892 }
1893
switch_sql_queue_manager_start(switch_sql_queue_manager_t * qm)1894 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm)
1895 {
1896 switch_threadattr_t *thd_attr;
1897
1898 if (!qm->thread_running) {
1899 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Starting SQL thread.\n", qm->name);
1900 switch_threadattr_create(&thd_attr, qm->pool);
1901 switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
1902 switch_threadattr_priority_set(thd_attr, SWITCH_PRI_NORMAL);
1903 if (switch_thread_create(&qm->thread, thd_attr, switch_user_sql_thread, qm, qm->pool) == SWITCH_STATUS_SUCCESS) {
1904 while (!qm->thread_initiated) {
1905 switch_cond_next();
1906 }
1907
1908 if (qm->event_db) {
1909 return SWITCH_STATUS_SUCCESS;
1910 }
1911 }
1912 }
1913
1914 return SWITCH_STATUS_FALSE;
1915 }
1916
switch_sql_queue_manager_destroy(switch_sql_queue_manager_t ** qmp)1917 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp)
1918 {
1919 switch_sql_queue_manager_t *qm;
1920 switch_status_t status = SWITCH_STATUS_SUCCESS;
1921 switch_memory_pool_t *pool;
1922 uint32_t i;
1923
1924 switch_assert(qmp);
1925 qm = *qmp;
1926 if (!qm) {
1927 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "No SQL queue to destroy.\n");
1928 return SWITCH_STATUS_NOOP;
1929 }
1930
1931 *qmp = NULL;
1932
1933 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Destroying SQL queue.\n", qm->name);
1934
1935 switch_sql_queue_manager_stop(qm);
1936
1937
1938
1939 for(i = 0; i < qm->numq; i++) {
1940 do_flush(qm, i, NULL);
1941 }
1942
1943 pool = qm->pool;
1944 switch_core_destroy_memory_pool(&pool);
1945
1946 return status;
1947 }
1948
switch_sql_queue_manager_push(switch_sql_queue_manager_t * qm,const char * sql,uint32_t pos,switch_bool_t dup)1949 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
1950 {
1951 char *sqlptr = NULL;
1952 switch_status_t status;
1953 int x = 0;
1954
1955 if (sql_manager.paused || qm->thread_running != 1) {
1956 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql);
1957 if (!dup) free((char *)sql);
1958 qm_wake(qm);
1959 return SWITCH_STATUS_SUCCESS;
1960 }
1961
1962 if (qm->thread_running != 1) {
1963 if (!dup) free((char *)sql);
1964 return SWITCH_STATUS_FALSE;
1965 }
1966
1967 if (pos > qm->numq - 1) {
1968 pos = 0;
1969 }
1970
1971 sqlptr = dup ? strdup(sql) : (char *)sql;
1972
1973 do {
1974 switch_mutex_lock(qm->mutex);
1975 status = switch_queue_trypush(qm->sql_queue[pos], sqlptr);
1976 switch_mutex_unlock(qm->mutex);
1977 if (status != SWITCH_STATUS_SUCCESS) {
1978 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "Delay %d sending sql\n", x);
1979 if (x++) {
1980 switch_yield(1000000 * x);
1981 }
1982 }
1983 } while(status != SWITCH_STATUS_SUCCESS);
1984
1985 qm_wake(qm);
1986
1987 return SWITCH_STATUS_SUCCESS;
1988 }
1989
1990
switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t * qm,const char * sql,uint32_t pos,switch_bool_t dup)1991 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
1992 {
1993 #define EXEC_NOW
1994 #ifdef EXEC_NOW
1995 switch_cache_db_handle_t *dbh;
1996
1997 if (sql_manager.paused || qm->thread_running != 1) {
1998 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql);
1999 if (!dup) free((char *)sql);
2000 qm_wake(qm);
2001 return SWITCH_STATUS_SUCCESS;
2002 }
2003
2004 if (switch_cache_db_get_db_handle_dsn(&dbh, qm->dsn) == SWITCH_STATUS_SUCCESS) {
2005 switch_cache_db_execute_sql(dbh, (char *)sql, NULL);
2006 switch_cache_db_release_db_handle(&dbh);
2007 }
2008
2009 if (!dup) free((char *)sql);
2010
2011 #else
2012
2013 int size, x = 0, sanity = 0;
2014 uint32_t written, want;
2015
2016 if (sql_manager.paused) {
2017 if (!dup) free((char *)sql);
2018 qm_wake(qm);
2019 return SWITCH_STATUS_SUCCESS;
2020 }
2021
2022 if (qm->thread_running != 1) {
2023 if (!dup) free((char *)sql);
2024 return SWITCH_STATUS_FALSE;
2025 }
2026
2027 if (pos > qm->numq - 1) {
2028 pos = 0;
2029 }
2030
2031 switch_mutex_lock(qm->mutex);
2032 qm->confirm++;
2033 switch_queue_push(qm->sql_queue[pos], dup ? strdup(sql) : (char *)sql);
2034 written = qm->pre_written[pos];
2035 size = switch_sql_queue_manager_size(qm, pos);
2036 want = written + size;
2037 switch_mutex_unlock(qm->mutex);
2038
2039 qm_wake(qm);
2040
2041 while((qm->written[pos] < want) || (qm->written[pos] >= written && want < written && qm->written[pos] > want)) {
2042 switch_yield(5000);
2043
2044 if (++x == 200) {
2045 qm_wake(qm);
2046 x = 0;
2047 if (++sanity == 20) {
2048 break;
2049 }
2050 }
2051 }
2052
2053 switch_mutex_lock(qm->mutex);
2054 qm->confirm--;
2055 switch_mutex_unlock(qm->mutex);
2056 #endif
2057
2058 return SWITCH_STATUS_SUCCESS;
2059 }
2060
2061
2062
2063
2064
switch_sql_queue_manager_init_name(const char * name,switch_sql_queue_manager_t ** qmp,uint32_t numq,const char * dsn,uint32_t max_trans,const char * pre_trans_execute,const char * post_trans_execute,const char * inner_pre_trans_execute,const char * inner_post_trans_execute)2065 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *name,
2066 switch_sql_queue_manager_t **qmp,
2067 uint32_t numq, const char *dsn, uint32_t max_trans,
2068 const char *pre_trans_execute,
2069 const char *post_trans_execute,
2070 const char *inner_pre_trans_execute,
2071 const char *inner_post_trans_execute)
2072 {
2073 switch_memory_pool_t *pool;
2074 switch_sql_queue_manager_t *qm;
2075 uint32_t i;
2076
2077 if (!numq) numq = 1;
2078
2079 switch_core_new_memory_pool(&pool);
2080 qm = switch_core_alloc(pool, sizeof(*qm));
2081
2082 qm->pool = pool;
2083 qm->numq = numq;
2084 qm->dsn = switch_core_strdup(qm->pool, dsn);
2085 qm->name = switch_core_strdup(qm->pool, name);
2086 qm->max_trans = max_trans;
2087
2088 switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool);
2089 switch_mutex_init(&qm->cond2_mutex, SWITCH_MUTEX_NESTED, qm->pool);
2090 switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool);
2091 switch_thread_cond_create(&qm->cond, qm->pool);
2092
2093 qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq);
2094 qm->written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
2095 qm->pre_written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
2096
2097 for (i = 0; i < qm->numq; i++) {
2098 switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool);
2099 }
2100
2101 if (pre_trans_execute) {
2102 qm->pre_trans_execute = switch_core_strdup(qm->pool, pre_trans_execute);
2103 }
2104 if (post_trans_execute) {
2105 qm->post_trans_execute = switch_core_strdup(qm->pool, post_trans_execute);
2106 }
2107 if (inner_pre_trans_execute) {
2108 qm->inner_pre_trans_execute = switch_core_strdup(qm->pool, inner_pre_trans_execute);
2109 }
2110 if (inner_post_trans_execute) {
2111 qm->inner_post_trans_execute = switch_core_strdup(qm->pool, inner_post_trans_execute);
2112 }
2113
2114 *qmp = qm;
2115
2116 return SWITCH_STATUS_SUCCESS;
2117
2118 }
2119
do_trans(switch_sql_queue_manager_t * qm)2120 static uint32_t do_trans(switch_sql_queue_manager_t *qm)
2121 {
2122 char *errmsg = NULL;
2123 void *pop;
2124 switch_status_t status;
2125 uint32_t ttl = 0;
2126 switch_mutex_t *io_mutex = qm->event_db->io_mutex;
2127 uint32_t i;
2128
2129 if (io_mutex) switch_mutex_lock(io_mutex);
2130
2131 if (!zstr(qm->pre_trans_execute)) {
2132 switch_cache_db_execute_sql_real(qm->event_db, qm->pre_trans_execute, &errmsg);
2133 if (errmsg) {
2134 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", qm->pre_trans_execute, errmsg);
2135 switch_safe_free(errmsg);
2136 }
2137 }
2138
2139 switch(qm->event_db->type) {
2140 case SCDB_TYPE_CORE_DB:
2141 {
2142 switch_cache_db_execute_sql_real(qm->event_db, "BEGIN EXCLUSIVE", &errmsg);
2143 }
2144 break;
2145 case SCDB_TYPE_ODBC:
2146 {
2147 switch_odbc_status_t result;
2148
2149 if ((result = switch_odbc_SQLSetAutoCommitAttr(qm->event_db->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
2150 char tmp[100];
2151 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
2152 errmsg = strdup(tmp);
2153 }
2154 }
2155 break;
2156 case SCDB_TYPE_DATABASE_INTERFACE:
2157 {
2158 switch_database_interface_t *database_interface = qm->event_db->native_handle.database_interface_dbh->connection_options.database_interface;
2159 switch_status_t result;
2160
2161 if ((result = database_interface->sql_set_auto_commit_attr(qm->event_db->native_handle.database_interface_dbh, 0)) != SWITCH_STATUS_SUCCESS) {
2162 char tmp[100];
2163 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
2164 errmsg = strdup(tmp);
2165 }
2166 }
2167 break;
2168 }
2169
2170 if (errmsg) {
2171 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "ERROR [%s], [%s]\n", errmsg, qm->event_db->name);
2172 switch_safe_free(errmsg);
2173 goto end;
2174 }
2175
2176
2177 if (!zstr(qm->inner_pre_trans_execute)) {
2178 switch_cache_db_execute_sql_real(qm->event_db, qm->inner_pre_trans_execute, &errmsg);
2179 if (errmsg) {
2180 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL PRE TRANS EXEC %s [%s]\n", qm->inner_pre_trans_execute, errmsg);
2181 switch_safe_free(errmsg);
2182 }
2183 }
2184
2185
2186 while(qm->max_trans == 0 || ttl <= qm->max_trans) {
2187 pop = NULL;
2188
2189 for (i = 0; (qm->max_trans == 0 || ttl <= qm->max_trans) && (i < qm->numq); i++) {
2190 switch_mutex_lock(qm->mutex);
2191 switch_queue_trypop(qm->sql_queue[i], &pop);
2192 switch_mutex_unlock(qm->mutex);
2193 if (pop) break;
2194 }
2195
2196 if (pop) {
2197 if ((status = switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) {
2198 switch_mutex_lock(qm->mutex);
2199 qm->pre_written[i]++;
2200 switch_mutex_unlock(qm->mutex);
2201 ttl++;
2202 }
2203 switch_safe_free(pop);
2204 if (status != SWITCH_STATUS_SUCCESS) break;
2205 } else {
2206 break;
2207 }
2208 }
2209
2210 if (!zstr(qm->inner_post_trans_execute)) {
2211 switch_cache_db_execute_sql_real(qm->event_db, qm->inner_post_trans_execute, &errmsg);
2212 if (errmsg) {
2213 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", qm->inner_post_trans_execute, errmsg);
2214 switch_safe_free(errmsg);
2215 }
2216 }
2217
2218
2219 end:
2220
2221 switch(qm->event_db->type) {
2222 case SCDB_TYPE_CORE_DB:
2223 {
2224 switch_cache_db_execute_sql_real(qm->event_db, "COMMIT", NULL);
2225 }
2226 break;
2227 case SCDB_TYPE_ODBC:
2228 {
2229 switch_odbc_SQLEndTran(qm->event_db->native_handle.odbc_dbh, 1);
2230 switch_odbc_SQLSetAutoCommitAttr(qm->event_db->native_handle.odbc_dbh, 1);
2231 }
2232 break;
2233 case SCDB_TYPE_DATABASE_INTERFACE:
2234 {
2235 switch_database_interface_t *database_interface = qm->event_db->native_handle.database_interface_dbh->connection_options.database_interface;
2236 switch_status_t result;
2237
2238 if ((result = database_interface->commit(qm->event_db->native_handle.database_interface_dbh)) != SWITCH_STATUS_SUCCESS) {
2239 char tmp[100];
2240 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to commit transaction", result);
2241 }
2242 }
2243 break;
2244 }
2245
2246
2247 if (!zstr(qm->post_trans_execute)) {
2248 switch_cache_db_execute_sql_real(qm->event_db, qm->post_trans_execute, &errmsg);
2249 if (errmsg) {
2250 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL POST TRANS EXEC %s [%s]\n", qm->post_trans_execute, errmsg);
2251 switch_safe_free(errmsg);
2252 }
2253 }
2254
2255
2256 switch_mutex_lock(qm->mutex);
2257 for (i = 0; i < qm->numq; i++) {
2258 qm->written[i] = qm->pre_written[i];
2259 }
2260 switch_mutex_unlock(qm->mutex);
2261
2262
2263 if (io_mutex) switch_mutex_unlock(io_mutex);
2264
2265 return ttl;
2266 }
2267
switch_user_sql_thread(switch_thread_t * thread,void * obj)2268 static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj)
2269 {
2270
2271 uint32_t sanity = 120;
2272 switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
2273 uint32_t i;
2274
2275 while (sanity && !qm->event_db) {
2276 if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
2277 break;
2278 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
2279 switch_yield(500000);
2280 sanity--;
2281 }
2282
2283 if (!qm->event_db) {
2284 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
2285 qm->thread_initiated = 1;
2286 return NULL;
2287 }
2288
2289 switch_mutex_lock(qm->cond_mutex);
2290
2291 switch (qm->event_db->type) {
2292 case SCDB_TYPE_DATABASE_INTERFACE:
2293 break;
2294 case SCDB_TYPE_ODBC:
2295 break;
2296 case SCDB_TYPE_CORE_DB:
2297 {
2298 switch_cache_db_execute_sql(qm->event_db, "PRAGMA synchronous=OFF;", NULL);
2299 switch_cache_db_execute_sql(qm->event_db, "PRAGMA count_changes=OFF;", NULL);
2300 switch_cache_db_execute_sql(qm->event_db, "PRAGMA temp_store=MEMORY;", NULL);
2301 switch_cache_db_execute_sql(qm->event_db, "PRAGMA journal_mode=OFF;", NULL);
2302 }
2303 break;
2304 }
2305
2306 qm->thread_initiated = 1;
2307 qm->thread_running = 1;
2308
2309 while (qm->thread_running == 1) {
2310 uint32_t i, lc;
2311 uint32_t written = 0, iterations = 0;
2312
2313 if (qm->paused) {
2314 goto check;
2315 }
2316
2317 if (sql_manager.paused) {
2318 for (i = 0; i < qm->numq; i++) {
2319 do_flush(qm, i, NULL);
2320 }
2321 goto check;
2322 }
2323
2324 do {
2325 if (!qm_ttl(qm)) {
2326 goto check;
2327 }
2328 written = do_trans(qm);
2329 iterations += written;
2330 } while(written == qm->max_trans);
2331
2332 if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
2333 char line[128] = "";
2334 switch_size_t l;
2335
2336 switch_snprintf(line, sizeof(line), "%s RUN QUEUE [", qm->name);
2337
2338 for (i = 0; i < qm->numq; i++) {
2339 l = strlen(line);
2340 switch_snprintf(line + l, sizeof(line) - l, "%d%s", switch_queue_size(qm->sql_queue[i]), i == qm->numq - 1 ? "" : "|");
2341 }
2342
2343 l = strlen(line);
2344 switch_snprintf(line + l, sizeof(line) - l, "]--[%d]\n", iterations);
2345
2346 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
2347
2348 }
2349
2350 check:
2351
2352 if ((lc = qm_ttl(qm)) == 0) {
2353 switch_mutex_lock(qm->cond2_mutex);
2354 switch_thread_cond_wait(qm->cond, qm->cond_mutex);
2355 switch_mutex_unlock(qm->cond2_mutex);
2356 }
2357
2358 i = 40;
2359
2360 while (--i > 0 && (lc = qm_ttl(qm)) < 500) {
2361 switch_yield(5000);
2362 }
2363
2364
2365 }
2366
2367 switch_mutex_unlock(qm->cond_mutex);
2368
2369 for(i = 0; i < qm->numq; i++) {
2370 do_flush(qm, i, qm->event_db);
2371 }
2372
2373 switch_cache_db_release_db_handle(&qm->event_db);
2374
2375 qm->thread_running = 0;
2376
2377 return NULL;
2378 }
2379
2380
parse_presence_data_cols(switch_event_t * event)2381 static char *parse_presence_data_cols(switch_event_t *event)
2382 {
2383 char *cols[128] = { 0 };
2384 int col_count = 0;
2385 char *data_copy;
2386 switch_stream_handle_t stream = { 0 };
2387 int i;
2388 char *r;
2389 char col_name[128] = "";
2390 const char *data = switch_event_get_header(event, "presence-data-cols");
2391
2392 if (zstr(data)) {
2393 return NULL;
2394 }
2395
2396 data_copy = strdup(data);
2397
2398 col_count = switch_split(data_copy, ':', cols);
2399
2400 SWITCH_STANDARD_STREAM(stream);
2401
2402 for (i = 0; i < col_count; i++) {
2403 const char *val = NULL;
2404
2405 switch_snprintfv(col_name, sizeof(col_name), "PD-%q", cols[i]);
2406 val = switch_event_get_header_nil(event, col_name);
2407 if (zstr(val)) {
2408 stream.write_function(&stream, "%q=NULL,", cols[i]);
2409 } else {
2410 stream.write_function(&stream, "%q='%q',", cols[i], val);
2411 }
2412 }
2413
2414 r = (char *) stream.data;
2415
2416 if (end_of(r) == ',') {
2417 end_of(r) = '\0';
2418 }
2419
2420 switch_safe_free(data_copy);
2421
2422 return r;
2423
2424 }
2425
2426
2427 #define MAX_SQL 5
2428 #define new_sql() switch_assert(sql_idx+1 < MAX_SQL); if (exists) sql[sql_idx++]
2429 #define new_sql_a() switch_assert(sql_idx+1 < MAX_SQL); sql[sql_idx++]
2430
core_event_handler(switch_event_t * event)2431 static void core_event_handler(switch_event_t *event)
2432 {
2433 char *sql[MAX_SQL] = { 0 };
2434 int sql_idx = 0;
2435 char *extra_cols;
2436 int exists = 1;
2437 char *uuid = NULL;
2438
2439 switch_assert(event);
2440
2441 switch (event->event_id) {
2442 case SWITCH_EVENT_CHANNEL_UUID:
2443 case SWITCH_EVENT_CHANNEL_CREATE:
2444 case SWITCH_EVENT_CHANNEL_ANSWER:
2445 case SWITCH_EVENT_CHANNEL_PROGRESS_MEDIA:
2446 case SWITCH_EVENT_CHANNEL_HOLD:
2447 case SWITCH_EVENT_CHANNEL_UNHOLD:
2448 case SWITCH_EVENT_CHANNEL_EXECUTE:
2449 case SWITCH_EVENT_CHANNEL_ORIGINATE:
2450 case SWITCH_EVENT_CALL_UPDATE:
2451 case SWITCH_EVENT_CHANNEL_CALLSTATE:
2452 case SWITCH_EVENT_CHANNEL_STATE:
2453 case SWITCH_EVENT_CHANNEL_BRIDGE:
2454 case SWITCH_EVENT_CHANNEL_UNBRIDGE:
2455 case SWITCH_EVENT_CALL_SECURE:
2456 {
2457 if ((uuid = switch_event_get_header(event, "unique-id"))) {
2458 exists = switch_ivr_uuid_exists(uuid);
2459 }
2460 }
2461 break;
2462 default:
2463 break;
2464 }
2465
2466 switch (event->event_id) {
2467 case SWITCH_EVENT_ADD_SCHEDULE:
2468 {
2469 const char *id = switch_event_get_header(event, "task-id");
2470 const char *manager = switch_event_get_header(event, "task-sql_manager");
2471
2472 if (id) {
2473 new_sql() = switch_mprintf("insert into tasks (task_id, task_desc, task_group, task_runtime, task_sql_manager, hostname) "
2474 "values(%q,'%q','%q',%q,%q,'%q')",
2475 id,
2476 switch_event_get_header_nil(event, "task-desc"),
2477 switch_event_get_header_nil(event, "task-group"),
2478 switch_event_get_header_nil(event, "task-runtime"),
2479 manager ? manager : "0",
2480 switch_core_get_hostname()
2481 );
2482 }
2483 }
2484 break;
2485 case SWITCH_EVENT_DEL_SCHEDULE:
2486 case SWITCH_EVENT_EXE_SCHEDULE:
2487 new_sql() = switch_mprintf("delete from tasks where task_id=%q and hostname='%q'",
2488 switch_event_get_header_nil(event, "task-id"), switch_core_get_hostname());
2489 break;
2490 case SWITCH_EVENT_RE_SCHEDULE:
2491 {
2492 const char *id = switch_event_get_header(event, "task-id");
2493 const char *manager = switch_event_get_header(event, "task-sql_manager");
2494
2495 if (id) {
2496 new_sql() = switch_mprintf("update tasks set task_desc='%q',task_group='%q', task_runtime=%q, task_sql_manager=%q where task_id=%q and hostname='%q'",
2497 switch_event_get_header_nil(event, "task-desc"),
2498 switch_event_get_header_nil(event, "task-group"),
2499 switch_event_get_header_nil(event, "task-runtime"),
2500 manager ? manager : "0",
2501 id,
2502 switch_core_get_hostname()
2503 );
2504 }
2505 }
2506 break;
2507 case SWITCH_EVENT_CHANNEL_DESTROY:
2508 {
2509 const char *uuid = switch_event_get_header(event, "unique-id");
2510
2511 if (uuid) {
2512 new_sql() = switch_mprintf("delete from channels where uuid='%q'",
2513 uuid);
2514
2515 new_sql() = switch_mprintf("delete from calls where (caller_uuid='%q' or callee_uuid='%q')",
2516 uuid, uuid);
2517
2518 }
2519 }
2520 break;
2521 case SWITCH_EVENT_CHANNEL_UUID:
2522 {
2523 new_sql() = switch_mprintf("update channels set uuid='%q' where uuid='%q'",
2524 switch_event_get_header_nil(event, "unique-id"),
2525 switch_event_get_header_nil(event, "old-unique-id")
2526 );
2527
2528 new_sql() = switch_mprintf("update channels set call_uuid='%q' where call_uuid='%q'",
2529 switch_event_get_header_nil(event, "unique-id"),
2530 switch_event_get_header_nil(event, "old-unique-id")
2531 );
2532 break;
2533 }
2534 case SWITCH_EVENT_CHANNEL_CREATE:
2535 new_sql() = switch_mprintf("insert into channels (uuid,direction,created,created_epoch, name,state,callstate,dialplan,context,hostname,initial_cid_name,initial_cid_num,initial_ip_addr,initial_dest,initial_dialplan,initial_context) "
2536 "values('%q','%q','%q','%ld','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q')",
2537 switch_event_get_header_nil(event, "unique-id"),
2538 switch_event_get_header_nil(event, "call-direction"),
2539 switch_event_get_header_nil(event, "event-date-local"),
2540 (long) switch_epoch_time_now(NULL),
2541 switch_event_get_header_nil(event, "channel-name"),
2542 switch_event_get_header_nil(event, "channel-state"),
2543 switch_event_get_header_nil(event, "channel-call-state"),
2544 switch_event_get_header_nil(event, "caller-dialplan"),
2545 switch_event_get_header_nil(event, "caller-context"), switch_core_get_switchname(),
2546 switch_event_get_header_nil(event, "caller-caller-id-name"),
2547 switch_event_get_header_nil(event, "caller-caller-id-number"),
2548 switch_event_get_header_nil(event, "caller-network-addr"),
2549 switch_event_get_header_nil(event, "caller-destination-number"),
2550 switch_event_get_header_nil(event, "caller-dialplan"),
2551 switch_event_get_header_nil(event, "caller-context")
2552 );
2553 break;
2554 case SWITCH_EVENT_CHANNEL_ANSWER:
2555 case SWITCH_EVENT_CHANNEL_PROGRESS_MEDIA:
2556 case SWITCH_EVENT_CODEC:
2557 new_sql() =
2558 switch_mprintf
2559 ("update channels set read_codec='%q',read_rate='%q',read_bit_rate='%q',write_codec='%q',write_rate='%q',write_bit_rate='%q' where uuid='%q'",
2560 switch_event_get_header_nil(event, "channel-read-codec-name"),
2561 switch_event_get_header_nil(event, "channel-read-codec-rate"),
2562 switch_event_get_header_nil(event, "channel-read-codec-bit-rate"),
2563 switch_event_get_header_nil(event, "channel-write-codec-name"),
2564 switch_event_get_header_nil(event, "channel-write-codec-rate"),
2565 switch_event_get_header_nil(event, "channel-write-codec-bit-rate"),
2566 switch_event_get_header_nil(event, "unique-id"));
2567 break;
2568 case SWITCH_EVENT_CHANNEL_HOLD:
2569 case SWITCH_EVENT_CHANNEL_UNHOLD:
2570 case SWITCH_EVENT_CHANNEL_EXECUTE: {
2571
2572 new_sql() = switch_mprintf("update channels set application='%q',application_data='%q',"
2573 "presence_id='%q',presence_data='%q',accountcode='%q' where uuid='%q'",
2574 switch_event_get_header_nil(event, "application"),
2575 switch_event_get_header_nil(event, "application-data"),
2576 switch_event_get_header_nil(event, "channel-presence-id"),
2577 switch_event_get_header_nil(event, "channel-presence-data"),
2578 switch_event_get_header_nil(event, "variable_accountcode"),
2579 switch_event_get_header_nil(event, "unique-id")
2580 );
2581
2582 }
2583 break;
2584
2585 case SWITCH_EVENT_CHANNEL_ORIGINATE:
2586 {
2587 if ((extra_cols = parse_presence_data_cols(event))) {
2588 new_sql() = switch_mprintf("update channels set "
2589 "presence_id='%q',presence_data='%q',accountcode='%q',call_uuid='%q',%s where uuid='%q'",
2590 switch_event_get_header_nil(event, "channel-presence-id"),
2591 switch_event_get_header_nil(event, "channel-presence-data"),
2592 switch_event_get_header_nil(event, "variable_accountcode"),
2593 switch_event_get_header_nil(event, "channel-call-uuid"),
2594 extra_cols,
2595 switch_event_get_header_nil(event, "unique-id"));
2596 free(extra_cols);
2597 } else {
2598 new_sql() = switch_mprintf("update channels set "
2599 "presence_id='%q',presence_data='%q',accountcode='%q',call_uuid='%q' where uuid='%q'",
2600 switch_event_get_header_nil(event, "channel-presence-id"),
2601 switch_event_get_header_nil(event, "channel-presence-data"),
2602 switch_event_get_header_nil(event, "variable_accountcode"),
2603 switch_event_get_header_nil(event, "channel-call-uuid"),
2604 switch_event_get_header_nil(event, "unique-id"));
2605 }
2606
2607 }
2608
2609 break;
2610 case SWITCH_EVENT_CALL_UPDATE:
2611 {
2612 new_sql() = switch_mprintf("update channels set callee_name='%q',callee_num='%q',sent_callee_name='%q',sent_callee_num='%q',callee_direction='%q',"
2613 "cid_name='%q',cid_num='%q' where uuid='%q'",
2614 switch_event_get_header_nil(event, "caller-callee-id-name"),
2615 switch_event_get_header_nil(event, "caller-callee-id-number"),
2616 switch_event_get_header_nil(event, "sent-callee-id-name"),
2617 switch_event_get_header_nil(event, "sent-callee-id-number"),
2618 switch_event_get_header_nil(event, "direction"),
2619 switch_event_get_header_nil(event, "caller-caller-id-name"),
2620 switch_event_get_header_nil(event, "caller-caller-id-number"),
2621 switch_event_get_header_nil(event, "unique-id")
2622 );
2623 }
2624 break;
2625 case SWITCH_EVENT_CHANNEL_CALLSTATE:
2626 {
2627 char *num = switch_event_get_header_nil(event, "channel-call-state-number");
2628 switch_channel_callstate_t callstate = CCS_DOWN;
2629
2630 if (num) {
2631 callstate = atoi(num);
2632 }
2633
2634 if (callstate != CCS_DOWN && callstate != CCS_HANGUP) {
2635 if ((extra_cols = parse_presence_data_cols(event))) {
2636 new_sql() = switch_mprintf("update channels set callstate='%q',%s where uuid='%q'",
2637 switch_event_get_header_nil(event, "channel-call-state"),
2638 extra_cols,
2639 switch_event_get_header_nil(event, "unique-id"));
2640 free(extra_cols);
2641 } else {
2642 new_sql() = switch_mprintf("update channels set callstate='%q' where uuid='%q'",
2643 switch_event_get_header_nil(event, "channel-call-state"),
2644 switch_event_get_header_nil(event, "unique-id"));
2645 }
2646 }
2647
2648 }
2649 break;
2650 case SWITCH_EVENT_CHANNEL_STATE:
2651 {
2652 char *state = switch_event_get_header_nil(event, "channel-state-number");
2653 switch_channel_state_t state_i = CS_DESTROY;
2654
2655 if (!zstr(state)) {
2656 state_i = atoi(state);
2657 }
2658
2659 switch (state_i) {
2660 case CS_NEW:
2661 case CS_DESTROY:
2662 case CS_REPORTING:
2663 #ifndef SWITCH_DEPRECATED_CORE_DB
2664 case CS_HANGUP: /* marked for deprication */
2665 #endif
2666 case CS_INIT:
2667 break;
2668 #ifdef SWITCH_DEPRECATED_CORE_DB
2669 case CS_HANGUP: /* marked for deprication */
2670 new_sql_a() = switch_mprintf("update channels set state='%q' where uuid='%q'",
2671 switch_event_get_header_nil(event, "channel-state"),
2672 switch_event_get_header_nil(event, "unique-id"));
2673 break;
2674 #endif
2675 case CS_EXECUTE:
2676 if ((extra_cols = parse_presence_data_cols(event))) {
2677 new_sql() = switch_mprintf("update channels set state='%q',%s where uuid='%q'",
2678 switch_event_get_header_nil(event, "channel-state"),
2679 extra_cols,
2680 switch_event_get_header_nil(event, "unique-id"));
2681 free(extra_cols);
2682
2683 } else {
2684 new_sql() = switch_mprintf("update channels set state='%q' where uuid='%q'",
2685 switch_event_get_header_nil(event, "channel-state"),
2686 switch_event_get_header_nil(event, "unique-id"));
2687 }
2688 break;
2689 case CS_ROUTING:
2690 if ((extra_cols = parse_presence_data_cols(event))) {
2691 new_sql() = switch_mprintf("update channels set state='%q',cid_name='%q',cid_num='%q',callee_name='%q',callee_num='%q',"
2692 "sent_callee_name='%q',sent_callee_num='%q',"
2693 "ip_addr='%q',dest='%q',dialplan='%q',context='%q',presence_id='%q',presence_data='%q',accountcode='%q',%s "
2694 "where uuid='%q'",
2695 switch_event_get_header_nil(event, "channel-state"),
2696 switch_event_get_header_nil(event, "caller-caller-id-name"),
2697 switch_event_get_header_nil(event, "caller-caller-id-number"),
2698 switch_event_get_header_nil(event, "caller-callee-id-name"),
2699 switch_event_get_header_nil(event, "caller-callee-id-number"),
2700 switch_event_get_header_nil(event, "sent-callee-id-name"),
2701 switch_event_get_header_nil(event, "sent-callee-id-number"),
2702 switch_event_get_header_nil(event, "caller-network-addr"),
2703 switch_event_get_header_nil(event, "caller-destination-number"),
2704 switch_event_get_header_nil(event, "caller-dialplan"),
2705 switch_event_get_header_nil(event, "caller-context"),
2706 switch_event_get_header_nil(event, "channel-presence-id"),
2707 switch_event_get_header_nil(event, "channel-presence-data"),
2708 switch_event_get_header_nil(event, "variable_accountcode"),
2709 extra_cols,
2710 switch_event_get_header_nil(event, "unique-id"));
2711 free(extra_cols);
2712 } else {
2713 new_sql() = switch_mprintf("update channels set state='%q',cid_name='%q',cid_num='%q',callee_name='%q',callee_num='%q',"
2714 "sent_callee_name='%q',sent_callee_num='%q',"
2715 "ip_addr='%q',dest='%q',dialplan='%q',context='%q',presence_id='%q',presence_data='%q',accountcode='%q' "
2716 "where uuid='%q'",
2717 switch_event_get_header_nil(event, "channel-state"),
2718 switch_event_get_header_nil(event, "caller-caller-id-name"),
2719 switch_event_get_header_nil(event, "caller-caller-id-number"),
2720 switch_event_get_header_nil(event, "caller-callee-id-name"),
2721 switch_event_get_header_nil(event, "caller-callee-id-number"),
2722 switch_event_get_header_nil(event, "sent-callee-id-name"),
2723 switch_event_get_header_nil(event, "sent-callee-id-number"),
2724 switch_event_get_header_nil(event, "caller-network-addr"),
2725 switch_event_get_header_nil(event, "caller-destination-number"),
2726 switch_event_get_header_nil(event, "caller-dialplan"),
2727 switch_event_get_header_nil(event, "caller-context"),
2728 switch_event_get_header_nil(event, "channel-presence-id"),
2729 switch_event_get_header_nil(event, "channel-presence-data"),
2730 switch_event_get_header_nil(event, "variable_accountcode"),
2731 switch_event_get_header_nil(event, "unique-id"));
2732 }
2733 break;
2734 default:
2735 new_sql() = switch_mprintf("update channels set state='%q' where uuid='%q'",
2736 switch_event_get_header_nil(event, "channel-state"),
2737 switch_event_get_header_nil(event, "unique-id"));
2738 break;
2739 }
2740
2741 break;
2742
2743
2744 }
2745 case SWITCH_EVENT_CHANNEL_BRIDGE:
2746 {
2747 const char *a_uuid, *b_uuid, *uuid;
2748
2749 a_uuid = switch_event_get_header(event, "Bridge-A-Unique-ID");
2750 b_uuid = switch_event_get_header(event, "Bridge-B-Unique-ID");
2751 uuid = switch_event_get_header(event, "unique-id");
2752
2753 if (zstr(a_uuid) || zstr(b_uuid)) {
2754 a_uuid = switch_event_get_header_nil(event, "caller-unique-id");
2755 b_uuid = switch_event_get_header_nil(event, "other-leg-unique-id");
2756 }
2757
2758 if (uuid && (extra_cols = parse_presence_data_cols(event))) {
2759 new_sql() = switch_mprintf("update channels set %s where uuid='%q'", extra_cols, uuid);
2760 switch_safe_free(extra_cols);
2761 }
2762
2763 new_sql() = switch_mprintf("update channels set call_uuid='%q' where uuid='%q' or uuid='%q'",
2764 switch_event_get_header_nil(event, "channel-call-uuid"), a_uuid, b_uuid);
2765
2766
2767 new_sql() = switch_mprintf("insert into calls (call_uuid,call_created,call_created_epoch,"
2768 "caller_uuid,callee_uuid,hostname) "
2769 "values ('%q','%q','%ld','%q','%q','%q')",
2770 switch_event_get_header_nil(event, "channel-call-uuid"),
2771 switch_event_get_header_nil(event, "event-date-local"),
2772 (long) switch_epoch_time_now(NULL),
2773 a_uuid,
2774 b_uuid,
2775 switch_core_get_switchname()
2776 );
2777 }
2778 break;
2779 case SWITCH_EVENT_CHANNEL_UNBRIDGE:
2780 {
2781 char *cuuid = switch_event_get_header_nil(event, "caller-unique-id");
2782 char *uuid = switch_event_get_header(event, "unique-id");
2783
2784 if (uuid && (extra_cols = parse_presence_data_cols(event))) {
2785 new_sql() = switch_mprintf("update channels set %s where uuid='%q'", extra_cols, uuid);
2786 switch_safe_free(extra_cols);
2787 }
2788
2789 new_sql() = switch_mprintf("update channels set call_uuid=uuid where call_uuid='%q'",
2790 switch_event_get_header_nil(event, "channel-call-uuid"));
2791
2792 new_sql() = switch_mprintf("delete from calls where (caller_uuid='%q' or callee_uuid='%q')",
2793 cuuid, cuuid);
2794 break;
2795 }
2796 case SWITCH_EVENT_SHUTDOWN:
2797 new_sql() = switch_mprintf("delete from channels where hostname='%q';"
2798 "delete from interfaces where hostname='%q';"
2799 "delete from calls where hostname='%q'",
2800 switch_core_get_switchname(), switch_core_get_hostname(), switch_core_get_switchname()
2801 );
2802 break;
2803 case SWITCH_EVENT_LOG:
2804 return;
2805 case SWITCH_EVENT_MODULE_LOAD:
2806 {
2807 const char *type = switch_event_get_header_nil(event, "type");
2808 const char *name = switch_event_get_header_nil(event, "name");
2809 const char *description = switch_event_get_header_nil(event, "description");
2810 const char *syntax = switch_event_get_header_nil(event, "syntax");
2811 const char *key = switch_event_get_header_nil(event, "key");
2812 const char *filename = switch_event_get_header_nil(event, "filename");
2813 if (!zstr(type) && !zstr(name)) {
2814 new_sql() =
2815 switch_mprintf
2816 ("insert into interfaces (type,name,description,syntax,ikey,filename,hostname) values('%q','%q','%q','%q','%q','%q','%q')", type, name,
2817 switch_str_nil(description), switch_str_nil(syntax), switch_str_nil(key), switch_str_nil(filename),
2818 switch_core_get_hostname()
2819 );
2820 }
2821 break;
2822 }
2823 case SWITCH_EVENT_MODULE_UNLOAD:
2824 {
2825 const char *type = switch_event_get_header_nil(event, "type");
2826 const char *name = switch_event_get_header_nil(event, "name");
2827 if (!zstr(type) && !zstr(name)) {
2828 new_sql() = switch_mprintf("delete from interfaces where type='%q' and name='%q' and hostname='%q'", type, name,
2829 switch_core_get_hostname());
2830 }
2831 break;
2832 }
2833 case SWITCH_EVENT_CALL_SECURE:
2834 {
2835 const char *type = switch_event_get_header_nil(event, "secure_type");
2836 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Secure Type: %s\n", type);
2837 if (zstr(type)) {
2838 break;
2839 }
2840 new_sql() = switch_mprintf("update channels set secure='%q' where uuid='%q'",
2841 type, switch_event_get_header_nil(event, "caller-unique-id")
2842 );
2843 break;
2844 }
2845 case SWITCH_EVENT_NAT:
2846 {
2847 const char *op = switch_event_get_header_nil(event, "op");
2848 switch_bool_t sticky = switch_true(switch_event_get_header_nil(event, "sticky"));
2849 if (!strcmp("add", op)) {
2850 new_sql() = switch_mprintf("insert into nat (port, proto, sticky, hostname) values (%q, %q, %d,'%q')",
2851 switch_event_get_header_nil(event, "port"),
2852 switch_event_get_header_nil(event, "proto"), sticky, switch_core_get_hostname()
2853 );
2854 } else if (!strcmp("del", op)) {
2855 new_sql() = switch_mprintf("delete from nat where port=%q and proto=%q and hostname='%q'",
2856 switch_event_get_header_nil(event, "port"),
2857 switch_event_get_header_nil(event, "proto"), switch_core_get_hostname());
2858 } else if (!strcmp("status", op)) {
2859 /* call show nat api */
2860 } else if (!strcmp("status_response", op)) {
2861 /* ignore */
2862 } else {
2863 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown op for SWITCH_EVENT_NAT: %s\n", op);
2864 }
2865 break;
2866 }
2867 default:
2868 break;
2869 }
2870
2871 if (sql_idx) {
2872 int i = 0;
2873
2874
2875 for (i = 0; i < sql_idx; i++) {
2876 if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) {
2877 switch_sql_queue_manager_push(sql_manager.qm, sql[i], 1, SWITCH_FALSE);
2878 } else {
2879 switch_sql_queue_manager_push(sql_manager.qm, sql[i], 0, SWITCH_FALSE);
2880 }
2881 sql[i] = NULL;
2882 }
2883 }
2884 }
2885
2886
2887 static char create_complete_sql[] =
2888 "CREATE TABLE complete (\n"
2889 " sticky INTEGER,\n"
2890 " a1 VARCHAR(128),\n"
2891 " a2 VARCHAR(128),\n"
2892 " a3 VARCHAR(128),\n"
2893 " a4 VARCHAR(128),\n"
2894 " a5 VARCHAR(128),\n"
2895 " a6 VARCHAR(128),\n"
2896 " a7 VARCHAR(128),\n"
2897 " a8 VARCHAR(128),\n"
2898 " a9 VARCHAR(128),\n"
2899 " a10 VARCHAR(128),\n"
2900 " hostname VARCHAR(256)\n"
2901 ");\n";
2902
2903 static char create_alias_sql[] =
2904 "CREATE TABLE aliases (\n"
2905 " sticky INTEGER,\n"
2906 " alias VARCHAR(128),\n"
2907 " command VARCHAR(4096),\n"
2908 " hostname VARCHAR(256)\n"
2909 ");\n";
2910
2911 static char create_channels_sql[] =
2912 "CREATE TABLE channels (\n"
2913 " uuid VARCHAR(256),\n"
2914 " direction VARCHAR(32),\n"
2915 " created VARCHAR(128),\n"
2916 " created_epoch INTEGER,\n"
2917 " name VARCHAR(1024),\n"
2918 " state VARCHAR(64),\n"
2919 " cid_name VARCHAR(1024),\n"
2920 " cid_num VARCHAR(256),\n"
2921 " ip_addr VARCHAR(256),\n"
2922 " dest VARCHAR(1024),\n"
2923 " application VARCHAR(128),\n"
2924 " application_data VARCHAR(4096),\n"
2925 " dialplan VARCHAR(128),\n"
2926 " context VARCHAR(128),\n"
2927 " read_codec VARCHAR(128),\n"
2928 " read_rate VARCHAR(32),\n"
2929 " read_bit_rate VARCHAR(32),\n"
2930 " write_codec VARCHAR(128),\n"
2931 " write_rate VARCHAR(32),\n"
2932 " write_bit_rate VARCHAR(32),\n"
2933 " secure VARCHAR(64),\n"
2934 " hostname VARCHAR(256),\n"
2935 " presence_id VARCHAR(4096),\n"
2936 " presence_data VARCHAR(4096),\n"
2937 " accountcode VARCHAR(256),\n"
2938 " callstate VARCHAR(64),\n"
2939 " callee_name VARCHAR(1024),\n"
2940 " callee_num VARCHAR(256),\n"
2941 " callee_direction VARCHAR(5),\n"
2942 " call_uuid VARCHAR(256),\n"
2943 " sent_callee_name VARCHAR(1024),\n"
2944 " sent_callee_num VARCHAR(256),\n"
2945 " initial_cid_name VARCHAR(1024),\n"
2946 " initial_cid_num VARCHAR(256),\n"
2947 " initial_ip_addr VARCHAR(256),\n"
2948 " initial_dest VARCHAR(1024),\n"
2949 " initial_dialplan VARCHAR(128),\n"
2950 " initial_context VARCHAR(128)\n"
2951 ");\n";
2952
2953 static char create_row_size_limited_channels_sql[] =
2954 "CREATE TABLE channels (\n"
2955 " uuid VARCHAR(256),\n"
2956 " direction VARCHAR(32),\n"
2957 " created VARCHAR(128),\n"
2958 " created_epoch INTEGER,\n"
2959 " name VARCHAR(1024),\n"
2960 " state VARCHAR(64),\n"
2961 " cid_name VARCHAR(1024),\n"
2962 " cid_num VARCHAR(256),\n"
2963 " ip_addr VARCHAR(256),\n"
2964 " dest VARCHAR(1024),\n"
2965 " application VARCHAR(128),\n"
2966 " application_data TEXT,\n"
2967 " dialplan VARCHAR(128),\n"
2968 " context VARCHAR(128),\n"
2969 " read_codec VARCHAR(128),\n"
2970 " read_rate VARCHAR(32),\n"
2971 " read_bit_rate VARCHAR(32),\n"
2972 " write_codec VARCHAR(128),\n"
2973 " write_rate VARCHAR(32),\n"
2974 " write_bit_rate VARCHAR(32),\n"
2975 " secure VARCHAR(64),\n"
2976 " hostname VARCHAR(256),\n"
2977 " presence_id VARCHAR(4096),\n"
2978 " presence_data TEXT,\n"
2979 " accountcode VARCHAR(256),\n"
2980 " callstate VARCHAR(64),\n"
2981 " callee_name VARCHAR(1024),\n"
2982 " callee_num VARCHAR(256),\n"
2983 " callee_direction VARCHAR(5),\n"
2984 " call_uuid VARCHAR(256),\n"
2985 " sent_callee_name VARCHAR(1024),\n"
2986 " sent_callee_num VARCHAR(256),\n"
2987 " initial_cid_name VARCHAR(1024),\n"
2988 " initial_cid_num VARCHAR(256),\n"
2989 " initial_ip_addr VARCHAR(256),\n"
2990 " initial_dest VARCHAR(1024),\n"
2991 " initial_dialplan VARCHAR(128),\n"
2992 " initial_context VARCHAR(128)\n"
2993 ");\n";
2994
2995 static char create_calls_sql[] =
2996 "CREATE TABLE calls (\n"
2997 " call_uuid VARCHAR(255),\n"
2998 " call_created VARCHAR(128),\n"
2999 " call_created_epoch INTEGER,\n"
3000 " caller_uuid VARCHAR(256),\n"
3001 " callee_uuid VARCHAR(256),\n"
3002 " hostname VARCHAR(256)\n"
3003 ");\n";
3004
3005 static char create_interfaces_sql[] =
3006 "CREATE TABLE interfaces (\n"
3007 " type VARCHAR(128),\n"
3008 " name VARCHAR(1024),\n"
3009 " description VARCHAR(4096),\n"
3010 " ikey VARCHAR(1024),\n"
3011 " filename VARCHAR(4096),\n"
3012 " syntax VARCHAR(4096),\n"
3013 " hostname VARCHAR(256)\n"
3014 ");\n";
3015
3016 static char create_tasks_sql[] =
3017 "CREATE TABLE tasks (\n"
3018 " task_id INTEGER,\n"
3019 " task_desc VARCHAR(4096),\n"
3020 " task_group VARCHAR(1024),\n"
3021 " task_runtime BIGINT,\n"
3022 " task_sql_manager INTEGER,\n"
3023 " hostname VARCHAR(256)\n"
3024 ");\n";
3025
3026 static char create_nat_sql[] =
3027 "CREATE TABLE nat (\n"
3028 " sticky INTEGER,\n"
3029 " port INTEGER,\n"
3030 " proto INTEGER,\n"
3031 " hostname VARCHAR(256)\n"
3032 ");\n";
3033
3034
3035 static char create_registrations_sql[] =
3036 "CREATE TABLE registrations (\n"
3037 " reg_user VARCHAR(256),\n"
3038 " realm VARCHAR(256),\n"
3039 " token VARCHAR(256),\n"
3040 /* If url is modified please check for code in switch_core_sqldb_start for dependencies for MSSQL" */
3041 " url TEXT,\n"
3042 " expires INTEGER,\n"
3043 " network_ip VARCHAR(256),\n"
3044 " network_port VARCHAR(256),\n"
3045 " network_proto VARCHAR(256),\n"
3046 " hostname VARCHAR(256),\n"
3047 " metadata VARCHAR(256)\n"
3048 ");\n";
3049
3050
3051
3052
3053 static char detailed_calls_sql[] =
3054 "create view detailed_calls as select "
3055 "a.uuid as uuid,"
3056 "a.direction as direction,"
3057 "a.created as created,"
3058 "a.created_epoch as created_epoch,"
3059 "a.name as name,"
3060 "a.state as state,"
3061 "a.cid_name as cid_name,"
3062 "a.cid_num as cid_num,"
3063 "a.ip_addr as ip_addr,"
3064 "a.dest as dest,"
3065 "a.application as application,"
3066 "a.application_data as application_data,"
3067 "a.dialplan as dialplan,"
3068 "a.context as context,"
3069 "a.read_codec as read_codec,"
3070 "a.read_rate as read_rate,"
3071 "a.read_bit_rate as read_bit_rate,"
3072 "a.write_codec as write_codec,"
3073 "a.write_rate as write_rate,"
3074 "a.write_bit_rate as write_bit_rate,"
3075 "a.secure as secure,"
3076 "a.hostname as hostname,"
3077 "a.presence_id as presence_id,"
3078 "a.presence_data as presence_data,"
3079 "a.accountcode as accountcode,"
3080 "a.callstate as callstate,"
3081 "a.callee_name as callee_name,"
3082 "a.callee_num as callee_num,"
3083 "a.callee_direction as callee_direction,"
3084 "a.call_uuid as call_uuid,"
3085 "a.sent_callee_name as sent_callee_name,"
3086 "a.sent_callee_num as sent_callee_num,"
3087 "b.uuid as b_uuid,"
3088 "b.direction as b_direction,"
3089 "b.created as b_created,"
3090 "b.created_epoch as b_created_epoch,"
3091 "b.name as b_name,"
3092 "b.state as b_state,"
3093 "b.cid_name as b_cid_name,"
3094 "b.cid_num as b_cid_num,"
3095 "b.ip_addr as b_ip_addr,"
3096 "b.dest as b_dest,"
3097 "b.application as b_application,"
3098 "b.application_data as b_application_data,"
3099 "b.dialplan as b_dialplan,"
3100 "b.context as b_context,"
3101 "b.read_codec as b_read_codec,"
3102 "b.read_rate as b_read_rate,"
3103 "b.read_bit_rate as b_read_bit_rate,"
3104 "b.write_codec as b_write_codec,"
3105 "b.write_rate as b_write_rate,"
3106 "b.write_bit_rate as b_write_bit_rate,"
3107 "b.secure as b_secure,"
3108 "b.hostname as b_hostname,"
3109 "b.presence_id as b_presence_id,"
3110 "b.presence_data as b_presence_data,"
3111 "b.accountcode as b_accountcode,"
3112 "b.callstate as b_callstate,"
3113 "b.callee_name as b_callee_name,"
3114 "b.callee_num as b_callee_num,"
3115 "b.callee_direction as b_callee_direction,"
3116 "b.call_uuid as b_call_uuid,"
3117 "b.sent_callee_name as b_sent_callee_name,"
3118 "b.sent_callee_num as b_sent_callee_num,"
3119 "c.call_created_epoch as call_created_epoch "
3120 "from channels a "
3121 "left join calls c on a.uuid = c.caller_uuid and a.hostname = c.hostname "
3122 "left join channels b on b.uuid = c.callee_uuid and b.hostname = c.hostname "
3123 "where a.uuid = c.caller_uuid or a.uuid not in (select callee_uuid from calls)";
3124
3125
3126 static char recovery_sql[] =
3127 "CREATE TABLE recovery (\n"
3128 " runtime_uuid VARCHAR(255),\n"
3129 " technology VARCHAR(255),\n"
3130 " profile_name VARCHAR(255),\n"
3131 " hostname VARCHAR(255),\n"
3132 " uuid VARCHAR(255),\n"
3133 " metadata text\n"
3134 ");\n";
3135
3136 static char basic_calls_sql[] =
3137 "create view basic_calls as select "
3138 "a.uuid as uuid,"
3139 "a.direction as direction,"
3140 "a.created as created,"
3141 "a.created_epoch as created_epoch,"
3142 "a.name as name,"
3143 "a.state as state,"
3144 "a.cid_name as cid_name,"
3145 "a.cid_num as cid_num,"
3146 "a.ip_addr as ip_addr,"
3147 "a.dest as dest,"
3148
3149 "a.presence_id as presence_id,"
3150 "a.presence_data as presence_data,"
3151 "a.accountcode as accountcode,"
3152 "a.callstate as callstate,"
3153 "a.callee_name as callee_name,"
3154 "a.callee_num as callee_num,"
3155 "a.callee_direction as callee_direction,"
3156 "a.call_uuid as call_uuid,"
3157 "a.hostname as hostname,"
3158 "a.sent_callee_name as sent_callee_name,"
3159 "a.sent_callee_num as sent_callee_num,"
3160
3161
3162 "b.uuid as b_uuid,"
3163 "b.direction as b_direction,"
3164 "b.created as b_created,"
3165 "b.created_epoch as b_created_epoch,"
3166 "b.name as b_name,"
3167 "b.state as b_state,"
3168 "b.cid_name as b_cid_name,"
3169 "b.cid_num as b_cid_num,"
3170 "b.ip_addr as b_ip_addr,"
3171 "b.dest as b_dest,"
3172
3173 "b.presence_id as b_presence_id,"
3174 "b.presence_data as b_presence_data,"
3175 "b.accountcode as b_accountcode,"
3176 "b.callstate as b_callstate,"
3177 "b.callee_name as b_callee_name,"
3178 "b.callee_num as b_callee_num,"
3179 "b.callee_direction as b_callee_direction,"
3180 "b.sent_callee_name as b_sent_callee_name,"
3181 "b.sent_callee_num as b_sent_callee_num,"
3182 "c.call_created_epoch as call_created_epoch "
3183
3184 "from channels a "
3185 "left join calls c on a.uuid = c.caller_uuid and a.hostname = c.hostname "
3186 "left join channels b on b.uuid = c.callee_uuid and b.hostname = c.hostname "
3187 "where a.uuid = c.caller_uuid or a.uuid not in (select callee_uuid from calls)";
3188
3189
3190
switch_core_recovery_flush(const char * technology,const char * profile_name)3191 SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name)
3192 {
3193 char *sql = NULL;
3194 switch_cache_db_handle_t *dbh;
3195
3196 if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
3197 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
3198 return;
3199 }
3200
3201 if (zstr(technology)) {
3202
3203 if (zstr(profile_name)) {
3204 sql = switch_mprintf("delete from recovery");
3205 } else {
3206 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "INVALID\n");
3207 }
3208
3209 } else {
3210 if (zstr(profile_name)) {
3211 sql = switch_mprintf("delete from recovery where technology='%q' ", technology);
3212 } else {
3213 sql = switch_mprintf("delete from recovery where technology='%q' and profile_name='%q'", technology, profile_name);
3214 }
3215 }
3216
3217 if (sql) {
3218 switch_cache_db_execute_sql(dbh, sql, NULL);
3219 switch_safe_free(sql);
3220 }
3221
3222 switch_cache_db_release_db_handle(&dbh);
3223 }
3224
3225
recover_callback(void * pArg,int argc,char ** argv,char ** columnNames)3226 static int recover_callback(void *pArg, int argc, char **argv, char **columnNames)
3227 {
3228 int *rp = (int *) pArg;
3229 switch_xml_t xml;
3230 switch_endpoint_interface_t *ep;
3231 switch_core_session_t *session;
3232
3233 if (argc < 4) {
3234 return 0;
3235 }
3236
3237 if (!(xml = switch_xml_parse_str_dynamic(argv[4], SWITCH_TRUE))) {
3238 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "XML ERROR\n");
3239 return 0;
3240 }
3241
3242 if (!(ep = switch_loadable_module_get_endpoint_interface(argv[0]))) {
3243 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "EP ERROR\n");
3244 return 0;
3245 }
3246
3247 if (!(session = switch_core_session_request_xml(ep, NULL, xml))) {
3248 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid cdr data, call not recovered\n");
3249 goto end;
3250 }
3251
3252 if (ep->recover_callback) {
3253 switch_caller_extension_t *extension = NULL;
3254 switch_channel_t *channel = switch_core_session_get_channel(session);
3255 int r = 0;
3256
3257 if ((r = ep->recover_callback(session)) > 0) {
3258 const char *cbname;
3259
3260 switch_channel_set_flag(session->channel, CF_RECOVERING);
3261
3262
3263 if (switch_channel_get_partner_uuid(channel)) {
3264 switch_channel_set_flag(channel, CF_RECOVERING_BRIDGE);
3265 }
3266
3267 switch_core_media_recover_session(session);
3268
3269 if ((cbname = switch_channel_get_variable(channel, "secondary_recovery_module"))) {
3270 switch_core_recover_callback_t recover_callback;
3271
3272 if ((recover_callback = switch_core_get_secondary_recover_callback(cbname))) {
3273 r = recover_callback(session);
3274 }
3275 }
3276
3277
3278 }
3279
3280 if (r > 0) {
3281
3282 if (!switch_channel_test_flag(channel, CF_RECOVERING_BRIDGE)) {
3283 switch_xml_t callflow, param, x_extension;
3284 if ((extension = switch_caller_extension_new(session, "recovery", "recovery")) == 0) {
3285 abort();
3286 }
3287
3288 if ((callflow = switch_xml_child(xml, "callflow")) && (x_extension = switch_xml_child(callflow, "extension"))) {
3289 for (param = switch_xml_child(x_extension, "application"); param; param = param->next) {
3290 const char *var = switch_xml_attr_soft(param, "app_name");
3291 const char *val = switch_xml_attr_soft(param, "app_data");
3292 /* skip announcement type apps */
3293 if (strcasecmp(var, "speak") && strcasecmp(var, "playback") && strcasecmp(var, "gentones") && strcasecmp(var, "say")) {
3294 switch_caller_extension_add_application(session, extension, var, val);
3295 }
3296 }
3297 }
3298
3299 switch_channel_set_caller_extension(channel, extension);
3300 }
3301
3302 switch_channel_set_state(channel, CS_INIT);
3303 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE,
3304 "Resurrecting fallen channel %s\n", switch_channel_get_name(channel));
3305 switch_core_session_thread_launch(session);
3306
3307 *rp = (*rp) + 1;
3308
3309 }
3310
3311 } else {
3312 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Endpoint %s has no recovery function\n", argv[0]);
3313 }
3314
3315
3316 end:
3317
3318 UNPROTECT_INTERFACE(ep);
3319
3320 switch_xml_free(xml);
3321
3322 return 0;
3323 }
3324
switch_core_recovery_recover(const char * technology,const char * profile_name)3325 SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const char *profile_name)
3326
3327 {
3328 char *sql = NULL;
3329 char *errmsg = NULL;
3330 switch_cache_db_handle_t *dbh;
3331 int r = 0;
3332
3333 if (!sql_manager.manage) {
3334 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "DATABASE NOT AVAIALBLE, REVCOVERY NOT POSSIBLE\n");
3335 return 0;
3336 }
3337
3338 if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
3339 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
3340 return 0;
3341 }
3342
3343 if (zstr(technology)) {
3344
3345 if (zstr(profile_name)) {
3346 sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
3347 "from recovery where runtime_uuid!='%q'",
3348 switch_core_get_uuid());
3349 } else {
3350 sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
3351 "from recovery where runtime_uuid!='%q' and profile_name='%q'",
3352 switch_core_get_uuid(), profile_name);
3353 }
3354
3355 } else {
3356
3357 if (zstr(profile_name)) {
3358 sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
3359 "from recovery where technology='%q' and runtime_uuid!='%q'",
3360 technology, switch_core_get_uuid());
3361 } else {
3362 sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
3363 "from recovery where technology='%q' and runtime_uuid!='%q' and profile_name='%q'",
3364 technology, switch_core_get_uuid(), profile_name);
3365 }
3366 }
3367
3368
3369 switch_cache_db_execute_sql_callback(dbh, sql, recover_callback, &r, &errmsg);
3370
3371 if (errmsg) {
3372 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
3373 switch_safe_free(errmsg);
3374 }
3375
3376 switch_safe_free(sql);
3377
3378 if (zstr(technology)) {
3379 if (zstr(profile_name)) {
3380 sql = switch_mprintf("delete from recovery where runtime_uuid!='%q'",
3381 switch_core_get_uuid());
3382 } else {
3383 sql = switch_mprintf("delete from recovery where runtime_uuid!='%q' and profile_name='%q'",
3384 switch_core_get_uuid(), profile_name);
3385 }
3386 } else {
3387 if (zstr(profile_name)) {
3388 sql = switch_mprintf("delete from recovery where runtime_uuid!='%q' and technology='%q' ",
3389 switch_core_get_uuid(), technology);
3390 } else {
3391 sql = switch_mprintf("delete from recovery where runtime_uuid!='%q' and technology='%q' and profile_name='%q'",
3392 switch_core_get_uuid(), technology, profile_name);
3393 }
3394 }
3395
3396 switch_cache_db_execute_sql(dbh, sql, NULL);
3397 switch_safe_free(sql);
3398
3399 switch_cache_db_release_db_handle(&dbh);
3400
3401 return r;
3402
3403 }
3404
switch_core_dbtype(void)3405 SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_core_dbtype(void)
3406 {
3407 switch_cache_db_handle_type_t type = SCDB_TYPE_CORE_DB;
3408
3409 switch_mutex_lock(sql_manager.ctl_mutex);
3410 if (sql_manager.qm && sql_manager.qm->event_db) {
3411 type = sql_manager.qm->event_db->type;
3412 }
3413 switch_mutex_unlock(sql_manager.ctl_mutex);
3414
3415 return type;
3416 }
3417
switch_core_sql_exec(const char * sql)3418 SWITCH_DECLARE(void) switch_core_sql_exec(const char *sql)
3419 {
3420 if (!sql_manager.manage) {
3421 return;
3422 }
3423
3424 if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
3425 return;
3426 }
3427
3428
3429 switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_TRUE);
3430 }
3431
switch_core_recovery_untrack(switch_core_session_t * session,switch_bool_t force)3432 SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force)
3433 {
3434 char *sql = NULL;
3435 switch_channel_t *channel = switch_core_session_get_channel(session);
3436
3437 if (!sql_manager.manage) {
3438 return;
3439 }
3440
3441 if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) {
3442 return;
3443 }
3444
3445 if (!switch_channel_test_flag(channel, CF_TRACKABLE)) {
3446 return;
3447 }
3448
3449 if ((switch_channel_test_flag(channel, CF_RECOVERING))) {
3450 return;
3451 }
3452
3453 if (switch_channel_test_flag(channel, CF_TRACKED) || force) {
3454
3455 if (force) {
3456 sql = switch_mprintf("delete from recovery where uuid='%q'", switch_core_session_get_uuid(session));
3457
3458 } else {
3459 sql = switch_mprintf("delete from recovery where runtime_uuid='%q' and uuid='%q'",
3460 switch_core_get_uuid(), switch_core_session_get_uuid(session));
3461 }
3462
3463 switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_FALSE);
3464
3465 switch_channel_clear_flag(channel, CF_TRACKED);
3466 }
3467
3468 }
3469
switch_core_recovery_track(switch_core_session_t * session)3470 SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
3471 {
3472 switch_xml_t cdr = NULL;
3473 char *xml_cdr_text = NULL;
3474 char *sql = NULL;
3475 switch_channel_t *channel = switch_core_session_get_channel(session);
3476 const char *profile_name;
3477 const char *technology;
3478
3479 if (!sql_manager.manage) {
3480 return;
3481 }
3482
3483 if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) {
3484 return;
3485 }
3486
3487 if (switch_channel_test_flag(channel, CF_RECOVERING) || !switch_channel_test_flag(channel, CF_TRACKABLE)) {
3488 return;
3489 }
3490
3491 profile_name = switch_channel_get_variable_dup(channel, "recovery_profile_name", SWITCH_FALSE, -1);
3492 technology = session->endpoint_interface->interface_name;
3493
3494 if (switch_ivr_generate_xml_cdr(session, &cdr) == SWITCH_STATUS_SUCCESS) {
3495 xml_cdr_text = switch_xml_toxml_nolock(cdr, SWITCH_FALSE);
3496 switch_xml_free(cdr);
3497 }
3498
3499 if (xml_cdr_text) {
3500 if (switch_channel_test_flag(channel, CF_TRACKED)) {
3501 sql = switch_mprintf("update recovery set metadata='%q' where uuid='%q'", xml_cdr_text, switch_core_session_get_uuid(session));
3502 } else {
3503 sql = switch_mprintf("insert into recovery (runtime_uuid, technology, profile_name, hostname, uuid, metadata) "
3504 "values ('%q','%q','%q','%q','%q','%q')",
3505 switch_core_get_uuid(), switch_str_nil(technology),
3506 switch_str_nil(profile_name), switch_core_get_switchname(), switch_core_session_get_uuid(session), xml_cdr_text);
3507 }
3508
3509 switch_sql_queue_manager_push(sql_manager.qm, sql, 2, SWITCH_FALSE);
3510
3511 switch_safe_free(xml_cdr_text);
3512 switch_channel_set_flag(channel, CF_TRACKED);
3513
3514 }
3515
3516 }
3517
3518
3519
switch_core_add_registration(const char * user,const char * realm,const char * token,const char * url,uint32_t expires,const char * network_ip,const char * network_port,const char * network_proto,const char * metadata)3520 SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, const char *realm, const char *token, const char *url, uint32_t expires,
3521 const char *network_ip, const char *network_port, const char *network_proto,
3522 const char *metadata)
3523 {
3524 char *sql;
3525
3526 if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
3527 return SWITCH_STATUS_FALSE;
3528 }
3529
3530 if (runtime.multiple_registrations) {
3531 sql = switch_mprintf("delete from registrations where hostname='%q' and (url='%q' or token='%q')",
3532 switch_core_get_switchname(), url, switch_str_nil(token));
3533 } else {
3534 sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q'",
3535 user, realm, switch_core_get_switchname());
3536 }
3537
3538 switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
3539
3540 if ( !zstr(metadata) ) {
3541 sql = switch_mprintf("insert into registrations (reg_user,realm,token,url,expires,network_ip,network_port,network_proto,hostname,metadata) "
3542 "values ('%q','%q','%q','%q',%ld,'%q','%q','%q','%q','%q')",
3543 switch_str_nil(user),
3544 switch_str_nil(realm),
3545 switch_str_nil(token),
3546 switch_str_nil(url),
3547 expires,
3548 switch_str_nil(network_ip),
3549 switch_str_nil(network_port),
3550 switch_str_nil(network_proto),
3551 switch_core_get_switchname(),
3552 metadata
3553 );
3554 } else {
3555 sql = switch_mprintf("insert into registrations (reg_user,realm,token,url,expires,network_ip,network_port,network_proto,hostname) "
3556 "values ('%q','%q','%q','%q',%ld,'%q','%q','%q','%q')",
3557 switch_str_nil(user),
3558 switch_str_nil(realm),
3559 switch_str_nil(token),
3560 switch_str_nil(url),
3561 expires,
3562 switch_str_nil(network_ip),
3563 switch_str_nil(network_port),
3564 switch_str_nil(network_proto),
3565 switch_core_get_switchname()
3566 );
3567 }
3568
3569
3570 switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
3571
3572 return SWITCH_STATUS_SUCCESS;
3573 }
3574
switch_core_del_registration(const char * user,const char * realm,const char * token)3575 SWITCH_DECLARE(switch_status_t) switch_core_del_registration(const char *user, const char *realm, const char *token)
3576 {
3577
3578 char *sql;
3579
3580 if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
3581 return SWITCH_STATUS_FALSE;
3582 }
3583
3584 if (!zstr(token) && runtime.multiple_registrations) {
3585 sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q' and token='%q'", user, realm, switch_core_get_switchname(), token);
3586 } else {
3587 sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q'", user, realm, switch_core_get_switchname());
3588 }
3589
3590 switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
3591
3592
3593 return SWITCH_STATUS_SUCCESS;
3594 }
3595
switch_core_expire_registration(int force)3596 SWITCH_DECLARE(switch_status_t) switch_core_expire_registration(int force)
3597 {
3598
3599 char *sql;
3600 time_t now;
3601
3602 if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
3603 return SWITCH_STATUS_FALSE;
3604 }
3605
3606 now = switch_epoch_time_now(NULL);
3607
3608 if (force) {
3609 sql = switch_mprintf("delete from registrations where hostname='%q'", switch_core_get_switchname());
3610 } else {
3611 sql = switch_mprintf("delete from registrations where expires > 0 and expires <= %ld and hostname='%q'", now, switch_core_get_switchname());
3612 }
3613
3614 switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
3615
3616 return SWITCH_STATUS_SUCCESS;
3617
3618 }
3619
switch_core_sqldb_start(switch_memory_pool_t * pool,switch_bool_t manage)3620 switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage)
3621 {
3622 switch_threadattr_t *thd_attr;
3623
3624 sql_manager.memory_pool = pool;
3625 sql_manager.manage = manage;
3626
3627 switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
3628 switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
3629 switch_mutex_init(&sql_manager.ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
3630
3631 if (!sql_manager.manage) goto skip;
3632
3633 top:
3634
3635 /* Activate SQL database */
3636 if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) {
3637 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
3638
3639 if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
3640 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC IS REQUIRED!\n");
3641 return SWITCH_STATUS_FALSE;
3642 }
3643
3644 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "CORE DATABASE INITIALIZATION FAILURE! CHECK `core-db-dsn`!\n");
3645
3646 switch_clear_flag((&runtime), SCF_USE_SQL);
3647 return SWITCH_STATUS_FALSE;
3648 }
3649
3650
3651 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Opening DB\n");
3652
3653 switch (sql_manager.dbh->type) {
3654 case SCDB_TYPE_DATABASE_INTERFACE:
3655 case SCDB_TYPE_ODBC:
3656 if (switch_test_flag((&runtime), SCF_CLEAR_SQL)) {
3657 char sql[512] = "";
3658 char *tables[] = { "channels", "calls", "tasks", NULL };
3659 int i;
3660 const char *hostname = switch_core_get_switchname();
3661
3662 for (i = 0; tables[i]; i++) {
3663 switch_snprintfv(sql, sizeof(sql), "delete from %q where hostname='%q'", tables[i], hostname);
3664 switch_cache_db_execute_sql(sql_manager.dbh, sql, NULL);
3665 }
3666 }
3667 break;
3668 case SCDB_TYPE_CORE_DB:
3669 {
3670 switch_cache_db_execute_sql(sql_manager.dbh, "drop table channels", NULL);
3671 switch_cache_db_execute_sql(sql_manager.dbh, "drop table calls", NULL);
3672 switch_cache_db_execute_sql(sql_manager.dbh, "drop view detailed_calls", NULL);
3673 switch_cache_db_execute_sql(sql_manager.dbh, "drop view basic_calls", NULL);
3674 switch_cache_db_execute_sql(sql_manager.dbh, "drop table interfaces", NULL);
3675 switch_cache_db_execute_sql(sql_manager.dbh, "drop table tasks", NULL);
3676 switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA synchronous=OFF;", NULL);
3677 switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA count_changes=OFF;", NULL);
3678 switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA default_cache_size=8000", NULL);
3679 switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA temp_store=MEMORY;", NULL);
3680 switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA journal_mode=OFF;", NULL);
3681 }
3682 break;
3683 }
3684
3685 switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from aliases", "DROP TABLE aliases", create_alias_sql);
3686 switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from complete", "DROP TABLE complete", create_complete_sql);
3687 switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from nat", "DROP TABLE nat", create_nat_sql);
3688 switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user=''",
3689 "DROP TABLE registrations", create_registrations_sql);
3690
3691 switch_cache_db_test_reactive(sql_manager.dbh, "select metadata from registrations", NULL, "ALTER TABLE registrations ADD COLUMN metadata VARCHAR(256)");
3692
3693
3694 switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql);
3695 switch_cache_db_create_schema(sql_manager.dbh, "create index recovery1 on recovery(technology)", NULL);
3696 switch_cache_db_create_schema(sql_manager.dbh, "create index recovery2 on recovery(profile_name)", NULL);
3697 switch_cache_db_create_schema(sql_manager.dbh, "create index recovery3 on recovery(uuid)", NULL);
3698 switch_cache_db_create_schema(sql_manager.dbh, "create index recovery3 on recovery(runtime_uuid)", NULL);
3699
3700
3701
3702
3703 switch (sql_manager.dbh->type) {
3704 case SCDB_TYPE_DATABASE_INTERFACE:
3705 case SCDB_TYPE_ODBC:
3706 {
3707 char *err;
3708 int result = 0;
3709
3710 switch_cache_db_test_reactive_ex(sql_manager.dbh, "select call_uuid, read_bit_rate, sent_callee_name, initial_cid_name, initial_cid_num, initial_ip_addr, initial_dest, initial_dialplan, initial_context, accountcode from channels", "DROP TABLE channels", create_channels_sql, create_row_size_limited_channels_sql);
3711 switch_cache_db_test_reactive(sql_manager.dbh, "select call_uuid from calls", "DROP TABLE calls", create_calls_sql);
3712 switch_cache_db_test_reactive(sql_manager.dbh, "select * from basic_calls where sent_callee_name=''", "DROP VIEW basic_calls", basic_calls_sql);
3713 switch_cache_db_test_reactive(sql_manager.dbh, "select * from detailed_calls where sent_callee_name=''", "DROP VIEW detailed_calls", detailed_calls_sql);
3714 if (runtime.odbc_dbtype == DBTYPE_DEFAULT) {
3715 switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user=''",
3716 "DROP TABLE registrations", create_registrations_sql);
3717 } else {
3718 char *tmp = switch_string_replace(create_registrations_sql, "url TEXT", "url VARCHAR(max)");
3719 switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user=''",
3720 "DROP TABLE registrations", tmp);
3721 free(tmp);
3722 }
3723 switch_cache_db_test_reactive(sql_manager.dbh, "select ikey from interfaces", "DROP TABLE interfaces", create_interfaces_sql);
3724 switch_cache_db_test_reactive(sql_manager.dbh, "select task_id, task_desc, task_group, task_runtime, task_sql_manager, hostname from tasks",
3725 "DROP TABLE tasks", create_tasks_sql);
3726
3727
3728 switch(sql_manager.dbh->type) {
3729 case SCDB_TYPE_CORE_DB:
3730 {
3731 switch_cache_db_execute_sql_real(sql_manager.dbh, "BEGIN EXCLUSIVE", &err);
3732 }
3733 break;
3734 case SCDB_TYPE_ODBC:
3735 {
3736 switch_odbc_status_t result;
3737
3738 if ((result = switch_odbc_SQLSetAutoCommitAttr(sql_manager.dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
3739 char tmp[100];
3740 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
3741 err = strdup(tmp);
3742 }
3743 }
3744 break;
3745 case SCDB_TYPE_DATABASE_INTERFACE:
3746 {
3747 switch_database_interface_t *database_interface = sql_manager.dbh->native_handle.database_interface_dbh->connection_options.database_interface;
3748 switch_status_t result;
3749
3750 if ((result = database_interface->sql_set_auto_commit_attr(sql_manager.dbh->native_handle.database_interface_dbh, 0)) != SWITCH_STATUS_SUCCESS) {
3751 char tmp[100];
3752 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
3753 err = strdup(tmp);
3754 }
3755 }
3756 break;
3757 }
3758
3759 switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels where hostname=''", &err);
3760 if (!err) {
3761 switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels where hostname=''", &err);
3762
3763 switch(sql_manager.dbh->type) {
3764 case SCDB_TYPE_CORE_DB:
3765 {
3766 switch_cache_db_execute_sql_real(sql_manager.dbh, "COMMIT", &err);
3767 }
3768 break;
3769 case SCDB_TYPE_ODBC:
3770 {
3771 if (switch_odbc_SQLEndTran(sql_manager.dbh->native_handle.odbc_dbh, 1) != SWITCH_ODBC_SUCCESS ||
3772 switch_odbc_SQLSetAutoCommitAttr(sql_manager.dbh->native_handle.odbc_dbh, 1) != SWITCH_ODBC_SUCCESS) {
3773 char tmp[100];
3774 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to commit transaction.", result);
3775 err = strdup(tmp);
3776 }
3777 }
3778 break;
3779 case SCDB_TYPE_DATABASE_INTERFACE:
3780 {
3781 switch_database_interface_t *database_interface = sql_manager.dbh->native_handle.database_interface_dbh->connection_options.database_interface;
3782 switch_status_t result;
3783
3784 if ((result = database_interface->commit(sql_manager.dbh->native_handle.database_interface_dbh)) != SWITCH_STATUS_SUCCESS) {
3785 char tmp[100];
3786 switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to commit transaction", result);
3787 err = strdup(tmp);
3788 }
3789 }
3790 break;
3791 }
3792 }
3793
3794
3795 if (err) {
3796 //runtime.odbc_dsn = NULL;
3797 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Database Error [%s]\n", err);
3798 //switch_cache_db_release_db_handle(&sql_manager.dbh);
3799 if (switch_stristr("read-only", err)) {
3800 switch_safe_free(err);
3801 } else {
3802 switch_safe_free(err);
3803 goto top;
3804 }
3805 }
3806 }
3807 break;
3808 case SCDB_TYPE_CORE_DB:
3809 {
3810 switch_cache_db_execute_sql(sql_manager.dbh, create_channels_sql, NULL);
3811 switch_cache_db_execute_sql(sql_manager.dbh, create_calls_sql, NULL);
3812 switch_cache_db_execute_sql(sql_manager.dbh, create_interfaces_sql, NULL);
3813 switch_cache_db_execute_sql(sql_manager.dbh, create_tasks_sql, NULL);
3814 switch_cache_db_execute_sql(sql_manager.dbh, detailed_calls_sql, NULL);
3815 switch_cache_db_execute_sql(sql_manager.dbh, basic_calls_sql, NULL);
3816
3817 if (sql_manager.dbh->native_handle.core_db_dbh->in_memory == SWITCH_TRUE) {
3818 switch_set_flag(sql_manager.dbh, CDF_NONEXPIRING);
3819 }
3820 }
3821 break;
3822 }
3823
3824 if (switch_test_flag((&runtime), SCF_CLEAR_SQL)) {
3825 char sql[512] = "";
3826 char *tables[] = { "complete", "aliases", "nat", NULL };
3827 int i;
3828 const char *hostname = switch_core_get_hostname();
3829
3830 for (i = 0; tables[i]; i++) {
3831 switch_snprintfv(sql, sizeof(sql), "delete from %q where sticky=0 and hostname='%q'", tables[i], hostname);
3832 switch_cache_db_execute_sql(sql_manager.dbh, sql, NULL);
3833 }
3834
3835 switch_snprintfv(sql, sizeof(sql), "delete from interfaces where hostname='%q'", hostname);
3836 switch_cache_db_execute_sql(sql_manager.dbh, sql, NULL);
3837 }
3838
3839 switch_cache_db_create_schema(sql_manager.dbh, "create index alias1 on aliases (alias)", NULL);
3840 switch_cache_db_create_schema(sql_manager.dbh, "create index tasks1 on tasks (hostname,task_id)", NULL);
3841 switch_cache_db_create_schema(sql_manager.dbh, "create index complete1 on complete (a1,hostname)", NULL);
3842 switch_cache_db_create_schema(sql_manager.dbh, "create index complete2 on complete (a2,hostname)", NULL);
3843 switch_cache_db_create_schema(sql_manager.dbh, "create index complete3 on complete (a3,hostname)", NULL);
3844 switch_cache_db_create_schema(sql_manager.dbh, "create index complete4 on complete (a4,hostname)", NULL);
3845 switch_cache_db_create_schema(sql_manager.dbh, "create index complete5 on complete (a5,hostname)", NULL);
3846 switch_cache_db_create_schema(sql_manager.dbh, "create index complete6 on complete (a6,hostname)", NULL);
3847 switch_cache_db_create_schema(sql_manager.dbh, "create index complete7 on complete (a7,hostname)", NULL);
3848 switch_cache_db_create_schema(sql_manager.dbh, "create index complete8 on complete (a8,hostname)", NULL);
3849 switch_cache_db_create_schema(sql_manager.dbh, "create index complete9 on complete (a9,hostname)", NULL);
3850 switch_cache_db_create_schema(sql_manager.dbh, "create index complete10 on complete (a10,hostname)", NULL);
3851 switch_cache_db_create_schema(sql_manager.dbh, "create index complete11 on complete (a1,a2,a3,a4,a5,a6,a7,a8,a9,a10,hostname)", NULL);
3852 switch_cache_db_create_schema(sql_manager.dbh, "create index nat_map_port_proto on nat (port,proto,hostname)", NULL);
3853 switch_cache_db_create_schema(sql_manager.dbh, "create index chidx1 on channels (hostname)", NULL);
3854 switch_cache_db_create_schema(sql_manager.dbh, "create index uuindex on channels (uuid, hostname)", NULL);
3855 switch_cache_db_create_schema(sql_manager.dbh, "create index uuindex2 on channels (call_uuid)", NULL);
3856 switch_cache_db_create_schema(sql_manager.dbh, "create index callsidx1 on calls (hostname)", NULL);
3857 switch_cache_db_create_schema(sql_manager.dbh, "create index eruuindex on calls (caller_uuid, hostname)", NULL);
3858 switch_cache_db_create_schema(sql_manager.dbh, "create index eeuuindex on calls (callee_uuid)", NULL);
3859 switch_cache_db_create_schema(sql_manager.dbh, "create index eeuuindex2 on calls (call_uuid)", NULL);
3860 switch_cache_db_create_schema(sql_manager.dbh, "create index regindex1 on registrations (reg_user,realm,hostname)", NULL);
3861
3862
3863 skip:
3864
3865 if (sql_manager.manage) {
3866 /* Initiate switch_sql_queue_manager */
3867 switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
3868 switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
3869 switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
3870 switch_core_sqldb_start_thread();
3871 switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool);
3872
3873 /* switch_sql_queue_manager initiated, now we can bind to core_event_handler */
3874 #ifdef SWITCH_SQL_BIND_EVERY_EVENT
3875 switch_event_bind("core_db", SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3876 #else
3877 switch_event_bind("core_db", SWITCH_EVENT_ADD_SCHEDULE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3878 switch_event_bind("core_db", SWITCH_EVENT_DEL_SCHEDULE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3879 switch_event_bind("core_db", SWITCH_EVENT_EXE_SCHEDULE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3880 switch_event_bind("core_db", SWITCH_EVENT_RE_SCHEDULE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3881 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_DESTROY, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3882 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_UUID, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3883 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_CREATE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3884 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_ANSWER, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3885 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_PROGRESS_MEDIA, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3886 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_HOLD, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3887 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_UNHOLD, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3888 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_EXECUTE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3889 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_ORIGINATE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3890 switch_event_bind("core_db", SWITCH_EVENT_CALL_UPDATE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3891 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_CALLSTATE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3892 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_STATE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3893 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_BRIDGE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3894 switch_event_bind("core_db", SWITCH_EVENT_CHANNEL_UNBRIDGE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3895 switch_event_bind("core_db", SWITCH_EVENT_SHUTDOWN, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3896 switch_event_bind("core_db", SWITCH_EVENT_LOG, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3897 switch_event_bind("core_db", SWITCH_EVENT_MODULE_LOAD, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3898 switch_event_bind("core_db", SWITCH_EVENT_MODULE_UNLOAD, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3899 switch_event_bind("core_db", SWITCH_EVENT_CALL_SECURE, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3900 switch_event_bind("core_db", SWITCH_EVENT_NAT, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3901 switch_event_bind("core_db", SWITCH_EVENT_CODEC, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
3902 #endif
3903 }
3904
3905 switch_cache_db_release_db_handle(&sql_manager.dbh);
3906
3907 return SWITCH_STATUS_SUCCESS;
3908 }
3909
switch_core_sqldb_pause(void)3910 SWITCH_DECLARE(void) switch_core_sqldb_pause(void)
3911 {
3912 if (sql_manager.paused) {
3913 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already paused.\n");
3914 }
3915 sql_manager.paused = 1;
3916 }
3917
switch_core_sqldb_resume(void)3918 SWITCH_DECLARE(void) switch_core_sqldb_resume(void)
3919 {
3920 if (!sql_manager.paused) {
3921 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already running.\n");
3922 }
3923 sql_manager.paused = 0;
3924 }
3925
3926
switch_core_sqldb_stop_thread(void)3927 static void switch_core_sqldb_stop_thread(void)
3928 {
3929 switch_mutex_lock(sql_manager.ctl_mutex);
3930 if (sql_manager.manage) {
3931 if (sql_manager.qm) {
3932 switch_sql_queue_manager_destroy(&sql_manager.qm);
3933 }
3934 } else {
3935 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
3936 }
3937
3938 switch_mutex_unlock(sql_manager.ctl_mutex);
3939 }
3940
switch_core_sqldb_start_thread(void)3941 static void switch_core_sqldb_start_thread(void)
3942 {
3943
3944 switch_mutex_lock(sql_manager.ctl_mutex);
3945 if (sql_manager.manage) {
3946 if (!sql_manager.qm) {
3947 char *dbname = runtime.odbc_dsn;
3948
3949 if (zstr(dbname)) {
3950 dbname = runtime.dbname;
3951 if (zstr(dbname)) {
3952 dbname = "core";
3953 }
3954 }
3955
3956 switch_sql_queue_manager_init_name("CORE",
3957 &sql_manager.qm,
3958 4,
3959 dbname,
3960 SWITCH_MAX_TRANS,
3961 runtime.core_db_pre_trans_execute,
3962 runtime.core_db_post_trans_execute,
3963 runtime.core_db_inner_pre_trans_execute,
3964 runtime.core_db_inner_post_trans_execute);
3965
3966 }
3967 switch_sql_queue_manager_start(sql_manager.qm);
3968 } else {
3969 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
3970 }
3971 switch_mutex_unlock(sql_manager.ctl_mutex);
3972 }
3973
switch_core_sqldb_stop(void)3974 void switch_core_sqldb_stop(void)
3975 {
3976 switch_status_t st;
3977
3978 switch_event_unbind_callback(core_event_handler);
3979
3980 if (sql_manager.db_thread && sql_manager.db_thread_running) {
3981 sql_manager.db_thread_running = -1;
3982 switch_thread_join(&st, sql_manager.db_thread);
3983 }
3984
3985 switch_core_sqldb_stop_thread();
3986
3987 switch_cache_db_flush_handles();
3988 sql_close(0);
3989 }
3990
switch_cache_db_status(switch_stream_handle_t * stream)3991 SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
3992 {
3993 /* return some status info suitable for the cli */
3994 switch_cache_db_handle_t *dbh = NULL;
3995 switch_bool_t locked = SWITCH_FALSE;
3996 time_t now = switch_epoch_time_now(NULL);
3997 char cleankey_str[CACHE_DB_LEN];
3998 char *pos1 = NULL;
3999 char *pos2 = NULL;
4000 int count = 0, used = 0;
4001
4002 switch_mutex_lock(sql_manager.dbh_mutex);
4003
4004 for (dbh = sql_manager.handle_pool; dbh; dbh = dbh->next) {
4005 char *needles[3];
4006 time_t diff = 0;
4007 int i = 0;
4008
4009 needles[0] = "pass=\"";
4010 needles[1] = "password=";
4011 needles[2] = "password='";
4012
4013 diff = now - dbh->last_used;
4014
4015 if (switch_mutex_trylock(dbh->mutex) == SWITCH_STATUS_SUCCESS) {
4016 switch_mutex_unlock(dbh->mutex);
4017 locked = SWITCH_FALSE;
4018 } else {
4019 locked = SWITCH_TRUE;
4020 }
4021
4022 /* sanitize password */
4023 memset(cleankey_str, 0, sizeof(cleankey_str));
4024 for (i = 0; i < 3; i++) {
4025 if((pos1 = strstr(dbh->name, needles[i]))) {
4026 pos1 += strlen(needles[i]);
4027
4028 if (!(pos2 = strstr(pos1, "\""))) {
4029 if (!(pos2 = strstr(pos1, "'"))) {
4030 if (!(pos2 = strstr(pos1, " "))) {
4031 pos2 = pos1 + strlen(pos1);
4032 }
4033 }
4034 }
4035 strncpy(cleankey_str, dbh->name, pos1 - dbh->name);
4036 strcpy(&cleankey_str[pos1 - dbh->name], pos2);
4037 break;
4038 }
4039 }
4040 if (i == 3) {
4041 snprintf(cleankey_str, sizeof(cleankey_str), "%s", dbh->name);
4042 }
4043
4044 count++;
4045
4046 if (dbh->use_count) {
4047 used++;
4048 }
4049
4050 stream->write_function(stream, "%s\n\tType: %s\n\tLast used: %d\n\tTotal used: %ld\n\tFlags: %s, %s(%d)%s\n"
4051 "\tCreator: %s\n\tLast User: %s\n",
4052 cleankey_str,
4053 switch_cache_db_type_name(dbh->type),
4054 diff,
4055 dbh->total_used_count,
4056 locked ? "Locked" : "Unlocked",
4057 dbh->use_count ? "Attached" : "Detached", dbh->use_count, switch_test_flag(dbh, CDF_NONEXPIRING) ? ", Non-expiring" : "", dbh->creator, dbh->last_user);
4058 }
4059
4060 stream->write_function(stream, "%d total. %d in use.\n", count, used);
4061
4062 switch_mutex_unlock(sql_manager.dbh_mutex);
4063 }
4064
switch_sql_concat(void)4065 SWITCH_DECLARE(char*)switch_sql_concat(void)
4066 {
4067 if(runtime.odbc_dbtype == DBTYPE_MSSQL)
4068 return "+";
4069
4070 return "||";
4071 }
4072
4073 /* For Emacs:
4074 * Local Variables:
4075 * mode:c
4076 * indent-tabs-mode:t
4077 * tab-width:4
4078 * c-basic-offset:4
4079 * End:
4080 * For VIM:
4081 * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
4082 */
4083