1 /*-------------------------------------------------------------------------
2  *
3  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
4  *					pg_recvlogical
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  *		  src/bin/pg_basebackup/streamutil.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/time.h>
18 #include <unistd.h>
19 
20 #include "access/xlog_internal.h"
21 #include "common/connect.h"
22 #include "common/fe_memutils.h"
23 #include "common/file_perm.h"
24 #include "common/logging.h"
25 #include "common/string.h"
26 #include "datatype/timestamp.h"
27 #include "port/pg_bswap.h"
28 #include "pqexpbuffer.h"
29 #include "receivelog.h"
30 #include "streamutil.h"
31 
32 #define ERRCODE_DUPLICATE_OBJECT  "42710"
33 
34 uint32		WalSegSz;
35 
36 static bool RetrieveDataDirCreatePerm(PGconn *conn);
37 
38 /* SHOW command for replication connection was introduced in version 10 */
39 #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
40 
41 /*
42  * Group access is supported from version 11.
43  */
44 #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
45 
46 const char *progname;
47 char	   *connection_string = NULL;
48 char	   *dbhost = NULL;
49 char	   *dbuser = NULL;
50 char	   *dbport = NULL;
51 char	   *dbname = NULL;
52 int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
53 static char *password = NULL;
54 PGconn	   *conn = NULL;
55 
56 /*
57  * Connect to the server. Returns a valid PGconn pointer if connected,
58  * or NULL on non-permanent error. On permanent error, the function will
59  * call exit(1) directly.
60  */
61 PGconn *
GetConnection(void)62 GetConnection(void)
63 {
64 	PGconn	   *tmpconn;
65 	int			argcount = 7;	/* dbname, replication, fallback_app_name,
66 								 * host, user, port, password */
67 	int			i;
68 	const char **keywords;
69 	const char **values;
70 	const char *tmpparam;
71 	bool		need_password;
72 	PQconninfoOption *conn_opts = NULL;
73 	PQconninfoOption *conn_opt;
74 	char	   *err_msg = NULL;
75 
76 	/* pg_recvlogical uses dbname only; others use connection_string only. */
77 	Assert(dbname == NULL || connection_string == NULL);
78 
79 	/*
80 	 * Merge the connection info inputs given in form of connection string,
81 	 * options and default values (dbname=replication, replication=true, etc.)
82 	 * Explicitly discard any dbname value in the connection string;
83 	 * otherwise, PQconnectdbParams() would interpret that value as being
84 	 * itself a connection string.
85 	 */
86 	i = 0;
87 	if (connection_string)
88 	{
89 		conn_opts = PQconninfoParse(connection_string, &err_msg);
90 		if (conn_opts == NULL)
91 		{
92 			pg_log_error("%s", err_msg);
93 			exit(1);
94 		}
95 
96 		for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
97 		{
98 			if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
99 				strcmp(conn_opt->keyword, "dbname") != 0)
100 				argcount++;
101 		}
102 
103 		keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
104 		values = pg_malloc0((argcount + 1) * sizeof(*values));
105 
106 		for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
107 		{
108 			if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
109 				strcmp(conn_opt->keyword, "dbname") != 0)
110 			{
111 				keywords[i] = conn_opt->keyword;
112 				values[i] = conn_opt->val;
113 				i++;
114 			}
115 		}
116 	}
117 	else
118 	{
119 		keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
120 		values = pg_malloc0((argcount + 1) * sizeof(*values));
121 	}
122 
123 	keywords[i] = "dbname";
124 	values[i] = dbname == NULL ? "replication" : dbname;
125 	i++;
126 	keywords[i] = "replication";
127 	values[i] = dbname == NULL ? "true" : "database";
128 	i++;
129 	keywords[i] = "fallback_application_name";
130 	values[i] = progname;
131 	i++;
132 
133 	if (dbhost)
134 	{
135 		keywords[i] = "host";
136 		values[i] = dbhost;
137 		i++;
138 	}
139 	if (dbuser)
140 	{
141 		keywords[i] = "user";
142 		values[i] = dbuser;
143 		i++;
144 	}
145 	if (dbport)
146 	{
147 		keywords[i] = "port";
148 		values[i] = dbport;
149 		i++;
150 	}
151 
152 	/* If -W was given, force prompt for password, but only the first time */
153 	need_password = (dbgetpassword == 1 && !password);
154 
155 	do
156 	{
157 		/* Get a new password if appropriate */
158 		if (need_password)
159 		{
160 			if (password)
161 				free(password);
162 			password = simple_prompt("Password: ", false);
163 			need_password = false;
164 		}
165 
166 		/* Use (or reuse, on a subsequent connection) password if we have it */
167 		if (password)
168 		{
169 			keywords[i] = "password";
170 			values[i] = password;
171 		}
172 		else
173 		{
174 			keywords[i] = NULL;
175 			values[i] = NULL;
176 		}
177 
178 		tmpconn = PQconnectdbParams(keywords, values, true);
179 
180 		/*
181 		 * If there is too little memory even to allocate the PGconn object
182 		 * and PQconnectdbParams returns NULL, we call exit(1) directly.
183 		 */
184 		if (!tmpconn)
185 		{
186 			pg_log_error("could not connect to server");
187 			exit(1);
188 		}
189 
190 		/* If we need a password and -w wasn't given, loop back and get one */
191 		if (PQstatus(tmpconn) == CONNECTION_BAD &&
192 			PQconnectionNeedsPassword(tmpconn) &&
193 			dbgetpassword != -1)
194 		{
195 			PQfinish(tmpconn);
196 			need_password = true;
197 		}
198 	}
199 	while (need_password);
200 
201 	if (PQstatus(tmpconn) != CONNECTION_OK)
202 	{
203 		pg_log_error("%s", PQerrorMessage(tmpconn));
204 		PQfinish(tmpconn);
205 		free(values);
206 		free(keywords);
207 		if (conn_opts)
208 			PQconninfoFree(conn_opts);
209 		return NULL;
210 	}
211 
212 	/* Connection ok! */
213 	free(values);
214 	free(keywords);
215 	if (conn_opts)
216 		PQconninfoFree(conn_opts);
217 
218 	/*
219 	 * Set always-secure search path, so malicious users can't get control.
220 	 * The capacity to run normal SQL queries was added in PostgreSQL 10, so
221 	 * the search path cannot be changed (by us or attackers) on earlier
222 	 * versions.
223 	 */
224 	if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
225 	{
226 		PGresult   *res;
227 
228 		res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
229 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
230 		{
231 			pg_log_error("could not clear search_path: %s",
232 						 PQerrorMessage(tmpconn));
233 			PQclear(res);
234 			PQfinish(tmpconn);
235 			exit(1);
236 		}
237 		PQclear(res);
238 	}
239 
240 	/*
241 	 * Ensure we have the same value of integer_datetimes (now always "on") as
242 	 * the server we are connecting to.
243 	 */
244 	tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
245 	if (!tmpparam)
246 	{
247 		pg_log_error("could not determine server setting for integer_datetimes");
248 		PQfinish(tmpconn);
249 		exit(1);
250 	}
251 
252 	if (strcmp(tmpparam, "on") != 0)
253 	{
254 		pg_log_error("integer_datetimes compile flag does not match server");
255 		PQfinish(tmpconn);
256 		exit(1);
257 	}
258 
259 	/*
260 	 * Retrieve the source data directory mode and use it to construct a umask
261 	 * for creating directories and files.
262 	 */
263 	if (!RetrieveDataDirCreatePerm(tmpconn))
264 	{
265 		PQfinish(tmpconn);
266 		exit(1);
267 	}
268 
269 	return tmpconn;
270 }
271 
272 /*
273  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
274  * since ControlFile is not accessible here.
275  */
276 bool
RetrieveWalSegSize(PGconn * conn)277 RetrieveWalSegSize(PGconn *conn)
278 {
279 	PGresult   *res;
280 	char		xlog_unit[3];
281 	int			xlog_val,
282 				multiplier = 1;
283 
284 	/* check connection existence */
285 	Assert(conn != NULL);
286 
287 	/* for previous versions set the default xlog seg size */
288 	if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
289 	{
290 		WalSegSz = DEFAULT_XLOG_SEG_SIZE;
291 		return true;
292 	}
293 
294 	res = PQexec(conn, "SHOW wal_segment_size");
295 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
296 	{
297 		pg_log_error("could not send replication command \"%s\": %s",
298 					 "SHOW wal_segment_size", PQerrorMessage(conn));
299 
300 		PQclear(res);
301 		return false;
302 	}
303 	if (PQntuples(res) != 1 || PQnfields(res) < 1)
304 	{
305 		pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
306 					 PQntuples(res), PQnfields(res), 1, 1);
307 
308 		PQclear(res);
309 		return false;
310 	}
311 
312 	/* fetch xlog value and unit from the result */
313 	if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
314 	{
315 		pg_log_error("WAL segment size could not be parsed");
316 		PQclear(res);
317 		return false;
318 	}
319 
320 	PQclear(res);
321 
322 	/* set the multiplier based on unit to convert xlog_val to bytes */
323 	if (strcmp(xlog_unit, "MB") == 0)
324 		multiplier = 1024 * 1024;
325 	else if (strcmp(xlog_unit, "GB") == 0)
326 		multiplier = 1024 * 1024 * 1024;
327 
328 	/* convert and set WalSegSz */
329 	WalSegSz = xlog_val * multiplier;
330 
331 	if (!IsValidWalSegSize(WalSegSz))
332 	{
333 		pg_log_error(ngettext("WAL segment size must be a power of two between 1 MB and 1 GB, but the remote server reported a value of %d byte",
334 							  "WAL segment size must be a power of two between 1 MB and 1 GB, but the remote server reported a value of %d bytes",
335 							  WalSegSz),
336 					 WalSegSz);
337 		return false;
338 	}
339 
340 	return true;
341 }
342 
343 /*
344  * RetrieveDataDirCreatePerm
345  *
346  * This function is used to determine the privileges on the server's PG data
347  * directory and, based on that, set what the permissions will be for
348  * directories and files we create.
349  *
350  * PG11 added support for (optionally) group read/execute rights to be set on
351  * the data directory.  Prior to PG11, only the owner was allowed to have rights
352  * on the data directory.
353  */
354 static bool
RetrieveDataDirCreatePerm(PGconn * conn)355 RetrieveDataDirCreatePerm(PGconn *conn)
356 {
357 	PGresult   *res;
358 	int			data_directory_mode;
359 
360 	/* check connection existence */
361 	Assert(conn != NULL);
362 
363 	/* for previous versions leave the default group access */
364 	if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS)
365 		return true;
366 
367 	res = PQexec(conn, "SHOW data_directory_mode");
368 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
369 	{
370 		pg_log_error("could not send replication command \"%s\": %s",
371 					 "SHOW data_directory_mode", PQerrorMessage(conn));
372 
373 		PQclear(res);
374 		return false;
375 	}
376 	if (PQntuples(res) != 1 || PQnfields(res) < 1)
377 	{
378 		pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
379 					 PQntuples(res), PQnfields(res), 1, 1);
380 
381 		PQclear(res);
382 		return false;
383 	}
384 
385 	if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
386 	{
387 		pg_log_error("group access flag could not be parsed: %s",
388 					 PQgetvalue(res, 0, 0));
389 
390 		PQclear(res);
391 		return false;
392 	}
393 
394 	SetDataDirectoryCreatePerm(data_directory_mode);
395 
396 	PQclear(res);
397 	return true;
398 }
399 
400 /*
401  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
402  * some result information if requested:
403  * - System identifier
404  * - Current timeline ID
405  * - Start LSN position
406  * - Database name (NULL in servers prior to 9.4)
407  */
408 bool
RunIdentifySystem(PGconn * conn,char ** sysid,TimeLineID * starttli,XLogRecPtr * startpos,char ** db_name)409 RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
410 				  XLogRecPtr *startpos, char **db_name)
411 {
412 	PGresult   *res;
413 	uint32		hi,
414 				lo;
415 
416 	/* Check connection existence */
417 	Assert(conn != NULL);
418 
419 	res = PQexec(conn, "IDENTIFY_SYSTEM");
420 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
421 	{
422 		pg_log_error("could not send replication command \"%s\": %s",
423 					 "IDENTIFY_SYSTEM", PQerrorMessage(conn));
424 
425 		PQclear(res);
426 		return false;
427 	}
428 	if (PQntuples(res) != 1 || PQnfields(res) < 3)
429 	{
430 		pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
431 					 PQntuples(res), PQnfields(res), 1, 3);
432 
433 		PQclear(res);
434 		return false;
435 	}
436 
437 	/* Get system identifier */
438 	if (sysid != NULL)
439 		*sysid = pg_strdup(PQgetvalue(res, 0, 0));
440 
441 	/* Get timeline ID to start streaming from */
442 	if (starttli != NULL)
443 		*starttli = atoi(PQgetvalue(res, 0, 1));
444 
445 	/* Get LSN start position if necessary */
446 	if (startpos != NULL)
447 	{
448 		if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
449 		{
450 			pg_log_error("could not parse write-ahead log location \"%s\"",
451 						 PQgetvalue(res, 0, 2));
452 
453 			PQclear(res);
454 			return false;
455 		}
456 		*startpos = ((uint64) hi) << 32 | lo;
457 	}
458 
459 	/* Get database name, only available in 9.4 and newer versions */
460 	if (db_name != NULL)
461 	{
462 		*db_name = NULL;
463 		if (PQserverVersion(conn) >= 90400)
464 		{
465 			if (PQnfields(res) < 4)
466 			{
467 				pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
468 							 PQntuples(res), PQnfields(res), 1, 4);
469 
470 				PQclear(res);
471 				return false;
472 			}
473 			if (!PQgetisnull(res, 0, 3))
474 				*db_name = pg_strdup(PQgetvalue(res, 0, 3));
475 		}
476 	}
477 
478 	PQclear(res);
479 	return true;
480 }
481 
482 /*
483  * Create a replication slot for the given connection. This function
484  * returns true in case of success.
485  */
486 bool
CreateReplicationSlot(PGconn * conn,const char * slot_name,const char * plugin,bool is_temporary,bool is_physical,bool reserve_wal,bool slot_exists_ok)487 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
488 					  bool is_temporary, bool is_physical, bool reserve_wal,
489 					  bool slot_exists_ok)
490 {
491 	PQExpBuffer query;
492 	PGresult   *res;
493 
494 	query = createPQExpBuffer();
495 
496 	Assert((is_physical && plugin == NULL) ||
497 		   (!is_physical && plugin != NULL));
498 	Assert(slot_name != NULL);
499 
500 	/* Build query */
501 	appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
502 	if (is_temporary)
503 		appendPQExpBufferStr(query, " TEMPORARY");
504 	if (is_physical)
505 	{
506 		appendPQExpBufferStr(query, " PHYSICAL");
507 		if (reserve_wal)
508 			appendPQExpBufferStr(query, " RESERVE_WAL");
509 	}
510 	else
511 	{
512 		appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
513 		if (PQserverVersion(conn) >= 100000)
514 			/* pg_recvlogical doesn't use an exported snapshot, so suppress */
515 			appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT");
516 	}
517 
518 	res = PQexec(conn, query->data);
519 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
520 	{
521 		const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
522 
523 		if (slot_exists_ok &&
524 			sqlstate &&
525 			strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
526 		{
527 			destroyPQExpBuffer(query);
528 			PQclear(res);
529 			return true;
530 		}
531 		else
532 		{
533 			pg_log_error("could not send replication command \"%s\": %s",
534 						 query->data, PQerrorMessage(conn));
535 
536 			destroyPQExpBuffer(query);
537 			PQclear(res);
538 			return false;
539 		}
540 	}
541 
542 	if (PQntuples(res) != 1 || PQnfields(res) != 4)
543 	{
544 		pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
545 					 slot_name,
546 					 PQntuples(res), PQnfields(res), 1, 4);
547 
548 		destroyPQExpBuffer(query);
549 		PQclear(res);
550 		return false;
551 	}
552 
553 	destroyPQExpBuffer(query);
554 	PQclear(res);
555 	return true;
556 }
557 
558 /*
559  * Drop a replication slot for the given connection. This function
560  * returns true in case of success.
561  */
562 bool
DropReplicationSlot(PGconn * conn,const char * slot_name)563 DropReplicationSlot(PGconn *conn, const char *slot_name)
564 {
565 	PQExpBuffer query;
566 	PGresult   *res;
567 
568 	Assert(slot_name != NULL);
569 
570 	query = createPQExpBuffer();
571 
572 	/* Build query */
573 	appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
574 					  slot_name);
575 	res = PQexec(conn, query->data);
576 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
577 	{
578 		pg_log_error("could not send replication command \"%s\": %s",
579 					 query->data, PQerrorMessage(conn));
580 
581 		destroyPQExpBuffer(query);
582 		PQclear(res);
583 		return false;
584 	}
585 
586 	if (PQntuples(res) != 0 || PQnfields(res) != 0)
587 	{
588 		pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
589 					 slot_name,
590 					 PQntuples(res), PQnfields(res), 0, 0);
591 
592 		destroyPQExpBuffer(query);
593 		PQclear(res);
594 		return false;
595 	}
596 
597 	destroyPQExpBuffer(query);
598 	PQclear(res);
599 	return true;
600 }
601 
602 
603 /*
604  * Frontend version of GetCurrentTimestamp(), since we are not linked with
605  * backend code.
606  */
607 TimestampTz
feGetCurrentTimestamp(void)608 feGetCurrentTimestamp(void)
609 {
610 	TimestampTz result;
611 	struct timeval tp;
612 
613 	gettimeofday(&tp, NULL);
614 
615 	result = (TimestampTz) tp.tv_sec -
616 		((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
617 	result = (result * USECS_PER_SEC) + tp.tv_usec;
618 
619 	return result;
620 }
621 
622 /*
623  * Frontend version of TimestampDifference(), since we are not linked with
624  * backend code.
625  */
626 void
feTimestampDifference(TimestampTz start_time,TimestampTz stop_time,long * secs,int * microsecs)627 feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
628 					  long *secs, int *microsecs)
629 {
630 	TimestampTz diff = stop_time - start_time;
631 
632 	if (diff <= 0)
633 	{
634 		*secs = 0;
635 		*microsecs = 0;
636 	}
637 	else
638 	{
639 		*secs = (long) (diff / USECS_PER_SEC);
640 		*microsecs = (int) (diff % USECS_PER_SEC);
641 	}
642 }
643 
644 /*
645  * Frontend version of TimestampDifferenceExceeds(), since we are not
646  * linked with backend code.
647  */
648 bool
feTimestampDifferenceExceeds(TimestampTz start_time,TimestampTz stop_time,int msec)649 feTimestampDifferenceExceeds(TimestampTz start_time,
650 							 TimestampTz stop_time,
651 							 int msec)
652 {
653 	TimestampTz diff = stop_time - start_time;
654 
655 	return (diff >= msec * INT64CONST(1000));
656 }
657 
658 /*
659  * Converts an int64 to network byte order.
660  */
661 void
fe_sendint64(int64 i,char * buf)662 fe_sendint64(int64 i, char *buf)
663 {
664 	uint64		n64 = pg_hton64(i);
665 
666 	memcpy(buf, &n64, sizeof(n64));
667 }
668 
669 /*
670  * Converts an int64 from network byte order to native format.
671  */
672 int64
fe_recvint64(char * buf)673 fe_recvint64(char *buf)
674 {
675 	uint64		n64;
676 
677 	memcpy(&n64, buf, sizeof(n64));
678 
679 	return pg_ntoh64(n64);
680 }
681