1 /*
2 * mod_pgsql for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3 * Copyright (C) 2005-2019, Anthony Minessale II <anthm@freeswitch.org>
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is
20 * Anthony Minessale II <anthm@freeswitch.org>
21 * Portions created by the Initial Developer are Copyright (C)
22 * the Initial Developer. All Rights Reserved.
23 *
24 * Contributor(s):
25 * Anthony Minessale II <anthm@freeswitch.org>
26 * Eliot Gable <egable@gmail.com>
27 * Seven Du <dujinfang@gmail.com>
28 * Andrey Volk <andywolk@gmail.com>
29 *
30 * mod_pgsql.c -- PostgreSQL FreeSWITCH module
31 *
32 */
33
34 #define SWITCH_PGSQL_H
35
36 #include <switch.h>
37
38 #include <libpq-fe.h>
39
40 #ifndef _WIN32
41 #include <poll.h>
42 #else
43 #include <WinSock2.h>
44 #endif
45
46 switch_loadable_module_interface_t *MODULE_INTERFACE;
47 static char *supported_prefixes[4] = { 0 };
48
49 SWITCH_MODULE_LOAD_FUNCTION(mod_pgsql_load);
50 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_pgsql_shutdown);
51 SWITCH_MODULE_DEFINITION(mod_pgsql, mod_pgsql_load, mod_pgsql_shutdown, NULL);
52
53 #define DEFAULT_PGSQL_RETRIES 120
54
55 typedef enum {
56 SWITCH_PGSQL_STATE_INIT,
57 SWITCH_PGSQL_STATE_DOWN,
58 SWITCH_PGSQL_STATE_CONNECTED,
59 SWITCH_PGSQL_STATE_ERROR
60 } switch_pgsql_state_t;
61
62 struct switch_pgsql_handle {
63 char *dsn;
64 char *sql;
65 PGconn* con;
66 int sock;
67 switch_pgsql_state_t state;
68 int affected_rows;
69 int num_retries;
70 switch_bool_t auto_commit;
71 switch_bool_t in_txn;
72 };
73
74 struct switch_pgsql_result {
75 PGresult *result;
76 ExecStatusType status;
77 char *err;
78 int rows;
79 int cols;
80 };
81
82 typedef struct switch_pgsql_handle switch_pgsql_handle_t;
83 typedef struct switch_pgsql_result switch_pgsql_result_t;
84
85 switch_status_t pgsql_handle_connect(switch_pgsql_handle_t *handle);
86 switch_status_t pgsql_handle_destroy(switch_database_interface_handle_t **dih);
87 switch_status_t pgsql_cancel_real(const char *file, const char *func, int line, switch_pgsql_handle_t *handle);
88 switch_status_t pgsql_next_result_timed(switch_pgsql_handle_t *handle, switch_pgsql_result_t **result_out, int msec);
89
90 #define pgsql_handle_exec_base(handle, sql, err) pgsql_handle_exec_base_detailed(__FILE__, (char * )__SWITCH_FUNC__, __LINE__, handle, sql, err)
91 #define pgsql_next_result(h, r) pgsql_next_result_timed(h, r, 10000)
92 #define pgsql_finish_results(handle) pgsql_finish_results_real(__FILE__, (char * )__SWITCH_FUNC__, __LINE__, handle)
93 #define pgsql_cancel(handle) pgsql_cancel_real(__FILE__, (char * )__SWITCH_FUNC__, __LINE__, handle)
94
pgsql_handle_get_error(switch_pgsql_handle_t * handle)95 char * pgsql_handle_get_error(switch_pgsql_handle_t *handle)
96 {
97 char * err_str;
98
99 if (!handle) {
100 return NULL;
101 }
102
103 switch_strdup(err_str, PQerrorMessage(handle->con));
104
105 return err_str;
106 }
107
db_is_up(switch_pgsql_handle_t * handle)108 static int db_is_up(switch_pgsql_handle_t *handle)
109 {
110 int ret = 0;
111 switch_event_t *event;
112 char *err_str = NULL;
113 int max_tries = DEFAULT_PGSQL_RETRIES;
114 int code = 0;
115 int recon = 0;
116
117 if (handle) {
118 max_tries = handle->num_retries;
119 if (max_tries < 1)
120 max_tries = DEFAULT_PGSQL_RETRIES;
121 }
122
123 top:
124
125 if (!handle) {
126 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "No DB Handle\n");
127 goto done;
128 }
129 if (!handle->con) {
130 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "No DB Connection\n");
131 goto done;
132 }
133
134 /* Try a non-blocking read on the connection to gobble up any EOF from a closed connection and mark the connection BAD if it is closed. */
135 PQconsumeInput(handle->con);
136
137 if (PQstatus(handle->con) == CONNECTION_BAD) {
138 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "PQstatus returned bad connection; reconnecting...\n");
139 handle->state = SWITCH_PGSQL_STATE_ERROR;
140 PQreset(handle->con);
141 if (PQstatus(handle->con) == CONNECTION_BAD) {
142 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "PQstatus returned bad connection -- reconnection failed!\n");
143 goto error;
144 }
145 handle->state = SWITCH_PGSQL_STATE_CONNECTED;
146 handle->sock = PQsocket(handle->con);
147 }
148
149 ret = 1;
150 goto done;
151
152 error:
153 err_str = pgsql_handle_get_error(handle);
154
155 if (PQstatus(handle->con) == CONNECTION_BAD) {
156 handle->state = SWITCH_PGSQL_STATE_ERROR;
157 PQreset(handle->con);
158 if (PQstatus(handle->con) == CONNECTION_OK) {
159 handle->state = SWITCH_PGSQL_STATE_CONNECTED;
160 recon = SWITCH_STATUS_SUCCESS;
161 handle->sock = PQsocket(handle->con);
162 }
163 }
164
165 max_tries--;
166
167 if (switch_event_create(&event, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) {
168 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Failure-Message", "The sql server is not responding for DSN %s [%s][%d]",
169 switch_str_nil(handle->dsn), switch_str_nil(err_str), code);
170 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The sql server is not responding for DSN %s [%s][%d]\n",
171 switch_str_nil(handle->dsn), switch_str_nil(err_str), code);
172
173 if (recon == SWITCH_STATUS_SUCCESS) {
174 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "The connection has been re-established");
175 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "The connection has been re-established\n");
176 } else {
177 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "The connection could not be re-established");
178 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The connection could not be re-established\n");
179 }
180 if (!max_tries) {
181 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "Giving up!");
182 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Giving up!\n");
183 }
184
185 switch_event_fire(&event);
186 }
187
188 if (!max_tries) {
189 goto done;
190 }
191
192 switch_safe_free(err_str);
193 switch_yield(1000000);
194 goto top;
195
196 done:
197
198 switch_safe_free(err_str);
199
200 return ret;
201 }
202
pgsql_free_result(switch_pgsql_result_t ** result)203 void pgsql_free_result(switch_pgsql_result_t **result)
204 {
205 if (!*result) {
206 return;
207 }
208
209 if ((*result)->result) {
210 PQclear((*result)->result);
211 }
212 free(*result);
213 *result = NULL;
214 }
215
pgsql_finish_results_real(const char * file,const char * func,int line,switch_pgsql_handle_t * handle)216 switch_status_t pgsql_finish_results_real(const char* file, const char* func, int line, switch_pgsql_handle_t *handle)
217 {
218 switch_pgsql_result_t *res = NULL;
219 switch_status_t final_status = SWITCH_STATUS_SUCCESS;
220 int done = 0;
221
222 do {
223 pgsql_next_result(handle, &res);
224 if (res && res->err && !switch_stristr("already exists", res->err) && !switch_stristr("duplicate key name", res->err)) {
225 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "Error executing query:\n%s\n", res->err);
226 final_status = SWITCH_STATUS_FALSE;
227 }
228
229 if (!res) {
230 done = 1;
231 } else if (res->result) {
232 char *affected_rows = PQcmdTuples(res->result);
233
234 if (!zstr(affected_rows)) {
235 handle->affected_rows = atoi(affected_rows);
236 }
237 }
238
239 pgsql_free_result(&res);
240 } while (!done);
241
242 return final_status;
243 }
244
pgsql_handle_affected_rows(switch_database_interface_handle_t * dih,int * affected_rows)245 switch_status_t pgsql_handle_affected_rows(switch_database_interface_handle_t *dih, int *affected_rows)
246 {
247 switch_pgsql_handle_t *handle = NULL;
248
249 if (!dih) {
250 return SWITCH_STATUS_FALSE;
251 }
252
253 handle = dih->handle;
254
255 if (!handle)
256 return SWITCH_STATUS_FALSE;
257
258 *affected_rows = handle->affected_rows;
259
260 return SWITCH_STATUS_SUCCESS;
261 }
262
pgsql_handle_new(switch_cache_db_database_interface_options_t database_interface_options,switch_database_interface_handle_t ** dih)263 switch_status_t pgsql_handle_new(switch_cache_db_database_interface_options_t database_interface_options, switch_database_interface_handle_t **dih)
264 {
265 switch_pgsql_handle_t *new_handle = NULL;
266
267 if (!(*dih = malloc(sizeof(**dih)))) {
268 goto err;
269 }
270
271 if (!(new_handle = malloc(sizeof(*new_handle)))) {
272 goto err;
273 }
274
275 memset(new_handle, 0, sizeof(*new_handle));
276
277 if (!strcasecmp(database_interface_options.prefix, "postgresql") || !strcasecmp(database_interface_options.prefix, "postgres")) {
278 new_handle->dsn = strdup(database_interface_options.original_dsn);
279 } else if (!strcasecmp(database_interface_options.prefix, "pgsql")) {
280 new_handle->dsn = strdup(database_interface_options.connection_string);
281 }
282
283 if (!new_handle->dsn) {
284 goto err;
285 }
286
287 new_handle->sock = 0;
288 new_handle->state = SWITCH_PGSQL_STATE_INIT;
289 new_handle->con = NULL;
290 new_handle->affected_rows = 0;
291 new_handle->num_retries = DEFAULT_PGSQL_RETRIES;
292 new_handle->auto_commit = SWITCH_TRUE;
293 new_handle->in_txn = SWITCH_FALSE;
294
295 (*dih)->handle = new_handle;
296
297 if (pgsql_handle_connect(new_handle) != SWITCH_STATUS_SUCCESS) {
298 if (pgsql_handle_destroy(dih) != SWITCH_STATUS_SUCCESS) {
299 goto err;
300 }
301 }
302
303 return SWITCH_STATUS_SUCCESS;
304
305 err:
306 switch_safe_free(*dih);
307
308 if (new_handle) {
309 switch_safe_free(new_handle->dsn);
310 switch_safe_free(new_handle);
311 }
312
313 return SWITCH_STATUS_FALSE;
314 }
315
pgsql_handle_disconnect(switch_pgsql_handle_t * handle)316 switch_status_t pgsql_handle_disconnect(switch_pgsql_handle_t *handle)
317 {
318 if (!handle) {
319 return SWITCH_STATUS_FALSE;
320 }
321
322 if (handle->state == SWITCH_PGSQL_STATE_CONNECTED) {
323 PQfinish(handle->con);
324 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Disconnected from [%s]\n", handle->dsn);
325 }
326 switch_safe_free(handle->sql);
327 handle->state = SWITCH_PGSQL_STATE_DOWN;
328
329 return SWITCH_STATUS_SUCCESS;
330 }
331
pgsql_handle_connect(switch_pgsql_handle_t * handle)332 switch_status_t pgsql_handle_connect(switch_pgsql_handle_t *handle)
333 {
334 if (!handle) {
335 return SWITCH_STATUS_FALSE;
336 }
337
338 if (handle->state == SWITCH_PGSQL_STATE_CONNECTED) {
339 pgsql_handle_disconnect(handle);
340 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "Re-connecting %s\n", handle->dsn);
341 }
342
343 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "Connecting %s\n", handle->dsn);
344 PQinitSSL(0);
345
346 handle->con = PQconnectdb(handle->dsn);
347 if (PQstatus(handle->con) != CONNECTION_OK) {
348 char *err_str;
349
350 if ((err_str = pgsql_handle_get_error(handle))) {
351 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%s\n", err_str);
352 switch_safe_free(err_str);
353 } else {
354 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to connect to the database [%s]\n", handle->dsn);
355 pgsql_handle_disconnect(handle);
356 }
357
358 return SWITCH_STATUS_FALSE;
359 }
360
361 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "Connected to [%s]\n", handle->dsn);
362 handle->state = SWITCH_PGSQL_STATE_CONNECTED;
363 handle->sock = PQsocket(handle->con);
364
365 return SWITCH_STATUS_SUCCESS;
366 }
367
pgsql_handle_destroy(switch_database_interface_handle_t ** dih)368 switch_status_t pgsql_handle_destroy(switch_database_interface_handle_t **dih)
369 {
370 switch_pgsql_handle_t *handle = NULL;
371
372 if (!dih) {
373 return SWITCH_STATUS_FALSE;
374 }
375
376 handle = (*dih)->handle;
377
378 if (handle) {
379 pgsql_handle_disconnect(handle);
380
381 switch_safe_free(handle->dsn);
382 free(handle);
383 }
384
385 switch_safe_free(*dih);
386
387 return SWITCH_STATUS_SUCCESS;
388 }
389
pgsql_flush(switch_pgsql_handle_t * handle)390 switch_status_t pgsql_flush(switch_pgsql_handle_t *handle)
391 {
392 PGresult *tmp = NULL;
393 int x = 0;
394
395 if (!handle) {
396 return SWITCH_STATUS_FALSE;
397 }
398
399 /* Make sure the query is fully cleared */
400 while ((tmp = PQgetResult(handle->con)) != NULL) {
401 PQclear(tmp);
402 x++;
403 }
404
405 if (x) {
406 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Flushing %d results\n", x);
407 }
408
409 return SWITCH_STATUS_SUCCESS;
410 }
411
database_flush(switch_database_interface_handle_t * dih)412 switch_status_t database_flush(switch_database_interface_handle_t *dih)
413 {
414 switch_pgsql_handle_t *handle;
415
416 if (!dih) {
417 return SWITCH_STATUS_FALSE;
418 }
419
420 handle = dih->handle;
421
422 return pgsql_flush(handle);
423 }
424
pgsql_send_query(switch_pgsql_handle_t * handle,const char * sql)425 switch_status_t pgsql_send_query(switch_pgsql_handle_t *handle, const char* sql)
426 {
427 char *err_str;
428
429 switch_safe_free(handle->sql);
430 handle->sql = strdup(sql);
431 if (!PQsendQuery(handle->con, sql)) {
432 err_str = pgsql_handle_get_error(handle);
433 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to send query (%s) to database: %s\n", sql, err_str);
434 switch_safe_free(err_str);
435 pgsql_finish_results(handle);
436 goto error;
437 }
438
439 return SWITCH_STATUS_SUCCESS;
440 error:
441 return SWITCH_STATUS_FALSE;
442 }
443
pgsql_handle_exec_base_detailed(const char * file,const char * func,int line,switch_pgsql_handle_t * handle,const char * sql,char ** err)444 switch_status_t pgsql_handle_exec_base_detailed(const char *file, const char *func, int line,
445 switch_pgsql_handle_t *handle, const char *sql, char **err)
446 {
447 char *err_str = NULL;
448 char *er = NULL;
449
450 pgsql_flush(handle);
451 handle->affected_rows = 0;
452
453 if (!db_is_up(handle)) {
454 er = strdup("Database is not up!");
455 goto error;
456 }
457
458 if (handle->auto_commit == SWITCH_FALSE && handle->in_txn == SWITCH_FALSE) {
459 if (pgsql_send_query(handle, "BEGIN") != SWITCH_STATUS_SUCCESS) {
460 er = strdup("Error sending BEGIN!");
461 if (pgsql_finish_results(handle) != SWITCH_STATUS_SUCCESS) {
462 db_is_up(handle); /* If finish_results failed, maybe the db went dead */
463 }
464 goto error;
465 }
466
467 if (pgsql_finish_results(handle) != SWITCH_STATUS_SUCCESS) {
468 db_is_up(handle);
469 er = strdup("Error sending BEGIN!");
470 goto error;
471 }
472 handle->in_txn = SWITCH_TRUE;
473 }
474
475 if (pgsql_send_query(handle, sql) != SWITCH_STATUS_SUCCESS) {
476 er = strdup("Error sending query!");
477 if (pgsql_finish_results(handle) != SWITCH_STATUS_SUCCESS) {
478 db_is_up(handle);
479 }
480 goto error;
481 }
482
483 return SWITCH_STATUS_SUCCESS;
484
485 error:
486 err_str = pgsql_handle_get_error(handle);
487
488 if (zstr(err_str)) {
489 if (zstr(er)) {
490 err_str = strdup((char *)"SQL ERROR!");
491 } else {
492 err_str = er;
493 }
494 } else {
495 if (!zstr(er)) {
496 free(er);
497 }
498 }
499
500 if (err_str) {
501 if (!switch_stristr("already exists", err_str) && !switch_stristr("duplicate key name", err_str)) {
502 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(err_str));
503 }
504
505 if (err) {
506 *err = err_str;
507 } else {
508 free(err_str);
509 }
510 }
511
512 return SWITCH_STATUS_FALSE;
513 }
514
515
pgsql_handle_exec_detailed(const char * file,const char * func,int line,switch_pgsql_handle_t * handle,const char * sql,char ** err)516 switch_status_t pgsql_handle_exec_detailed(const char *file, const char *func, int line,
517 switch_pgsql_handle_t *handle, const char *sql, char **err)
518 {
519 if (pgsql_handle_exec_base_detailed(file, func, line, handle, sql, err) == SWITCH_STATUS_FALSE) {
520 goto error;
521 }
522
523 return pgsql_finish_results(handle);
524 error:
525 return SWITCH_STATUS_FALSE;
526 }
527
database_handle_exec_detailed(const char * file,const char * func,int line,switch_database_interface_handle_t * dih,const char * sql,char ** err)528 switch_status_t database_handle_exec_detailed(const char *file, const char *func, int line,
529 switch_database_interface_handle_t *dih, const char *sql, char **err)
530 {
531 switch_pgsql_handle_t *handle;
532
533 if (!dih) {
534 return SWITCH_STATUS_FALSE;
535 }
536
537 handle = dih->handle;
538
539 return pgsql_handle_exec_detailed(file, func, line, handle, sql, err);
540 }
541
database_handle_exec_string(switch_database_interface_handle_t * dih,const char * sql,char * resbuf,size_t len,char ** err)542 switch_status_t database_handle_exec_string(switch_database_interface_handle_t *dih, const char *sql, char *resbuf, size_t len, char **err)
543 {
544 switch_pgsql_handle_t *handle;
545 switch_status_t sstatus = SWITCH_STATUS_SUCCESS;
546 char *val = NULL;
547 switch_pgsql_result_t *result = NULL;
548
549 if (!dih) {
550 return SWITCH_STATUS_FALSE;
551 }
552
553 handle = dih->handle;
554
555 if (!handle)
556 return SWITCH_STATUS_FALSE;
557
558 handle->affected_rows = 0;
559
560 if (pgsql_handle_exec_base(handle, sql, err) == SWITCH_STATUS_FALSE) {
561 goto error;
562 }
563
564 if (pgsql_next_result(handle, &result) == SWITCH_STATUS_FALSE) {
565 goto error;
566 }
567
568 if (!result) {
569 goto done;
570 } else {
571 switch (result->status) {
572 #if POSTGRESQL_MAJOR_VERSION >= 9 && POSTGRESQL_MINOR_VERSION >= 2
573 case PGRES_SINGLE_TUPLE:
574 /* Added in PostgreSQL 9.2 */
575 #endif
576 case PGRES_COMMAND_OK:
577 case PGRES_TUPLES_OK:
578 break;
579 default:
580 sstatus = SWITCH_STATUS_FALSE;
581 goto done;
582 }
583 }
584
585 if (handle->affected_rows <= 0) {
586 goto done;
587 }
588
589 val = PQgetvalue(result->result, 0, 0);
590 strncpy(resbuf, val, len);
591
592 done:
593
594 pgsql_free_result(&result);
595 if (pgsql_finish_results(handle) != SWITCH_STATUS_SUCCESS) {
596 sstatus = SWITCH_STATUS_FALSE;
597 }
598
599 return sstatus;
600
601 error:
602
603 return SWITCH_STATUS_FALSE;
604 }
605
pgsql_next_result_timed(switch_pgsql_handle_t * handle,switch_pgsql_result_t ** result_out,int msec)606 switch_status_t pgsql_next_result_timed(switch_pgsql_handle_t *handle, switch_pgsql_result_t **result_out, int msec)
607 {
608 switch_pgsql_result_t *res;
609 switch_time_t start;
610 switch_time_t ctime;
611 unsigned int usec = msec * 1000;
612 char *err_str;
613 #ifndef _WIN32
614 struct pollfd fds[2] = { { 0 } };
615 #else
616 fd_set rs, es;
617 #endif
618 int poll_res = 0;
619
620 if (!handle) {
621 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "**BUG** Null handle passed to pgsql_next_result.\n");
622 return SWITCH_STATUS_FALSE;
623 }
624
625 /* Try to consume input that might be waiting right away */
626 if (PQconsumeInput(handle->con)) {
627 /* And check to see if we have a full result ready for reading */
628 if (PQisBusy(handle->con)) {
629
630 /* Wait for a result to become available, up to msec milliseconds */
631 start = switch_micro_time_now();
632 while ((ctime = switch_micro_time_now()) - start <= usec) {
633 switch_time_t wait_time = (usec - (ctime - start)) / 1000;
634 /* Wait for the PostgreSQL socket to be ready for data reads. */
635 #ifndef _WIN32
636 fds[0].fd = handle->sock;
637 fds[0].events |= POLLIN;
638 fds[0].events |= POLLERR;
639 fds[0].events |= POLLNVAL;
640 fds[0].events |= POLLHUP;
641 fds[0].events |= POLLPRI;
642 fds[0].events |= POLLRDNORM;
643 fds[0].events |= POLLRDBAND;
644
645 poll_res = poll(&fds[0], 1, wait_time);
646 #else
647 struct timeval wait = { (long)wait_time * 1000, 0 };
648 FD_ZERO(&rs);
649 FD_SET(handle->sock, &rs);
650 FD_ZERO(&es);
651 FD_SET(handle->sock, &es);
652 poll_res = select(0, &rs, 0, &es, &wait);
653 #endif
654 if (poll_res > 0) {
655 #ifndef _WIN32
656 if (fds[0].revents & POLLHUP || fds[0].revents & POLLNVAL) {
657 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "PGSQL socket closed or invalid while waiting for result for query (%s)\n", handle->sql);
658 goto error;
659 } else if (fds[0].revents & POLLERR) {
660 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll error trying to read PGSQL socket for query (%s)\n", handle->sql);
661 goto error;
662 } else if (fds[0].revents & POLLIN || fds[0].revents & POLLPRI || fds[0].revents & POLLRDNORM || fds[0].revents & POLLRDBAND) {
663 #else
664 if (FD_ISSET(handle->sock, &rs)) {
665 #endif
666 /* Then try to consume any input waiting. */
667 if (PQconsumeInput(handle->con)) {
668 if (PQstatus(handle->con) == CONNECTION_BAD) {
669 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection terminated while waiting for result.\n");
670 handle->state = SWITCH_PGSQL_STATE_ERROR;
671 goto error;
672 }
673
674 /* And check to see if we have a full result ready for reading */
675 if (!PQisBusy(handle->con)) {
676 /* If we can pull a full result without blocking, then break this loop */
677 break;
678 }
679 } else {
680 /* If we had an error trying to consume input, report it and cancel the query. */
681 err_str = pgsql_handle_get_error(handle);
682 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str);
683 switch_safe_free(err_str);
684 pgsql_cancel(handle);
685 goto error;
686 }
687 }
688 } else if (poll_res == -1) {
689 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll failed trying to read PGSQL socket for query (%s)\n", handle->sql);
690 goto error;
691 }
692 }
693
694 /* If we broke the loop above because of a timeout, report that and cancel the query. */
695 if (ctime - start > usec) {
696 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Query (%s) took too long to complete or database not responding.\n", handle->sql);
697 pgsql_cancel(handle);
698 goto error;
699 }
700 }
701 } else {
702 /* If we had an error trying to consume input, report it and cancel the query. */
703 err_str = pgsql_handle_get_error(handle);
704 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str);
705 switch_safe_free(err_str);
706 /* pgsql_cancel(handle); */
707 goto error;
708 }
709
710 /* At this point, we know we can read a full result without blocking. */
711 if (!(res = malloc(sizeof(switch_pgsql_result_t)))) {
712 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Malloc failed!\n");
713 goto error;
714 }
715
716 memset(res, 0, sizeof(switch_pgsql_result_t));
717
718 res->result = PQgetResult(handle->con);
719 if (res->result) {
720 *result_out = res;
721 res->status = PQresultStatus(res->result);
722 switch (res->status) {
723 //#if (POSTGRESQL_MAJOR_VERSION == 9 && POSTGRESQL_MINOR_VERSION >= 2) || POSTGRESQL_MAJOR_VERSION > 9
724 case PGRES_SINGLE_TUPLE:
725 /* Added in PostgreSQL 9.2 */
726 //#endif
727 case PGRES_TUPLES_OK:
728 {
729 res->rows = PQntuples(res->result);
730 handle->affected_rows = res->rows;
731 res->cols = PQnfields(res->result);
732 }
733 break;
734 //#if (POSTGRESQL_MAJOR_VERSION == 9 && POSTGRESQL_MINOR_VERSION >= 1) || POSTGRESQL_MAJOR_VERSION > 9
735 case PGRES_COPY_BOTH:
736 /* Added in PostgreSQL 9.1 */
737 //#endif
738 case PGRES_COPY_OUT:
739 case PGRES_COPY_IN:
740 case PGRES_COMMAND_OK:
741 break;
742 case PGRES_EMPTY_QUERY:
743 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_EMPTY_QUERY\n", handle->sql);
744 case PGRES_BAD_RESPONSE:
745 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_BAD_RESPONSE\n", handle->sql);
746 case PGRES_NONFATAL_ERROR:
747 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_NONFATAL_ERROR\n", handle->sql);
748 case PGRES_FATAL_ERROR:
749 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_FATAL_ERROR\n", handle->sql);
750 res->err = PQresultErrorMessage(res->result);
751 goto error;
752 break;
753 }
754 } else {
755 free(res);
756 res = NULL;
757 *result_out = NULL;
758 }
759
760 return SWITCH_STATUS_SUCCESS;
761 error:
762
763 /* Make sure the failed connection does not have any transactions marked as in progress */
764 pgsql_flush(handle);
765
766 /* Try to reconnect to the DB if we were dropped */
767 db_is_up(handle);
768
769 return SWITCH_STATUS_FALSE;
770 }
771
772 switch_status_t pgsql_cancel_real(const char *file, const char *func, int line, switch_pgsql_handle_t *handle)
773 {
774 switch_status_t ret = SWITCH_STATUS_SUCCESS;
775 char err_buf[256];
776 PGcancel *cancel = NULL;
777
778 memset(err_buf, 0, 256);
779 cancel = PQgetCancel(handle->con);
780
781 if (!PQcancel(cancel, err_buf, 256)) {
782 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_CRIT, "Failed to cancel long-running query (%s): %s\n", handle->sql, err_buf);
783 ret = SWITCH_STATUS_FALSE;
784 }
785
786 PQfreeCancel(cancel);
787 pgsql_flush(handle);
788
789 return ret;
790 }
791
792 switch_status_t pgsql_SQLSetAutoCommitAttr(switch_database_interface_handle_t *dih, switch_bool_t on)
793 {
794 switch_pgsql_handle_t *handle;
795
796 if (!dih) {
797 return SWITCH_STATUS_FALSE;
798 }
799
800 handle = dih->handle;
801
802 if (!handle)
803 return SWITCH_STATUS_FALSE;
804
805 if (on) {
806 handle->auto_commit = SWITCH_TRUE;
807 } else {
808 handle->auto_commit = SWITCH_FALSE;
809 }
810
811 return SWITCH_STATUS_SUCCESS;
812 }
813
814 switch_status_t pgsql_SQLEndTran(switch_pgsql_handle_t *handle, switch_bool_t commit)
815 {
816 char * err_str = NULL;
817
818 if (!handle) {
819 return SWITCH_STATUS_FALSE;
820 }
821
822 if (commit) {
823 if (!PQsendQuery(handle->con, "COMMIT")) {
824 err_str = pgsql_handle_get_error(handle);
825 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not commit transaction: %s\n", err_str);
826 switch_safe_free(err_str);
827 return SWITCH_STATUS_FALSE;
828 }
829 } else {
830 if (!PQsendQuery(handle->con, "ROLLBACK")) {
831 err_str = pgsql_handle_get_error(handle);
832 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not rollback transaction: %s\n", err_str);
833 switch_safe_free(err_str);
834 return SWITCH_STATUS_FALSE;
835 }
836 }
837 handle->in_txn = SWITCH_FALSE;
838
839 return SWITCH_STATUS_SUCCESS;
840 }
841
842 switch_status_t database_commit(switch_database_interface_handle_t *dih)
843 {
844 switch_status_t result;
845
846 switch_pgsql_handle_t *handle;
847
848 if (!dih) {
849 return SWITCH_STATUS_FALSE;
850 }
851
852 handle = dih->handle;
853
854 if (!handle)
855 return SWITCH_STATUS_FALSE;
856
857 result = pgsql_SQLEndTran(handle, SWITCH_TRUE);
858 result = pgsql_SQLSetAutoCommitAttr(dih, SWITCH_TRUE) && result;
859 result = pgsql_finish_results(handle) && result;
860
861 return result;
862 }
863
864 switch_status_t database_rollback(switch_database_interface_handle_t *dih)
865 {
866 switch_pgsql_handle_t *handle;
867 switch_status_t result;
868
869 if (!dih) {
870 return SWITCH_STATUS_FALSE;
871 }
872
873 handle = dih->handle;
874
875 if (!handle)
876 return SWITCH_STATUS_FALSE;
877
878 result = pgsql_SQLEndTran(handle, SWITCH_FALSE);
879 result = pgsql_SQLSetAutoCommitAttr(dih, SWITCH_TRUE) && result;
880 result = pgsql_finish_results(handle) && result;
881
882 return result;
883 }
884
885 switch_status_t pgsql_handle_callback_exec_detailed(const char *file, const char *func, int line,
886 switch_database_interface_handle_t *dih, const char *sql, switch_core_db_callback_func_t callback, void *pdata, char **err)
887 {
888 char *err_str = NULL;
889 int row = 0, col = 0, err_cnt = 0;
890 switch_pgsql_result_t *result = NULL;
891
892 switch_pgsql_handle_t *handle;
893
894 if (!dih) {
895 return SWITCH_STATUS_FALSE;
896 }
897
898 handle = dih->handle;
899
900 if (!handle) {
901 return SWITCH_STATUS_FALSE;
902 }
903
904 handle->affected_rows = 0;
905
906 switch_assert(callback != NULL);
907
908 if (pgsql_handle_exec_base(handle, sql, err) == SWITCH_STATUS_FALSE) {
909 goto error;
910 }
911
912 if (pgsql_next_result(handle, &result) == SWITCH_STATUS_FALSE) {
913 err_cnt++;
914 err_str = pgsql_handle_get_error(handle);
915
916 if (result && !zstr(result->err)) {
917 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(result->err));
918 }
919
920 if (!zstr(err_str)) {
921 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(err_str));
922 }
923
924 switch_safe_free(err_str);
925 }
926
927 while (result != NULL) {
928 /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Processing result with %d rows and %d columns.\n", result->rows, result->cols);*/
929 for (row = 0; row < result->rows; ++row) {
930 char **names;
931 char **vals;
932
933 names = calloc(result->cols, sizeof(*names));
934 vals = calloc(result->cols, sizeof(*vals));
935
936 switch_assert(names && vals);
937
938 for (col = 0; col < result->cols; ++col) {
939 char * tmp;
940 size_t len;
941
942 tmp = PQfname(result->result, col);
943 if (tmp) {
944 len = strlen(tmp);
945 names[col] = malloc(len + 1);
946 snprintf(names[col], len + 1, "%s", tmp);
947
948 len = PQgetlength(result->result, row, col);
949 vals[col] = malloc(len + 1);
950 tmp = PQgetvalue(result->result, row, col);
951 snprintf(vals[col], len + 1, "%s", tmp);
952 /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Processing result row %d, col %d: %s => %s\n", row, col, names[col], vals[col]);*/
953 } else {
954 /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Processing result row %d, col %d.\n", row, col);*/
955 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: Column number %d out of range\n", col);
956 }
957 }
958
959 /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Executing callback for row %d...\n", row);*/
960 if (callback(pdata, result->cols, vals, names)) {
961 pgsql_finish_results(handle); /* Makes sure next call to switch_pgsql_next_result will return NULL */
962 row = result->rows; /* Makes us exit the for loop */
963 }
964
965 for (col = 0; col < result->cols; ++col) {
966 free(names[col]);
967 free(vals[col]);
968 }
969
970 free(names);
971 free(vals);
972 }
973
974 pgsql_free_result(&result);
975
976 if (pgsql_next_result(handle, &result) == SWITCH_STATUS_FALSE) {
977 err_cnt++;
978 err_str = pgsql_handle_get_error(handle);
979
980 if (result && !zstr(result->err)) {
981 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(result->err));
982 }
983
984 if (!zstr(err_str)) {
985 switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(err_str));
986 }
987 switch_safe_free(err_str);
988 }
989 }
990
991 if (err_cnt) {
992 goto error;
993 }
994
995 return SWITCH_STATUS_SUCCESS;
996 error:
997
998 return SWITCH_STATUS_FALSE;
999 }
1000
1001 SWITCH_MODULE_LOAD_FUNCTION(mod_pgsql_load)
1002 {
1003 switch_database_interface_t *database_interface;
1004
1005 supported_prefixes[0] = (char *)"pgsql";
1006 supported_prefixes[1] = (char *)"postgres";
1007 supported_prefixes[2] = (char *)"postgresql";
1008
1009 /* connect my internal structure to the blank pointer passed to me */
1010 *module_interface = switch_loadable_module_create_module_interface(pool, modname);
1011 MODULE_INTERFACE = *module_interface;
1012
1013 database_interface = (switch_database_interface_t *)switch_loadable_module_create_interface(*module_interface, SWITCH_DATABASE_INTERFACE);
1014 database_interface->flags = 0;
1015 database_interface->interface_name = modname;
1016 database_interface->prefixes = supported_prefixes;
1017 database_interface->handle_new = pgsql_handle_new;
1018 database_interface->handle_destroy = pgsql_handle_destroy;
1019 database_interface->flush = database_flush;
1020 database_interface->exec_detailed = database_handle_exec_detailed;
1021 database_interface->exec_string = database_handle_exec_string;
1022 database_interface->affected_rows = pgsql_handle_affected_rows;
1023 database_interface->sql_set_auto_commit_attr = pgsql_SQLSetAutoCommitAttr;
1024 database_interface->commit = database_commit;
1025 database_interface->rollback = database_rollback;
1026 database_interface->callback_exec_detailed = pgsql_handle_callback_exec_detailed;
1027
1028 /* indicate that the module should continue to be loaded */
1029 return SWITCH_STATUS_SUCCESS;
1030 }
1031
1032 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_pgsql_shutdown)
1033 {
1034 return SWITCH_STATUS_UNLOAD;
1035 }
1036
1037 /* For Emacs:
1038 * Local Variables:
1039 * mode:c
1040 * indent-tabs-mode:t
1041 * tab-width:4
1042 * c-basic-offset:4
1043 * End:
1044 * For VIM:
1045 * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
1046 */
1047