1 /* ----------------------------------------------------------------------
2 * dbutils.c
3 *
4 * Database utility functions for Slony-I
5 *
6 * Copyright (c) 2003-2009, PostgreSQL Global Development Group
7 * Author: Jan Wieck, Afilias USA INC.
8 *
9 *
10 * ----------------------------------------------------------------------
11 */
12
13
14 #include <pthread.h>
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <stdarg.h>
19 #include <string.h>
20 #include <errno.h>
21 #include <signal.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #ifndef WIN32
25 #include <sys/time.h>
26 #include <unistd.h>
27 #include <netinet/tcp.h>
28 #include <netinet/in.h>
29 #endif
30
31 #include "slon.h"
32
33 bool keep_alive;
34 bool enable_version_check;
35 int keep_alive_idle;
36 int keep_alive_count;
37 int keep_alive_interval;
38
39 static int slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap);
40 static int db_get_version(PGconn *conn);
41
42 #if (PG_VERSION_MAJOR < 8)
43 /* ----
44 * This mutex is used to wrap around PQconnectdb. There's a problem that
45 * occurs when your libpq is compiled with libkrb (kerberos) which is not
46 * threadsafe. It is especially odd because I'm not using kerberos.
47 *
48 * This is fixed in libpq in 8.0, but for now (and for older versions we'll just
49 * use this mutex.
50 * ----
51 */
52 static pthread_mutex_t slon_connect_lock = PTHREAD_MUTEX_INITIALIZER;
53 #endif
54
55
56 /* ----------
57 * slon_connectdb
58 * ----------
59 */
60 SlonConn *
slon_connectdb(char * conninfo,char * symname)61 slon_connectdb(char *conninfo, char *symname)
62 {
63 PGconn *dbconn;
64 SlonConn *conn;
65 PGresult *res;
66 SlonDString query;
67 int connpid = -1;
68
69 /*
70 * Create the native database connection
71 */
72 #if (PG_VERSION_MAJOR < 8)
73 pthread_mutex_lock(&slon_connect_lock);
74 dbconn = PQconnectdb(conninfo);
75 pthread_mutex_unlock(&slon_connect_lock);
76 #else
77 dbconn = PQconnectdb(conninfo);
78 #endif
79 if (dbconn == NULL)
80 {
81 slon_log(SLON_ERROR,
82 "slon_connectdb: PQconnectdb(\"%s\") failed\n",
83 conninfo);
84 return NULL;
85 }
86 if (PQstatus(dbconn) != CONNECTION_OK)
87 {
88 slon_log(SLON_ERROR,
89 "slon_connectdb: PQconnectdb(\"%s\") failed - %s",
90 conninfo, PQerrorMessage(dbconn));
91 PQfinish(dbconn);
92 return NULL;
93 }
94
95 setsockopt(PQsocket(dbconn), SOL_SOCKET, SO_KEEPALIVE, &keep_alive,
96 sizeof(int));
97 #ifndef WIN32
98 if (keep_alive)
99 {
100
101 if (keep_alive_idle > 0)
102 #ifdef TCP_KEEPIDLE
103 setsockopt(PQsocket(dbconn), IPPROTO_TCP, TCP_KEEPIDLE,
104 &keep_alive_idle, sizeof(keep_alive_idle));
105 #else
106 slon_log(SLON_WARN, "keep_alive_idle is not supported on this platform");
107 #endif
108 if (keep_alive_interval > 0)
109 #ifdef TCP_KEEPINTVL
110 setsockopt(PQsocket(dbconn), IPPROTO_TCP, TCP_KEEPINTVL,
111 &keep_alive_interval, sizeof(keep_alive_interval));
112 #else
113 slon_log(SLON_WARN, "keep_alive_interval is not supported on this platform");
114 #endif
115 if (keep_alive_count > 0)
116 #ifdef TCP_KEEPCNT
117 setsockopt(PQsocket(dbconn), IPPROTO_TCP, TCP_KEEPCNT,
118 &keep_alive_count, sizeof(keep_alive_count));
119 #else
120 slon_log(SLON_WARN, "keep_alive_count is not supported on this platform");
121 #endif
122
123 }
124 #else
125 /**
126 * Win32 does not support the setsockopt calls for setting keep alive
127 * parameters. On Win32 this can be adjusted via the registry.
128 * libpq 9.0 and above provide functions for doing this.
129 * If we ever require libpq9.0 or above we could start to use them.
130 * Alternativly someone could re-implement that functionality inside
131 * of slony.
132 */
133 if (keep_alive)
134 {
135 if (keep_alive_idle > 0)
136 slon_log(SLON_WARN, "keep_alive_idle is not supported by Slony on Win32");
137 if (keep_alive_interval > 0)
138 slon_log(SLON_WARN, "keep_alive_interval is not supported by Slony on Win32");
139 if (keep_alive_count > 0)
140 slon_log(SLON_WARN, "keep_alive_count is not supported by Slony Win32");
141
142 }
143 #endif
144
145 dstring_init(&query);
146
147 if (sql_on_connection != NULL)
148 {
149
150 slon_mkquery(&query, "%s", sql_on_connection);
151 res = PQexec(dbconn, dstring_data(&query));
152 if (!((PQresultStatus(res) == PGRES_TUPLES_OK) ||
153 (PQresultStatus(res) == PGRES_COMMAND_OK)))
154 {
155 slon_log(SLON_ERROR,
156 "query %s failed\n",
157 dstring_data(&query));
158 }
159 PQclear(res);
160 }
161
162 /* set the datestyle to ISO */
163 slon_mkquery(&query, "set datestyle to 'ISO'");
164 res = PQexec(dbconn, dstring_data(&query));
165 if (!(PQresultStatus(res) == PGRES_COMMAND_OK))
166 {
167 slon_log(SLON_ERROR, "Unable to set the datestyle to ISO\n");
168 }
169 PQclear(res);
170
171 /* Find PID for connection */
172 slon_mkquery(&query, "select pg_catalog.pg_backend_pid();");
173 res = PQexec(dbconn, dstring_data(&query));
174 if (!(PQresultStatus(res) == PGRES_TUPLES_OK))
175 {
176 slon_log(SLON_ERROR, "Unable to check connection PID\n");
177 }
178 else
179 {
180 connpid = strtol(PQgetvalue(res, 0, 0), NULL, 10);
181 }
182 PQclear(res);
183
184 /*
185 * Embed it into a SlonConn structure used to exchange it with the
186 * scheduler. On return this new connection object is locked.
187 */
188 conn = slon_make_dummyconn(symname);
189 conn->dbconn = dbconn;
190 conn->pg_version = db_get_version(dbconn);
191 conn->conn_pid = connpid;
192 if (conn->pg_version < 80300)
193 {
194 slon_log(SLON_ERROR,
195 "slon_connectdb: PQconnectdb(\"%s\") PostgreSQL version not supported\n",
196 conninfo);
197 PQfinish(dbconn);
198 return NULL;
199 }
200
201 slon_log(SLON_CONFIG,
202 "version for \"%s\" is %d\n", conninfo, conn->pg_version);
203
204 if (conn->pg_version >= 80300)
205 {
206 slon_mkquery(&query, "set escape_string_warning to 'off'");
207 res = PQexec(dbconn, dstring_data(&query));
208 if (!(PQresultStatus(res) == PGRES_COMMAND_OK))
209 {
210 slon_log(SLON_ERROR, "Unable to set escape_string_warning to off\n");
211 }
212 PQclear(res);
213
214 slon_mkquery(&query, "set standard_conforming_strings to 'off'");
215 res = PQexec(dbconn, dstring_data(&query));
216 if (!(PQresultStatus(res) == PGRES_COMMAND_OK))
217 {
218 slon_log(SLON_ERROR, "Unable to set the standard_conforming_strings to off\n");
219 }
220 PQclear(res);
221 }
222
223 slon_mkquery(&query, "select %s.store_application_name('slon.%s');",
224 rtcfg_namespace, symname);
225 res = PQexec(dbconn, dstring_data(&query));
226 if (PQresultStatus(res) != PGRES_TUPLES_OK)
227 {
228 slon_log(SLON_ERROR, "Unable to submit application_name store request\n");
229 }
230 PQclear(res);
231
232
233 if (slon_log_level >= SLON_DEBUG1)
234 {
235 slon_mkquery(&query, "select pg_backend_pid()");
236 res = PQexec(dbconn, dstring_data(&query));
237 if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
238 {
239 slon_log(SLON_ERROR, "%s: Unable to get backend pid - %s\n",
240 symname, PQresultErrorMessage(res));
241 }
242 else
243 {
244 slon_log(SLON_DEBUG1, "%s \"%s\": backend pid = %s\n",
245 symname, conninfo, PQgetvalue(res, 0, 0));
246 }
247 PQclear(res);
248 }
249
250 dstring_free(&query);
251 return conn;
252 }
253
254
255 /* ----------
256 * slon_disconnectdb
257 * ----------
258 */
259 void
slon_disconnectdb(SlonConn * conn)260 slon_disconnectdb(SlonConn * conn)
261 {
262 /*
263 * Disconnect the native database connection
264 */
265 PQfinish(conn->dbconn);
266 #ifdef SLON_MEMDEBUG
267 conn->dbconn = NULL;
268 #endif
269
270 /*
271 * Unlock and destroy the condition and mutex variables and free memory.
272 */
273 slon_free_dummyconn(conn);
274 }
275
276
277 /* ----------
278 * slon_make_dummyconn
279 * ----------
280 */
281 SlonConn *
slon_make_dummyconn(char * symname)282 slon_make_dummyconn(char *symname)
283 {
284 SlonConn *conn;
285
286 /*
287 * Allocate and initialize the SlonConn structure
288 */
289 conn = (SlonConn *) malloc(sizeof(SlonConn));
290 if (conn == NULL)
291 {
292 perror("slon_make_dummyconn: malloc()");
293 slon_retry();
294 }
295 memset(conn, 0, sizeof(SlonConn));
296 conn->symname = strdup(symname);
297
298 /*
299 * Initialize and lock the condition and mutex variables
300 */
301 pthread_mutex_init(&(conn->conn_lock), NULL);
302 pthread_cond_init(&(conn->conn_cond), NULL);
303 pthread_mutex_lock(&(conn->conn_lock));
304
305 return conn;
306 }
307
308
309 /* ----------
310 * slon_free_dummyconn
311 * ----------
312 */
313 void
slon_free_dummyconn(SlonConn * conn)314 slon_free_dummyconn(SlonConn * conn)
315 {
316 /*
317 * Destroy and unlock the condition and mutex variables
318 */
319 pthread_cond_destroy(&(conn->conn_cond));
320 pthread_mutex_unlock(&(conn->conn_lock));
321 pthread_mutex_destroy(&(conn->conn_lock));
322
323 /*
324 * Free allocated memory
325 */
326 free(conn->symname);
327 #ifdef SLON_MEMDEBUG
328 conn->symname = NULL;
329 #endif
330 free(conn);
331 }
332
333
334 /* ----------
335 * db_getLocalNodeId
336 *
337 * Query a connection for the value of sequence sl_local_node_id
338 * ----------
339 */
340 int
db_getLocalNodeId(PGconn * conn)341 db_getLocalNodeId(PGconn *conn)
342 {
343 char query[1024];
344 PGresult *res;
345 int retval;
346
347 /*
348 * Select the last_value from the sl_local_node_id sequence
349 */
350 snprintf(query, 1024, "select last_value::int4 from %s.sl_local_node_id",
351 rtcfg_namespace);
352 res = PQexec(conn, query);
353 if (PQresultStatus(res) != PGRES_TUPLES_OK)
354 {
355 slon_log(SLON_ERROR,
356 "cannot get sl_local_node_id - %s",
357 PQresultErrorMessage(res));
358 PQclear(res);
359 return -1;
360 }
361 if (PQntuples(res) != 1)
362 {
363 slon_log(SLON_ERROR,
364 "query '%s' returned %d rows (expected 1)\n",
365 query, PQntuples(res));
366 PQclear(res);
367 return -1;
368 }
369
370 /*
371 * Return the result as an integer value
372 */
373 retval = strtol(PQgetvalue(res, 0, 0), NULL, 10);
374 PQclear(res);
375
376 return retval;
377 }
378
379
380 /* ----------
381 * db_checkSchemaVersion
382 *
383 * Check the Slony schema on a connection for the correct version number
384 * ----------
385 */
386 int
db_checkSchemaVersion(PGconn * conn)387 db_checkSchemaVersion(PGconn *conn)
388 {
389 char query[1024];
390 PGresult *res;
391 int retval = 0;
392
393 if (! enable_version_check)
394 {
395 return 0;
396 }
397
398 /*
399 * Select the version number from the schema
400 */
401 snprintf(query, 1024, "select %s.slonyVersion()",
402 rtcfg_namespace);
403 res = PQexec(conn, query);
404 if (PQresultStatus(res) != PGRES_TUPLES_OK)
405 {
406 slon_log(SLON_ERROR,
407 "cannot get Slony-I schema version - %s",
408 PQresultErrorMessage(res));
409 slon_log(SLON_ERROR,
410 "please upgrade Slony-I schema to version %s\n",
411 SLONY_I_VERSION_STRING);
412 PQclear(res);
413 return -1;
414 }
415 if (PQntuples(res) != 1)
416 {
417 slon_log(SLON_ERROR,
418 "query '%s' returned %d rows (expected 1)\n",
419 query, PQntuples(res));
420 PQclear(res);
421 return -1;
422 }
423
424 /*
425 * Check the version string of the schema
426 */
427 if (strcmp(PQgetvalue(res, 0, 0), SLONY_I_VERSION_STRING) != 0)
428 {
429 slon_log(SLON_ERROR,
430 "Slony-I schema version is %s\n",
431 PQgetvalue(res, 0, 0));
432 slon_log(SLON_ERROR,
433 "please upgrade Slony-I schema to version %s\n",
434 SLONY_I_VERSION_STRING);
435 retval = -1;
436 }
437 PQclear(res);
438
439 /*
440 * Select the version number from the module
441 */
442 snprintf(query, 1024, "select %s.getModuleVersion()",
443 rtcfg_namespace);
444 res = PQexec(conn, query);
445 if (PQresultStatus(res) != PGRES_TUPLES_OK)
446 {
447 slon_log(SLON_ERROR,
448 "cannot get Slony-I module version - %s",
449 PQresultErrorMessage(res));
450 slon_log(SLON_ERROR,
451 "please upgrade Slony-I shared module to version %s\n",
452 SLONY_I_VERSION_STRING);
453 PQclear(res);
454 return -1;
455 }
456 if (PQntuples(res) != 1)
457 {
458 slon_log(SLON_ERROR,
459 "query '%s' returned %d rows (expected 1)\n",
460 query, PQntuples(res));
461 PQclear(res);
462 return -1;
463 }
464
465 /*
466 * Check the version string of the module
467 */
468 if (strcmp(PQgetvalue(res, 0, 0), SLONY_I_VERSION_STRING) != 0)
469 {
470 slon_log(SLON_ERROR,
471 "Slony-I module version is %s\n",
472 PQgetvalue(res, 0, 0));
473 slon_log(SLON_ERROR,
474 "please upgrade Slony-I shared module to version %s\n",
475 SLONY_I_VERSION_STRING);
476 retval = -1;
477 }
478 PQclear(res);
479
480 return retval;
481 }
482
483
484 /* ----------
485 * slon_mkquery
486 *
487 * A simple query formatting and quoting function using dynamic string buffer
488 * allocation. Similar to sprintf() it uses formatting symbols:
489 * %s String argument
490 * %q Quoted literal (\ and ' will be escaped)
491 * %d Integer argument
492 * ----------
493 */
494 void
slon_mkquery(SlonDString * dsp,char * fmt,...)495 slon_mkquery(SlonDString * dsp, char *fmt,...)
496 {
497 va_list ap;
498
499 dstring_reset(dsp);
500
501 va_start(ap, fmt);
502 slon_appendquery_int(dsp, fmt, ap);
503 va_end(ap);
504
505 dstring_terminate(dsp);
506 }
507
508
509 /* ----------
510 * slon_appendquery
511 *
512 * Append query string material to an existing dynamic string.
513 * ----------
514 */
515 void
slon_appendquery(SlonDString * dsp,char * fmt,...)516 slon_appendquery(SlonDString * dsp, char *fmt,...)
517 {
518 va_list ap;
519
520 va_start(ap, fmt);
521 slon_appendquery_int(dsp, fmt, ap);
522 va_end(ap);
523
524 dstring_terminate(dsp);
525 }
526
527
528 /* ----------
529 * slon_appendquery_int
530 *
531 * Implementation of slon_mkquery() and slon_appendquery().
532 * ----------
533 */
534 static int
slon_appendquery_int(SlonDString * dsp,char * fmt,va_list ap)535 slon_appendquery_int(SlonDString * dsp, char *fmt, va_list ap)
536 {
537 char *s;
538 char buf[64];
539
540 while (*fmt)
541 {
542 switch (*fmt)
543 {
544 case '%':
545 fmt++;
546 switch (*fmt)
547 {
548 case 's':
549 s = va_arg(ap, char *);
550 dstring_append(dsp, s);
551 fmt++;
552 break;
553
554 case 'q':
555 s = va_arg(ap, char *);
556 while (s && *s != '\0')
557 {
558 switch (*s)
559 {
560 case '\'':
561 dstring_addchar(dsp, '\'');
562 break;
563 case '\\':
564 dstring_addchar(dsp, '\\');
565 break;
566 default:
567 break;
568 }
569 dstring_addchar(dsp, *s);
570 s++;
571 }
572 fmt++;
573 break;
574
575 case 'd':
576 sprintf(buf, "%d", va_arg(ap, int));
577 dstring_append(dsp, buf);
578 fmt++;
579 break;
580
581 case 'L':
582 sprintf(buf, INT64_FORMAT, va_arg(ap, int64));
583 dstring_append(dsp, buf);
584 fmt++;
585 break;
586
587 default:
588 dstring_addchar(dsp, '%');
589 dstring_addchar(dsp, *fmt);
590 fmt++;
591 break;
592 }
593 break;
594
595 case '\\':
596 fmt++;
597 dstring_addchar(dsp, *fmt);
598 fmt++;
599 break;
600
601 default:
602 dstring_addchar(dsp, *fmt);
603 fmt++;
604 break;
605 }
606 }
607
608 dstring_terminate(dsp);
609
610 return 0;
611 }
612
613 static int
db_get_version(PGconn * conn)614 db_get_version(PGconn *conn)
615 {
616 PGresult *res;
617 SlonDString query;
618 char versionstr[7];
619 int version = 0;
620 int major = 0;
621 int minor = 0;
622 int patch = 0;
623 int scanres=0;
624
625 dstring_init(&query);
626 slon_mkquery(&query, "SELECT version();");
627 res = PQexec(conn, dstring_data(&query));
628
629 if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
630 {
631 PQclear(res);
632 return -1;
633 }
634 scanres=sscanf(PQgetvalue(res, 0, 0), "PostgreSQL %d.%d.%d", &major, &minor, &patch);
635 if(scanres < 1)
636 {
637 scanres=sscanf(PQgetvalue(res, 0, 0), "EnterpriseDB %d.%d.%d", &major, &minor, &patch);
638 }
639 if ( scanres < 1)
640 {
641 PQclear(res);
642 return -1;
643 }
644 PQclear(res);
645 snprintf(versionstr, 7, "%.2d%.2d%.2d", major, minor, patch);
646 version = atoi(versionstr);
647 dstring_free(&query);
648 return version;
649 }
650
651 /*
652 * Local Variables:
653 * tab-width: 4
654 * c-indent-level: 4
655 * c-basic-offset: 4
656 * End:
657 */
658