1 /*
2 * mod_pgsql for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2005-2019, Anthony Minessale II <anthm@freeswitch.org>
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is
20 * Anthony Minessale II <anthm@freeswitch.org>
21 * Portions created by the Initial Developer are Copyright (C)
22 * the Initial Developer. All Rights Reserved.
23 *
24 * Contributor(s):
25 * Anthony Minessale II <anthm@freeswitch.org>
26 * Eliot Gable <egable@gmail.com>
27 * Seven Du <dujinfang@gmail.com>
28 * Andrey Volk <andywolk@gmail.com>
29 *
30 * mod_pgsql.c -- PostgreSQL FreeSWITCH module
31 *
32 */
33 
34 #define SWITCH_PGSQL_H
35 
36 #include <switch.h>
37 
38 #include <libpq-fe.h>
39 
40 #ifndef _WIN32
41 #include <poll.h>
42 #else
43 #include <WinSock2.h>
44 #endif
45 
46 switch_loadable_module_interface_t *MODULE_INTERFACE;
47 static char *supported_prefixes[4] = { 0 };
48 
49 SWITCH_MODULE_LOAD_FUNCTION(mod_pgsql_load);
50 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_pgsql_shutdown);
51 SWITCH_MODULE_DEFINITION(mod_pgsql, mod_pgsql_load, mod_pgsql_shutdown, NULL);
52 
53 #define DEFAULT_PGSQL_RETRIES 120
54 
55 typedef enum {
56 	SWITCH_PGSQL_STATE_INIT,
57 	SWITCH_PGSQL_STATE_DOWN,
58 	SWITCH_PGSQL_STATE_CONNECTED,
59 	SWITCH_PGSQL_STATE_ERROR
60 } switch_pgsql_state_t;
61 
62 struct switch_pgsql_handle {
63 	char *dsn;
64 	char *sql;
65 	PGconn* con;
66 	int sock;
67 	switch_pgsql_state_t state;
68 	int affected_rows;
69 	int num_retries;
70 	switch_bool_t auto_commit;
71 	switch_bool_t in_txn;
72 };
73 
74 struct switch_pgsql_result {
75 	PGresult *result;
76 	ExecStatusType status;
77 	char *err;
78 	int rows;
79 	int cols;
80 };
81 
82 typedef struct switch_pgsql_handle switch_pgsql_handle_t;
83 typedef struct switch_pgsql_result switch_pgsql_result_t;
84 
85 switch_status_t pgsql_handle_connect(switch_pgsql_handle_t *handle);
86 switch_status_t pgsql_handle_destroy(switch_database_interface_handle_t **dih);
87 switch_status_t pgsql_cancel_real(const char *file, const char *func, int line, switch_pgsql_handle_t *handle);
88 switch_status_t pgsql_next_result_timed(switch_pgsql_handle_t *handle, switch_pgsql_result_t **result_out, int msec);
89 
90 #define pgsql_handle_exec_base(handle, sql, err) pgsql_handle_exec_base_detailed(__FILE__, (char * )__SWITCH_FUNC__, __LINE__, handle, sql, err)
91 #define pgsql_next_result(h, r) pgsql_next_result_timed(h, r, 10000)
92 #define pgsql_finish_results(handle) pgsql_finish_results_real(__FILE__, (char * )__SWITCH_FUNC__, __LINE__, handle)
93 #define pgsql_cancel(handle) pgsql_cancel_real(__FILE__, (char * )__SWITCH_FUNC__, __LINE__, handle)
94 
pgsql_handle_get_error(switch_pgsql_handle_t * handle)95 char * pgsql_handle_get_error(switch_pgsql_handle_t *handle)
96 {
97 	char * err_str;
98 
99 	if (!handle) {
100 		return NULL;
101 	}
102 
103 	switch_strdup(err_str, PQerrorMessage(handle->con));
104 
105 	return err_str;
106 }
107 
db_is_up(switch_pgsql_handle_t * handle)108 static int db_is_up(switch_pgsql_handle_t *handle)
109 {
110 	int ret = 0;
111 	switch_event_t *event;
112 	char *err_str = NULL;
113 	int max_tries = DEFAULT_PGSQL_RETRIES;
114 	int code = 0;
115 	int recon = 0;
116 
117 	if (handle) {
118 		max_tries = handle->num_retries;
119 		if (max_tries < 1)
120 			max_tries = DEFAULT_PGSQL_RETRIES;
121 	}
122 
123 top:
124 
125 	if (!handle) {
126 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "No DB Handle\n");
127 		goto done;
128 	}
129 	if (!handle->con) {
130 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "No DB Connection\n");
131 		goto done;
132 	}
133 
134 	/* Try a non-blocking read on the connection to gobble up any EOF from a closed connection and mark the connection BAD if it is closed. */
135 	PQconsumeInput(handle->con);
136 
137 	if (PQstatus(handle->con) == CONNECTION_BAD) {
138 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "PQstatus returned bad connection; reconnecting...\n");
139 		handle->state = SWITCH_PGSQL_STATE_ERROR;
140 		PQreset(handle->con);
141 		if (PQstatus(handle->con) == CONNECTION_BAD) {
142 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "PQstatus returned bad connection -- reconnection failed!\n");
143 			goto error;
144 		}
145 		handle->state = SWITCH_PGSQL_STATE_CONNECTED;
146 		handle->sock = PQsocket(handle->con);
147 	}
148 
149 	ret = 1;
150 	goto done;
151 
152 error:
153 	err_str = pgsql_handle_get_error(handle);
154 
155 	if (PQstatus(handle->con) == CONNECTION_BAD) {
156 		handle->state = SWITCH_PGSQL_STATE_ERROR;
157 		PQreset(handle->con);
158 		if (PQstatus(handle->con) == CONNECTION_OK) {
159 			handle->state = SWITCH_PGSQL_STATE_CONNECTED;
160 			recon = SWITCH_STATUS_SUCCESS;
161 			handle->sock = PQsocket(handle->con);
162 		}
163 	}
164 
165 	max_tries--;
166 
167 	if (switch_event_create(&event, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) {
168 		switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Failure-Message", "The sql server is not responding for DSN %s [%s][%d]",
169 			switch_str_nil(handle->dsn), switch_str_nil(err_str), code);
170 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The sql server is not responding for DSN %s [%s][%d]\n",
171 			switch_str_nil(handle->dsn), switch_str_nil(err_str), code);
172 
173 		if (recon == SWITCH_STATUS_SUCCESS) {
174 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "The connection has been re-established");
175 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "The connection has been re-established\n");
176 		} else {
177 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "The connection could not be re-established");
178 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The connection could not be re-established\n");
179 		}
180 		if (!max_tries) {
181 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Additional-Info", "Giving up!");
182 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Giving up!\n");
183 		}
184 
185 		switch_event_fire(&event);
186 	}
187 
188 	if (!max_tries) {
189 		goto done;
190 	}
191 
192 	switch_safe_free(err_str);
193 	switch_yield(1000000);
194 	goto top;
195 
196 done:
197 
198 	switch_safe_free(err_str);
199 
200 	return ret;
201 }
202 
pgsql_free_result(switch_pgsql_result_t ** result)203 void pgsql_free_result(switch_pgsql_result_t **result)
204 {
205 	if (!*result) {
206 		return;
207 	}
208 
209 	if ((*result)->result) {
210 		PQclear((*result)->result);
211 	}
212 	free(*result);
213 	*result = NULL;
214 }
215 
pgsql_finish_results_real(const char * file,const char * func,int line,switch_pgsql_handle_t * handle)216 switch_status_t pgsql_finish_results_real(const char* file, const char* func, int line, switch_pgsql_handle_t *handle)
217 {
218 	switch_pgsql_result_t *res = NULL;
219 	switch_status_t final_status = SWITCH_STATUS_SUCCESS;
220 	int done = 0;
221 
222 	do {
223 		pgsql_next_result(handle, &res);
224 		if (res && res->err && !switch_stristr("already exists", res->err) && !switch_stristr("duplicate key name", res->err)) {
225 			switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "Error executing query:\n%s\n", res->err);
226 			final_status = SWITCH_STATUS_FALSE;
227 		}
228 
229 		if (!res) {
230 			done = 1;
231 		} else if (res->result) {
232 			char *affected_rows = PQcmdTuples(res->result);
233 
234 			if (!zstr(affected_rows)) {
235 				handle->affected_rows = atoi(affected_rows);
236 			}
237 		}
238 
239 		pgsql_free_result(&res);
240 	} while (!done);
241 
242 	return final_status;
243 }
244 
pgsql_handle_affected_rows(switch_database_interface_handle_t * dih,int * affected_rows)245 switch_status_t pgsql_handle_affected_rows(switch_database_interface_handle_t *dih, int *affected_rows)
246 {
247 	switch_pgsql_handle_t *handle = NULL;
248 
249 	if (!dih) {
250 		return SWITCH_STATUS_FALSE;
251 	}
252 
253 	handle = dih->handle;
254 
255 	if (!handle)
256 		return SWITCH_STATUS_FALSE;
257 
258 	*affected_rows = handle->affected_rows;
259 
260 	return SWITCH_STATUS_SUCCESS;
261 }
262 
pgsql_handle_new(switch_cache_db_database_interface_options_t database_interface_options,switch_database_interface_handle_t ** dih)263 switch_status_t pgsql_handle_new(switch_cache_db_database_interface_options_t database_interface_options, switch_database_interface_handle_t **dih)
264 {
265 	switch_pgsql_handle_t *new_handle = NULL;
266 
267 	if (!(*dih = malloc(sizeof(**dih)))) {
268 		goto err;
269 	}
270 
271 	if (!(new_handle = malloc(sizeof(*new_handle)))) {
272 		goto err;
273 	}
274 
275 	memset(new_handle, 0, sizeof(*new_handle));
276 
277 	if (!strcasecmp(database_interface_options.prefix, "postgresql") || !strcasecmp(database_interface_options.prefix, "postgres")) {
278 		new_handle->dsn = strdup(database_interface_options.original_dsn);
279 	} else if (!strcasecmp(database_interface_options.prefix, "pgsql")) {
280 		new_handle->dsn = strdup(database_interface_options.connection_string);
281 	}
282 
283 	if (!new_handle->dsn) {
284 		goto err;
285 	}
286 
287 	new_handle->sock = 0;
288 	new_handle->state = SWITCH_PGSQL_STATE_INIT;
289 	new_handle->con = NULL;
290 	new_handle->affected_rows = 0;
291 	new_handle->num_retries = DEFAULT_PGSQL_RETRIES;
292 	new_handle->auto_commit = SWITCH_TRUE;
293 	new_handle->in_txn = SWITCH_FALSE;
294 
295 	(*dih)->handle = new_handle;
296 
297 	if (pgsql_handle_connect(new_handle) != SWITCH_STATUS_SUCCESS) {
298 		if (pgsql_handle_destroy(dih) != SWITCH_STATUS_SUCCESS) {
299 			goto err;
300 		}
301 	}
302 
303 	return SWITCH_STATUS_SUCCESS;
304 
305 err:
306 	switch_safe_free(*dih);
307 
308 	if (new_handle) {
309 		switch_safe_free(new_handle->dsn);
310 		switch_safe_free(new_handle);
311 	}
312 
313 	return SWITCH_STATUS_FALSE;
314 }
315 
pgsql_handle_disconnect(switch_pgsql_handle_t * handle)316 switch_status_t pgsql_handle_disconnect(switch_pgsql_handle_t *handle)
317 {
318 	if (!handle) {
319 		return SWITCH_STATUS_FALSE;
320 	}
321 
322 	if (handle->state == SWITCH_PGSQL_STATE_CONNECTED) {
323 		PQfinish(handle->con);
324 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Disconnected from [%s]\n", handle->dsn);
325 	}
326 	switch_safe_free(handle->sql);
327 	handle->state = SWITCH_PGSQL_STATE_DOWN;
328 
329 	return SWITCH_STATUS_SUCCESS;
330 }
331 
pgsql_handle_connect(switch_pgsql_handle_t * handle)332 switch_status_t pgsql_handle_connect(switch_pgsql_handle_t *handle)
333 {
334 	if (!handle) {
335 		return SWITCH_STATUS_FALSE;
336 	}
337 
338 	if (handle->state == SWITCH_PGSQL_STATE_CONNECTED) {
339 		pgsql_handle_disconnect(handle);
340 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "Re-connecting %s\n", handle->dsn);
341 	}
342 
343 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "Connecting %s\n", handle->dsn);
344 	PQinitSSL(0);
345 
346 	handle->con = PQconnectdb(handle->dsn);
347 	if (PQstatus(handle->con) != CONNECTION_OK) {
348 		char *err_str;
349 
350 		if ((err_str = pgsql_handle_get_error(handle))) {
351 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%s\n", err_str);
352 			switch_safe_free(err_str);
353 		} else {
354 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to connect to the database [%s]\n", handle->dsn);
355 			pgsql_handle_disconnect(handle);
356 		}
357 
358 		return SWITCH_STATUS_FALSE;
359 	}
360 
361 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "Connected to [%s]\n", handle->dsn);
362 	handle->state = SWITCH_PGSQL_STATE_CONNECTED;
363 	handle->sock = PQsocket(handle->con);
364 
365 	return SWITCH_STATUS_SUCCESS;
366 }
367 
pgsql_handle_destroy(switch_database_interface_handle_t ** dih)368 switch_status_t pgsql_handle_destroy(switch_database_interface_handle_t **dih)
369 {
370 	switch_pgsql_handle_t *handle = NULL;
371 
372 	if (!dih) {
373 		return SWITCH_STATUS_FALSE;
374 	}
375 
376 	handle = (*dih)->handle;
377 
378 	if (handle) {
379 		pgsql_handle_disconnect(handle);
380 
381 		switch_safe_free(handle->dsn);
382 		free(handle);
383 	}
384 
385 	switch_safe_free(*dih);
386 
387 	return SWITCH_STATUS_SUCCESS;
388 }
389 
pgsql_flush(switch_pgsql_handle_t * handle)390 switch_status_t pgsql_flush(switch_pgsql_handle_t *handle)
391 {
392 	PGresult *tmp = NULL;
393 	int x = 0;
394 
395 	if (!handle) {
396 		return SWITCH_STATUS_FALSE;
397 	}
398 
399 	/* Make sure the query is fully cleared */
400 	while ((tmp = PQgetResult(handle->con)) != NULL) {
401 		PQclear(tmp);
402 		x++;
403 	}
404 
405 	if (x) {
406 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Flushing %d results\n", x);
407 	}
408 
409 	return SWITCH_STATUS_SUCCESS;
410 }
411 
database_flush(switch_database_interface_handle_t * dih)412 switch_status_t database_flush(switch_database_interface_handle_t *dih)
413 {
414 	switch_pgsql_handle_t *handle;
415 
416 	if (!dih) {
417 		return SWITCH_STATUS_FALSE;
418 	}
419 
420 	handle = dih->handle;
421 
422 	return pgsql_flush(handle);
423 }
424 
pgsql_send_query(switch_pgsql_handle_t * handle,const char * sql)425 switch_status_t pgsql_send_query(switch_pgsql_handle_t *handle, const char* sql)
426 {
427 	char *err_str;
428 
429 	switch_safe_free(handle->sql);
430 	handle->sql = strdup(sql);
431 	if (!PQsendQuery(handle->con, sql)) {
432 		err_str = pgsql_handle_get_error(handle);
433 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed to send query (%s) to database: %s\n", sql, err_str);
434 		switch_safe_free(err_str);
435 		pgsql_finish_results(handle);
436 		goto error;
437 	}
438 
439 	return SWITCH_STATUS_SUCCESS;
440 error:
441 	return SWITCH_STATUS_FALSE;
442 }
443 
pgsql_handle_exec_base_detailed(const char * file,const char * func,int line,switch_pgsql_handle_t * handle,const char * sql,char ** err)444 switch_status_t pgsql_handle_exec_base_detailed(const char *file, const char *func, int line,
445 	switch_pgsql_handle_t *handle, const char *sql, char **err)
446 {
447 	char *err_str = NULL;
448 	char *er = NULL;
449 
450 	pgsql_flush(handle);
451 	handle->affected_rows = 0;
452 
453 	if (!db_is_up(handle)) {
454 		er = strdup("Database is not up!");
455 		goto error;
456 	}
457 
458 	if (handle->auto_commit == SWITCH_FALSE && handle->in_txn == SWITCH_FALSE) {
459 		if (pgsql_send_query(handle, "BEGIN") != SWITCH_STATUS_SUCCESS) {
460 			er = strdup("Error sending BEGIN!");
461 			if (pgsql_finish_results(handle) != SWITCH_STATUS_SUCCESS) {
462 				db_is_up(handle); /* If finish_results failed, maybe the db went dead */
463 			}
464 			goto error;
465 		}
466 
467 		if (pgsql_finish_results(handle) != SWITCH_STATUS_SUCCESS) {
468 			db_is_up(handle);
469 			er = strdup("Error sending BEGIN!");
470 			goto error;
471 		}
472 		handle->in_txn = SWITCH_TRUE;
473 	}
474 
475 	if (pgsql_send_query(handle, sql) != SWITCH_STATUS_SUCCESS) {
476 		er = strdup("Error sending query!");
477 		if (pgsql_finish_results(handle) != SWITCH_STATUS_SUCCESS) {
478 			db_is_up(handle);
479 		}
480 		goto error;
481 	}
482 
483 	return SWITCH_STATUS_SUCCESS;
484 
485 error:
486 	err_str = pgsql_handle_get_error(handle);
487 
488 	if (zstr(err_str)) {
489 		if (zstr(er)) {
490 			err_str = strdup((char *)"SQL ERROR!");
491 		} else {
492 			err_str = er;
493 		}
494 	} else {
495 		if (!zstr(er)) {
496 			free(er);
497 		}
498 	}
499 
500 	if (err_str) {
501 		if (!switch_stristr("already exists", err_str) && !switch_stristr("duplicate key name", err_str)) {
502 			switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(err_str));
503 		}
504 
505 		if (err) {
506 			*err = err_str;
507 		} else {
508 			free(err_str);
509 		}
510 	}
511 
512 	return SWITCH_STATUS_FALSE;
513 }
514 
515 
pgsql_handle_exec_detailed(const char * file,const char * func,int line,switch_pgsql_handle_t * handle,const char * sql,char ** err)516 switch_status_t pgsql_handle_exec_detailed(const char *file, const char *func, int line,
517 	switch_pgsql_handle_t *handle, const char *sql, char **err)
518 {
519 	if (pgsql_handle_exec_base_detailed(file, func, line, handle, sql, err) == SWITCH_STATUS_FALSE) {
520 		goto error;
521 	}
522 
523 	return pgsql_finish_results(handle);
524 error:
525 	return SWITCH_STATUS_FALSE;
526 }
527 
database_handle_exec_detailed(const char * file,const char * func,int line,switch_database_interface_handle_t * dih,const char * sql,char ** err)528 switch_status_t database_handle_exec_detailed(const char *file, const char *func, int line,
529 	switch_database_interface_handle_t *dih, const char *sql, char **err)
530 {
531 	switch_pgsql_handle_t *handle;
532 
533 	if (!dih) {
534 		return SWITCH_STATUS_FALSE;
535 	}
536 
537 	handle = dih->handle;
538 
539 	return pgsql_handle_exec_detailed(file, func, line, handle, sql, err);
540 }
541 
database_handle_exec_string(switch_database_interface_handle_t * dih,const char * sql,char * resbuf,size_t len,char ** err)542 switch_status_t database_handle_exec_string(switch_database_interface_handle_t *dih, const char *sql, char *resbuf, size_t len, char **err)
543 {
544 	switch_pgsql_handle_t *handle;
545 	switch_status_t sstatus = SWITCH_STATUS_SUCCESS;
546 	char *val = NULL;
547 	switch_pgsql_result_t *result = NULL;
548 
549 	if (!dih) {
550 		return SWITCH_STATUS_FALSE;
551 	}
552 
553 	handle = dih->handle;
554 
555 	if (!handle)
556 		return SWITCH_STATUS_FALSE;
557 
558 	handle->affected_rows = 0;
559 
560 	if (pgsql_handle_exec_base(handle, sql, err) == SWITCH_STATUS_FALSE) {
561 		goto error;
562 	}
563 
564 	if (pgsql_next_result(handle, &result) == SWITCH_STATUS_FALSE) {
565 		goto error;
566 	}
567 
568 	if (!result) {
569 		goto done;
570 	} else {
571 		switch (result->status) {
572 #if POSTGRESQL_MAJOR_VERSION >= 9 && POSTGRESQL_MINOR_VERSION >= 2
573 		case PGRES_SINGLE_TUPLE:
574 			/* Added in PostgreSQL 9.2 */
575 #endif
576 		case PGRES_COMMAND_OK:
577 		case PGRES_TUPLES_OK:
578 			break;
579 		default:
580 			sstatus = SWITCH_STATUS_FALSE;
581 			goto done;
582 		}
583 	}
584 
585 	if (handle->affected_rows <= 0) {
586 		goto done;
587 	}
588 
589 	val = PQgetvalue(result->result, 0, 0);
590 	strncpy(resbuf, val, len);
591 
592 done:
593 
594 	pgsql_free_result(&result);
595 	if (pgsql_finish_results(handle) != SWITCH_STATUS_SUCCESS) {
596 		sstatus = SWITCH_STATUS_FALSE;
597 	}
598 
599 	return sstatus;
600 
601 error:
602 
603 	return SWITCH_STATUS_FALSE;
604 }
605 
pgsql_next_result_timed(switch_pgsql_handle_t * handle,switch_pgsql_result_t ** result_out,int msec)606 switch_status_t pgsql_next_result_timed(switch_pgsql_handle_t *handle, switch_pgsql_result_t **result_out, int msec)
607 {
608 	switch_pgsql_result_t *res;
609 	switch_time_t start;
610 	switch_time_t ctime;
611 	unsigned int usec = msec * 1000;
612 	char *err_str;
613 #ifndef _WIN32
614 	struct pollfd fds[2] = { { 0 } };
615 #else
616 	fd_set rs, es;
617 #endif
618 	int poll_res = 0;
619 
620 	if (!handle) {
621 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "**BUG** Null handle passed to pgsql_next_result.\n");
622 		return SWITCH_STATUS_FALSE;
623 	}
624 
625 	/* Try to consume input that might be waiting right away */
626 	if (PQconsumeInput(handle->con)) {
627 		/* And check to see if we have a full result ready for reading */
628 		if (PQisBusy(handle->con)) {
629 
630 			/* Wait for a result to become available, up to msec milliseconds */
631 			start = switch_micro_time_now();
632 			while ((ctime = switch_micro_time_now()) - start <= usec) {
633 				switch_time_t wait_time = (usec - (ctime - start)) / 1000;
634 				/* Wait for the PostgreSQL socket to be ready for data reads. */
635 #ifndef _WIN32
636 				fds[0].fd = handle->sock;
637 				fds[0].events |= POLLIN;
638 				fds[0].events |= POLLERR;
639 				fds[0].events |= POLLNVAL;
640 				fds[0].events |= POLLHUP;
641 				fds[0].events |= POLLPRI;
642 				fds[0].events |= POLLRDNORM;
643 				fds[0].events |= POLLRDBAND;
644 
645 				poll_res = poll(&fds[0], 1, wait_time);
646 #else
647 				struct timeval wait = { (long)wait_time * 1000, 0 };
648 				FD_ZERO(&rs);
649 				FD_SET(handle->sock, &rs);
650 				FD_ZERO(&es);
651 				FD_SET(handle->sock, &es);
652 				poll_res = select(0, &rs, 0, &es, &wait);
653 #endif
654 				if (poll_res > 0) {
655 #ifndef _WIN32
656 					if (fds[0].revents & POLLHUP || fds[0].revents & POLLNVAL) {
657 						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "PGSQL socket closed or invalid while waiting for result for query (%s)\n", handle->sql);
658 						goto error;
659 					} else if (fds[0].revents & POLLERR) {
660 						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll error trying to read PGSQL socket for query (%s)\n", handle->sql);
661 						goto error;
662 					} else if (fds[0].revents & POLLIN || fds[0].revents & POLLPRI || fds[0].revents & POLLRDNORM || fds[0].revents & POLLRDBAND) {
663 #else
664 					if (FD_ISSET(handle->sock, &rs)) {
665 #endif
666 						/* Then try to consume any input waiting. */
667 						if (PQconsumeInput(handle->con)) {
668 							if (PQstatus(handle->con) == CONNECTION_BAD) {
669 								switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection terminated while waiting for result.\n");
670 								handle->state = SWITCH_PGSQL_STATE_ERROR;
671 								goto error;
672 							}
673 
674 							/* And check to see if we have a full result ready for reading */
675 							if (!PQisBusy(handle->con)) {
676 								/* If we can pull a full result without blocking, then break this loop */
677 								break;
678 							}
679 						} else {
680 							/* If we had an error trying to consume input, report it and cancel the query. */
681 							err_str = pgsql_handle_get_error(handle);
682 							switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str);
683 							switch_safe_free(err_str);
684 							pgsql_cancel(handle);
685 							goto error;
686 						}
687 					}
688 				} else if (poll_res == -1) {
689 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll failed trying to read PGSQL socket for query (%s)\n", handle->sql);
690 					goto error;
691 				}
692 			}
693 
694 			/* If we broke the loop above because of a timeout, report that and cancel the query. */
695 			if (ctime - start > usec) {
696 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Query (%s) took too long to complete or database not responding.\n", handle->sql);
697 				pgsql_cancel(handle);
698 				goto error;
699 			}
700 		}
701 	} else {
702 		/* If we had an error trying to consume input, report it and cancel the query. */
703 		err_str = pgsql_handle_get_error(handle);
704 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str);
705 		switch_safe_free(err_str);
706 		/* pgsql_cancel(handle); */
707 		goto error;
708 	}
709 
710 	/* At this point, we know we can read a full result without blocking. */
711 	if (!(res = malloc(sizeof(switch_pgsql_result_t)))) {
712 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Malloc failed!\n");
713 		goto error;
714 	}
715 
716 	memset(res, 0, sizeof(switch_pgsql_result_t));
717 
718 	res->result = PQgetResult(handle->con);
719 	if (res->result) {
720 		*result_out = res;
721 		res->status = PQresultStatus(res->result);
722 		switch (res->status) {
723 //#if (POSTGRESQL_MAJOR_VERSION == 9 && POSTGRESQL_MINOR_VERSION >= 2) || POSTGRESQL_MAJOR_VERSION > 9
724 		case PGRES_SINGLE_TUPLE:
725 			/* Added in PostgreSQL 9.2 */
726 //#endif
727 		case PGRES_TUPLES_OK:
728 		{
729 			res->rows = PQntuples(res->result);
730 			handle->affected_rows = res->rows;
731 			res->cols = PQnfields(res->result);
732 		}
733 		break;
734 //#if (POSTGRESQL_MAJOR_VERSION == 9 && POSTGRESQL_MINOR_VERSION >= 1) || POSTGRESQL_MAJOR_VERSION > 9
735 		case PGRES_COPY_BOTH:
736 			/* Added in PostgreSQL 9.1 */
737 //#endif
738 		case PGRES_COPY_OUT:
739 		case PGRES_COPY_IN:
740 		case PGRES_COMMAND_OK:
741 			break;
742 		case PGRES_EMPTY_QUERY:
743 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_EMPTY_QUERY\n", handle->sql);
744 		case PGRES_BAD_RESPONSE:
745 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_BAD_RESPONSE\n", handle->sql);
746 		case PGRES_NONFATAL_ERROR:
747 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_NONFATAL_ERROR\n", handle->sql);
748 		case PGRES_FATAL_ERROR:
749 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Query (%s) returned PGRES_FATAL_ERROR\n", handle->sql);
750 			res->err = PQresultErrorMessage(res->result);
751 			goto error;
752 			break;
753 		}
754 	} else {
755 		free(res);
756 		res = NULL;
757 		*result_out = NULL;
758 	}
759 
760 	return SWITCH_STATUS_SUCCESS;
761 error:
762 
763 	/* Make sure the failed connection does not have any transactions marked as in progress */
764 	pgsql_flush(handle);
765 
766 	/* Try to reconnect to the DB if we were dropped */
767 	db_is_up(handle);
768 
769 	return SWITCH_STATUS_FALSE;
770 }
771 
772 switch_status_t pgsql_cancel_real(const char *file, const char *func, int line, switch_pgsql_handle_t *handle)
773 {
774 	switch_status_t ret = SWITCH_STATUS_SUCCESS;
775 	char err_buf[256];
776 	PGcancel *cancel = NULL;
777 
778 	memset(err_buf, 0, 256);
779 	cancel = PQgetCancel(handle->con);
780 
781 	if (!PQcancel(cancel, err_buf, 256)) {
782 		switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_CRIT, "Failed to cancel long-running query (%s): %s\n", handle->sql, err_buf);
783 		ret = SWITCH_STATUS_FALSE;
784 	}
785 
786 	PQfreeCancel(cancel);
787 	pgsql_flush(handle);
788 
789 	return ret;
790 }
791 
792 switch_status_t pgsql_SQLSetAutoCommitAttr(switch_database_interface_handle_t *dih, switch_bool_t on)
793 {
794 	switch_pgsql_handle_t *handle;
795 
796 	if (!dih) {
797 		return SWITCH_STATUS_FALSE;
798 	}
799 
800 	handle = dih->handle;
801 
802 	if (!handle)
803 		return SWITCH_STATUS_FALSE;
804 
805 	if (on) {
806 		handle->auto_commit = SWITCH_TRUE;
807 	} else {
808 		handle->auto_commit = SWITCH_FALSE;
809 	}
810 
811 	return SWITCH_STATUS_SUCCESS;
812 }
813 
814 switch_status_t pgsql_SQLEndTran(switch_pgsql_handle_t *handle, switch_bool_t commit)
815 {
816 	char * err_str = NULL;
817 
818 	if (!handle) {
819 		return SWITCH_STATUS_FALSE;
820 	}
821 
822 	if (commit) {
823 		if (!PQsendQuery(handle->con, "COMMIT")) {
824 			err_str = pgsql_handle_get_error(handle);
825 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not commit transaction: %s\n", err_str);
826 			switch_safe_free(err_str);
827 			return SWITCH_STATUS_FALSE;
828 		}
829 	} else {
830 		if (!PQsendQuery(handle->con, "ROLLBACK")) {
831 			err_str = pgsql_handle_get_error(handle);
832 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not rollback transaction: %s\n", err_str);
833 			switch_safe_free(err_str);
834 			return SWITCH_STATUS_FALSE;
835 		}
836 	}
837 	handle->in_txn = SWITCH_FALSE;
838 
839 	return SWITCH_STATUS_SUCCESS;
840 }
841 
842 switch_status_t database_commit(switch_database_interface_handle_t *dih)
843 {
844 	switch_status_t result;
845 
846 	switch_pgsql_handle_t *handle;
847 
848 	if (!dih) {
849 		return SWITCH_STATUS_FALSE;
850 	}
851 
852 	handle = dih->handle;
853 
854 	if (!handle)
855 		return SWITCH_STATUS_FALSE;
856 
857 	result = pgsql_SQLEndTran(handle, SWITCH_TRUE);
858 	result = pgsql_SQLSetAutoCommitAttr(dih, SWITCH_TRUE) && result;
859 	result = pgsql_finish_results(handle) && result;
860 
861 	return result;
862 }
863 
864 switch_status_t database_rollback(switch_database_interface_handle_t *dih)
865 {
866 	switch_pgsql_handle_t *handle;
867 	switch_status_t result;
868 
869 	if (!dih) {
870 		return SWITCH_STATUS_FALSE;
871 	}
872 
873 	handle = dih->handle;
874 
875 	if (!handle)
876 		return SWITCH_STATUS_FALSE;
877 
878 	result = pgsql_SQLEndTran(handle, SWITCH_FALSE);
879 	result = pgsql_SQLSetAutoCommitAttr(dih, SWITCH_TRUE) && result;
880 	result = pgsql_finish_results(handle) && result;
881 
882 	return result;
883 }
884 
885 switch_status_t pgsql_handle_callback_exec_detailed(const char *file, const char *func, int line,
886 	switch_database_interface_handle_t *dih, const char *sql, switch_core_db_callback_func_t callback, void *pdata, char **err)
887 {
888 	char *err_str = NULL;
889 	int row = 0, col = 0, err_cnt = 0;
890 	switch_pgsql_result_t *result = NULL;
891 
892 	switch_pgsql_handle_t *handle;
893 
894 	if (!dih) {
895 		return SWITCH_STATUS_FALSE;
896 	}
897 
898 	handle = dih->handle;
899 
900 	if (!handle) {
901 		return SWITCH_STATUS_FALSE;
902 	}
903 
904 	handle->affected_rows = 0;
905 
906 	switch_assert(callback != NULL);
907 
908 	if (pgsql_handle_exec_base(handle, sql, err) == SWITCH_STATUS_FALSE) {
909 		goto error;
910 	}
911 
912 	if (pgsql_next_result(handle, &result) == SWITCH_STATUS_FALSE) {
913 		err_cnt++;
914 		err_str = pgsql_handle_get_error(handle);
915 
916 		if (result && !zstr(result->err)) {
917 			switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(result->err));
918 		}
919 
920 		if (!zstr(err_str)) {
921 			switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(err_str));
922 		}
923 
924 		switch_safe_free(err_str);
925 	}
926 
927 	while (result != NULL) {
928 		/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Processing result with %d rows and %d columns.\n", result->rows, result->cols);*/
929 		for (row = 0; row < result->rows; ++row) {
930 			char **names;
931 			char **vals;
932 
933 			names = calloc(result->cols, sizeof(*names));
934 			vals = calloc(result->cols, sizeof(*vals));
935 
936 			switch_assert(names && vals);
937 
938 			for (col = 0; col < result->cols; ++col) {
939 				char * tmp;
940 				size_t len;
941 
942 				tmp = PQfname(result->result, col);
943 				if (tmp) {
944 					len = strlen(tmp);
945 					names[col] = malloc(len + 1);
946 					snprintf(names[col], len + 1, "%s", tmp);
947 
948 					len = PQgetlength(result->result, row, col);
949 					vals[col] = malloc(len + 1);
950 					tmp = PQgetvalue(result->result, row, col);
951 					snprintf(vals[col], len + 1, "%s", tmp);
952 					/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Processing result row %d, col %d: %s => %s\n", row, col, names[col], vals[col]);*/
953 				} else {
954 					/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Processing result row %d, col %d.\n", row, col);*/
955 					switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: Column number %d out of range\n", col);
956 				}
957 			}
958 
959 			/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Executing callback for row %d...\n", row);*/
960 			if (callback(pdata, result->cols, vals, names)) {
961 				pgsql_finish_results(handle); /* Makes sure next call to switch_pgsql_next_result will return NULL */
962 				row = result->rows;                  /* Makes us exit the for loop */
963 			}
964 
965 			for (col = 0; col < result->cols; ++col) {
966 				free(names[col]);
967 				free(vals[col]);
968 			}
969 
970 			free(names);
971 			free(vals);
972 		}
973 
974 		pgsql_free_result(&result);
975 
976 		if (pgsql_next_result(handle, &result) == SWITCH_STATUS_FALSE) {
977 			err_cnt++;
978 			err_str = pgsql_handle_get_error(handle);
979 
980 			if (result && !zstr(result->err)) {
981 				switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(result->err));
982 			}
983 
984 			if (!zstr(err_str)) {
985 				switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(err_str));
986 			}
987 			switch_safe_free(err_str);
988 		}
989 	}
990 
991 	if (err_cnt) {
992 		goto error;
993 	}
994 
995 	return SWITCH_STATUS_SUCCESS;
996 error:
997 
998 	return SWITCH_STATUS_FALSE;
999 }
1000 
1001 SWITCH_MODULE_LOAD_FUNCTION(mod_pgsql_load)
1002 {
1003 	switch_database_interface_t *database_interface;
1004 
1005 	supported_prefixes[0] = (char *)"pgsql";
1006 	supported_prefixes[1] = (char *)"postgres";
1007 	supported_prefixes[2] = (char *)"postgresql";
1008 
1009 	/* connect my internal structure to the blank pointer passed to me */
1010 	*module_interface = switch_loadable_module_create_module_interface(pool, modname);
1011 	MODULE_INTERFACE = *module_interface;
1012 
1013 	database_interface = (switch_database_interface_t *)switch_loadable_module_create_interface(*module_interface, SWITCH_DATABASE_INTERFACE);
1014 	database_interface->flags = 0;
1015 	database_interface->interface_name = modname;
1016 	database_interface->prefixes = supported_prefixes;
1017 	database_interface->handle_new = pgsql_handle_new;
1018 	database_interface->handle_destroy = pgsql_handle_destroy;
1019 	database_interface->flush = database_flush;
1020 	database_interface->exec_detailed = database_handle_exec_detailed;
1021 	database_interface->exec_string = database_handle_exec_string;
1022 	database_interface->affected_rows = pgsql_handle_affected_rows;
1023 	database_interface->sql_set_auto_commit_attr = pgsql_SQLSetAutoCommitAttr;
1024 	database_interface->commit = database_commit;
1025 	database_interface->rollback = database_rollback;
1026 	database_interface->callback_exec_detailed = pgsql_handle_callback_exec_detailed;
1027 
1028 	/* indicate that the module should continue to be loaded */
1029 	return SWITCH_STATUS_SUCCESS;
1030 }
1031 
1032 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_pgsql_shutdown)
1033 {
1034 	return SWITCH_STATUS_UNLOAD;
1035 }
1036 
1037 /* For Emacs:
1038  * Local Variables:
1039  * mode:c
1040  * indent-tabs-mode:t
1041  * tab-width:4
1042  * c-basic-offset:4
1043  * End:
1044  * For VIM:
1045  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
1046  */
1047