1 /* ----------------------------------------------------------------------
2  * dbutils.c
3  *
4  *	Database utility functions for Slony-I
5  *
6  *	Copyright (c) 2003-2009, PostgreSQL Global Development Group
7  *	Author: Jan Wieck, Afilias USA INC.
8  *
9  *
10  * ----------------------------------------------------------------------
11  */
12 
13 
14 #include <pthread.h>
15 
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <stdarg.h>
19 #include <string.h>
20 #include <errno.h>
21 #include <signal.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #ifndef WIN32
25 #include <sys/time.h>
26 #include <unistd.h>
27 #include <netinet/tcp.h>
28 #include <netinet/in.h>
29 #endif
30 
31 #include "slon.h"
32 
33 bool		keep_alive;
34 bool		enable_version_check;
35 int			keep_alive_idle;
36 int			keep_alive_count;
37 int			keep_alive_interval;
38 
39 static int	slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap);
40 static int	db_get_version(PGconn *conn);
41 
42 #if (PG_VERSION_MAJOR < 8)
43 /* ----
44  * This mutex is used to wrap around PQconnectdb. There's a problem that
45  * occurs when your libpq is compiled with libkrb (kerberos) which is not
46  * threadsafe.	It is especially odd because I'm not using kerberos.
47  *
48  * This is fixed in libpq in 8.0, but for now (and for older versions we'll just
49  * use this mutex.
50  * ----
51  */
52 static pthread_mutex_t slon_connect_lock = PTHREAD_MUTEX_INITIALIZER;
53 #endif
54 
55 
56 /* ----------
57  * slon_connectdb
58  * ----------
59  */
60 SlonConn *
slon_connectdb(char * conninfo,char * symname)61 slon_connectdb(char *conninfo, char *symname)
62 {
63 	PGconn	   *dbconn;
64 	SlonConn   *conn;
65 	PGresult   *res;
66 	SlonDString query;
67 	int			connpid = -1;
68 
69 	/*
70 	 * Create the native database connection
71 	 */
72 #if (PG_VERSION_MAJOR < 8)
73 	pthread_mutex_lock(&slon_connect_lock);
74 	dbconn = PQconnectdb(conninfo);
75 	pthread_mutex_unlock(&slon_connect_lock);
76 #else
77 	dbconn = PQconnectdb(conninfo);
78 #endif
79 	if (dbconn == NULL)
80 	{
81 		slon_log(SLON_ERROR,
82 				 "slon_connectdb: PQconnectdb(\"%s\") failed\n",
83 				 conninfo);
84 		return NULL;
85 	}
86 	if (PQstatus(dbconn) != CONNECTION_OK)
87 	{
88 		slon_log(SLON_ERROR,
89 				 "slon_connectdb: PQconnectdb(\"%s\") failed - %s",
90 				 conninfo, PQerrorMessage(dbconn));
91 		PQfinish(dbconn);
92 		return NULL;
93 	}
94 
95 	setsockopt(PQsocket(dbconn), SOL_SOCKET, SO_KEEPALIVE, &keep_alive,
96 			   sizeof(int));
97 #ifndef WIN32
98 	if (keep_alive)
99 	{
100 
101 		if (keep_alive_idle > 0)
102 #ifdef TCP_KEEPIDLE
103 			setsockopt(PQsocket(dbconn), IPPROTO_TCP, TCP_KEEPIDLE,
104 					   &keep_alive_idle, sizeof(keep_alive_idle));
105 #else
106 			slon_log(SLON_WARN, "keep_alive_idle is not supported on this platform");
107 #endif
108 		if (keep_alive_interval > 0)
109 #ifdef TCP_KEEPINTVL
110 			setsockopt(PQsocket(dbconn), IPPROTO_TCP, TCP_KEEPINTVL,
111 					   &keep_alive_interval, sizeof(keep_alive_interval));
112 #else
113 			slon_log(SLON_WARN, "keep_alive_interval is not supported on this platform");
114 #endif
115 		if (keep_alive_count > 0)
116 #ifdef TCP_KEEPCNT
117 			setsockopt(PQsocket(dbconn), IPPROTO_TCP, TCP_KEEPCNT,
118 					   &keep_alive_count, sizeof(keep_alive_count));
119 #else
120 			slon_log(SLON_WARN, "keep_alive_count is not supported on this platform");
121 #endif
122 
123 	}
124 #else
125 	/**
126 	 * Win32 does not support the setsockopt calls for setting keep alive
127 	 * parameters.	On Win32 this can be adjusted via the registry.
128 	 * libpq 9.0 and above provide functions for doing this.
129 	 * If we ever require libpq9.0 or above we could start to use them.
130 	 * Alternativly someone could re-implement that functionality inside
131 	 * of slony.
132 	 */
133 	if (keep_alive)
134 	{
135 		if (keep_alive_idle > 0)
136 			slon_log(SLON_WARN, "keep_alive_idle is not supported by Slony on Win32");
137 		if (keep_alive_interval > 0)
138 			slon_log(SLON_WARN, "keep_alive_interval is not supported by Slony on Win32");
139 		if (keep_alive_count > 0)
140 			slon_log(SLON_WARN, "keep_alive_count is not supported by Slony Win32");
141 
142 	}
143 #endif
144 
145 	dstring_init(&query);
146 
147 	if (sql_on_connection != NULL)
148 	{
149 
150 		slon_mkquery(&query, "%s", sql_on_connection);
151 		res = PQexec(dbconn, dstring_data(&query));
152 		if (!((PQresultStatus(res) == PGRES_TUPLES_OK) ||
153 			  (PQresultStatus(res) == PGRES_COMMAND_OK)))
154 		{
155 			slon_log(SLON_ERROR,
156 					 "query %s failed\n",
157 					 dstring_data(&query));
158 		}
159 		PQclear(res);
160 	}
161 
162 	/* set the datestyle to ISO */
163 	slon_mkquery(&query, "set datestyle to 'ISO'");
164 	res = PQexec(dbconn, dstring_data(&query));
165 	if (!(PQresultStatus(res) == PGRES_COMMAND_OK))
166 	{
167 		slon_log(SLON_ERROR, "Unable to set the datestyle to ISO\n");
168 	}
169 	PQclear(res);
170 
171 	/* Find PID for connection */
172 	slon_mkquery(&query, "select pg_catalog.pg_backend_pid();");
173 	res = PQexec(dbconn, dstring_data(&query));
174 	if (!(PQresultStatus(res) == PGRES_TUPLES_OK))
175 	{
176 		slon_log(SLON_ERROR, "Unable to check connection PID\n");
177 	}
178 	else
179 	{
180 		connpid = strtol(PQgetvalue(res, 0, 0), NULL, 10);
181 	}
182 	PQclear(res);
183 
184 	/*
185 	 * Embed it into a SlonConn structure used to exchange it with the
186 	 * scheduler. On return this new connection object is locked.
187 	 */
188 	conn = slon_make_dummyconn(symname);
189 	conn->dbconn = dbconn;
190 	conn->pg_version = db_get_version(dbconn);
191 	conn->conn_pid = connpid;
192 	if (conn->pg_version < 80300)
193 	{
194 		slon_log(SLON_ERROR,
195 				 "slon_connectdb: PQconnectdb(\"%s\") PostgreSQL version not supported\n",
196 				 conninfo);
197 		PQfinish(dbconn);
198 		return NULL;
199 	}
200 
201 	slon_log(SLON_CONFIG,
202 			 "version for \"%s\" is %d\n", conninfo, conn->pg_version);
203 
204 	if (conn->pg_version >= 80300)
205 	{
206 		slon_mkquery(&query, "set escape_string_warning to 'off'");
207 		res = PQexec(dbconn, dstring_data(&query));
208 		if (!(PQresultStatus(res) == PGRES_COMMAND_OK))
209 		{
210 			slon_log(SLON_ERROR, "Unable to set escape_string_warning to off\n");
211 		}
212 		PQclear(res);
213 
214 		slon_mkquery(&query, "set standard_conforming_strings to 'off'");
215 		res = PQexec(dbconn, dstring_data(&query));
216 		if (!(PQresultStatus(res) == PGRES_COMMAND_OK))
217 		{
218 			slon_log(SLON_ERROR, "Unable to set the standard_conforming_strings to off\n");
219 		}
220 		PQclear(res);
221 	}
222 
223 	slon_mkquery(&query, "select %s.store_application_name('slon.%s');",
224 				 rtcfg_namespace, symname);
225 	res = PQexec(dbconn, dstring_data(&query));
226 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
227 	{
228 		slon_log(SLON_ERROR, "Unable to submit application_name store request\n");
229 	}
230 	PQclear(res);
231 
232 
233 	if (slon_log_level >= SLON_DEBUG1)
234 	{
235 		slon_mkquery(&query, "select pg_backend_pid()");
236 		res = PQexec(dbconn, dstring_data(&query));
237 		if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
238 		{
239 			slon_log(SLON_ERROR, "%s: Unable to get backend pid - %s\n",
240 					 symname, PQresultErrorMessage(res));
241 		}
242 		else
243 		{
244 			slon_log(SLON_DEBUG1, "%s \"%s\": backend pid = %s\n",
245 					 symname, conninfo, PQgetvalue(res, 0, 0));
246 		}
247 		PQclear(res);
248 	}
249 
250 	dstring_free(&query);
251 	return conn;
252 }
253 
254 
255 /* ----------
256  * slon_disconnectdb
257  * ----------
258  */
259 void
slon_disconnectdb(SlonConn * conn)260 slon_disconnectdb(SlonConn * conn)
261 {
262 	/*
263 	 * Disconnect the native database connection
264 	 */
265 	PQfinish(conn->dbconn);
266 #ifdef SLON_MEMDEBUG
267 	conn->dbconn = NULL;
268 #endif
269 
270 	/*
271 	 * Unlock and destroy the condition and mutex variables and free memory.
272 	 */
273 	slon_free_dummyconn(conn);
274 }
275 
276 
277 /* ----------
278  * slon_make_dummyconn
279  * ----------
280  */
281 SlonConn *
slon_make_dummyconn(char * symname)282 slon_make_dummyconn(char *symname)
283 {
284 	SlonConn   *conn;
285 
286 	/*
287 	 * Allocate and initialize the SlonConn structure
288 	 */
289 	conn = (SlonConn *) malloc(sizeof(SlonConn));
290 	if (conn == NULL)
291 	{
292 		perror("slon_make_dummyconn: malloc()");
293 		slon_retry();
294 	}
295 	memset(conn, 0, sizeof(SlonConn));
296 	conn->symname = strdup(symname);
297 
298 	/*
299 	 * Initialize and lock the condition and mutex variables
300 	 */
301 	pthread_mutex_init(&(conn->conn_lock), NULL);
302 	pthread_cond_init(&(conn->conn_cond), NULL);
303 	pthread_mutex_lock(&(conn->conn_lock));
304 
305 	return conn;
306 }
307 
308 
309 /* ----------
310  * slon_free_dummyconn
311  * ----------
312  */
313 void
slon_free_dummyconn(SlonConn * conn)314 slon_free_dummyconn(SlonConn * conn)
315 {
316 	/*
317 	 * Destroy and unlock the condition and mutex variables
318 	 */
319 	pthread_cond_destroy(&(conn->conn_cond));
320 	pthread_mutex_unlock(&(conn->conn_lock));
321 	pthread_mutex_destroy(&(conn->conn_lock));
322 
323 	/*
324 	 * Free allocated memory
325 	 */
326 	free(conn->symname);
327 #ifdef SLON_MEMDEBUG
328 	conn->symname = NULL;
329 #endif
330 	free(conn);
331 }
332 
333 
334 /* ----------
335  * db_getLocalNodeId
336  *
337  * Query a connection for the value of sequence sl_local_node_id
338  * ----------
339  */
340 int
db_getLocalNodeId(PGconn * conn)341 db_getLocalNodeId(PGconn *conn)
342 {
343 	char		query[1024];
344 	PGresult   *res;
345 	int			retval;
346 
347 	/*
348 	 * Select the last_value from the sl_local_node_id sequence
349 	 */
350 	snprintf(query, 1024, "select last_value::int4 from %s.sl_local_node_id",
351 			 rtcfg_namespace);
352 	res = PQexec(conn, query);
353 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
354 	{
355 		slon_log(SLON_ERROR,
356 				 "cannot get sl_local_node_id - %s",
357 				 PQresultErrorMessage(res));
358 		PQclear(res);
359 		return -1;
360 	}
361 	if (PQntuples(res) != 1)
362 	{
363 		slon_log(SLON_ERROR,
364 				 "query '%s' returned %d rows (expected 1)\n",
365 				 query, PQntuples(res));
366 		PQclear(res);
367 		return -1;
368 	}
369 
370 	/*
371 	 * Return the result as an integer value
372 	 */
373 	retval = strtol(PQgetvalue(res, 0, 0), NULL, 10);
374 	PQclear(res);
375 
376 	return retval;
377 }
378 
379 
380 /* ----------
381  * db_checkSchemaVersion
382  *
383  *	Check the Slony schema on a connection for the correct version number
384  * ----------
385  */
386 int
db_checkSchemaVersion(PGconn * conn)387 db_checkSchemaVersion(PGconn *conn)
388 {
389 	char		query[1024];
390 	PGresult   *res;
391 	int			retval = 0;
392 
393 	if (! enable_version_check)
394 	{
395 		return 0;
396 	}
397 
398 	/*
399 	 * Select the version number from the schema
400 	 */
401 	snprintf(query, 1024, "select %s.slonyVersion()",
402 			 rtcfg_namespace);
403 	res = PQexec(conn, query);
404 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
405 	{
406 		slon_log(SLON_ERROR,
407 				 "cannot get Slony-I schema version - %s",
408 				 PQresultErrorMessage(res));
409 		slon_log(SLON_ERROR,
410 				 "please upgrade Slony-I schema to version %s\n",
411 				 SLONY_I_VERSION_STRING);
412 		PQclear(res);
413 		return -1;
414 	}
415 	if (PQntuples(res) != 1)
416 	{
417 		slon_log(SLON_ERROR,
418 				 "query '%s' returned %d rows (expected 1)\n",
419 				 query, PQntuples(res));
420 		PQclear(res);
421 		return -1;
422 	}
423 
424 	/*
425 	 * Check the version string of the schema
426 	 */
427 	if (strcmp(PQgetvalue(res, 0, 0), SLONY_I_VERSION_STRING) != 0)
428 	{
429 		slon_log(SLON_ERROR,
430 				 "Slony-I schema version is %s\n",
431 				 PQgetvalue(res, 0, 0));
432 		slon_log(SLON_ERROR,
433 				 "please upgrade Slony-I schema to version %s\n",
434 				 SLONY_I_VERSION_STRING);
435 		retval = -1;
436 	}
437 	PQclear(res);
438 
439 	/*
440 	 * Select the version number from the module
441 	 */
442 	snprintf(query, 1024, "select %s.getModuleVersion()",
443 			 rtcfg_namespace);
444 	res = PQexec(conn, query);
445 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
446 	{
447 		slon_log(SLON_ERROR,
448 				 "cannot get Slony-I module version - %s",
449 				 PQresultErrorMessage(res));
450 		slon_log(SLON_ERROR,
451 				 "please upgrade Slony-I shared module to version %s\n",
452 				 SLONY_I_VERSION_STRING);
453 		PQclear(res);
454 		return -1;
455 	}
456 	if (PQntuples(res) != 1)
457 	{
458 		slon_log(SLON_ERROR,
459 				 "query '%s' returned %d rows (expected 1)\n",
460 				 query, PQntuples(res));
461 		PQclear(res);
462 		return -1;
463 	}
464 
465 	/*
466 	 * Check the version string of the module
467 	 */
468 	if (strcmp(PQgetvalue(res, 0, 0), SLONY_I_VERSION_STRING) != 0)
469 	{
470 		slon_log(SLON_ERROR,
471 				 "Slony-I module version is %s\n",
472 				 PQgetvalue(res, 0, 0));
473 		slon_log(SLON_ERROR,
474 				 "please upgrade Slony-I shared module to version %s\n",
475 				 SLONY_I_VERSION_STRING);
476 		retval = -1;
477 	}
478 	PQclear(res);
479 
480 	return retval;
481 }
482 
483 
484 /* ----------
485  * slon_mkquery
486  *
487  * A simple query formatting and quoting function using dynamic string buffer
488  * allocation. Similar to sprintf() it uses formatting symbols:
489  *	   %s	String argument
490  *	   %q	Quoted literal (\ and ' will be escaped)
491  *	   %d	Integer argument
492  * ----------
493  */
494 void
slon_mkquery(SlonDString * dsp,char * fmt,...)495 slon_mkquery(SlonDString * dsp, char *fmt,...)
496 {
497 	va_list		ap;
498 
499 	dstring_reset(dsp);
500 
501 	va_start(ap, fmt);
502 	slon_appendquery_int(dsp, fmt, ap);
503 	va_end(ap);
504 
505 	dstring_terminate(dsp);
506 }
507 
508 
509 /* ----------
510  * slon_appendquery
511  *
512  * Append query string material to an existing dynamic string.
513  * ----------
514  */
515 void
slon_appendquery(SlonDString * dsp,char * fmt,...)516 slon_appendquery(SlonDString * dsp, char *fmt,...)
517 {
518 	va_list		ap;
519 
520 	va_start(ap, fmt);
521 	slon_appendquery_int(dsp, fmt, ap);
522 	va_end(ap);
523 
524 	dstring_terminate(dsp);
525 }
526 
527 
528 /* ----------
529  * slon_appendquery_int
530  *
531  * Implementation of slon_mkquery() and slon_appendquery().
532  * ----------
533  */
534 static int
slon_appendquery_int(SlonDString * dsp,char * fmt,va_list ap)535 slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap)
536 {
537 	char	   *s;
538 	char		buf[64];
539 
540 	while (*fmt)
541 	{
542 		switch (*fmt)
543 		{
544 			case '%':
545 				fmt++;
546 				switch (*fmt)
547 				{
548 					case 's':
549 						s = va_arg(ap, char *);
550 						dstring_append(dsp, s);
551 						fmt++;
552 						break;
553 
554 					case 'q':
555 						s = va_arg(ap, char *);
556 						while (s && *s != '\0')
557 						{
558 							switch (*s)
559 							{
560 								case '\'':
561 									dstring_addchar(dsp, '\'');
562 									break;
563 								case '\\':
564 									dstring_addchar(dsp, '\\');
565 									break;
566 								default:
567 									break;
568 							}
569 							dstring_addchar(dsp, *s);
570 							s++;
571 						}
572 						fmt++;
573 						break;
574 
575 					case 'd':
576 						sprintf(buf, "%d", va_arg(ap, int));
577 						dstring_append(dsp, buf);
578 						fmt++;
579 						break;
580 
581 					case 'L':
582 						sprintf(buf, INT64_FORMAT, va_arg(ap, int64));
583 						dstring_append(dsp, buf);
584 						fmt++;
585 						break;
586 
587 					default:
588 						dstring_addchar(dsp, '%');
589 						dstring_addchar(dsp, *fmt);
590 						fmt++;
591 						break;
592 				}
593 				break;
594 
595 			case '\\':
596 				fmt++;
597 				dstring_addchar(dsp, *fmt);
598 				fmt++;
599 				break;
600 
601 			default:
602 				dstring_addchar(dsp, *fmt);
603 				fmt++;
604 				break;
605 		}
606 	}
607 
608 	dstring_terminate(dsp);
609 
610 	return 0;
611 }
612 
613 static int
db_get_version(PGconn * conn)614 db_get_version(PGconn *conn)
615 {
616 	PGresult   *res;
617 	SlonDString query;
618 	char		versionstr[7];
619 	int			version = 0;
620 	int			major = 0;
621 	int			minor = 0;
622 	int			patch = 0;
623 	int			scanres=0;
624 
625 	dstring_init(&query);
626 	slon_mkquery(&query, "SELECT version();");
627 	res = PQexec(conn, dstring_data(&query));
628 
629 	if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
630 	{
631 		PQclear(res);
632 		return -1;
633 	}
634 	scanres=sscanf(PQgetvalue(res, 0, 0), "PostgreSQL %d.%d.%d", &major, &minor, &patch);
635 	if(scanres < 1)
636 	{
637 		scanres=sscanf(PQgetvalue(res, 0, 0), "EnterpriseDB %d.%d.%d", &major, &minor, &patch);
638 	}
639 	if ( scanres < 1)
640 	{
641 		PQclear(res);
642 		return -1;
643 	}
644 	PQclear(res);
645 	snprintf(versionstr, 7, "%.2d%.2d%.2d", major, minor, patch);
646 	version = atoi(versionstr);
647 	dstring_free(&query);
648 	return version;
649 }
650 
651 /*
652  * Local Variables:
653  *	tab-width: 4
654  *	c-indent-level: 4
655  *	c-basic-offset: 4
656  * End:
657  */
658