1 /* -------------------------------------------------------------------------
2 *
3 * pglogical_create_subscriber.c
4 * Initialize a new pglogical subscriber from a physical base backup
5 *
6 * Copyright (C) 2012-2016, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * pglogical_create_subscriber.c
10 *
11 * -------------------------------------------------------------------------
12 */
13
14 /* dirent.h on port/win32_msvc expects MAX_PATH to be defined */
15 #if defined(_WIN32)
16 #define WIN32_LEAN_AND_MEAN
17 #include <windows.h>
18 #endif
19
20 #include <dirent.h>
21 #include <fcntl.h>
22 #include <locale.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/types.h>
26 #include <sys/wait.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29 #include <stdlib.h>
30
31 /* Note the order is important for debian here. */
32 #if !defined(pg_attribute_printf)
33
34 /* GCC and XLC support format attributes */
35 #if defined(__GNUC__) || defined(__IBMC__)
36 #define pg_attribute_format_arg(a) __attribute__((format_arg(a)))
37 #define pg_attribute_printf(f,a) __attribute__((format(PG_PRINTF_ATTRIBUTE, f, a)))
38 #else
39 #define pg_attribute_format_arg(a)
40 #define pg_attribute_printf(f,a)
41 #endif
42
43 #endif
44
45 #include "libpq-fe.h"
46 #include "postgres_fe.h"
47 #include "pqexpbuffer.h"
48
49 #include "getopt_long.h"
50
51 #include "miscadmin.h"
52
53 #include "access/timeline.h"
54 #include "access/xlog_internal.h"
55 #include "catalog/pg_control.h"
56
57 #include "pglogical_fe.h"
58
59 #define MAX_APPLY_DELAY 86400
60
61 typedef struct RemoteInfo {
62 Oid nodeid;
63 char *node_name;
64 char *sysid;
65 char *dbname;
66 char *replication_sets;
67 } RemoteInfo;
68
69 typedef enum {
70 VERBOSITY_NORMAL,
71 VERBOSITY_VERBOSE,
72 VERBOSITY_DEBUG
73 } VerbosityLevelEnum;
74
75 static char *argv0 = NULL;
76 static const char *progname;
77 static char *data_dir = NULL;
78 static char pid_file[MAXPGPATH];
79 static time_t start_time;
80 static VerbosityLevelEnum verbosity = VERBOSITY_NORMAL;
81
82 /* defined as static so that die() can close them */
83 static PGconn *subscriber_conn = NULL;
84 static PGconn *provider_conn = NULL;
85
86 static void signal_handler(int sig);
87 static void usage(void);
88 static void die(const char *fmt,...)
89 pg_attribute_printf(1, 2);
90 static void print_msg(VerbosityLevelEnum level, const char *fmt,...)
91 pg_attribute_printf(2, 3);
92
93 static int run_pg_ctl(const char *arg);
94 static void run_basebackup(const char *provider_connstr, const char *data_dir,
95 const char *extra_basebackup_args);
96 static void wait_postmaster_connection(const char *connstr);
97 static void wait_primary_connection(const char *connstr);
98 static void wait_postmaster_shutdown(void);
99
100 static char *validate_replication_set_input(char *replication_sets);
101
102 static void remove_unwanted_data(PGconn *conn);
103 static void initialize_replication_origin(PGconn *conn, char *origin_name, char *remote_lsn);
104 static char *create_restore_point(PGconn *conn, char *restore_point_name);
105 static char *initialize_replication_slot(PGconn *conn, char *dbname,
106 char *provider_node_name, char *subscription_name,
107 bool drop_slot_if_exists);
108 static void pglogical_subscribe(PGconn *conn, char *subscriber_name,
109 char *subscriber_dsn,
110 char *provider_connstr,
111 char *replication_sets,
112 int apply_delay,
113 bool force_text_transfer);
114
115 static RemoteInfo *get_remote_info(PGconn* conn);
116
117 static bool extension_exists(PGconn *conn, const char *extname);
118 static void install_extension(PGconn *conn, const char *extname);
119
120 static void initialize_data_dir(char *data_dir, char *connstr,
121 char *postgresql_conf, char *pg_hba_conf,
122 char *extra_basebackup_args);
123 static bool check_data_dir(char *data_dir, RemoteInfo *remoteinfo);
124
125 static char *read_sysid(const char *data_dir);
126
127 static void WriteRecoveryConf(PQExpBuffer contents);
128 static void CopyConfFile(char *fromfile, char *tofile, bool append);
129
130 static char *get_connstr_dbname(char *connstr);
131 static char *get_connstr(char *connstr, char *dbname);
132 static char *PQconninfoParamsToConnstr(const char *const * keywords, const char *const * values);
133 static void appendPQExpBufferConnstrValue(PQExpBuffer buf, const char *str);
134
135 static bool file_exists(const char *path);
136 static bool is_pg_dir(const char *path);
137 static void copy_file(char *fromfile, char *tofile, bool append);
138 static char *find_other_exec_or_die(const char *argv0, const char *target);
139 static bool postmaster_is_alive(pid_t pid);
140 static long get_pgpid(void);
141 static char **get_database_list(char *databases, int *n_databases);
142 static char *generate_restore_point_name(void);
143
144 static PGconn *
connectdb(const char * connstr)145 connectdb(const char *connstr)
146 {
147 PGconn *conn;
148
149 conn = PQconnectdb(connstr);
150 if (PQstatus(conn) != CONNECTION_OK)
151 die(_("Connection to database failed: %s, connection string was: %s\n"), PQerrorMessage(conn), connstr);
152
153 return conn;
154 }
155
signal_handler(int sig)156 void signal_handler(int sig)
157 {
158 if (sig == SIGINT)
159 {
160 die(_("\nCanceling...\n"));
161 }
162 }
163
164
165 int
main(int argc,char ** argv)166 main(int argc, char **argv)
167 {
168 int i;
169 int c;
170 PQExpBuffer recoveryconfcontents = createPQExpBuffer();
171 RemoteInfo *remote_info;
172 char *remote_lsn;
173 bool stop = false;
174 bool drop_slot_if_exists = false;
175 int optindex;
176 char *subscriber_name = NULL;
177 char *base_sub_connstr = NULL;
178 char *base_prov_connstr = NULL;
179 char *replication_sets = NULL;
180 char *databases = NULL;
181 char *postgresql_conf = NULL,
182 *pg_hba_conf = NULL,
183 *recovery_conf = NULL;
184 int apply_delay = 0;
185 bool force_text_transfer = false;
186 char **slot_names;
187 char *sub_connstr;
188 char *prov_connstr;
189 char **database_list = { NULL };
190 int n_databases = 1;
191 int dbnum;
192 bool use_existing_data_dir = false;
193 int pg_ctl_ret,
194 logfd;
195 char *restore_point_name = NULL;
196 char *extra_basebackup_args = NULL;
197
198 static struct option long_options[] = {
199 {"subscriber-name", required_argument, NULL, 'n'},
200 {"pgdata", required_argument, NULL, 'D'},
201 {"provider-dsn", required_argument, NULL, 1},
202 {"subscriber-dsn", required_argument, NULL, 2},
203 {"replication-sets", required_argument, NULL, 3},
204 {"postgresql-conf", required_argument, NULL, 4},
205 {"hba-conf", required_argument, NULL, 5},
206 {"recovery-conf", required_argument, NULL, 6},
207 {"stop", no_argument, NULL, 's'},
208 {"drop-slot-if-exists", no_argument, NULL, 7},
209 {"apply-delay", required_argument, NULL, 8},
210 {"databases", required_argument, NULL, 9},
211 {"extra-basebackup-args", required_argument, NULL, 10},
212 {"text-types", no_argument, NULL, 11},
213 {NULL, 0, NULL, 0}
214 };
215
216 argv0 = argv[0];
217 progname = get_progname(argv[0]);
218 start_time = time(NULL);
219 signal(SIGINT, signal_handler);
220
221 /* check for --help */
222 if (argc > 1)
223 {
224 for (i = 1; i < argc; i++)
225 {
226 if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-?") == 0)
227 {
228 usage();
229 exit(0);
230 }
231 }
232 }
233
234 /* Option parsing and validation */
235 while ((c = getopt_long(argc, argv, "D:n:sv", long_options, &optindex)) != -1)
236 {
237 switch (c)
238 {
239 case 'D':
240 data_dir = pg_strdup(optarg);
241 break;
242 case 'n':
243 subscriber_name = pg_strdup(optarg);
244 break;
245 case 1:
246 base_prov_connstr = pg_strdup(optarg);
247 break;
248 case 2:
249 base_sub_connstr = pg_strdup(optarg);
250 break;
251 case 3:
252 replication_sets = validate_replication_set_input(pg_strdup(optarg));
253 break;
254 case 4:
255 {
256 postgresql_conf = pg_strdup(optarg);
257 if (postgresql_conf != NULL && !file_exists(postgresql_conf))
258 die(_("The specified postgresql.conf file does not exist."));
259 break;
260 }
261 case 5:
262 {
263 pg_hba_conf = pg_strdup(optarg);
264 if (pg_hba_conf != NULL && !file_exists(pg_hba_conf))
265 die(_("The specified pg_hba.conf file does not exist."));
266 break;
267 }
268 case 6:
269 {
270 recovery_conf = pg_strdup(optarg);
271 if (recovery_conf != NULL && !file_exists(recovery_conf))
272 die(_("The specified recovery configuration file does not exist."));
273 break;
274 }
275 case 'v':
276 verbosity++;
277 break;
278 case 's':
279 stop = true;
280 break;
281 case 7:
282 drop_slot_if_exists = true;
283 break;
284 case 8:
285 apply_delay = atoi(optarg);
286 break;
287 case 9:
288 databases = pg_strdup(optarg);
289 break;
290 case 10:
291 extra_basebackup_args = pg_strdup(optarg);
292 break;
293 case 11:
294 force_text_transfer = true;
295 break;
296 default:
297 fprintf(stderr, _("Unknown option\n"));
298 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
299 exit(1);
300 }
301 }
302
303 /*
304 * Sanity checks
305 */
306
307 if (data_dir == NULL)
308 {
309 fprintf(stderr, _("No data directory specified\n"));
310 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
311 exit(1);
312 }
313 else if (subscriber_name == NULL)
314 {
315 fprintf(stderr, _("No subscriber name specified\n"));
316 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
317 exit(1);
318 }
319
320 if (!base_prov_connstr || !strlen(base_prov_connstr))
321 die(_("Provider connection string must be specified.\n"));
322 if (!base_sub_connstr || !strlen(base_sub_connstr))
323 die(_("Subscriber connection string must be specified.\n"));
324
325 if (apply_delay < 0)
326 die(_("Apply delay cannot be negative.\n"));
327
328 if (apply_delay > MAX_APPLY_DELAY)
329 die(_("Apply delay cannot be more than %d.\n"), MAX_APPLY_DELAY);
330
331 if (!replication_sets || !strlen(replication_sets))
332 replication_sets = "default,default_insert_only,ddl_sql";
333
334 /* Init random numbers used for slot suffixes, etc */
335 srand(time(NULL));
336
337 /* Parse database list or connection string. */
338 if (databases != NULL)
339 {
340 database_list = get_database_list(databases, &n_databases);
341 }
342 else
343 {
344 char *dbname = get_connstr_dbname(base_prov_connstr);
345
346 if (!dbname)
347 die(_("Either provider connection string must contain database "
348 "name or --databases option must be specified.\n"));
349
350 n_databases = 1;
351 database_list = palloc(n_databases * sizeof(char *));
352 database_list[0] = dbname;
353 }
354
355 slot_names = palloc(n_databases * sizeof(char *));
356
357 /*
358 * Check connection strings for validity before doing anything
359 * expensive.
360 */
361 for (dbnum = 0; dbnum < n_databases; dbnum++)
362 {
363 char *db = database_list[dbnum];
364
365 prov_connstr = get_connstr(base_prov_connstr, db);
366 if (!prov_connstr || !strlen(prov_connstr))
367 die(_("Provider connection string is not valid.\n"));
368
369 sub_connstr = get_connstr(base_sub_connstr, db);
370 if (!sub_connstr || !strlen(sub_connstr))
371 die(_("Subscriber connection string is not valid.\n"));
372 }
373
374 /*
375 * Create log file where new postgres instance will log to while being
376 * initialized.
377 */
378 logfd = open("pglogical_create_subscriber_postgres.log", O_CREAT | O_RDWR,
379 S_IRUSR | S_IWUSR);
380 if (logfd == -1)
381 {
382 die(_("Creating pglogical_create_subscriber_postgres.log failed: %s"),
383 strerror(errno));
384 }
385 /* Safe to close() unchecked, we didn't write */
386 (void) close(logfd);
387
388 /* Let's start the real work... */
389 print_msg(VERBOSITY_NORMAL, _("%s: starting ...\n"), progname);
390
391 for (dbnum = 0; dbnum < n_databases; dbnum++)
392 {
393 char *db = database_list[dbnum];
394
395 prov_connstr = get_connstr(base_prov_connstr, db);
396 if (!prov_connstr || !strlen(prov_connstr))
397 die(_("Provider connection string is not valid.\n"));
398
399 /* Read the remote server indetification. */
400 print_msg(VERBOSITY_NORMAL,
401 _("Getting information for database %s ...\n"), db);
402 provider_conn = connectdb(prov_connstr);
403 remote_info = get_remote_info(provider_conn);
404
405 /* only need to do this piece once */
406
407 if (dbnum == 0)
408 {
409 use_existing_data_dir = check_data_dir(data_dir, remote_info);
410
411 if (use_existing_data_dir &&
412 strcmp(remote_info->sysid, read_sysid(data_dir)) != 0)
413 die(_("Subscriber data directory is not basebackup of remote node.\n"));
414 }
415
416 /*
417 * Create replication slots on remote node.
418 */
419 print_msg(VERBOSITY_NORMAL,
420 _("Creating replication slot in database %s ...\n"), db);
421 slot_names[dbnum] = initialize_replication_slot(provider_conn,
422 remote_info->dbname,
423 remote_info->node_name,
424 subscriber_name,
425 drop_slot_if_exists);
426 PQfinish(provider_conn);
427 provider_conn = NULL;
428 }
429
430 /*
431 * Create basebackup or use existing one
432 */
433 prov_connstr = get_connstr(base_prov_connstr, database_list[0]);
434 sub_connstr = get_connstr(base_sub_connstr, database_list[0]);
435
436 initialize_data_dir(data_dir,
437 use_existing_data_dir ? NULL : prov_connstr,
438 postgresql_conf, pg_hba_conf,
439 extra_basebackup_args);
440 snprintf(pid_file, MAXPGPATH, "%s/postmaster.pid", data_dir);
441
442 restore_point_name = generate_restore_point_name();
443
444 print_msg(VERBOSITY_NORMAL, _("Creating restore point \"%s\" on remote node ...\n"),
445 restore_point_name);
446 provider_conn = connectdb(prov_connstr);
447 remote_lsn = create_restore_point(provider_conn, restore_point_name);
448 PQfinish(provider_conn);
449 provider_conn = NULL;
450
451 /*
452 * Get subscriber db to consistent state (for lsn after slot creation).
453 */
454 print_msg(VERBOSITY_NORMAL,
455 _("Bringing subscriber node to the restore point ...\n"));
456 if (recovery_conf)
457 {
458 #if PG_VERSION_NUM >= 120000
459 CopyConfFile(recovery_conf, "postgresql.auto.conf", true);
460 #else
461 CopyConfFile(recovery_conf, "recovery.conf", false);
462 #endif
463 }
464 else
465 {
466 #if PG_VERSION_NUM < 120000
467 appendPQExpBuffer(recoveryconfcontents, "standby_mode = 'on'\n");
468 #endif
469 appendPQExpBuffer(recoveryconfcontents, "primary_conninfo = '%s'\n",
470 escape_single_quotes_ascii(prov_connstr));
471 }
472 appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = '%s'\n", restore_point_name);
473 appendPQExpBuffer(recoveryconfcontents, "recovery_target_inclusive = true\n");
474 #if PG_VERSION_NUM >= 90500
475 appendPQExpBuffer(recoveryconfcontents, "recovery_target_action = promote\n");
476 #else
477 appendPQExpBuffer(recoveryconfcontents, "pause_at_recovery_target = false\n");
478 #endif
479 WriteRecoveryConf(recoveryconfcontents);
480
481 free(restore_point_name);
482 restore_point_name = NULL;
483
484 /*
485 * Start subscriber node with pglogical disabled, and wait until it starts
486 * accepting connections which means it has caught up to the restore point.
487 */
488 pg_ctl_ret = run_pg_ctl("start -l \"pglogical_create_subscriber_postgres.log\" -o \"-c shared_preload_libraries=''\"");
489 if (pg_ctl_ret != 0)
490 die(_("Postgres startup for restore point catchup failed with %d. See pglogical_create_subscriber_postgres.log."), pg_ctl_ret);
491
492 wait_primary_connection(sub_connstr);
493
494 /*
495 * Clean any per-node data that were copied by pg_basebackup.
496 */
497 print_msg(VERBOSITY_VERBOSE,
498 _("Removing old pglogical configuration ...\n"));
499
500 for (dbnum = 0; dbnum < n_databases; dbnum++)
501 {
502 char *db = database_list[dbnum];
503
504 sub_connstr = get_connstr(base_sub_connstr, db);
505
506 if (!sub_connstr || !strlen(sub_connstr))
507 die(_("Subscriber connection string is not valid.\n"));
508
509 subscriber_conn = connectdb(sub_connstr);
510 remove_unwanted_data(subscriber_conn);
511 PQfinish(subscriber_conn);
512 subscriber_conn = NULL;
513 }
514
515 /* Stop Postgres so we can reset system id and start it with pglogical loaded. */
516 pg_ctl_ret = run_pg_ctl("stop");
517 if (pg_ctl_ret != 0)
518 die(_("Postgres stop after restore point catchup failed with %d. See pglogical_create_subscriber_postgres.log."), pg_ctl_ret);
519 wait_postmaster_shutdown();
520
521 /*
522 * Start the node again, now with pglogical active so that we can start the
523 * logical replication. This is final start, so don't log to to special log
524 * file anymore.
525 */
526 print_msg(VERBOSITY_NORMAL,
527 _("Initializing pglogical on the subscriber node:\n"));
528
529 pg_ctl_ret = run_pg_ctl("start");
530 if (pg_ctl_ret != 0)
531 die(_("Postgres restart with pglogical enabled failed with %d."), pg_ctl_ret);
532 wait_postmaster_connection(base_sub_connstr);
533
534 for (dbnum = 0; dbnum < n_databases; dbnum++)
535 {
536 char *db = database_list[dbnum];
537
538 sub_connstr = get_connstr(base_sub_connstr, db);
539 prov_connstr = get_connstr(base_prov_connstr, db);
540
541 subscriber_conn = connectdb(sub_connstr);
542
543 /* Create the extension. */
544 print_msg(VERBOSITY_VERBOSE,
545 _("Creating pglogical extension for database %s...\n"), db);
546 if (PQserverVersion(subscriber_conn) < 90500)
547 install_extension(subscriber_conn, "pglogical_origin");
548 install_extension(subscriber_conn, "pglogical");
549
550 /*
551 * Create the identifier which is setup with the position to which we
552 * already caught up using physical replication.
553 */
554 print_msg(VERBOSITY_VERBOSE,
555 _("Creating replication origin for database %s...\n"), db);
556 initialize_replication_origin(subscriber_conn, slot_names[dbnum], remote_lsn);
557
558 /*
559 * And finally add the node to the cluster.
560 */
561 print_msg(VERBOSITY_NORMAL, _("Creating subscriber %s for database %s...\n"),
562 subscriber_name, db);
563 print_msg(VERBOSITY_VERBOSE, _("Replication sets: %s\n"), replication_sets);
564
565 pglogical_subscribe(subscriber_conn, subscriber_name, sub_connstr,
566 prov_connstr, replication_sets, apply_delay,
567 force_text_transfer);
568
569 PQfinish(subscriber_conn);
570 subscriber_conn = NULL;
571 }
572
573 /* If user does not want the node to be running at the end, stop it. */
574 if (stop)
575 {
576 print_msg(VERBOSITY_NORMAL, _("Stopping the subscriber node ...\n"));
577 pg_ctl_ret = run_pg_ctl("stop");
578 if (pg_ctl_ret != 0)
579 die(_("Stopping postgres after successful subscribtion failed with %d."), pg_ctl_ret);
580 wait_postmaster_shutdown();
581 }
582
583 print_msg(VERBOSITY_NORMAL, _("All done\n"));
584
585 return 0;
586 }
587
588
589 /*
590 * Print help.
591 */
592 static void
usage(void)593 usage(void)
594 {
595 printf(_("%s create new pglogical subscriber from basebackup of provider.\n\n"), progname);
596 printf(_("Usage:\n"));
597 printf(_(" %s [OPTION]...\n"), progname);
598 printf(_("\nGeneral options:\n"));
599 printf(_(" -D, --pgdata=DIRECTORY data directory to be used for new node,\n"));
600 printf(_(" can be either empty/non-existing directory,\n"));
601 printf(_(" or directory populated using\n"));
602 printf(_(" pg_basebackup -X stream command\n"));
603 printf(_(" --databases optional list of databases to replicate\n"));
604 printf(_(" -n, --subscriber-name=NAME name of the newly created subscriber\n"));
605 printf(_(" --subscriber-dsn=CONNSTR connection string to the newly created subscriber\n"));
606 printf(_(" --provider-dsn=CONNSTR connection string to the provider\n"));
607 printf(_(" --replication-sets=SETS comma separated list of replication set names\n"));
608 printf(_(" --apply-delay=DELAY apply delay in seconds (by default 0)\n"));
609 printf(_(" --drop-slot-if-exists drop replication slot of conflicting name\n"));
610 printf(_(" -s, --stop stop the server once the initialization is done\n"));
611 printf(_(" -v increase logging verbosity\n"));
612 printf(_(" --extra-basebackup-args additional arguments to pass to pg_basebackup.\n"));
613 printf(_(" Safe options: -T, -c, --xlogdir/--waldir\n"));
614 printf(_("\nConfiguration files override:\n"));
615 printf(_(" --hba-conf path to the new pg_hba.conf\n"));
616 printf(_(" --postgresql-conf path to the new postgresql.conf\n"));
617 printf(_(" --recovery-conf path to the template recovery configuration\n"));
618 }
619
620 /*
621 * Print error and exit.
622 */
623 static void
die(const char * fmt,...)624 die(const char *fmt,...)
625 {
626 va_list argptr;
627 va_start(argptr, fmt);
628 vfprintf(stderr, fmt, argptr);
629 va_end(argptr);
630
631 if (subscriber_conn)
632 PQfinish(subscriber_conn);
633 if (provider_conn)
634 PQfinish(provider_conn);
635
636 if (get_pgpid())
637 {
638 if (!run_pg_ctl("stop -s"))
639 {
640 fprintf(stderr, _("WARNING: postgres seems to be running, but could not be stopped\n"));
641 }
642 }
643
644 exit(1);
645 }
646
647 /*
648 * Print message to stdout and flush
649 */
650 static void
print_msg(VerbosityLevelEnum level,const char * fmt,...)651 print_msg(VerbosityLevelEnum level, const char *fmt,...)
652 {
653 if (verbosity >= level)
654 {
655 va_list argptr;
656 va_start(argptr, fmt);
657 vfprintf(stdout, fmt, argptr);
658 va_end(argptr);
659 fflush(stdout);
660 }
661 }
662
663
664 /*
665 * Start pg_ctl with given argument(s) - used to start/stop postgres
666 *
667 * Returns the exit code reported by pg_ctl. If pg_ctl exits due to a
668 * signal this call will die and not return.
669 */
670 static int
run_pg_ctl(const char * arg)671 run_pg_ctl(const char *arg)
672 {
673 int ret;
674 PQExpBuffer cmd = createPQExpBuffer();
675 char *exec_path = find_other_exec_or_die(argv0, "pg_ctl");
676
677 appendPQExpBuffer(cmd, "%s %s -D \"%s\"", exec_path, arg, data_dir);
678
679 /* Run pg_ctl in silent mode unless we run in debug mode. */
680 if (verbosity < VERBOSITY_DEBUG)
681 appendPQExpBuffer(cmd, " -s");
682
683 print_msg(VERBOSITY_DEBUG, _("Running pg_ctl: %s.\n"), cmd->data);
684 ret = system(cmd->data);
685
686 destroyPQExpBuffer(cmd);
687
688 if (WIFEXITED(ret))
689 return WEXITSTATUS(ret);
690 else if (WIFSIGNALED(ret))
691 die(_("pg_ctl exited with signal %d"), WTERMSIG(ret));
692 else
693 die(_("pg_ctl exited for an unknown reason (system() returned %d)"), ret);
694
695 return -1;
696 }
697
698
699 /*
700 * Run pg_basebackup to create the copy of the origin node.
701 */
702 static void
run_basebackup(const char * provider_connstr,const char * data_dir,const char * extra_basebackup_args)703 run_basebackup(const char *provider_connstr, const char *data_dir,
704 const char *extra_basebackup_args)
705 {
706 int ret;
707 PQExpBuffer cmd = createPQExpBuffer();
708 char *exec_path = find_other_exec_or_die(argv0, "pg_basebackup");
709
710 appendPQExpBuffer(cmd, "%s -D \"%s\" -d \"%s\" -X s -P", exec_path, data_dir, provider_connstr);
711
712 /* Run pg_basebackup in verbose mode if we are running in verbose mode. */
713 if (verbosity >= VERBOSITY_VERBOSE)
714 appendPQExpBuffer(cmd, " -v");
715
716 if (extra_basebackup_args != NULL)
717 appendPQExpBuffer(cmd, "%s", extra_basebackup_args);
718
719 print_msg(VERBOSITY_DEBUG, _("Running pg_basebackup: %s.\n"), cmd->data);
720 ret = system(cmd->data);
721
722 destroyPQExpBuffer(cmd);
723
724 if (WIFEXITED(ret) && WEXITSTATUS(ret) == 0)
725 return;
726 if (WIFEXITED(ret))
727 die(_("pg_basebackup failed with exit status %d, cannot continue.\n"), WEXITSTATUS(ret));
728 else if (WIFSIGNALED(ret))
729 die(_("pg_basebackup exited with signal %d, cannot continue"), WTERMSIG(ret));
730 else
731 die(_("pg_basebackup exited for an unknown reason (system() returned %d)"), ret);
732 }
733
734 /*
735 * Init the datadir
736 *
737 * This function can either ensure provided datadir is a postgres datadir,
738 * or create it using pg_basebackup.
739 *
740 * In any case, new postresql.conf and pg_hba.conf will be copied to the
741 * datadir if they are provided.
742 */
743 static void
initialize_data_dir(char * data_dir,char * connstr,char * postgresql_conf,char * pg_hba_conf,char * extra_basebackup_args)744 initialize_data_dir(char *data_dir, char *connstr,
745 char *postgresql_conf, char *pg_hba_conf,
746 char *extra_basebackup_args)
747 {
748 if (connstr)
749 {
750 print_msg(VERBOSITY_NORMAL,
751 _("Creating base backup of the remote node...\n"));
752 run_basebackup(connstr, data_dir, extra_basebackup_args);
753 }
754
755 if (postgresql_conf)
756 CopyConfFile(postgresql_conf, "postgresql.conf", false);
757 if (pg_hba_conf)
758 CopyConfFile(pg_hba_conf, "pg_hba.conf", false);
759 }
760
761 /*
762 * This function checks if provided datadir is clone of the remote node
763 * described by the remote info, or if it's emtpy directory that can be used
764 * as new datadir.
765 */
766 static bool
check_data_dir(char * data_dir,RemoteInfo * remoteinfo)767 check_data_dir(char *data_dir, RemoteInfo *remoteinfo)
768 {
769 /* Run basebackup as needed. */
770 switch (pg_check_dir(data_dir))
771 {
772 case 0: /* Does not exist */
773 case 1: /* Exists, empty */
774 return false;
775 case 2:
776 case 3: /* Exists, not empty */
777 case 4:
778 {
779 if (!is_pg_dir(data_dir))
780 die(_("Directory \"%s\" exists but is not valid postgres data directory.\n"),
781 data_dir);
782 return true;
783 }
784 case -1: /* Access problem */
785 die(_("Could not access directory \"%s\": %s.\n"),
786 data_dir, strerror(errno));
787 }
788
789 /* Unreachable */
790 die(_("Unexpected result from pg_check_dir() call"));
791 return false;
792 }
793
794 /*
795 * Initialize replication slots
796 */
797 static char *
initialize_replication_slot(PGconn * conn,char * dbname,char * provider_node_name,char * subscription_name,bool drop_slot_if_exists)798 initialize_replication_slot(PGconn *conn, char *dbname,
799 char *provider_node_name, char *subscription_name,
800 bool drop_slot_if_exists)
801 {
802 PQExpBufferData query;
803 char *slot_name;
804 PGresult *res;
805
806 /* Generate the slot name. */
807 initPQExpBuffer(&query);
808 printfPQExpBuffer(&query,
809 "SELECT pglogical.pglogical_gen_slot_name(%s, %s, %s)",
810 PQescapeLiteral(conn, dbname, strlen(dbname)),
811 PQescapeLiteral(conn, provider_node_name,
812 strlen(provider_node_name)),
813 PQescapeLiteral(conn, subscription_name,
814 strlen(subscription_name)));
815
816 res = PQexec(conn, query.data);
817 if (PQresultStatus(res) != PGRES_TUPLES_OK)
818 die(_("Could generate slot name: %s"), PQerrorMessage(conn));
819
820 slot_name = pstrdup(PQgetvalue(res, 0, 0));
821
822 PQclear(res);
823 resetPQExpBuffer(&query);
824
825 /* Check if the current slot exists. */
826 printfPQExpBuffer(&query,
827 "SELECT 1 FROM pg_catalog.pg_replication_slots WHERE slot_name = %s",
828 PQescapeLiteral(conn, slot_name, strlen(slot_name)));
829
830 res = PQexec(conn, query.data);
831 if (PQresultStatus(res) != PGRES_TUPLES_OK)
832 die(_("Could not fetch existing slot information: %s"), PQerrorMessage(conn));
833
834 /* Drop the existing slot when asked for it or error if it already exists. */
835 if (PQntuples(res) > 0)
836 {
837 PQclear(res);
838 resetPQExpBuffer(&query);
839
840 if (!drop_slot_if_exists)
841 die(_("Slot %s already exists, drop it or use --drop-slot-if-exists to drop it automatically.\n"),
842 slot_name);
843
844 print_msg(VERBOSITY_VERBOSE,
845 _("Droping existing slot %s ...\n"), slot_name);
846
847 printfPQExpBuffer(&query,
848 "SELECT pg_catalog.pg_drop_replication_slot(%s)",
849 PQescapeLiteral(conn, slot_name, strlen(slot_name)));
850
851 res = PQexec(conn, query.data);
852 if (PQresultStatus(res) != PGRES_TUPLES_OK)
853 die(_("Could not drop existing slot %s: %s"), slot_name,
854 PQerrorMessage(conn));
855 }
856
857 PQclear(res);
858 resetPQExpBuffer(&query);
859
860 /* And finally, create the slot. */
861 appendPQExpBuffer(&query, "SELECT pg_create_logical_replication_slot(%s, '%s');",
862 PQescapeLiteral(conn, slot_name, strlen(slot_name)),
863 "pglogical_output");
864
865 res = PQexec(conn, query.data);
866 if (PQresultStatus(res) != PGRES_TUPLES_OK)
867 {
868 die(_("Could not create replication slot, status %s: %s\n"),
869 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
870 }
871
872 PQclear(res);
873 termPQExpBuffer(&query);
874
875 return slot_name;
876 }
877
878 /*
879 * Read replication info about remote connection
880 *
881 * TODO: unify with pglogical_remote_node_info in pglogical_rpc
882 */
883 static RemoteInfo *
get_remote_info(PGconn * conn)884 get_remote_info(PGconn* conn)
885 {
886 RemoteInfo *ri = (RemoteInfo *)pg_malloc0(sizeof(RemoteInfo));
887 PGresult *res;
888
889 if (!extension_exists(conn, "pglogical"))
890 die(_("The remote node is not configured as a pglogical provider.\n"));
891
892 res = PQexec(conn, "SELECT node_id, node_name, sysid, dbname, replication_sets FROM pglogical.pglogical_node_info()");
893 if (PQresultStatus(res) != PGRES_TUPLES_OK)
894 die(_("could not fetch remote node info: %s\n"), PQerrorMessage(conn));
895
896 /* No nodes found? */
897 if (PQntuples(res) == 0)
898 die(_("The remote database is not configured as a pglogical node.\n"));
899
900 if (PQntuples(res) > 1)
901 die(_("The remote database has multiple nodes configured. That is not supported with current version of pglogical.\n"));
902
903 #define atooid(x) ((Oid) strtoul((x), NULL, 10))
904
905 ri->nodeid = atooid(PQgetvalue(res, 0, 0));
906 ri->node_name = pstrdup(PQgetvalue(res, 0, 1));
907 ri->sysid = pstrdup(PQgetvalue(res, 0, 2));
908 ri->dbname = pstrdup(PQgetvalue(res, 0, 3));
909 ri->replication_sets = pstrdup(PQgetvalue(res, 0, 4));
910
911 PQclear(res);
912
913 return ri;
914 }
915
916 /*
917 * Check if extension exists.
918 */
919 static bool
extension_exists(PGconn * conn,const char * extname)920 extension_exists(PGconn *conn, const char *extname)
921 {
922 PQExpBuffer query = createPQExpBuffer();
923 PGresult *res;
924 bool ret;
925
926 printfPQExpBuffer(query, "SELECT 1 FROM pg_catalog.pg_extension WHERE extname = %s;",
927 PQescapeLiteral(conn, extname, strlen(extname)));
928 res = PQexec(conn, query->data);
929
930 if (PQresultStatus(res) != PGRES_TUPLES_OK)
931 {
932 PQclear(res);
933 die(_("Could not read extension info: %s\n"), PQerrorMessage(conn));
934 }
935
936 ret = PQntuples(res) == 1;
937
938 PQclear(res);
939 destroyPQExpBuffer(query);
940
941 return ret;
942 }
943
944 /*
945 * Create extension.
946 */
947 static void
install_extension(PGconn * conn,const char * extname)948 install_extension(PGconn *conn, const char *extname)
949 {
950 PQExpBuffer query = createPQExpBuffer();
951 PGresult *res;
952
953 printfPQExpBuffer(query, "CREATE EXTENSION IF NOT EXISTS %s;",
954 PQescapeIdentifier(conn, extname, strlen(extname)));
955 res = PQexec(conn, query->data);
956
957 if (PQresultStatus(res) != PGRES_COMMAND_OK)
958 {
959 PQclear(res);
960 die(_("Could not install %s extension: %s\n"), extname, PQerrorMessage(conn));
961 }
962
963 PQclear(res);
964 destroyPQExpBuffer(query);
965 }
966
967 /*
968 * Clean all the data that was copied from remote node but we don't
969 * want it here (currently shared security labels and replication identifiers).
970 */
971 static void
remove_unwanted_data(PGconn * conn)972 remove_unwanted_data(PGconn *conn)
973 {
974 PGresult *res;
975
976 /*
977 * Remove replication identifiers (9.4 will get them removed by dropping
978 * the extension later as we emulate them there).
979 */
980 if (PQserverVersion(conn) >= 90500)
981 {
982 res = PQexec(conn, "SELECT pg_replication_origin_drop(external_id) FROM pg_replication_origin_status;");
983 if (PQresultStatus(res) != PGRES_TUPLES_OK)
984 {
985 PQclear(res);
986 die(_("Could not remove existing replication origins: %s\n"), PQerrorMessage(conn));
987 }
988 PQclear(res);
989 }
990
991 res = PQexec(conn, "DROP EXTENSION pglogical CASCADE;");
992 if (PQresultStatus(res) != PGRES_COMMAND_OK)
993 {
994 die(_("Could not clean the pglogical extension, status %s: %s\n"),
995 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
996 }
997 PQclear(res);
998 }
999
1000 /*
1001 * Initialize new remote identifier to specific position.
1002 */
1003 static void
initialize_replication_origin(PGconn * conn,char * origin_name,char * remote_lsn)1004 initialize_replication_origin(PGconn *conn, char *origin_name, char *remote_lsn)
1005 {
1006 PGresult *res;
1007 PQExpBuffer query = createPQExpBuffer();
1008
1009 if (PQserverVersion(conn) >= 90500)
1010 {
1011 printfPQExpBuffer(query, "SELECT pg_replication_origin_create(%s)",
1012 PQescapeLiteral(conn, origin_name, strlen(origin_name)));
1013
1014 res = PQexec(conn, query->data);
1015
1016 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1017 {
1018 die(_("Could not create replication origin \"%s\": status %s: %s\n"),
1019 query->data,
1020 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1021 }
1022 PQclear(res);
1023
1024 if (remote_lsn)
1025 {
1026 printfPQExpBuffer(query, "SELECT pg_replication_origin_advance(%s, '%s')",
1027 PQescapeLiteral(conn, origin_name, strlen(origin_name)),
1028 remote_lsn);
1029
1030 res = PQexec(conn, query->data);
1031
1032 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1033 {
1034 die(_("Could not advance replication origin \"%s\": status %s: %s\n"),
1035 query->data,
1036 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1037 }
1038 PQclear(res);
1039 }
1040 }
1041 else
1042 {
1043 printfPQExpBuffer(query, "INSERT INTO pglogical_origin.replication_origin (roident, roname, roremote_lsn) SELECT COALESCE(MAX(roident::int), 0) + 1, %s, %s FROM pglogical_origin.replication_origin",
1044 PQescapeLiteral(conn, origin_name, strlen(origin_name)),
1045 remote_lsn ? PQescapeLiteral(conn, remote_lsn, strlen(remote_lsn)) : "0");
1046
1047 res = PQexec(conn, query->data);
1048
1049 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1050 {
1051 die(_("Could not create replication origin \"%s\": status %s: %s\n"),
1052 query->data,
1053 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1054 }
1055 PQclear(res);
1056 }
1057
1058 destroyPQExpBuffer(query);
1059 }
1060
1061
1062 /*
1063 * Create remote restore point which will be used to get into synchronized
1064 * state through physical replay.
1065 */
1066 static char *
create_restore_point(PGconn * conn,char * restore_point_name)1067 create_restore_point(PGconn *conn, char *restore_point_name)
1068 {
1069 PQExpBuffer query = createPQExpBuffer();
1070 PGresult *res;
1071 char *remote_lsn = NULL;
1072
1073 printfPQExpBuffer(query, "SELECT pg_create_restore_point('%s')", restore_point_name);
1074 res = PQexec(conn, query->data);
1075 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1076 {
1077 die(_("Could not create restore point, status %s: %s\n"),
1078 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1079 }
1080 remote_lsn = pstrdup(PQgetvalue(res, 0, 0));
1081
1082 PQclear(res);
1083 destroyPQExpBuffer(query);
1084
1085 return remote_lsn;
1086 }
1087
1088 static void
pglogical_subscribe(PGconn * conn,char * subscriber_name,char * subscriber_dsn,char * provider_dsn,char * replication_sets,int apply_delay,bool force_text_transfer)1089 pglogical_subscribe(PGconn *conn, char *subscriber_name, char *subscriber_dsn,
1090 char *provider_dsn, char *replication_sets,
1091 int apply_delay, bool force_text_transfer)
1092 {
1093 PQExpBufferData query;
1094 PQExpBufferData repsets;
1095 PGresult *res;
1096
1097 initPQExpBuffer(&query);
1098 printfPQExpBuffer(&query,
1099 "SELECT pglogical.create_node(node_name := %s, dsn := %s);",
1100 PQescapeLiteral(conn, subscriber_name, strlen(subscriber_name)),
1101 PQescapeLiteral(conn, subscriber_dsn, strlen(subscriber_dsn)));
1102
1103 res = PQexec(conn, query.data);
1104 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1105 {
1106 die(_("Could not create local node, status %s: %s\n"),
1107 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1108 }
1109 PQclear(res);
1110
1111 resetPQExpBuffer(&query);
1112 initPQExpBuffer(&repsets);
1113
1114 printfPQExpBuffer(&repsets, "{%s}", replication_sets);
1115 printfPQExpBuffer(&query,
1116 "SELECT pglogical.create_subscription("
1117 "subscription_name := %s, provider_dsn := %s, "
1118 "replication_sets := %s, "
1119 "apply_delay := '%d seconds'::interval, "
1120 "synchronize_structure := false, "
1121 "synchronize_data := false, "
1122 "force_text_transfer := '%s');",
1123 PQescapeLiteral(conn, subscriber_name, strlen(subscriber_name)),
1124 PQescapeLiteral(conn, provider_dsn, strlen(provider_dsn)),
1125 PQescapeLiteral(conn, repsets.data, repsets.len),
1126 apply_delay, (force_text_transfer ? "t" : "f"));
1127
1128 res = PQexec(conn, query.data);
1129 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1130 {
1131 die(_("Could not create subscription, status %s: %s\n"),
1132 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1133 }
1134 PQclear(res);
1135
1136 /* TODO */
1137 res = PQexec(conn, "UPDATE pglogical.local_sync_status SET sync_status = 'r'");
1138 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1139 {
1140 die(_("Could not update subscription, status %s: %s\n"),
1141 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1142 }
1143
1144 PQclear(res);
1145
1146 termPQExpBuffer(&repsets);
1147 termPQExpBuffer(&query);
1148 }
1149
1150
1151 /*
1152 * Validates input of the replication sets and returns normalized data.
1153 */
1154 static char *
validate_replication_set_input(char * replication_sets)1155 validate_replication_set_input(char *replication_sets)
1156 {
1157 char *name;
1158 PQExpBuffer retbuf = createPQExpBuffer();
1159 char *ret;
1160 bool first = true;
1161
1162 if (!replication_sets)
1163 return NULL;
1164
1165 name = strtok(replication_sets, " ,");
1166 while (name != NULL)
1167 {
1168 const char *cp;
1169
1170 if (strlen(name) == 0)
1171 die(_("Replication set name \"%s\" is too short\n"), name);
1172
1173 if (strlen(name) > NAMEDATALEN)
1174 die(_("Replication set name \"%s\" is too long\n"), name);
1175
1176 for (cp = name; *cp; cp++)
1177 {
1178 if (!((*cp >= 'a' && *cp <= 'z')
1179 || (*cp >= '0' && *cp <= '9')
1180 || (*cp == '_')
1181 || (*cp == '-')))
1182 {
1183 die(_("Replication set name \"%s\" contains invalid character\n"),
1184 name);
1185 }
1186 }
1187
1188 if (first)
1189 first = false;
1190 else
1191 appendPQExpBufferStr(retbuf, ", ");
1192 appendPQExpBufferStr(retbuf, name);
1193
1194 name = strtok(NULL, " ,");
1195 }
1196
1197 ret = pg_strdup(retbuf->data);
1198 destroyPQExpBuffer(retbuf);
1199
1200 return ret;
1201 }
1202
1203 static char *
get_connstr_dbname(char * connstr)1204 get_connstr_dbname(char *connstr)
1205 {
1206 PQconninfoOption *conn_opts = NULL;
1207 PQconninfoOption *conn_opt;
1208 char *err_msg = NULL;
1209 char *ret = NULL;
1210
1211 conn_opts = PQconninfoParse(connstr, &err_msg);
1212 if (conn_opts == NULL)
1213 {
1214 die(_("Invalid connection string: %s\n"), err_msg);
1215 }
1216
1217 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
1218 {
1219 if (strcmp(conn_opt->keyword, "dbname") == 0)
1220 {
1221 ret = pstrdup(conn_opt->val);
1222 break;
1223 }
1224 }
1225
1226 PQconninfoFree(conn_opts);
1227
1228 return ret;
1229 }
1230
1231
1232 /*
1233 * Build connection string from individual parameter.
1234 *
1235 * dbname can be specified in connstr parameter
1236 */
1237 static char *
get_connstr(char * connstr,char * dbname)1238 get_connstr(char *connstr, char *dbname)
1239 {
1240 char *ret;
1241 int argcount = 4; /* dbname, host, user, port */
1242 int i;
1243 const char **keywords;
1244 const char **values;
1245 PQconninfoOption *conn_opts = NULL;
1246 PQconninfoOption *conn_opt;
1247 char *err_msg = NULL;
1248
1249 /*
1250 * Merge the connection info inputs given in form of connection string
1251 * and options
1252 */
1253 i = 0;
1254 if (connstr &&
1255 (strncmp(connstr, "postgresql://", 13) == 0 ||
1256 strncmp(connstr, "postgres://", 11) == 0 ||
1257 strchr(connstr, '=') != NULL))
1258 {
1259 conn_opts = PQconninfoParse(connstr, &err_msg);
1260 if (conn_opts == NULL)
1261 {
1262 die(_("Invalid connection string: %s\n"), err_msg);
1263 }
1264
1265 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
1266 {
1267 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
1268 argcount++;
1269 }
1270
1271 keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
1272 values = pg_malloc0((argcount + 1) * sizeof(*values));
1273
1274 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
1275 {
1276 /* If db* parameters were provided, we'll fill them later. */
1277 if (dbname && strcmp(conn_opt->keyword, "dbname") == 0)
1278 continue;
1279
1280 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
1281 {
1282 keywords[i] = conn_opt->keyword;
1283 values[i] = conn_opt->val;
1284 i++;
1285 }
1286 }
1287 }
1288 else
1289 {
1290 keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
1291 values = pg_malloc0((argcount + 1) * sizeof(*values));
1292
1293 /*
1294 * If connstr was provided but it's not in connection string format and
1295 * the dbname wasn't provided then connstr is actually dbname.
1296 */
1297 if (connstr && !dbname)
1298 dbname = connstr;
1299 }
1300
1301 if (dbname)
1302 {
1303 keywords[i] = "dbname";
1304 values[i] = dbname;
1305 i++;
1306 }
1307
1308 ret = PQconninfoParamsToConnstr(keywords, values);
1309
1310 /* Connection ok! */
1311 pg_free(values);
1312 pg_free(keywords);
1313 if (conn_opts)
1314 PQconninfoFree(conn_opts);
1315
1316 return ret;
1317 }
1318
1319
1320 /*
1321 * Reads the pg_control file of the existing data dir.
1322 */
1323 static char *
read_sysid(const char * data_dir)1324 read_sysid(const char *data_dir)
1325 {
1326 ControlFileData ControlFile;
1327 int fd;
1328 char ControlFilePath[MAXPGPATH];
1329 char *res = (char *) pg_malloc0(33);
1330
1331 snprintf(ControlFilePath, MAXPGPATH, "%s/global/pg_control", data_dir);
1332
1333 if ((fd = open(ControlFilePath, O_RDONLY | PG_BINARY, 0)) == -1)
1334 die(_("%s: could not open file \"%s\" for reading: %s\n"),
1335 progname, ControlFilePath, strerror(errno));
1336
1337 if (read(fd, &ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
1338 die(_("%s: could not read file \"%s\": %s\n"),
1339 progname, ControlFilePath, strerror(errno));
1340
1341 close(fd);
1342
1343 snprintf(res, 33, UINT64_FORMAT, ControlFile.system_identifier);
1344 return res;
1345 }
1346
1347 /*
1348 * Write contents of recovery.conf or postgresql.auto.conf
1349 */
1350 static void
WriteRecoveryConf(PQExpBuffer contents)1351 WriteRecoveryConf(PQExpBuffer contents)
1352 {
1353 char filename[MAXPGPATH];
1354 FILE *cf;
1355
1356 #if PG_VERSION_NUM >= 120000
1357 sprintf(filename, "%s/postgresql.auto.conf", data_dir);
1358
1359 cf = fopen(filename, "a");
1360 #else
1361 sprintf(filename, "%s/recovery.conf", data_dir);
1362
1363 cf = fopen(filename, "w");
1364 #endif
1365 if (cf == NULL)
1366 {
1367 die(_("%s: could not create file \"%s\": %s\n"), progname, filename, strerror(errno));
1368 }
1369
1370 if (fwrite(contents->data, contents->len, 1, cf) != 1)
1371 {
1372 die(_("%s: could not write to file \"%s\": %s\n"),
1373 progname, filename, strerror(errno));
1374 }
1375
1376 fclose(cf);
1377
1378 #if PG_VERSION_NUM >= 120000
1379 {
1380 sprintf(filename, "%s/standby.signal", data_dir);
1381 cf = fopen(filename, "w");
1382 if (cf == NULL)
1383 {
1384 die(_("%s: could not create file \"%s\": %s\n"), progname, filename, strerror(errno));
1385 }
1386
1387 fclose(cf);
1388 }
1389 #endif
1390 }
1391
1392 /*
1393 * Copy file to data
1394 */
1395 static void
CopyConfFile(char * fromfile,char * tofile,bool append)1396 CopyConfFile(char *fromfile, char *tofile, bool append)
1397 {
1398 char filename[MAXPGPATH];
1399
1400 sprintf(filename, "%s/%s", data_dir, tofile);
1401
1402 print_msg(VERBOSITY_DEBUG, _("Copying \"%s\" to \"%s\".\n"),
1403 fromfile, filename);
1404 copy_file(fromfile, filename, append);
1405 }
1406
1407
1408 /*
1409 * Convert PQconninfoOption array into conninfo string
1410 */
1411 static char *
PQconninfoParamsToConnstr(const char * const * keywords,const char * const * values)1412 PQconninfoParamsToConnstr(const char *const * keywords, const char *const * values)
1413 {
1414 PQExpBuffer retbuf = createPQExpBuffer();
1415 char *ret;
1416 int i = 0;
1417
1418 for (i = 0; keywords[i] != NULL; i++)
1419 {
1420 if (i > 0)
1421 appendPQExpBufferChar(retbuf, ' ');
1422 appendPQExpBuffer(retbuf, "%s=", keywords[i]);
1423 appendPQExpBufferConnstrValue(retbuf, values[i]);
1424 }
1425
1426 ret = pg_strdup(retbuf->data);
1427 destroyPQExpBuffer(retbuf);
1428
1429 return ret;
1430 }
1431
1432 /*
1433 * Escape connection info value
1434 */
1435 static void
appendPQExpBufferConnstrValue(PQExpBuffer buf,const char * str)1436 appendPQExpBufferConnstrValue(PQExpBuffer buf, const char *str)
1437 {
1438 const char *s;
1439 bool needquotes;
1440
1441 /*
1442 * If the string consists entirely of plain ASCII characters, no need to
1443 * quote it. This is quite conservative, but better safe than sorry.
1444 */
1445 needquotes = false;
1446 for (s = str; *s; s++)
1447 {
1448 if (!((*s >= 'a' && *s <= 'z') || (*s >= 'A' && *s <= 'Z') ||
1449 (*s >= '0' && *s <= '9') || *s == '_' || *s == '.'))
1450 {
1451 needquotes = true;
1452 break;
1453 }
1454 }
1455
1456 if (needquotes)
1457 {
1458 appendPQExpBufferChar(buf, '\'');
1459 while (*str)
1460 {
1461 /* ' and \ must be escaped by to \' and \\ */
1462 if (*str == '\'' || *str == '\\')
1463 appendPQExpBufferChar(buf, '\\');
1464
1465 appendPQExpBufferChar(buf, *str);
1466 str++;
1467 }
1468 appendPQExpBufferChar(buf, '\'');
1469 }
1470 else
1471 appendPQExpBufferStr(buf, str);
1472 }
1473
1474
1475 /*
1476 * Find the pgport and try a connection
1477 */
1478 static void
wait_postmaster_connection(const char * connstr)1479 wait_postmaster_connection(const char *connstr)
1480 {
1481 PGPing res;
1482 long pmpid = 0;
1483
1484 print_msg(VERBOSITY_VERBOSE, "Waiting for PostgreSQL to accept connections ...");
1485
1486 /* First wait for Postmaster to come up. */
1487 for (;;)
1488 {
1489 if ((pmpid = get_pgpid()) != 0 &&
1490 postmaster_is_alive((pid_t) pmpid))
1491 break;
1492
1493 pg_usleep(1000000); /* 1 sec */
1494 print_msg(VERBOSITY_VERBOSE, ".");
1495 }
1496
1497 /* Now wait for Postmaster to either accept connections or die. */
1498 for (;;)
1499 {
1500 res = PQping(connstr);
1501 if (res == PQPING_OK)
1502 break;
1503 else if (res == PQPING_NO_ATTEMPT)
1504 break;
1505
1506 /*
1507 * Check if the process is still alive. This covers cases where the
1508 * postmaster successfully created the pidfile but then crashed without
1509 * removing it.
1510 */
1511 if (!postmaster_is_alive((pid_t) pmpid))
1512 break;
1513
1514 /* No response; wait */
1515 pg_usleep(1000000); /* 1 sec */
1516 print_msg(VERBOSITY_VERBOSE, ".");
1517 }
1518
1519 print_msg(VERBOSITY_VERBOSE, "\n");
1520 }
1521
1522
1523 /*
1524 * Wait for PostgreSQL to leave recovery/standby mode
1525 */
1526 static void
wait_primary_connection(const char * connstr)1527 wait_primary_connection(const char *connstr)
1528 {
1529 bool ispri = false;
1530 PGconn *conn = NULL;
1531 PGresult *res;
1532
1533 wait_postmaster_connection(connstr);
1534
1535 print_msg(VERBOSITY_VERBOSE, "Waiting for PostgreSQL to become primary...");
1536
1537 while (!ispri)
1538 {
1539 if (!conn || PQstatus(conn) != CONNECTION_OK)
1540 {
1541 if (conn)
1542 PQfinish(conn);
1543 wait_postmaster_connection(connstr);
1544 conn = connectdb(connstr);
1545 }
1546
1547 res = PQexec(conn, "SELECT pg_is_in_recovery()");
1548 if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1 && *PQgetvalue(res, 0, 0) == 'f')
1549 ispri = true;
1550 else
1551 {
1552 pg_usleep(1000000); /* 1 sec */
1553 print_msg(VERBOSITY_VERBOSE, ".");
1554 }
1555
1556 PQclear(res);
1557 }
1558
1559 PQfinish(conn);
1560 print_msg(VERBOSITY_VERBOSE, "\n");
1561 }
1562
1563 /*
1564 * Wait for postmaster to die
1565 */
1566 static void
wait_postmaster_shutdown(void)1567 wait_postmaster_shutdown(void)
1568 {
1569 long pid;
1570
1571 print_msg(VERBOSITY_VERBOSE, "Waiting for PostgreSQL to shutdown ...");
1572
1573 for (;;)
1574 {
1575 if ((pid = get_pgpid()) != 0)
1576 {
1577 pg_usleep(1000000); /* 1 sec */
1578 print_msg(VERBOSITY_NORMAL, ".");
1579 }
1580 else
1581 break;
1582 }
1583
1584 print_msg(VERBOSITY_VERBOSE, "\n");
1585 }
1586
1587 static bool
file_exists(const char * path)1588 file_exists(const char *path)
1589 {
1590 struct stat statbuf;
1591
1592 if (stat(path, &statbuf) != 0)
1593 return false;
1594
1595 return true;
1596 }
1597
1598 static bool
is_pg_dir(const char * path)1599 is_pg_dir(const char *path)
1600 {
1601 struct stat statbuf;
1602 char version_file[MAXPGPATH];
1603
1604 if (stat(path, &statbuf) != 0)
1605 return false;
1606
1607 snprintf(version_file, MAXPGPATH, "%s/PG_VERSION", data_dir);
1608 if (stat(version_file, &statbuf) != 0 && errno == ENOENT)
1609 {
1610 return false;
1611 }
1612
1613 return true;
1614 }
1615
1616 /*
1617 * copy one file
1618 */
1619 static void
copy_file(char * fromfile,char * tofile,bool append)1620 copy_file(char *fromfile, char *tofile, bool append)
1621 {
1622 char *buffer;
1623 int srcfd;
1624 int dstfd;
1625 int nbytes;
1626 off_t offset;
1627
1628 #define COPY_BUF_SIZE (8 * BLCKSZ)
1629
1630 buffer = malloc(COPY_BUF_SIZE);
1631
1632 /*
1633 * Open the files
1634 */
1635 srcfd = open(fromfile, O_RDONLY | PG_BINARY, 0);
1636 if (srcfd < 0)
1637 die(_("could not open file \"%s\""), fromfile);
1638
1639 dstfd = open(tofile, O_RDWR | O_CREAT | (append ? O_APPEND : O_TRUNC) | PG_BINARY,
1640 S_IRUSR | S_IWUSR);
1641 if (dstfd < 0)
1642 die(_("could not create file \"%s\""), tofile);
1643
1644 /*
1645 * Do the data copying.
1646 */
1647 for (offset = 0;; offset += nbytes)
1648 {
1649 nbytes = read(srcfd, buffer, COPY_BUF_SIZE);
1650 if (nbytes < 0)
1651 die(_("could not read file \"%s\""), fromfile);
1652 if (nbytes == 0)
1653 break;
1654 errno = 0;
1655 if ((int) write(dstfd, buffer, nbytes) != nbytes)
1656 {
1657 /* if write didn't set errno, assume problem is no disk space */
1658 if (errno == 0)
1659 errno = ENOSPC;
1660 die(_("could not write to file \"%s\""), tofile);
1661 }
1662 }
1663
1664 if (close(dstfd))
1665 die(_("could not close file \"%s\""), tofile);
1666
1667 /* we don't care about errors here */
1668 close(srcfd);
1669
1670 free(buffer);
1671 }
1672
1673
1674 static char *
find_other_exec_or_die(const char * argv0,const char * target)1675 find_other_exec_or_die(const char *argv0, const char *target)
1676 {
1677 int ret;
1678 char *found_path;
1679 uint32 bin_version;
1680
1681 found_path = pg_malloc(MAXPGPATH);
1682
1683 ret = find_other_exec_version(argv0, target, &bin_version, found_path);
1684
1685 if (ret < 0)
1686 {
1687 char full_path[MAXPGPATH];
1688
1689 if (find_my_exec(argv0, full_path) < 0)
1690 strlcpy(full_path, progname, sizeof(full_path));
1691
1692 if (ret == -1)
1693 die(_("The program \"%s\" is needed by %s "
1694 "but was not found in the\n"
1695 "same directory as \"%s\".\n"
1696 "Check your installation.\n"),
1697 target, progname, full_path);
1698 else
1699 die(_("The program \"%s\" was found by \"%s\"\n"
1700 "but was not the same version as %s.\n"
1701 "Check your installation.\n"),
1702 target, full_path, progname);
1703 }
1704 else
1705 {
1706 char full_path[MAXPGPATH];
1707
1708 if (find_my_exec(argv0, full_path) < 0)
1709 strlcpy(full_path, progname, sizeof(full_path));
1710
1711 if (bin_version / 100 != PG_VERSION_NUM / 100)
1712 die(_("The program \"%s\" was found by \"%s\"\n"
1713 "but was not the same version as %s.\n"
1714 "Check your installation.\n"),
1715 target, full_path, progname);
1716
1717 }
1718
1719 return found_path;
1720 }
1721
1722 static bool
postmaster_is_alive(pid_t pid)1723 postmaster_is_alive(pid_t pid)
1724 {
1725 /*
1726 * Test to see if the process is still there. Note that we do not
1727 * consider an EPERM failure to mean that the process is still there;
1728 * EPERM must mean that the given PID belongs to some other userid, and
1729 * considering the permissions on $PGDATA, that means it's not the
1730 * postmaster we are after.
1731 *
1732 * Don't believe that our own PID or parent shell's PID is the postmaster,
1733 * either. (Windows hasn't got getppid(), though.)
1734 */
1735 if (pid == getpid())
1736 return false;
1737 #ifndef WIN32
1738 if (pid == getppid())
1739 return false;
1740 #endif
1741 if (kill(pid, 0) == 0)
1742 return true;
1743 return false;
1744 }
1745
1746 static long
get_pgpid(void)1747 get_pgpid(void)
1748 {
1749 FILE *pidf;
1750 long pid;
1751
1752 pidf = fopen(pid_file, "r");
1753 if (pidf == NULL)
1754 {
1755 return 0;
1756 }
1757 if (fscanf(pidf, "%ld", &pid) != 1)
1758 {
1759 return 0;
1760 }
1761 fclose(pidf);
1762 return pid;
1763 }
1764
1765 static char **
get_database_list(char * databases,int * n_databases)1766 get_database_list(char *databases, int *n_databases)
1767 {
1768 char *c;
1769 char **result;
1770 int num = 1;
1771 for (c = databases; *c; c++ )
1772 if (*c == ',')
1773 num++;
1774 *n_databases = num;
1775 result = palloc(num * sizeof(char *));
1776 num = 0;
1777 /* clone the argument so we don't destroy it with strtok*/
1778 databases = pstrdup(databases);
1779 c = strtok(databases, ",");
1780 while (c != NULL)
1781 {
1782 result[num] = pstrdup(c);
1783 num++;
1784 c = strtok(NULL,",");
1785 }
1786 pfree(databases);
1787 return result;
1788 }
1789
1790 static char *
generate_restore_point_name(void)1791 generate_restore_point_name(void)
1792 {
1793 char *rpn = malloc(NAMEDATALEN);
1794 snprintf(rpn, NAMEDATALEN-1, "pglogical_create_subscriber_%lx", random());
1795 return rpn;
1796 }
1797