1 /*-------------------------------------------------------------------------
2  *
3  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
4  * 					pg_recvlogical
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  *		  src/bin/pg_basebackup/streamutil.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/time.h>
18 #include <unistd.h>
19 
20 /* for ntohl/htonl */
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
23 
24 /* local includes */
25 #include "receivelog.h"
26 #include "streamutil.h"
27 
28 #include "pqexpbuffer.h"
29 #include "common/fe_memutils.h"
30 #include "datatype/timestamp.h"
31 #include "fe_utils/connect.h"
32 
33 #define ERRCODE_DUPLICATE_OBJECT  "42710"
34 
35 const char *progname;
36 char	   *connection_string = NULL;
37 char	   *dbhost = NULL;
38 char	   *dbuser = NULL;
39 char	   *dbport = NULL;
40 char	   *dbname = NULL;
41 int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
42 static bool have_password = false;
43 static char password[100];
44 PGconn	   *conn = NULL;
45 
46 /*
47  * Connect to the server. Returns a valid PGconn pointer if connected,
48  * or NULL on non-permanent error. On permanent error, the function will
49  * call exit(1) directly.
50  */
51 PGconn *
GetConnection(void)52 GetConnection(void)
53 {
54 	PGconn	   *tmpconn;
55 	int			argcount = 7;	/* dbname, replication, fallback_app_name,
56 								 * host, user, port, password */
57 	int			i;
58 	const char **keywords;
59 	const char **values;
60 	const char *tmpparam;
61 	bool		need_password;
62 	PQconninfoOption *conn_opts = NULL;
63 	PQconninfoOption *conn_opt;
64 	char	   *err_msg = NULL;
65 
66 	/* pg_recvlogical uses dbname only; others use connection_string only. */
67 	Assert(dbname == NULL || connection_string == NULL);
68 
69 	/*
70 	 * Merge the connection info inputs given in form of connection string,
71 	 * options and default values (dbname=replication, replication=true, etc.)
72 	 * Explicitly discard any dbname value in the connection string;
73 	 * otherwise, PQconnectdbParams() would interpret that value as being
74 	 * itself a connection string.
75 	 */
76 	i = 0;
77 	if (connection_string)
78 	{
79 		conn_opts = PQconninfoParse(connection_string, &err_msg);
80 		if (conn_opts == NULL)
81 		{
82 			fprintf(stderr, "%s: %s", progname, err_msg);
83 			exit(1);
84 		}
85 
86 		for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
87 		{
88 			if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
89 				strcmp(conn_opt->keyword, "dbname") != 0)
90 				argcount++;
91 		}
92 
93 		keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
94 		values = pg_malloc0((argcount + 1) * sizeof(*values));
95 
96 		for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
97 		{
98 			if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
99 				strcmp(conn_opt->keyword, "dbname") != 0)
100 			{
101 				keywords[i] = conn_opt->keyword;
102 				values[i] = conn_opt->val;
103 				i++;
104 			}
105 		}
106 	}
107 	else
108 	{
109 		keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
110 		values = pg_malloc0((argcount + 1) * sizeof(*values));
111 	}
112 
113 	keywords[i] = "dbname";
114 	values[i] = dbname == NULL ? "replication" : dbname;
115 	i++;
116 	keywords[i] = "replication";
117 	values[i] = dbname == NULL ? "true" : "database";
118 	i++;
119 	keywords[i] = "fallback_application_name";
120 	values[i] = progname;
121 	i++;
122 
123 	if (dbhost)
124 	{
125 		keywords[i] = "host";
126 		values[i] = dbhost;
127 		i++;
128 	}
129 	if (dbuser)
130 	{
131 		keywords[i] = "user";
132 		values[i] = dbuser;
133 		i++;
134 	}
135 	if (dbport)
136 	{
137 		keywords[i] = "port";
138 		values[i] = dbport;
139 		i++;
140 	}
141 
142 	/* If -W was given, force prompt for password, but only the first time */
143 	need_password = (dbgetpassword == 1 && !have_password);
144 
145 	do
146 	{
147 		/* Get a new password if appropriate */
148 		if (need_password)
149 		{
150 			simple_prompt("Password: ", password, sizeof(password), false);
151 			have_password = true;
152 			need_password = false;
153 		}
154 
155 		/* Use (or reuse, on a subsequent connection) password if we have it */
156 		if (have_password)
157 		{
158 			keywords[i] = "password";
159 			values[i] = password;
160 		}
161 		else
162 		{
163 			keywords[i] = NULL;
164 			values[i] = NULL;
165 		}
166 
167 		tmpconn = PQconnectdbParams(keywords, values, true);
168 
169 		/*
170 		 * If there is too little memory even to allocate the PGconn object
171 		 * and PQconnectdbParams returns NULL, we call exit(1) directly.
172 		 */
173 		if (!tmpconn)
174 		{
175 			fprintf(stderr, _("%s: could not connect to server\n"),
176 					progname);
177 			exit(1);
178 		}
179 
180 		/* If we need a password and -w wasn't given, loop back and get one */
181 		if (PQstatus(tmpconn) == CONNECTION_BAD &&
182 			PQconnectionNeedsPassword(tmpconn) &&
183 			dbgetpassword != -1)
184 		{
185 			PQfinish(tmpconn);
186 			need_password = true;
187 		}
188 	}
189 	while (need_password);
190 
191 	if (PQstatus(tmpconn) != CONNECTION_OK)
192 	{
193 		fprintf(stderr, _("%s: %s"), progname, PQerrorMessage(tmpconn));
194 		PQfinish(tmpconn);
195 		free(values);
196 		free(keywords);
197 		if (conn_opts)
198 			PQconninfoFree(conn_opts);
199 		return NULL;
200 	}
201 
202 	/* Connection ok! */
203 	free(values);
204 	free(keywords);
205 	if (conn_opts)
206 		PQconninfoFree(conn_opts);
207 
208 	/*
209 	 * Set always-secure search path, so malicious users can't get control.
210 	 * The capacity to run normal SQL queries was added in PostgreSQL
211 	 * 10, so the search path cannot be changed (by us or attackers) on
212 	 * earlier versions.
213 	 */
214 	if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
215 	{
216 		PGresult   *res;
217 
218 		res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
219 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
220 		{
221 			fprintf(stderr, _("%s: could not clear search_path: %s"),
222 					progname, PQerrorMessage(tmpconn));
223 			PQclear(res);
224 			PQfinish(tmpconn);
225 			exit(1);
226 		}
227 		PQclear(res);
228 	}
229 
230 	/*
231 	 * Ensure we have the same value of integer_datetimes (now always "on") as
232 	 * the server we are connecting to.
233 	 */
234 	tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
235 	if (!tmpparam)
236 	{
237 		fprintf(stderr,
238 				_("%s: could not determine server setting for integer_datetimes\n"),
239 				progname);
240 		PQfinish(tmpconn);
241 		exit(1);
242 	}
243 
244 	if (strcmp(tmpparam, "on") != 0)
245 	{
246 		fprintf(stderr,
247 				_("%s: integer_datetimes compile flag does not match server\n"),
248 				progname);
249 		PQfinish(tmpconn);
250 		exit(1);
251 	}
252 
253 	return tmpconn;
254 }
255 
256 /*
257  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
258  * some result information if requested:
259  * - System identifier
260  * - Current timeline ID
261  * - Start LSN position
262  * - Database name (NULL in servers prior to 9.4)
263  */
264 bool
RunIdentifySystem(PGconn * conn,char ** sysid,TimeLineID * starttli,XLogRecPtr * startpos,char ** db_name)265 RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
266 				  XLogRecPtr *startpos, char **db_name)
267 {
268 	PGresult   *res;
269 	uint32		hi,
270 				lo;
271 
272 	/* Check connection existence */
273 	Assert(conn != NULL);
274 
275 	res = PQexec(conn, "IDENTIFY_SYSTEM");
276 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
277 	{
278 		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
279 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
280 
281 		PQclear(res);
282 		return false;
283 	}
284 	if (PQntuples(res) != 1 || PQnfields(res) < 3)
285 	{
286 		fprintf(stderr,
287 				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
288 				progname, PQntuples(res), PQnfields(res), 1, 3);
289 
290 		PQclear(res);
291 		return false;
292 	}
293 
294 	/* Get system identifier */
295 	if (sysid != NULL)
296 		*sysid = pg_strdup(PQgetvalue(res, 0, 0));
297 
298 	/* Get timeline ID to start streaming from */
299 	if (starttli != NULL)
300 		*starttli = atoi(PQgetvalue(res, 0, 1));
301 
302 	/* Get LSN start position if necessary */
303 	if (startpos != NULL)
304 	{
305 		if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
306 		{
307 			fprintf(stderr,
308 					_("%s: could not parse write-ahead log location \"%s\"\n"),
309 					progname, PQgetvalue(res, 0, 2));
310 
311 			PQclear(res);
312 			return false;
313 		}
314 		*startpos = ((uint64) hi) << 32 | lo;
315 	}
316 
317 	/* Get database name, only available in 9.4 and newer versions */
318 	if (db_name != NULL)
319 	{
320 		*db_name = NULL;
321 		if (PQserverVersion(conn) >= 90400)
322 		{
323 			if (PQnfields(res) < 4)
324 			{
325 				fprintf(stderr,
326 						_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
327 						progname, PQntuples(res), PQnfields(res), 1, 4);
328 
329 				PQclear(res);
330 				return false;
331 			}
332 			if (!PQgetisnull(res, 0, 3))
333 				*db_name = pg_strdup(PQgetvalue(res, 0, 3));
334 		}
335 	}
336 
337 	PQclear(res);
338 	return true;
339 }
340 
341 /*
342  * Create a replication slot for the given connection. This function
343  * returns true in case of success.
344  */
345 bool
CreateReplicationSlot(PGconn * conn,const char * slot_name,const char * plugin,bool is_physical,bool slot_exists_ok)346 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
347 					  bool is_physical, bool slot_exists_ok)
348 {
349 	PQExpBuffer query;
350 	PGresult   *res;
351 
352 	query = createPQExpBuffer();
353 
354 	Assert((is_physical && plugin == NULL) ||
355 		   (!is_physical && plugin != NULL));
356 	Assert(slot_name != NULL);
357 
358 	/* Build query */
359 	if (is_physical)
360 		appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
361 						  slot_name);
362 	else
363 	{
364 		appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
365 						  slot_name, plugin);
366 		if (PQserverVersion(conn) >= 100000)
367 			/* pg_recvlogical doesn't use an exported snapshot, so suppress */
368 			appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
369 	}
370 
371 	res = PQexec(conn, query->data);
372 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
373 	{
374 		const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
375 
376 		if (slot_exists_ok &&
377 			sqlstate &&
378 			strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
379 		{
380 			destroyPQExpBuffer(query);
381 			PQclear(res);
382 			return true;
383 		}
384 		else
385 		{
386 			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
387 					progname, query->data, PQerrorMessage(conn));
388 
389 			destroyPQExpBuffer(query);
390 			PQclear(res);
391 			return false;
392 		}
393 	}
394 
395 	if (PQntuples(res) != 1 || PQnfields(res) != 4)
396 	{
397 		fprintf(stderr,
398 				_("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
399 				progname, slot_name,
400 				PQntuples(res), PQnfields(res), 1, 4);
401 
402 		destroyPQExpBuffer(query);
403 		PQclear(res);
404 		return false;
405 	}
406 
407 	destroyPQExpBuffer(query);
408 	PQclear(res);
409 	return true;
410 }
411 
412 /*
413  * Drop a replication slot for the given connection. This function
414  * returns true in case of success.
415  */
416 bool
DropReplicationSlot(PGconn * conn,const char * slot_name)417 DropReplicationSlot(PGconn *conn, const char *slot_name)
418 {
419 	PQExpBuffer query;
420 	PGresult   *res;
421 
422 	Assert(slot_name != NULL);
423 
424 	query = createPQExpBuffer();
425 
426 	/* Build query */
427 	appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
428 					  slot_name);
429 	res = PQexec(conn, query->data);
430 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
431 	{
432 		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
433 				progname, query->data, PQerrorMessage(conn));
434 
435 		destroyPQExpBuffer(query);
436 		PQclear(res);
437 		return false;
438 	}
439 
440 	if (PQntuples(res) != 0 || PQnfields(res) != 0)
441 	{
442 		fprintf(stderr,
443 				_("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
444 				progname, slot_name,
445 				PQntuples(res), PQnfields(res), 0, 0);
446 
447 		destroyPQExpBuffer(query);
448 		PQclear(res);
449 		return false;
450 	}
451 
452 	destroyPQExpBuffer(query);
453 	PQclear(res);
454 	return true;
455 }
456 
457 
458 /*
459  * Frontend version of GetCurrentTimestamp(), since we are not linked with
460  * backend code.
461  */
462 TimestampTz
feGetCurrentTimestamp(void)463 feGetCurrentTimestamp(void)
464 {
465 	TimestampTz result;
466 	struct timeval tp;
467 
468 	gettimeofday(&tp, NULL);
469 
470 	result = (TimestampTz) tp.tv_sec -
471 		((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
472 	result = (result * USECS_PER_SEC) + tp.tv_usec;
473 
474 	return result;
475 }
476 
477 /*
478  * Frontend version of TimestampDifference(), since we are not linked with
479  * backend code.
480  */
481 void
feTimestampDifference(TimestampTz start_time,TimestampTz stop_time,long * secs,int * microsecs)482 feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
483 					  long *secs, int *microsecs)
484 {
485 	TimestampTz diff = stop_time - start_time;
486 
487 	if (diff <= 0)
488 	{
489 		*secs = 0;
490 		*microsecs = 0;
491 	}
492 	else
493 	{
494 		*secs = (long) (diff / USECS_PER_SEC);
495 		*microsecs = (int) (diff % USECS_PER_SEC);
496 	}
497 }
498 
499 /*
500  * Frontend version of TimestampDifferenceExceeds(), since we are not
501  * linked with backend code.
502  */
503 bool
feTimestampDifferenceExceeds(TimestampTz start_time,TimestampTz stop_time,int msec)504 feTimestampDifferenceExceeds(TimestampTz start_time,
505 							 TimestampTz stop_time,
506 							 int msec)
507 {
508 	TimestampTz diff = stop_time - start_time;
509 
510 	return (diff >= msec * INT64CONST(1000));
511 }
512 
513 /*
514  * Converts an int64 to network byte order.
515  */
516 void
fe_sendint64(int64 i,char * buf)517 fe_sendint64(int64 i, char *buf)
518 {
519 	uint32		n32;
520 
521 	/* High order half first, since we're doing MSB-first */
522 	n32 = (uint32) (i >> 32);
523 	n32 = htonl(n32);
524 	memcpy(&buf[0], &n32, 4);
525 
526 	/* Now the low order half */
527 	n32 = (uint32) i;
528 	n32 = htonl(n32);
529 	memcpy(&buf[4], &n32, 4);
530 }
531 
532 /*
533  * Converts an int64 from network byte order to native format.
534  */
535 int64
fe_recvint64(char * buf)536 fe_recvint64(char *buf)
537 {
538 	int64		result;
539 	uint32		h32;
540 	uint32		l32;
541 
542 	memcpy(&h32, buf, 4);
543 	memcpy(&l32, buf + 4, 4);
544 	h32 = ntohl(h32);
545 	l32 = ntohl(l32);
546 
547 	result = h32;
548 	result <<= 32;
549 	result |= l32;
550 
551 	return result;
552 }
553