1 /* -------------------------------------------------------------------------
2  *
3  * pglogical_create_subscriber.c
4  *		Initialize a new pglogical subscriber from a physical base backup
5  *
6  * Copyright (C) 2012-2016, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		pglogical_create_subscriber.c
10  *
11  * -------------------------------------------------------------------------
12  */
13 
14 /* dirent.h on port/win32_msvc expects MAX_PATH to be defined */
15 #if defined(_WIN32)
16 #define WIN32_LEAN_AND_MEAN
17 #include <windows.h>
18 #endif
19 
20 #include <dirent.h>
21 #include <fcntl.h>
22 #include <locale.h>
23 #include <signal.h>
24 #include <time.h>
25 #include <sys/types.h>
26 #include <sys/wait.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29 #include <stdlib.h>
30 
31 /* Note the order is important for debian here. */
32 #if !defined(pg_attribute_printf)
33 
34 /* GCC and XLC support format attributes */
35 #if defined(__GNUC__) || defined(__IBMC__)
36 #define pg_attribute_format_arg(a) __attribute__((format_arg(a)))
37 #define pg_attribute_printf(f,a) __attribute__((format(PG_PRINTF_ATTRIBUTE, f, a)))
38 #else
39 #define pg_attribute_format_arg(a)
40 #define pg_attribute_printf(f,a)
41 #endif
42 
43 #endif
44 
45 #include "libpq-fe.h"
46 #include "postgres_fe.h"
47 #include "pqexpbuffer.h"
48 
49 #include "getopt_long.h"
50 
51 #include "miscadmin.h"
52 
53 #include "access/timeline.h"
54 #include "access/xlog_internal.h"
55 #include "catalog/pg_control.h"
56 
57 #include "pglogical_fe.h"
58 
59 #define MAX_APPLY_DELAY 86400
60 
61 typedef struct RemoteInfo {
62 	Oid			nodeid;
63 	char	   *node_name;
64 	char	   *sysid;
65 	char	   *dbname;
66 	char	   *replication_sets;
67 } RemoteInfo;
68 
69 typedef enum {
70 	VERBOSITY_NORMAL,
71 	VERBOSITY_VERBOSE,
72 	VERBOSITY_DEBUG
73 } VerbosityLevelEnum;
74 
75 static char		   *argv0 = NULL;
76 static const char  *progname;
77 static char		   *data_dir = NULL;
78 static char			pid_file[MAXPGPATH];
79 static time_t		start_time;
80 static VerbosityLevelEnum	verbosity = VERBOSITY_NORMAL;
81 
82 /* defined as static so that die() can close them */
83 static PGconn		*subscriber_conn = NULL;
84 static PGconn		*provider_conn = NULL;
85 
86 static void signal_handler(int sig);
87 static void usage(void);
88 static void die(const char *fmt,...)
89 pg_attribute_printf(1, 2);
90 static void print_msg(VerbosityLevelEnum level, const char *fmt,...)
91 pg_attribute_printf(2, 3);
92 
93 static int run_pg_ctl(const char *arg);
94 static void run_basebackup(const char *provider_connstr, const char *data_dir,
95 	const char *extra_basebackup_args);
96 static void wait_postmaster_connection(const char *connstr);
97 static void wait_primary_connection(const char *connstr);
98 static void wait_postmaster_shutdown(void);
99 
100 static char *validate_replication_set_input(char *replication_sets);
101 
102 static void remove_unwanted_data(PGconn *conn);
103 static void initialize_replication_origin(PGconn *conn, char *origin_name, char *remote_lsn);
104 static char *create_restore_point(PGconn *conn, char *restore_point_name);
105 static char *initialize_replication_slot(PGconn *conn, char *dbname,
106 							char *provider_node_name, char *subscription_name,
107 							bool drop_slot_if_exists);
108 static void pglogical_subscribe(PGconn *conn, char *subscriber_name,
109 								char *subscriber_dsn,
110 								char *provider_connstr,
111 								char *replication_sets,
112 								int apply_delay,
113 								bool force_text_transfer);
114 
115 static RemoteInfo *get_remote_info(PGconn* conn);
116 
117 static bool extension_exists(PGconn *conn, const char *extname);
118 static void install_extension(PGconn *conn, const char *extname);
119 
120 static void initialize_data_dir(char *data_dir, char *connstr,
121 					char *postgresql_conf, char *pg_hba_conf,
122 					char *extra_basebackup_args);
123 static bool check_data_dir(char *data_dir, RemoteInfo *remoteinfo);
124 
125 static char *read_sysid(const char *data_dir);
126 
127 static void WriteRecoveryConf(PQExpBuffer contents);
128 static void CopyConfFile(char *fromfile, char *tofile, bool append);
129 
130 static char *get_connstr_dbname(char *connstr);
131 static char *get_connstr(char *connstr, char *dbname);
132 static char *PQconninfoParamsToConnstr(const char *const * keywords, const char *const * values);
133 static void appendPQExpBufferConnstrValue(PQExpBuffer buf, const char *str);
134 
135 static bool file_exists(const char *path);
136 static bool is_pg_dir(const char *path);
137 static void copy_file(char *fromfile, char *tofile, bool append);
138 static char *find_other_exec_or_die(const char *argv0, const char *target);
139 static bool postmaster_is_alive(pid_t pid);
140 static long get_pgpid(void);
141 static char **get_database_list(char *databases, int *n_databases);
142 static char *generate_restore_point_name(void);
143 
144 static PGconn *
connectdb(const char * connstr)145 connectdb(const char *connstr)
146 {
147 	PGconn *conn;
148 
149 	conn = PQconnectdb(connstr);
150 	if (PQstatus(conn) != CONNECTION_OK)
151 		die(_("Connection to database failed: %s, connection string was: %s\n"), PQerrorMessage(conn), connstr);
152 
153 	return conn;
154 }
155 
signal_handler(int sig)156 void signal_handler(int sig)
157 {
158 	if (sig == SIGINT)
159 	{
160 		die(_("\nCanceling...\n"));
161 	}
162 }
163 
164 
165 int
main(int argc,char ** argv)166 main(int argc, char **argv)
167 {
168 	int	i;
169 	int	c;
170 	PQExpBuffer recoveryconfcontents = createPQExpBuffer();
171 	RemoteInfo *remote_info;
172 	char	   *remote_lsn;
173 	bool		stop = false;
174 	bool		drop_slot_if_exists = false;
175 	int			optindex;
176 	char	   *subscriber_name = NULL;
177 	char	   *base_sub_connstr = NULL;
178 	char	   *base_prov_connstr = NULL;
179 	char	   *replication_sets = NULL;
180 	char       *databases = NULL;
181 	char	   *postgresql_conf = NULL,
182 			   *pg_hba_conf = NULL,
183 			   *recovery_conf = NULL;
184 	int			apply_delay = 0;
185 	bool		force_text_transfer = false;
186 	char	  **slot_names;
187 	char       *sub_connstr;
188 	char       *prov_connstr;
189 	char      **database_list = { NULL };
190 	int         n_databases = 1;
191 	int         dbnum;
192 	bool		use_existing_data_dir = false;
193 	int			pg_ctl_ret,
194 				logfd;
195 	char	   *restore_point_name = NULL;
196 	char	   *extra_basebackup_args = NULL;
197 
198 	static struct option long_options[] = {
199 		{"subscriber-name", required_argument, NULL, 'n'},
200 		{"pgdata", required_argument, NULL, 'D'},
201 		{"provider-dsn", required_argument, NULL, 1},
202 		{"subscriber-dsn", required_argument, NULL, 2},
203 		{"replication-sets", required_argument, NULL, 3},
204 		{"postgresql-conf", required_argument, NULL, 4},
205 		{"hba-conf", required_argument, NULL, 5},
206 		{"recovery-conf", required_argument, NULL, 6},
207 		{"stop", no_argument, NULL, 's'},
208 		{"drop-slot-if-exists", no_argument, NULL, 7},
209 		{"apply-delay", required_argument, NULL, 8},
210 		{"databases", required_argument, NULL, 9},
211 		{"extra-basebackup-args", required_argument, NULL, 10},
212 		{"text-types", no_argument, NULL, 11},
213 		{NULL, 0, NULL, 0}
214 	};
215 
216 	argv0 = argv[0];
217 	progname = get_progname(argv[0]);
218 	start_time = time(NULL);
219 	signal(SIGINT, signal_handler);
220 
221 	/* check for --help */
222 	if (argc > 1)
223 	{
224 		for (i = 1; i < argc; i++)
225 		{
226 			if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-?") == 0)
227 			{
228 				usage();
229 				exit(0);
230 			}
231 		}
232 	}
233 
234 	/* Option parsing and validation */
235 	while ((c = getopt_long(argc, argv, "D:n:sv", long_options, &optindex)) != -1)
236 	{
237 		switch (c)
238 		{
239 			case 'D':
240 				data_dir = pg_strdup(optarg);
241 				break;
242 			case 'n':
243 				subscriber_name = pg_strdup(optarg);
244 				break;
245 			case 1:
246 				base_prov_connstr = pg_strdup(optarg);
247 				break;
248 			case 2:
249 				base_sub_connstr = pg_strdup(optarg);
250 				break;
251 			case 3:
252 				replication_sets = validate_replication_set_input(pg_strdup(optarg));
253 				break;
254 			case 4:
255 				{
256 					postgresql_conf = pg_strdup(optarg);
257 					if (postgresql_conf != NULL && !file_exists(postgresql_conf))
258 						die(_("The specified postgresql.conf file does not exist."));
259 					break;
260 				}
261 			case 5:
262 				{
263 					pg_hba_conf = pg_strdup(optarg);
264 					if (pg_hba_conf != NULL && !file_exists(pg_hba_conf))
265 						die(_("The specified pg_hba.conf file does not exist."));
266 					break;
267 				}
268 			case 6:
269 				{
270 					recovery_conf = pg_strdup(optarg);
271 					if (recovery_conf != NULL && !file_exists(recovery_conf))
272 						die(_("The specified recovery configuration file does not exist."));
273 					break;
274 				}
275 			case 'v':
276 				verbosity++;
277 				break;
278 			case 's':
279 				stop = true;
280 				break;
281 			case 7:
282 				drop_slot_if_exists = true;
283 				break;
284 			case 8:
285 				apply_delay = atoi(optarg);
286 				break;
287 			case 9:
288 				databases = pg_strdup(optarg);
289 				break;
290 			case 10:
291 				extra_basebackup_args = pg_strdup(optarg);
292 				break;
293 			case 11:
294 				force_text_transfer = true;
295 				break;
296 			default:
297 				fprintf(stderr, _("Unknown option\n"));
298 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
299 				exit(1);
300 		}
301 	}
302 
303 	/*
304 	 * Sanity checks
305 	 */
306 
307 	if (data_dir == NULL)
308 	{
309 		fprintf(stderr, _("No data directory specified\n"));
310 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
311 		exit(1);
312 	}
313 	else if (subscriber_name == NULL)
314 	{
315 		fprintf(stderr, _("No subscriber name specified\n"));
316 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
317 		exit(1);
318 	}
319 
320 	if (!base_prov_connstr || !strlen(base_prov_connstr))
321 		die(_("Provider connection string must be specified.\n"));
322 	if (!base_sub_connstr || !strlen(base_sub_connstr))
323 		die(_("Subscriber connection string must be specified.\n"));
324 
325 	if (apply_delay < 0)
326 		die(_("Apply delay cannot be negative.\n"));
327 
328 	if (apply_delay > MAX_APPLY_DELAY)
329 		die(_("Apply delay cannot be more than %d.\n"), MAX_APPLY_DELAY);
330 
331 	if (!replication_sets || !strlen(replication_sets))
332 		replication_sets = "default,default_insert_only,ddl_sql";
333 
334 	/* Init random numbers used for slot suffixes, etc */
335 	srand(time(NULL));
336 
337 	/* Parse database list or connection string. */
338 	if (databases != NULL)
339 	{
340 		database_list = get_database_list(databases, &n_databases);
341 	}
342 	else
343 	{
344 		char *dbname = get_connstr_dbname(base_prov_connstr);
345 
346 		if (!dbname)
347 			die(_("Either provider connection string must contain database "
348 				  "name or --databases option must be specified.\n"));
349 
350 		n_databases = 1;
351 		database_list = palloc(n_databases * sizeof(char *));
352 		database_list[0] = dbname;
353 	}
354 
355 	slot_names = palloc(n_databases * sizeof(char *));
356 
357 	/*
358 	 * Check connection strings for validity before doing anything
359 	 * expensive.
360 	 */
361 	for (dbnum = 0; dbnum < n_databases; dbnum++)
362 	{
363 		char *db = database_list[dbnum];
364 
365 		prov_connstr = get_connstr(base_prov_connstr, db);
366 		if (!prov_connstr || !strlen(prov_connstr))
367 			die(_("Provider connection string is not valid.\n"));
368 
369 		sub_connstr = get_connstr(base_sub_connstr, db);
370 		if (!sub_connstr || !strlen(sub_connstr))
371 			die(_("Subscriber connection string is not valid.\n"));
372 	}
373 
374 	/*
375 	 * Create log file where new postgres instance will log to while being
376 	 * initialized.
377 	 */
378 	logfd = open("pglogical_create_subscriber_postgres.log", O_CREAT | O_RDWR,
379 				 S_IRUSR | S_IWUSR);
380 	if (logfd == -1)
381 	{
382 		die(_("Creating pglogical_create_subscriber_postgres.log failed: %s"),
383 			strerror(errno));
384 	}
385 	/* Safe to close() unchecked, we didn't write */
386 	(void) close(logfd);
387 
388 	/* Let's start the real work... */
389 	print_msg(VERBOSITY_NORMAL, _("%s: starting ...\n"), progname);
390 
391 	for (dbnum = 0; dbnum < n_databases; dbnum++)
392 	{
393 		char *db = database_list[dbnum];
394 
395 		prov_connstr = get_connstr(base_prov_connstr, db);
396 		if (!prov_connstr || !strlen(prov_connstr))
397 			die(_("Provider connection string is not valid.\n"));
398 
399 		/* Read the remote server indetification. */
400 		print_msg(VERBOSITY_NORMAL,
401 				  _("Getting information for database %s ...\n"), db);
402 		provider_conn = connectdb(prov_connstr);
403 		remote_info = get_remote_info(provider_conn);
404 
405 		/* only need to do this piece once */
406 
407 		if (dbnum == 0)
408 		{
409 			use_existing_data_dir = check_data_dir(data_dir, remote_info);
410 
411 			if (use_existing_data_dir &&
412 				strcmp(remote_info->sysid, read_sysid(data_dir)) != 0)
413 				die(_("Subscriber data directory is not basebackup of remote node.\n"));
414 		}
415 
416 		/*
417 		 * Create replication slots on remote node.
418 		 */
419 		print_msg(VERBOSITY_NORMAL,
420 				  _("Creating replication slot in database %s ...\n"), db);
421 		slot_names[dbnum] = initialize_replication_slot(provider_conn,
422 														remote_info->dbname,
423 														remote_info->node_name,
424 														subscriber_name,
425 														drop_slot_if_exists);
426 		PQfinish(provider_conn);
427 		provider_conn = NULL;
428 	}
429 
430 	/*
431 	 * Create basebackup or use existing one
432 	 */
433 	prov_connstr = get_connstr(base_prov_connstr, database_list[0]);
434 	sub_connstr = get_connstr(base_sub_connstr, database_list[0]);
435 
436 	initialize_data_dir(data_dir,
437 						use_existing_data_dir ? NULL : prov_connstr,
438 						postgresql_conf, pg_hba_conf,
439 						extra_basebackup_args);
440 	snprintf(pid_file, MAXPGPATH, "%s/postmaster.pid", data_dir);
441 
442 	restore_point_name = generate_restore_point_name();
443 
444 	print_msg(VERBOSITY_NORMAL, _("Creating restore point \"%s\" on remote node ...\n"),
445 		restore_point_name);
446 	provider_conn = connectdb(prov_connstr);
447 	remote_lsn = create_restore_point(provider_conn, restore_point_name);
448 	PQfinish(provider_conn);
449 	provider_conn = NULL;
450 
451 	/*
452 	 * Get subscriber db to consistent state (for lsn after slot creation).
453 	 */
454 	print_msg(VERBOSITY_NORMAL,
455 			  _("Bringing subscriber node to the restore point ...\n"));
456 	if (recovery_conf)
457 	{
458 #if PG_VERSION_NUM >= 120000
459 		CopyConfFile(recovery_conf, "postgresql.auto.conf", true);
460 #else
461 		CopyConfFile(recovery_conf, "recovery.conf", false);
462 #endif
463 	}
464 	else
465 	{
466 #if PG_VERSION_NUM < 120000
467 		appendPQExpBuffer(recoveryconfcontents, "standby_mode = 'on'\n");
468 #endif
469 		appendPQExpBuffer(recoveryconfcontents, "primary_conninfo = '%s'\n",
470 								escape_single_quotes_ascii(prov_connstr));
471 	}
472 	appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = '%s'\n", restore_point_name);
473 	appendPQExpBuffer(recoveryconfcontents, "recovery_target_inclusive = true\n");
474 #if PG_VERSION_NUM >= 90500
475 	appendPQExpBuffer(recoveryconfcontents, "recovery_target_action = promote\n");
476 #else
477 	appendPQExpBuffer(recoveryconfcontents, "pause_at_recovery_target = false\n");
478 #endif
479 	WriteRecoveryConf(recoveryconfcontents);
480 
481 	free(restore_point_name);
482 	restore_point_name = NULL;
483 
484 	/*
485 	 * Start subscriber node with pglogical disabled, and wait until it starts
486 	 * accepting connections which means it has caught up to the restore point.
487 	 */
488 	pg_ctl_ret = run_pg_ctl("start -l \"pglogical_create_subscriber_postgres.log\" -o \"-c shared_preload_libraries=''\"");
489 	if (pg_ctl_ret != 0)
490 		die(_("Postgres startup for restore point catchup failed with %d. See pglogical_create_subscriber_postgres.log."), pg_ctl_ret);
491 
492 	wait_primary_connection(sub_connstr);
493 
494 	/*
495 	 * Clean any per-node data that were copied by pg_basebackup.
496 	 */
497 	print_msg(VERBOSITY_VERBOSE,
498 			  _("Removing old pglogical configuration ...\n"));
499 
500 	for (dbnum = 0; dbnum < n_databases; dbnum++)
501 	{
502 		char *db = database_list[dbnum];
503 
504 		sub_connstr = get_connstr(base_sub_connstr, db);
505 
506 		if (!sub_connstr || !strlen(sub_connstr))
507 			die(_("Subscriber connection string is not valid.\n"));
508 
509 		subscriber_conn = connectdb(sub_connstr);
510 		remove_unwanted_data(subscriber_conn);
511 		PQfinish(subscriber_conn);
512 		subscriber_conn = NULL;
513 	}
514 
515 	/* Stop Postgres so we can reset system id and start it with pglogical loaded. */
516 	pg_ctl_ret = run_pg_ctl("stop");
517 	if (pg_ctl_ret != 0)
518 		die(_("Postgres stop after restore point catchup failed with %d. See pglogical_create_subscriber_postgres.log."), pg_ctl_ret);
519 	wait_postmaster_shutdown();
520 
521 	/*
522 	 * Start the node again, now with pglogical active so that we can start the
523 	 * logical replication. This is final start, so don't log to to special log
524 	 * file anymore.
525 	 */
526 	print_msg(VERBOSITY_NORMAL,
527 			  _("Initializing pglogical on the subscriber node:\n"));
528 
529 	pg_ctl_ret = run_pg_ctl("start");
530 	if (pg_ctl_ret != 0)
531 		die(_("Postgres restart with pglogical enabled failed with %d."), pg_ctl_ret);
532 	wait_postmaster_connection(base_sub_connstr);
533 
534 	for (dbnum = 0; dbnum < n_databases; dbnum++)
535 	{
536 		char *db = database_list[dbnum];
537 
538 		sub_connstr = get_connstr(base_sub_connstr, db);
539 		prov_connstr = get_connstr(base_prov_connstr, db);
540 
541 		subscriber_conn = connectdb(sub_connstr);
542 
543 		/* Create the extension. */
544 		print_msg(VERBOSITY_VERBOSE,
545 				  _("Creating pglogical extension for database %s...\n"), db);
546 		if (PQserverVersion(subscriber_conn) < 90500)
547 			install_extension(subscriber_conn, "pglogical_origin");
548 		install_extension(subscriber_conn, "pglogical");
549 
550 		/*
551 		 * Create the identifier which is setup with the position to which we
552 		 * already caught up using physical replication.
553 		 */
554 		print_msg(VERBOSITY_VERBOSE,
555 				  _("Creating replication origin for database %s...\n"), db);
556 		initialize_replication_origin(subscriber_conn, slot_names[dbnum], remote_lsn);
557 
558 		/*
559 		 * And finally add the node to the cluster.
560 		 */
561 		print_msg(VERBOSITY_NORMAL, _("Creating subscriber %s for database %s...\n"),
562 				  subscriber_name, db);
563 		print_msg(VERBOSITY_VERBOSE, _("Replication sets: %s\n"), replication_sets);
564 
565 		pglogical_subscribe(subscriber_conn, subscriber_name, sub_connstr,
566 							prov_connstr, replication_sets, apply_delay,
567 							force_text_transfer);
568 
569 		PQfinish(subscriber_conn);
570 		subscriber_conn = NULL;
571 	}
572 
573 	/* If user does not want the node to be running at the end, stop it. */
574 	if (stop)
575 	{
576 		print_msg(VERBOSITY_NORMAL, _("Stopping the subscriber node ...\n"));
577 		pg_ctl_ret = run_pg_ctl("stop");
578 		if (pg_ctl_ret != 0)
579 			die(_("Stopping postgres after successful subscribtion failed with %d."), pg_ctl_ret);
580 		wait_postmaster_shutdown();
581 	}
582 
583 	print_msg(VERBOSITY_NORMAL, _("All done\n"));
584 
585 	return 0;
586 }
587 
588 
589 /*
590  * Print help.
591  */
592 static void
usage(void)593 usage(void)
594 {
595 	printf(_("%s create new pglogical subscriber from basebackup of provider.\n\n"), progname);
596 	printf(_("Usage:\n"));
597 	printf(_("  %s [OPTION]...\n"), progname);
598 	printf(_("\nGeneral options:\n"));
599 	printf(_("  -D, --pgdata=DIRECTORY      data directory to be used for new node,\n"));
600 	printf(_("                              can be either empty/non-existing directory,\n"));
601 	printf(_("                              or directory populated using\n"));
602 	printf(_("                              pg_basebackup -X stream command\n"));
603 	printf(_("  --databases                 optional list of databases to replicate\n"));
604 	printf(_("  -n, --subscriber-name=NAME  name of the newly created subscriber\n"));
605 	printf(_("  --subscriber-dsn=CONNSTR    connection string to the newly created subscriber\n"));
606 	printf(_("  --provider-dsn=CONNSTR      connection string to the provider\n"));
607 	printf(_("  --replication-sets=SETS     comma separated list of replication set names\n"));
608 	printf(_("  --apply-delay=DELAY         apply delay in seconds (by default 0)\n"));
609 	printf(_("  --drop-slot-if-exists       drop replication slot of conflicting name\n"));
610 	printf(_("  -s, --stop                  stop the server once the initialization is done\n"));
611 	printf(_("  -v                          increase logging verbosity\n"));
612 	printf(_("  --extra-basebackup-args     additional arguments to pass to pg_basebackup.\n"));
613 	printf(_("                              Safe options: -T, -c, --xlogdir/--waldir\n"));
614 	printf(_("\nConfiguration files override:\n"));
615 	printf(_("  --hba-conf              path to the new pg_hba.conf\n"));
616 	printf(_("  --postgresql-conf       path to the new postgresql.conf\n"));
617 	printf(_("  --recovery-conf         path to the template recovery configuration\n"));
618 }
619 
620 /*
621  * Print error and exit.
622  */
623 static void
die(const char * fmt,...)624 die(const char *fmt,...)
625 {
626 	va_list argptr;
627 	va_start(argptr, fmt);
628 	vfprintf(stderr, fmt, argptr);
629 	va_end(argptr);
630 
631 	if (subscriber_conn)
632 		PQfinish(subscriber_conn);
633 	if (provider_conn)
634 		PQfinish(provider_conn);
635 
636 	if (get_pgpid())
637 	{
638 		if (!run_pg_ctl("stop -s"))
639 		{
640 			fprintf(stderr, _("WARNING: postgres seems to be running, but could not be stopped\n"));
641 		}
642 	}
643 
644 	exit(1);
645 }
646 
647 /*
648  * Print message to stdout and flush
649  */
650 static void
print_msg(VerbosityLevelEnum level,const char * fmt,...)651 print_msg(VerbosityLevelEnum level, const char *fmt,...)
652 {
653 	if (verbosity >= level)
654 	{
655 		va_list argptr;
656 		va_start(argptr, fmt);
657 		vfprintf(stdout, fmt, argptr);
658 		va_end(argptr);
659 		fflush(stdout);
660 	}
661 }
662 
663 
664 /*
665  * Start pg_ctl with given argument(s) - used to start/stop postgres
666  *
667  * Returns the exit code reported by pg_ctl. If pg_ctl exits due to a
668  * signal this call will die and not return.
669  */
670 static int
run_pg_ctl(const char * arg)671 run_pg_ctl(const char *arg)
672 {
673 	int			 ret;
674 	PQExpBuffer  cmd = createPQExpBuffer();
675 	char		*exec_path = find_other_exec_or_die(argv0, "pg_ctl");
676 
677 	appendPQExpBuffer(cmd, "%s %s -D \"%s\"", exec_path, arg, data_dir);
678 
679 	/* Run pg_ctl in silent mode unless we run in debug mode. */
680 	if (verbosity < VERBOSITY_DEBUG)
681 		appendPQExpBuffer(cmd, " -s");
682 
683 	print_msg(VERBOSITY_DEBUG, _("Running pg_ctl: %s.\n"), cmd->data);
684 	ret = system(cmd->data);
685 
686 	destroyPQExpBuffer(cmd);
687 
688 	if (WIFEXITED(ret))
689 		return WEXITSTATUS(ret);
690 	else if (WIFSIGNALED(ret))
691 		die(_("pg_ctl exited with signal %d"), WTERMSIG(ret));
692 	else
693 		die(_("pg_ctl exited for an unknown reason (system() returned %d)"), ret);
694 
695 	return -1;
696 }
697 
698 
699 /*
700  * Run pg_basebackup to create the copy of the origin node.
701  */
702 static void
run_basebackup(const char * provider_connstr,const char * data_dir,const char * extra_basebackup_args)703 run_basebackup(const char *provider_connstr, const char *data_dir,
704 	const char *extra_basebackup_args)
705 {
706 	int			 ret;
707 	PQExpBuffer  cmd = createPQExpBuffer();
708 	char		*exec_path = find_other_exec_or_die(argv0, "pg_basebackup");
709 
710 	appendPQExpBuffer(cmd, "%s -D \"%s\" -d \"%s\" -X s -P", exec_path, data_dir, provider_connstr);
711 
712 	/* Run pg_basebackup in verbose mode if we are running in verbose mode. */
713 	if (verbosity >= VERBOSITY_VERBOSE)
714 		appendPQExpBuffer(cmd, " -v");
715 
716 	if (extra_basebackup_args != NULL)
717 		appendPQExpBuffer(cmd, "%s", extra_basebackup_args);
718 
719 	print_msg(VERBOSITY_DEBUG, _("Running pg_basebackup: %s.\n"), cmd->data);
720 	ret = system(cmd->data);
721 
722 	destroyPQExpBuffer(cmd);
723 
724 	if (WIFEXITED(ret) && WEXITSTATUS(ret) == 0)
725 		return;
726 	if (WIFEXITED(ret))
727 		die(_("pg_basebackup failed with exit status %d, cannot continue.\n"), WEXITSTATUS(ret));
728 	else if (WIFSIGNALED(ret))
729 		die(_("pg_basebackup exited with signal %d, cannot continue"), WTERMSIG(ret));
730 	else
731 		die(_("pg_basebackup exited for an unknown reason (system() returned %d)"), ret);
732 }
733 
734 /*
735  * Init the datadir
736  *
737  * This function can either ensure provided datadir is a postgres datadir,
738  * or create it using pg_basebackup.
739  *
740  * In any case, new postresql.conf and pg_hba.conf will be copied to the
741  * datadir if they are provided.
742  */
743 static void
initialize_data_dir(char * data_dir,char * connstr,char * postgresql_conf,char * pg_hba_conf,char * extra_basebackup_args)744 initialize_data_dir(char *data_dir, char *connstr,
745 					char *postgresql_conf, char *pg_hba_conf,
746 					char *extra_basebackup_args)
747 {
748 	if (connstr)
749 	{
750 		print_msg(VERBOSITY_NORMAL,
751 				  _("Creating base backup of the remote node...\n"));
752 		run_basebackup(connstr, data_dir, extra_basebackup_args);
753 	}
754 
755 	if (postgresql_conf)
756 		CopyConfFile(postgresql_conf, "postgresql.conf", false);
757 	if (pg_hba_conf)
758 		CopyConfFile(pg_hba_conf, "pg_hba.conf", false);
759 }
760 
761 /*
762  * This function checks if provided datadir is clone of the remote node
763  * described by the remote info, or if it's emtpy directory that can be used
764  * as new datadir.
765  */
766 static bool
check_data_dir(char * data_dir,RemoteInfo * remoteinfo)767 check_data_dir(char *data_dir, RemoteInfo *remoteinfo)
768 {
769 	/* Run basebackup as needed. */
770 	switch (pg_check_dir(data_dir))
771 	{
772 		case 0:		/* Does not exist */
773 		case 1:		/* Exists, empty */
774 				return false;
775 		case 2:
776 		case 3:		/* Exists, not empty */
777 		case 4:
778 			{
779 				if (!is_pg_dir(data_dir))
780 					die(_("Directory \"%s\" exists but is not valid postgres data directory.\n"),
781 						data_dir);
782 				return true;
783 			}
784 		case -1:	/* Access problem */
785 			die(_("Could not access directory \"%s\": %s.\n"),
786 				data_dir, strerror(errno));
787 	}
788 
789 	/* Unreachable */
790 	die(_("Unexpected result from pg_check_dir() call"));
791 	return false;
792 }
793 
794 /*
795  * Initialize replication slots
796  */
797 static char *
initialize_replication_slot(PGconn * conn,char * dbname,char * provider_node_name,char * subscription_name,bool drop_slot_if_exists)798 initialize_replication_slot(PGconn *conn, char *dbname,
799 							char *provider_node_name, char *subscription_name,
800 							bool drop_slot_if_exists)
801 {
802 	PQExpBufferData		query;
803 	char			   *slot_name;
804 	PGresult		   *res;
805 
806 	/* Generate the slot name. */
807 	initPQExpBuffer(&query);
808 	printfPQExpBuffer(&query,
809 					  "SELECT pglogical.pglogical_gen_slot_name(%s, %s, %s)",
810 					  PQescapeLiteral(conn, dbname, strlen(dbname)),
811 					  PQescapeLiteral(conn, provider_node_name,
812 									  strlen(provider_node_name)),
813 					  PQescapeLiteral(conn, subscription_name,
814 									  strlen(subscription_name)));
815 
816 	res = PQexec(conn, query.data);
817 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
818 		die(_("Could generate slot name: %s"), PQerrorMessage(conn));
819 
820 	slot_name = pstrdup(PQgetvalue(res, 0, 0));
821 
822 	PQclear(res);
823 	resetPQExpBuffer(&query);
824 
825 	/* Check if the current slot exists. */
826 	printfPQExpBuffer(&query,
827 					  "SELECT 1 FROM pg_catalog.pg_replication_slots WHERE slot_name = %s",
828 					  PQescapeLiteral(conn, slot_name, strlen(slot_name)));
829 
830 	res = PQexec(conn, query.data);
831 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
832 		die(_("Could not fetch existing slot information: %s"), PQerrorMessage(conn));
833 
834 	/* Drop the existing slot when asked for it or error if it already exists. */
835 	if (PQntuples(res) > 0)
836 	{
837 		PQclear(res);
838 		resetPQExpBuffer(&query);
839 
840 		if (!drop_slot_if_exists)
841 			die(_("Slot %s already exists, drop it or use --drop-slot-if-exists to drop it automatically.\n"),
842 				slot_name);
843 
844 		print_msg(VERBOSITY_VERBOSE,
845 				  _("Droping existing slot %s ...\n"), slot_name);
846 
847 		printfPQExpBuffer(&query,
848 						  "SELECT pg_catalog.pg_drop_replication_slot(%s)",
849 						  PQescapeLiteral(conn, slot_name, strlen(slot_name)));
850 
851 		res = PQexec(conn, query.data);
852 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
853 			die(_("Could not drop existing slot %s: %s"), slot_name,
854 				PQerrorMessage(conn));
855 	}
856 
857 	PQclear(res);
858 	resetPQExpBuffer(&query);
859 
860 	/* And finally, create the slot. */
861 	appendPQExpBuffer(&query, "SELECT pg_create_logical_replication_slot(%s, '%s');",
862 					  PQescapeLiteral(conn, slot_name, strlen(slot_name)),
863 					  "pglogical_output");
864 
865 	res = PQexec(conn, query.data);
866 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
867 	{
868 		die(_("Could not create replication slot, status %s: %s\n"),
869 			 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
870 	}
871 
872 	PQclear(res);
873 	termPQExpBuffer(&query);
874 
875 	return slot_name;
876 }
877 
878 /*
879  * Read replication info about remote connection
880  *
881  * TODO: unify with pglogical_remote_node_info in pglogical_rpc
882  */
883 static RemoteInfo *
get_remote_info(PGconn * conn)884 get_remote_info(PGconn* conn)
885 {
886 	RemoteInfo		    *ri = (RemoteInfo *)pg_malloc0(sizeof(RemoteInfo));
887 	PGresult	   *res;
888 
889 	if (!extension_exists(conn, "pglogical"))
890 		die(_("The remote node is not configured as a pglogical provider.\n"));
891 
892 	res = PQexec(conn, "SELECT node_id, node_name, sysid, dbname, replication_sets FROM pglogical.pglogical_node_info()");
893 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
894 		die(_("could not fetch remote node info: %s\n"), PQerrorMessage(conn));
895 
896 	/* No nodes found? */
897 	if (PQntuples(res) == 0)
898 		die(_("The remote database is not configured as a pglogical node.\n"));
899 
900 	if (PQntuples(res) > 1)
901 		die(_("The remote database has multiple nodes configured. That is not supported with current version of pglogical.\n"));
902 
903 #define atooid(x)  ((Oid) strtoul((x), NULL, 10))
904 
905 	ri->nodeid = atooid(PQgetvalue(res, 0, 0));
906 	ri->node_name = pstrdup(PQgetvalue(res, 0, 1));
907 	ri->sysid = pstrdup(PQgetvalue(res, 0, 2));
908 	ri->dbname = pstrdup(PQgetvalue(res, 0, 3));
909 	ri->replication_sets = pstrdup(PQgetvalue(res, 0, 4));
910 
911 	PQclear(res);
912 
913 	return ri;
914 }
915 
916 /*
917  * Check if extension exists.
918  */
919 static bool
extension_exists(PGconn * conn,const char * extname)920 extension_exists(PGconn *conn, const char *extname)
921 {
922 	PQExpBuffer		query = createPQExpBuffer();
923 	PGresult	   *res;
924 	bool			ret;
925 
926 	printfPQExpBuffer(query, "SELECT 1 FROM pg_catalog.pg_extension WHERE extname = %s;",
927 					  PQescapeLiteral(conn, extname, strlen(extname)));
928 	res = PQexec(conn, query->data);
929 
930 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
931 	{
932 		PQclear(res);
933 		die(_("Could not read extension info: %s\n"), PQerrorMessage(conn));
934 	}
935 
936 	ret = PQntuples(res) == 1;
937 
938 	PQclear(res);
939 	destroyPQExpBuffer(query);
940 
941 	return ret;
942 }
943 
944 /*
945  * Create extension.
946  */
947 static void
install_extension(PGconn * conn,const char * extname)948 install_extension(PGconn *conn, const char *extname)
949 {
950 	PQExpBuffer		query = createPQExpBuffer();
951 	PGresult	   *res;
952 
953 	printfPQExpBuffer(query, "CREATE EXTENSION IF NOT EXISTS %s;",
954 					  PQescapeIdentifier(conn, extname, strlen(extname)));
955 	res = PQexec(conn, query->data);
956 
957 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
958 	{
959 		PQclear(res);
960 		die(_("Could not install %s extension: %s\n"), extname, PQerrorMessage(conn));
961 	}
962 
963 	PQclear(res);
964 	destroyPQExpBuffer(query);
965 }
966 
967 /*
968  * Clean all the data that was copied from remote node but we don't
969  * want it here (currently shared security labels and replication identifiers).
970  */
971 static void
remove_unwanted_data(PGconn * conn)972 remove_unwanted_data(PGconn *conn)
973 {
974 	PGresult		   *res;
975 
976 	/*
977 	 * Remove replication identifiers (9.4 will get them removed by dropping
978 	 * the extension later as we emulate them there).
979 	 */
980 	if (PQserverVersion(conn) >= 90500)
981 	{
982 		res = PQexec(conn, "SELECT pg_replication_origin_drop(external_id) FROM pg_replication_origin_status;");
983 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
984 		{
985 			PQclear(res);
986 			die(_("Could not remove existing replication origins: %s\n"), PQerrorMessage(conn));
987 		}
988 		PQclear(res);
989 	}
990 
991 	res = PQexec(conn, "DROP EXTENSION pglogical CASCADE;");
992 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
993 	{
994 		die(_("Could not clean the pglogical extension, status %s: %s\n"),
995 			 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
996 	}
997 	PQclear(res);
998 }
999 
1000 /*
1001  * Initialize new remote identifier to specific position.
1002  */
1003 static void
initialize_replication_origin(PGconn * conn,char * origin_name,char * remote_lsn)1004 initialize_replication_origin(PGconn *conn, char *origin_name, char *remote_lsn)
1005 {
1006 	PGresult   *res;
1007 	PQExpBuffer query = createPQExpBuffer();
1008 
1009 	if (PQserverVersion(conn) >= 90500)
1010 	{
1011 		printfPQExpBuffer(query, "SELECT pg_replication_origin_create(%s)",
1012 						  PQescapeLiteral(conn, origin_name, strlen(origin_name)));
1013 
1014 		res = PQexec(conn, query->data);
1015 
1016 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
1017 		{
1018 			die(_("Could not create replication origin \"%s\": status %s: %s\n"),
1019 				query->data,
1020 				PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1021 		}
1022 		PQclear(res);
1023 
1024 		if (remote_lsn)
1025 		{
1026 			printfPQExpBuffer(query, "SELECT pg_replication_origin_advance(%s, '%s')",
1027 							  PQescapeLiteral(conn, origin_name, strlen(origin_name)),
1028 							  remote_lsn);
1029 
1030 			res = PQexec(conn, query->data);
1031 
1032 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
1033 			{
1034 				die(_("Could not advance replication origin \"%s\": status %s: %s\n"),
1035 					query->data,
1036 					PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1037 			}
1038 			PQclear(res);
1039 		}
1040 	}
1041 	else
1042 	{
1043 		printfPQExpBuffer(query, "INSERT INTO pglogical_origin.replication_origin (roident, roname, roremote_lsn) SELECT COALESCE(MAX(roident::int), 0) + 1, %s, %s FROM pglogical_origin.replication_origin",
1044 						  PQescapeLiteral(conn, origin_name, strlen(origin_name)),
1045 						  remote_lsn ? PQescapeLiteral(conn, remote_lsn, strlen(remote_lsn)) : "0");
1046 
1047 		res = PQexec(conn, query->data);
1048 
1049 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
1050 		{
1051 			die(_("Could not create replication origin \"%s\": status %s: %s\n"),
1052 				query->data,
1053 				PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1054 		}
1055 		PQclear(res);
1056 	}
1057 
1058 	destroyPQExpBuffer(query);
1059 }
1060 
1061 
1062 /*
1063  * Create remote restore point which will be used to get into synchronized
1064  * state through physical replay.
1065  */
1066 static char *
create_restore_point(PGconn * conn,char * restore_point_name)1067 create_restore_point(PGconn *conn, char *restore_point_name)
1068 {
1069 	PQExpBuffer  query = createPQExpBuffer();
1070 	PGresult	*res;
1071 	char		*remote_lsn = NULL;
1072 
1073 	printfPQExpBuffer(query, "SELECT pg_create_restore_point('%s')", restore_point_name);
1074 	res = PQexec(conn, query->data);
1075 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1076 	{
1077 		die(_("Could not create restore point, status %s: %s\n"),
1078 			 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1079 	}
1080 	remote_lsn = pstrdup(PQgetvalue(res, 0, 0));
1081 
1082 	PQclear(res);
1083 	destroyPQExpBuffer(query);
1084 
1085 	return remote_lsn;
1086 }
1087 
1088 static void
pglogical_subscribe(PGconn * conn,char * subscriber_name,char * subscriber_dsn,char * provider_dsn,char * replication_sets,int apply_delay,bool force_text_transfer)1089 pglogical_subscribe(PGconn *conn, char *subscriber_name, char *subscriber_dsn,
1090 					char *provider_dsn, char *replication_sets,
1091 					int apply_delay, bool force_text_transfer)
1092 {
1093 	PQExpBufferData		query;
1094 	PQExpBufferData		repsets;
1095 	PGresult		   *res;
1096 
1097 	initPQExpBuffer(&query);
1098 	printfPQExpBuffer(&query,
1099 					  "SELECT pglogical.create_node(node_name := %s, dsn := %s);",
1100 					  PQescapeLiteral(conn, subscriber_name, strlen(subscriber_name)),
1101 					  PQescapeLiteral(conn, subscriber_dsn, strlen(subscriber_dsn)));
1102 
1103 	res = PQexec(conn, query.data);
1104 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1105 	{
1106 		die(_("Could not create local node, status %s: %s\n"),
1107 			 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1108 	}
1109 	PQclear(res);
1110 
1111 	resetPQExpBuffer(&query);
1112 	initPQExpBuffer(&repsets);
1113 
1114 	printfPQExpBuffer(&repsets, "{%s}", replication_sets);
1115 	printfPQExpBuffer(&query,
1116 					  "SELECT pglogical.create_subscription("
1117 					  "subscription_name := %s, provider_dsn := %s, "
1118 					  "replication_sets := %s, "
1119 					  "apply_delay := '%d seconds'::interval, "
1120 					  "synchronize_structure := false, "
1121 					  "synchronize_data := false, "
1122 					  "force_text_transfer := '%s');",
1123 					  PQescapeLiteral(conn, subscriber_name, strlen(subscriber_name)),
1124 					  PQescapeLiteral(conn, provider_dsn, strlen(provider_dsn)),
1125 					  PQescapeLiteral(conn, repsets.data, repsets.len),
1126 					  apply_delay, (force_text_transfer ? "t" : "f"));
1127 
1128 	res = PQexec(conn, query.data);
1129 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
1130 	{
1131 		die(_("Could not create subscription, status %s: %s\n"),
1132 			 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1133 	}
1134 	PQclear(res);
1135 
1136 	/* TODO */
1137 	res = PQexec(conn, "UPDATE pglogical.local_sync_status SET sync_status = 'r'");
1138 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
1139 	{
1140 		die(_("Could not update subscription, status %s: %s\n"),
1141 			 PQresStatus(PQresultStatus(res)), PQresultErrorMessage(res));
1142 	}
1143 
1144 	PQclear(res);
1145 
1146 	termPQExpBuffer(&repsets);
1147 	termPQExpBuffer(&query);
1148 }
1149 
1150 
1151 /*
1152  * Validates input of the replication sets and returns normalized data.
1153  */
1154 static char *
validate_replication_set_input(char * replication_sets)1155 validate_replication_set_input(char *replication_sets)
1156 {
1157 	char	   *name;
1158 	PQExpBuffer	retbuf = createPQExpBuffer();
1159 	char	   *ret;
1160 	bool		first = true;
1161 
1162 	if (!replication_sets)
1163 		return NULL;
1164 
1165 	name = strtok(replication_sets, " ,");
1166 	while (name != NULL)
1167 	{
1168 		const char *cp;
1169 
1170 		if (strlen(name) == 0)
1171 			die(_("Replication set name \"%s\" is too short\n"), name);
1172 
1173 		if (strlen(name) > NAMEDATALEN)
1174 			die(_("Replication set name \"%s\" is too long\n"), name);
1175 
1176 		for (cp = name; *cp; cp++)
1177 		{
1178 			if (!((*cp >= 'a' && *cp <= 'z')
1179 				  || (*cp >= '0' && *cp <= '9')
1180 				  || (*cp == '_')
1181 				  || (*cp == '-')))
1182 			{
1183 				die(_("Replication set name \"%s\" contains invalid character\n"),
1184 					name);
1185 			}
1186 		}
1187 
1188 		if (first)
1189 			first = false;
1190 		else
1191 			appendPQExpBufferStr(retbuf, ", ");
1192 		appendPQExpBufferStr(retbuf, name);
1193 
1194 		name = strtok(NULL, " ,");
1195 	}
1196 
1197 	ret = pg_strdup(retbuf->data);
1198 	destroyPQExpBuffer(retbuf);
1199 
1200 	return ret;
1201 }
1202 
1203 static char *
get_connstr_dbname(char * connstr)1204 get_connstr_dbname(char *connstr)
1205 {
1206 	PQconninfoOption *conn_opts = NULL;
1207 	PQconninfoOption *conn_opt;
1208 	char	   *err_msg = NULL;
1209 	char	   *ret = NULL;
1210 
1211 	conn_opts = PQconninfoParse(connstr, &err_msg);
1212 	if (conn_opts == NULL)
1213 	{
1214 		die(_("Invalid connection string: %s\n"), err_msg);
1215 	}
1216 
1217 	for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
1218 	{
1219 		if (strcmp(conn_opt->keyword, "dbname") == 0)
1220 		{
1221 			ret = pstrdup(conn_opt->val);
1222 			break;
1223 		}
1224 	}
1225 
1226 	PQconninfoFree(conn_opts);
1227 
1228 	return ret;
1229 }
1230 
1231 
1232 /*
1233  * Build connection string from individual parameter.
1234  *
1235  * dbname can be specified in connstr parameter
1236  */
1237 static char *
get_connstr(char * connstr,char * dbname)1238 get_connstr(char *connstr, char *dbname)
1239 {
1240 	char		*ret;
1241 	int			argcount = 4;	/* dbname, host, user, port */
1242 	int			i;
1243 	const char **keywords;
1244 	const char **values;
1245 	PQconninfoOption *conn_opts = NULL;
1246 	PQconninfoOption *conn_opt;
1247 	char	   *err_msg = NULL;
1248 
1249 	/*
1250 	 * Merge the connection info inputs given in form of connection string
1251 	 * and options
1252 	 */
1253 	i = 0;
1254 	if (connstr &&
1255 		(strncmp(connstr, "postgresql://", 13) == 0 ||
1256 		 strncmp(connstr, "postgres://", 11) == 0 ||
1257 		 strchr(connstr, '=') != NULL))
1258 	{
1259 		conn_opts = PQconninfoParse(connstr, &err_msg);
1260 		if (conn_opts == NULL)
1261 		{
1262 			die(_("Invalid connection string: %s\n"), err_msg);
1263 		}
1264 
1265 		for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
1266 		{
1267 			if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
1268 				argcount++;
1269 		}
1270 
1271 		keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
1272 		values = pg_malloc0((argcount + 1) * sizeof(*values));
1273 
1274 		for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
1275 		{
1276 			/* If db* parameters were provided, we'll fill them later. */
1277 			if (dbname && strcmp(conn_opt->keyword, "dbname") == 0)
1278 				continue;
1279 
1280 			if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
1281 			{
1282 				keywords[i] = conn_opt->keyword;
1283 				values[i] = conn_opt->val;
1284 				i++;
1285 			}
1286 		}
1287 	}
1288 	else
1289 	{
1290 		keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
1291 		values = pg_malloc0((argcount + 1) * sizeof(*values));
1292 
1293 		/*
1294 		 * If connstr was provided but it's not in connection string format and
1295 		 * the dbname wasn't provided then connstr is actually dbname.
1296 		 */
1297 		if (connstr && !dbname)
1298 			dbname = connstr;
1299 	}
1300 
1301 	if (dbname)
1302 	{
1303 		keywords[i] = "dbname";
1304 		values[i] = dbname;
1305 		i++;
1306 	}
1307 
1308 	ret = PQconninfoParamsToConnstr(keywords, values);
1309 
1310 	/* Connection ok! */
1311 	pg_free(values);
1312 	pg_free(keywords);
1313 	if (conn_opts)
1314 		PQconninfoFree(conn_opts);
1315 
1316 	return ret;
1317 }
1318 
1319 
1320 /*
1321  * Reads the pg_control file of the existing data dir.
1322  */
1323 static char *
read_sysid(const char * data_dir)1324 read_sysid(const char *data_dir)
1325 {
1326 	ControlFileData ControlFile;
1327 	int			fd;
1328 	char		ControlFilePath[MAXPGPATH];
1329 	char	   *res = (char *) pg_malloc0(33);
1330 
1331 	snprintf(ControlFilePath, MAXPGPATH, "%s/global/pg_control", data_dir);
1332 
1333 	if ((fd = open(ControlFilePath, O_RDONLY | PG_BINARY, 0)) == -1)
1334 		die(_("%s: could not open file \"%s\" for reading: %s\n"),
1335 			progname, ControlFilePath, strerror(errno));
1336 
1337 	if (read(fd, &ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
1338 		die(_("%s: could not read file \"%s\": %s\n"),
1339 			progname, ControlFilePath, strerror(errno));
1340 
1341 	close(fd);
1342 
1343 	snprintf(res, 33, UINT64_FORMAT, ControlFile.system_identifier);
1344 	return res;
1345 }
1346 
1347 /*
1348  * Write contents of recovery.conf or postgresql.auto.conf
1349  */
1350 static void
WriteRecoveryConf(PQExpBuffer contents)1351 WriteRecoveryConf(PQExpBuffer contents)
1352 {
1353 	char		filename[MAXPGPATH];
1354 	FILE	   *cf;
1355 
1356 #if PG_VERSION_NUM >= 120000
1357 	sprintf(filename, "%s/postgresql.auto.conf", data_dir);
1358 
1359 	cf = fopen(filename, "a");
1360 #else
1361 	sprintf(filename, "%s/recovery.conf", data_dir);
1362 
1363 	cf = fopen(filename, "w");
1364 #endif
1365 	if (cf == NULL)
1366 	{
1367 		die(_("%s: could not create file \"%s\": %s\n"), progname, filename, strerror(errno));
1368 	}
1369 
1370 	if (fwrite(contents->data, contents->len, 1, cf) != 1)
1371 	{
1372 		die(_("%s: could not write to file \"%s\": %s\n"),
1373 				progname, filename, strerror(errno));
1374 	}
1375 
1376 	fclose(cf);
1377 
1378 #if PG_VERSION_NUM >= 120000
1379 	{
1380 		sprintf(filename, "%s/standby.signal", data_dir);
1381 		cf = fopen(filename, "w");
1382 		if (cf == NULL)
1383 		{
1384 			die(_("%s: could not create file \"%s\": %s\n"), progname, filename, strerror(errno));
1385 		}
1386 
1387 		fclose(cf);
1388 	}
1389 #endif
1390 }
1391 
1392 /*
1393  * Copy file to data
1394  */
1395 static void
CopyConfFile(char * fromfile,char * tofile,bool append)1396 CopyConfFile(char *fromfile, char *tofile, bool append)
1397 {
1398 	char		filename[MAXPGPATH];
1399 
1400 	sprintf(filename, "%s/%s", data_dir, tofile);
1401 
1402 	print_msg(VERBOSITY_DEBUG, _("Copying \"%s\" to \"%s\".\n"),
1403 			  fromfile, filename);
1404 	copy_file(fromfile, filename, append);
1405 }
1406 
1407 
1408 /*
1409  * Convert PQconninfoOption array into conninfo string
1410  */
1411 static char *
PQconninfoParamsToConnstr(const char * const * keywords,const char * const * values)1412 PQconninfoParamsToConnstr(const char *const * keywords, const char *const * values)
1413 {
1414 	PQExpBuffer	 retbuf = createPQExpBuffer();
1415 	char		*ret;
1416 	int			 i = 0;
1417 
1418 	for (i = 0; keywords[i] != NULL; i++)
1419 	{
1420 		if (i > 0)
1421 			appendPQExpBufferChar(retbuf, ' ');
1422 		appendPQExpBuffer(retbuf, "%s=", keywords[i]);
1423 		appendPQExpBufferConnstrValue(retbuf, values[i]);
1424 	}
1425 
1426 	ret = pg_strdup(retbuf->data);
1427 	destroyPQExpBuffer(retbuf);
1428 
1429 	return ret;
1430 }
1431 
1432 /*
1433  * Escape connection info value
1434  */
1435 static void
appendPQExpBufferConnstrValue(PQExpBuffer buf,const char * str)1436 appendPQExpBufferConnstrValue(PQExpBuffer buf, const char *str)
1437 {
1438 	const char *s;
1439 	bool		needquotes;
1440 
1441 	/*
1442 	 * If the string consists entirely of plain ASCII characters, no need to
1443 	 * quote it. This is quite conservative, but better safe than sorry.
1444 	 */
1445 	needquotes = false;
1446 	for (s = str; *s; s++)
1447 	{
1448 		if (!((*s >= 'a' && *s <= 'z') || (*s >= 'A' && *s <= 'Z') ||
1449 			  (*s >= '0' && *s <= '9') || *s == '_' || *s == '.'))
1450 		{
1451 			needquotes = true;
1452 			break;
1453 		}
1454 	}
1455 
1456 	if (needquotes)
1457 	{
1458 		appendPQExpBufferChar(buf, '\'');
1459 		while (*str)
1460 		{
1461 			/* ' and \ must be escaped by to \' and \\ */
1462 			if (*str == '\'' || *str == '\\')
1463 				appendPQExpBufferChar(buf, '\\');
1464 
1465 			appendPQExpBufferChar(buf, *str);
1466 			str++;
1467 		}
1468 		appendPQExpBufferChar(buf, '\'');
1469 	}
1470 	else
1471 		appendPQExpBufferStr(buf, str);
1472 }
1473 
1474 
1475 /*
1476  * Find the pgport and try a connection
1477  */
1478 static void
wait_postmaster_connection(const char * connstr)1479 wait_postmaster_connection(const char *connstr)
1480 {
1481 	PGPing		res;
1482 	long		pmpid = 0;
1483 
1484 	print_msg(VERBOSITY_VERBOSE, "Waiting for PostgreSQL to accept connections ...");
1485 
1486 	/* First wait for Postmaster to come up. */
1487 	for (;;)
1488 	{
1489 		if ((pmpid = get_pgpid()) != 0 &&
1490 			postmaster_is_alive((pid_t) pmpid))
1491 			break;
1492 
1493 		pg_usleep(1000000);		/* 1 sec */
1494 		print_msg(VERBOSITY_VERBOSE, ".");
1495 	}
1496 
1497 	/* Now wait for Postmaster to either accept connections or die. */
1498 	for (;;)
1499 	{
1500 		res = PQping(connstr);
1501 		if (res == PQPING_OK)
1502 			break;
1503 		else if (res == PQPING_NO_ATTEMPT)
1504 			break;
1505 
1506 		/*
1507 		 * Check if the process is still alive. This covers cases where the
1508 		 * postmaster successfully created the pidfile but then crashed without
1509 		 * removing it.
1510 		 */
1511 		if (!postmaster_is_alive((pid_t) pmpid))
1512 			break;
1513 
1514 		/* No response; wait */
1515 		pg_usleep(1000000);		/* 1 sec */
1516 		print_msg(VERBOSITY_VERBOSE, ".");
1517 	}
1518 
1519 	print_msg(VERBOSITY_VERBOSE, "\n");
1520 }
1521 
1522 
1523 /*
1524  * Wait for PostgreSQL to leave recovery/standby mode
1525  */
1526 static void
wait_primary_connection(const char * connstr)1527 wait_primary_connection(const char *connstr)
1528 {
1529 	bool		ispri = false;
1530 	PGconn		*conn = NULL;
1531 	PGresult	*res;
1532 
1533 	wait_postmaster_connection(connstr);
1534 
1535 	print_msg(VERBOSITY_VERBOSE, "Waiting for PostgreSQL to become primary...");
1536 
1537 	while (!ispri)
1538 	{
1539 		if (!conn || PQstatus(conn) != CONNECTION_OK)
1540 		{
1541 			if (conn)
1542 				PQfinish(conn);
1543 			wait_postmaster_connection(connstr);
1544 			conn = connectdb(connstr);
1545 		}
1546 
1547 		res = PQexec(conn, "SELECT pg_is_in_recovery()");
1548 		if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1 && *PQgetvalue(res, 0, 0) == 'f')
1549 			ispri = true;
1550 		else
1551 		{
1552 			pg_usleep(1000000);		/* 1 sec */
1553 			print_msg(VERBOSITY_VERBOSE, ".");
1554 		}
1555 
1556 		PQclear(res);
1557 	}
1558 
1559 	PQfinish(conn);
1560 	print_msg(VERBOSITY_VERBOSE, "\n");
1561 }
1562 
1563 /*
1564  * Wait for postmaster to die
1565  */
1566 static void
wait_postmaster_shutdown(void)1567 wait_postmaster_shutdown(void)
1568 {
1569 	long pid;
1570 
1571 	print_msg(VERBOSITY_VERBOSE, "Waiting for PostgreSQL to shutdown ...");
1572 
1573 	for (;;)
1574 	{
1575 		if ((pid = get_pgpid()) != 0)
1576 		{
1577 			pg_usleep(1000000);		/* 1 sec */
1578 			print_msg(VERBOSITY_NORMAL, ".");
1579 		}
1580 		else
1581 			break;
1582 	}
1583 
1584 	print_msg(VERBOSITY_VERBOSE, "\n");
1585 }
1586 
1587 static bool
file_exists(const char * path)1588 file_exists(const char *path)
1589 {
1590 	struct stat statbuf;
1591 
1592 	if (stat(path, &statbuf) != 0)
1593 		return false;
1594 
1595 	return true;
1596 }
1597 
1598 static bool
is_pg_dir(const char * path)1599 is_pg_dir(const char *path)
1600 {
1601 	struct stat statbuf;
1602 	char		version_file[MAXPGPATH];
1603 
1604 	if (stat(path, &statbuf) != 0)
1605 		return false;
1606 
1607 	snprintf(version_file, MAXPGPATH, "%s/PG_VERSION", data_dir);
1608 	if (stat(version_file, &statbuf) != 0 && errno == ENOENT)
1609 	{
1610 		return false;
1611 	}
1612 
1613 	return true;
1614 }
1615 
1616 /*
1617  * copy one file
1618  */
1619 static void
copy_file(char * fromfile,char * tofile,bool append)1620 copy_file(char *fromfile, char *tofile, bool append)
1621 {
1622 	char	   *buffer;
1623 	int			srcfd;
1624 	int			dstfd;
1625 	int			nbytes;
1626 	off_t		offset;
1627 
1628 #define COPY_BUF_SIZE (8 * BLCKSZ)
1629 
1630 	buffer = malloc(COPY_BUF_SIZE);
1631 
1632 	/*
1633 	 * Open the files
1634 	 */
1635 	srcfd = open(fromfile, O_RDONLY | PG_BINARY, 0);
1636 	if (srcfd < 0)
1637 		die(_("could not open file \"%s\""), fromfile);
1638 
1639 	dstfd = open(tofile, O_RDWR | O_CREAT | (append ? O_APPEND : O_TRUNC) | PG_BINARY,
1640 							  S_IRUSR | S_IWUSR);
1641 	if (dstfd < 0)
1642 		die(_("could not create file \"%s\""), tofile);
1643 
1644 	/*
1645 	 * Do the data copying.
1646 	 */
1647 	for (offset = 0;; offset += nbytes)
1648 	{
1649 		nbytes = read(srcfd, buffer, COPY_BUF_SIZE);
1650 		if (nbytes < 0)
1651 			die(_("could not read file \"%s\""), fromfile);
1652 		if (nbytes == 0)
1653 			break;
1654 		errno = 0;
1655 		if ((int) write(dstfd, buffer, nbytes) != nbytes)
1656 		{
1657 			/* if write didn't set errno, assume problem is no disk space */
1658 			if (errno == 0)
1659 				errno = ENOSPC;
1660 			die(_("could not write to file \"%s\""), tofile);
1661 		}
1662 	}
1663 
1664 	if (close(dstfd))
1665 		die(_("could not close file \"%s\""), tofile);
1666 
1667 	/* we don't care about errors here */
1668 	close(srcfd);
1669 
1670 	free(buffer);
1671 }
1672 
1673 
1674 static char *
find_other_exec_or_die(const char * argv0,const char * target)1675 find_other_exec_or_die(const char *argv0, const char *target)
1676 {
1677 	int			ret;
1678 	char	   *found_path;
1679 	uint32		bin_version;
1680 
1681 	found_path = pg_malloc(MAXPGPATH);
1682 
1683 	ret = find_other_exec_version(argv0, target, &bin_version, found_path);
1684 
1685 	if (ret < 0)
1686 	{
1687 		char		full_path[MAXPGPATH];
1688 
1689 		if (find_my_exec(argv0, full_path) < 0)
1690 			strlcpy(full_path, progname, sizeof(full_path));
1691 
1692 		if (ret == -1)
1693 			die(_("The program \"%s\" is needed by %s "
1694 						   "but was not found in the\n"
1695 						   "same directory as \"%s\".\n"
1696 						   "Check your installation.\n"),
1697 						 target, progname, full_path);
1698 		else
1699 			die(_("The program \"%s\" was found by \"%s\"\n"
1700 						   "but was not the same version as %s.\n"
1701 						   "Check your installation.\n"),
1702 						 target, full_path, progname);
1703 	}
1704 	else
1705 	{
1706 		char		full_path[MAXPGPATH];
1707 
1708 		if (find_my_exec(argv0, full_path) < 0)
1709 			strlcpy(full_path, progname, sizeof(full_path));
1710 
1711 		if (bin_version / 100 != PG_VERSION_NUM / 100)
1712 			die(_("The program \"%s\" was found by \"%s\"\n"
1713 						   "but was not the same version as %s.\n"
1714 						   "Check your installation.\n"),
1715 						 target, full_path, progname);
1716 
1717 	}
1718 
1719 	return found_path;
1720 }
1721 
1722 static bool
postmaster_is_alive(pid_t pid)1723 postmaster_is_alive(pid_t pid)
1724 {
1725 	/*
1726 	 * Test to see if the process is still there.  Note that we do not
1727 	 * consider an EPERM failure to mean that the process is still there;
1728 	 * EPERM must mean that the given PID belongs to some other userid, and
1729 	 * considering the permissions on $PGDATA, that means it's not the
1730 	 * postmaster we are after.
1731 	 *
1732 	 * Don't believe that our own PID or parent shell's PID is the postmaster,
1733 	 * either.  (Windows hasn't got getppid(), though.)
1734 	 */
1735 	if (pid == getpid())
1736 		return false;
1737 #ifndef WIN32
1738 	if (pid == getppid())
1739 		return false;
1740 #endif
1741 	if (kill(pid, 0) == 0)
1742 		return true;
1743 	return false;
1744 }
1745 
1746 static long
get_pgpid(void)1747 get_pgpid(void)
1748 {
1749 	FILE	   *pidf;
1750 	long		pid;
1751 
1752 	pidf = fopen(pid_file, "r");
1753 	if (pidf == NULL)
1754 	{
1755 		return 0;
1756 	}
1757 	if (fscanf(pidf, "%ld", &pid) != 1)
1758 	{
1759 		return 0;
1760 	}
1761 	fclose(pidf);
1762 	return pid;
1763 }
1764 
1765 static char **
get_database_list(char * databases,int * n_databases)1766 get_database_list(char *databases, int *n_databases)
1767 {
1768 	char *c;
1769 	char **result;
1770 	int num = 1;
1771 	for (c = databases; *c; c++ )
1772 		if (*c == ',')
1773 			num++;
1774 	*n_databases = num;
1775 	result = palloc(num * sizeof(char *));
1776 	num = 0;
1777 	/* clone the argument so we don't destroy it with strtok*/
1778 	databases = pstrdup(databases);
1779 	c = strtok(databases, ",");
1780 	while (c != NULL)
1781 	{
1782 		result[num] = pstrdup(c);
1783 		num++;
1784 		c = strtok(NULL,",");
1785 	}
1786 	pfree(databases);
1787 	return result;
1788 }
1789 
1790 static char *
generate_restore_point_name(void)1791 generate_restore_point_name(void)
1792 {
1793 	char *rpn = malloc(NAMEDATALEN);
1794 	snprintf(rpn, NAMEDATALEN-1, "pglogical_create_subscriber_%lx", random());
1795 	return rpn;
1796 }
1797