1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_db.c
4  *
5  *	Implements the basic DB functions used by the archiver.
6  *
7  * IDENTIFICATION
8  *	  src/bin/pg_dump/pg_backup_db.c
9  *
10  *-------------------------------------------------------------------------
11  */
12 #include "postgres_fe.h"
13 
14 #include <unistd.h>
15 #include <ctype.h>
16 #ifdef HAVE_TERMIOS_H
17 #include <termios.h>
18 #endif
19 
20 #include "common/connect.h"
21 #include "common/string.h"
22 #include "dumputils.h"
23 #include "fe_utils/string_utils.h"
24 #include "parallel.h"
25 #include "pg_backup_archiver.h"
26 #include "pg_backup_db.h"
27 #include "pg_backup_utils.h"
28 
29 static void _check_database_version(ArchiveHandle *AH);
30 static void notice_processor(void *arg, const char *message);
31 
32 static void
_check_database_version(ArchiveHandle * AH)33 _check_database_version(ArchiveHandle *AH)
34 {
35 	const char *remoteversion_str;
36 	int			remoteversion;
37 	PGresult   *res;
38 
39 	remoteversion_str = PQparameterStatus(AH->connection, "server_version");
40 	remoteversion = PQserverVersion(AH->connection);
41 	if (remoteversion == 0 || !remoteversion_str)
42 		fatal("could not get server_version from libpq");
43 
44 	AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
45 	AH->public.remoteVersion = remoteversion;
46 	if (!AH->archiveRemoteVersion)
47 		AH->archiveRemoteVersion = AH->public.remoteVersionStr;
48 
49 	if (remoteversion != PG_VERSION_NUM
50 		&& (remoteversion < AH->public.minRemoteVersion ||
51 			remoteversion > AH->public.maxRemoteVersion))
52 	{
53 		pg_log_error("server version: %s; %s version: %s",
54 					 remoteversion_str, progname, PG_VERSION);
55 		fatal("aborting because of server version mismatch");
56 	}
57 
58 	/*
59 	 * When running against 9.0 or later, check if we are in recovery mode,
60 	 * which means we are on a hot standby.
61 	 */
62 	if (remoteversion >= 90000)
63 	{
64 		res = ExecuteSqlQueryForSingleRow((Archive *) AH, "SELECT pg_catalog.pg_is_in_recovery()");
65 
66 		AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
67 		PQclear(res);
68 	}
69 	else
70 		AH->public.isStandby = false;
71 }
72 
73 /*
74  * Reconnect to the server.  If dbname is not NULL, use that database,
75  * else the one associated with the archive handle.
76  */
77 void
ReconnectToServer(ArchiveHandle * AH,const char * dbname)78 ReconnectToServer(ArchiveHandle *AH, const char *dbname)
79 {
80 	PGconn	   *oldConn = AH->connection;
81 	RestoreOptions *ropt = AH->public.ropt;
82 
83 	/*
84 	 * Save the dbname, if given, in override_dbname so that it will also
85 	 * affect any later reconnection attempt.
86 	 */
87 	if (dbname)
88 		ropt->cparams.override_dbname = pg_strdup(dbname);
89 
90 	/*
91 	 * Note: we want to establish the new connection, and in particular update
92 	 * ArchiveHandle's connCancel, before closing old connection.  Otherwise
93 	 * an ill-timed SIGINT could try to access a dead connection.
94 	 */
95 	AH->connection = NULL;		/* dodge error check in ConnectDatabase */
96 
97 	ConnectDatabase((Archive *) AH, &ropt->cparams, true);
98 
99 	PQfinish(oldConn);
100 }
101 
102 /*
103  * Make, or remake, a database connection with the given parameters.
104  *
105  * The resulting connection handle is stored in AHX->connection.
106  *
107  * An interactive password prompt is automatically issued if required.
108  * We store the results of that in AHX->savedPassword.
109  * Note: it's not really all that sensible to use a single-entry password
110  * cache if the username keeps changing.  In current usage, however, the
111  * username never does change, so one savedPassword is sufficient.
112  */
113 void
ConnectDatabase(Archive * AHX,const ConnParams * cparams,bool isReconnect)114 ConnectDatabase(Archive *AHX,
115 				const ConnParams *cparams,
116 				bool isReconnect)
117 {
118 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
119 	trivalue	prompt_password;
120 	char	   *password;
121 	bool		new_pass;
122 
123 	if (AH->connection)
124 		fatal("already connected to a database");
125 
126 	/* Never prompt for a password during a reconnection */
127 	prompt_password = isReconnect ? TRI_NO : cparams->promptPassword;
128 
129 	password = AH->savedPassword;
130 
131 	if (prompt_password == TRI_YES && password == NULL)
132 		password = simple_prompt("Password: ", false);
133 
134 	/*
135 	 * Start the connection.  Loop until we have a password if requested by
136 	 * backend.
137 	 */
138 	do
139 	{
140 		const char *keywords[8];
141 		const char *values[8];
142 		int			i = 0;
143 
144 		/*
145 		 * If dbname is a connstring, its entries can override the other
146 		 * values obtained from cparams; but in turn, override_dbname can
147 		 * override the dbname component of it.
148 		 */
149 		keywords[i] = "host";
150 		values[i++] = cparams->pghost;
151 		keywords[i] = "port";
152 		values[i++] = cparams->pgport;
153 		keywords[i] = "user";
154 		values[i++] = cparams->username;
155 		keywords[i] = "password";
156 		values[i++] = password;
157 		keywords[i] = "dbname";
158 		values[i++] = cparams->dbname;
159 		if (cparams->override_dbname)
160 		{
161 			keywords[i] = "dbname";
162 			values[i++] = cparams->override_dbname;
163 		}
164 		keywords[i] = "fallback_application_name";
165 		values[i++] = progname;
166 		keywords[i] = NULL;
167 		values[i++] = NULL;
168 		Assert(i <= lengthof(keywords));
169 
170 		new_pass = false;
171 		AH->connection = PQconnectdbParams(keywords, values, true);
172 
173 		if (!AH->connection)
174 			fatal("could not connect to database");
175 
176 		if (PQstatus(AH->connection) == CONNECTION_BAD &&
177 			PQconnectionNeedsPassword(AH->connection) &&
178 			password == NULL &&
179 			prompt_password != TRI_NO)
180 		{
181 			PQfinish(AH->connection);
182 			password = simple_prompt("Password: ", false);
183 			new_pass = true;
184 		}
185 	} while (new_pass);
186 
187 	/* check to see that the backend connection was successfully made */
188 	if (PQstatus(AH->connection) == CONNECTION_BAD)
189 	{
190 		if (isReconnect)
191 			fatal("reconnection failed: %s",
192 				  PQerrorMessage(AH->connection));
193 		else
194 			fatal("%s",
195 				  PQerrorMessage(AH->connection));
196 	}
197 
198 	/* Start strict; later phases may override this. */
199 	PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
200 										ALWAYS_SECURE_SEARCH_PATH_SQL));
201 
202 	if (password && password != AH->savedPassword)
203 		free(password);
204 
205 	/*
206 	 * We want to remember connection's actual password, whether or not we got
207 	 * it by prompting.  So we don't just store the password variable.
208 	 */
209 	if (PQconnectionUsedPassword(AH->connection))
210 	{
211 		if (AH->savedPassword)
212 			free(AH->savedPassword);
213 		AH->savedPassword = pg_strdup(PQpass(AH->connection));
214 	}
215 
216 	/* check for version mismatch */
217 	_check_database_version(AH);
218 
219 	PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
220 
221 	/* arrange for SIGINT to issue a query cancel on this connection */
222 	set_archive_cancel_info(AH, AH->connection);
223 }
224 
225 /*
226  * Close the connection to the database and also cancel off the query if we
227  * have one running.
228  */
229 void
DisconnectDatabase(Archive * AHX)230 DisconnectDatabase(Archive *AHX)
231 {
232 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
233 	char		errbuf[1];
234 
235 	if (!AH->connection)
236 		return;
237 
238 	if (AH->connCancel)
239 	{
240 		/*
241 		 * If we have an active query, send a cancel before closing, ignoring
242 		 * any errors.  This is of no use for a normal exit, but might be
243 		 * helpful during fatal().
244 		 */
245 		if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
246 			(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
247 
248 		/*
249 		 * Prevent signal handler from sending a cancel after this.
250 		 */
251 		set_archive_cancel_info(AH, NULL);
252 	}
253 
254 	PQfinish(AH->connection);
255 	AH->connection = NULL;
256 }
257 
258 PGconn *
GetConnection(Archive * AHX)259 GetConnection(Archive *AHX)
260 {
261 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
262 
263 	return AH->connection;
264 }
265 
266 static void
notice_processor(void * arg,const char * message)267 notice_processor(void *arg, const char *message)
268 {
269 	pg_log_generic(PG_LOG_INFO, "%s", message);
270 }
271 
272 /* Like fatal(), but with a complaint about a particular query. */
273 static void
die_on_query_failure(ArchiveHandle * AH,const char * query)274 die_on_query_failure(ArchiveHandle *AH, const char *query)
275 {
276 	pg_log_error("query failed: %s",
277 				 PQerrorMessage(AH->connection));
278 	fatal("query was: %s", query);
279 }
280 
281 void
ExecuteSqlStatement(Archive * AHX,const char * query)282 ExecuteSqlStatement(Archive *AHX, const char *query)
283 {
284 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
285 	PGresult   *res;
286 
287 	res = PQexec(AH->connection, query);
288 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
289 		die_on_query_failure(AH, query);
290 	PQclear(res);
291 }
292 
293 PGresult *
ExecuteSqlQuery(Archive * AHX,const char * query,ExecStatusType status)294 ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
295 {
296 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
297 	PGresult   *res;
298 
299 	res = PQexec(AH->connection, query);
300 	if (PQresultStatus(res) != status)
301 		die_on_query_failure(AH, query);
302 	return res;
303 }
304 
305 /*
306  * Execute an SQL query and verify that we got exactly one row back.
307  */
308 PGresult *
ExecuteSqlQueryForSingleRow(Archive * fout,const char * query)309 ExecuteSqlQueryForSingleRow(Archive *fout, const char *query)
310 {
311 	PGresult   *res;
312 	int			ntups;
313 
314 	res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK);
315 
316 	/* Expecting a single result only */
317 	ntups = PQntuples(res);
318 	if (ntups != 1)
319 		fatal(ngettext("query returned %d row instead of one: %s",
320 					   "query returned %d rows instead of one: %s",
321 					   ntups),
322 			  ntups, query);
323 
324 	return res;
325 }
326 
327 /*
328  * Convenience function to send a query.
329  * Monitors result to detect COPY statements
330  */
331 static void
ExecuteSqlCommand(ArchiveHandle * AH,const char * qry,const char * desc)332 ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
333 {
334 	PGconn	   *conn = AH->connection;
335 	PGresult   *res;
336 
337 #ifdef NOT_USED
338 	fprintf(stderr, "Executing: '%s'\n\n", qry);
339 #endif
340 	res = PQexec(conn, qry);
341 
342 	switch (PQresultStatus(res))
343 	{
344 		case PGRES_COMMAND_OK:
345 		case PGRES_TUPLES_OK:
346 		case PGRES_EMPTY_QUERY:
347 			/* A-OK */
348 			break;
349 		case PGRES_COPY_IN:
350 			/* Assume this is an expected result */
351 			AH->pgCopyIn = true;
352 			break;
353 		default:
354 			/* trouble */
355 			warn_or_exit_horribly(AH, "%s: %sCommand was: %s",
356 								  desc, PQerrorMessage(conn), qry);
357 			break;
358 	}
359 
360 	PQclear(res);
361 }
362 
363 
364 /*
365  * Process non-COPY table data (that is, INSERT commands).
366  *
367  * The commands have been run together as one long string for compressibility,
368  * and we are receiving them in bufferloads with arbitrary boundaries, so we
369  * have to locate command boundaries and save partial commands across calls.
370  * All state must be kept in AH->sqlparse, not in local variables of this
371  * routine.  We assume that AH->sqlparse was filled with zeroes when created.
372  *
373  * We have to lex the data to the extent of identifying literals and quoted
374  * identifiers, so that we can recognize statement-terminating semicolons.
375  * We assume that INSERT data will not contain SQL comments, E'' literals,
376  * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
377  *
378  * Note: when restoring from a pre-9.0 dump file, this code is also used to
379  * process BLOB COMMENTS data, which has the same problem of containing
380  * multiple SQL commands that might be split across bufferloads.  Fortunately,
381  * that data won't contain anything complicated to lex either.
382  */
383 static void
ExecuteSimpleCommands(ArchiveHandle * AH,const char * buf,size_t bufLen)384 ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
385 {
386 	const char *qry = buf;
387 	const char *eos = buf + bufLen;
388 
389 	/* initialize command buffer if first time through */
390 	if (AH->sqlparse.curCmd == NULL)
391 		AH->sqlparse.curCmd = createPQExpBuffer();
392 
393 	for (; qry < eos; qry++)
394 	{
395 		char		ch = *qry;
396 
397 		/* For neatness, we skip any newlines between commands */
398 		if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
399 			appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
400 
401 		switch (AH->sqlparse.state)
402 		{
403 			case SQL_SCAN:		/* Default state == 0, set in _allocAH */
404 				if (ch == ';')
405 				{
406 					/*
407 					 * We've found the end of a statement. Send it and reset
408 					 * the buffer.
409 					 */
410 					ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
411 									  "could not execute query");
412 					resetPQExpBuffer(AH->sqlparse.curCmd);
413 				}
414 				else if (ch == '\'')
415 				{
416 					AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
417 					AH->sqlparse.backSlash = false;
418 				}
419 				else if (ch == '"')
420 				{
421 					AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
422 				}
423 				break;
424 
425 			case SQL_IN_SINGLE_QUOTE:
426 				/* We needn't handle '' specially */
427 				if (ch == '\'' && !AH->sqlparse.backSlash)
428 					AH->sqlparse.state = SQL_SCAN;
429 				else if (ch == '\\' && !AH->public.std_strings)
430 					AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
431 				else
432 					AH->sqlparse.backSlash = false;
433 				break;
434 
435 			case SQL_IN_DOUBLE_QUOTE:
436 				/* We needn't handle "" specially */
437 				if (ch == '"')
438 					AH->sqlparse.state = SQL_SCAN;
439 				break;
440 		}
441 	}
442 }
443 
444 
445 /*
446  * Implement ahwrite() for direct-to-DB restore
447  */
448 int
ExecuteSqlCommandBuf(Archive * AHX,const char * buf,size_t bufLen)449 ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
450 {
451 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
452 
453 	if (AH->outputKind == OUTPUT_COPYDATA)
454 	{
455 		/*
456 		 * COPY data.
457 		 *
458 		 * We drop the data on the floor if libpq has failed to enter COPY
459 		 * mode; this allows us to behave reasonably when trying to continue
460 		 * after an error in a COPY command.
461 		 */
462 		if (AH->pgCopyIn &&
463 			PQputCopyData(AH->connection, buf, bufLen) <= 0)
464 			fatal("error returned by PQputCopyData: %s",
465 				  PQerrorMessage(AH->connection));
466 	}
467 	else if (AH->outputKind == OUTPUT_OTHERDATA)
468 	{
469 		/*
470 		 * Table data expressed as INSERT commands; or, in old dump files,
471 		 * BLOB COMMENTS data (which is expressed as COMMENT ON commands).
472 		 */
473 		ExecuteSimpleCommands(AH, buf, bufLen);
474 	}
475 	else
476 	{
477 		/*
478 		 * General SQL commands; we assume that commands will not be split
479 		 * across calls.
480 		 *
481 		 * In most cases the data passed to us will be a null-terminated
482 		 * string, but if it's not, we have to add a trailing null.
483 		 */
484 		if (buf[bufLen] == '\0')
485 			ExecuteSqlCommand(AH, buf, "could not execute query");
486 		else
487 		{
488 			char	   *str = (char *) pg_malloc(bufLen + 1);
489 
490 			memcpy(str, buf, bufLen);
491 			str[bufLen] = '\0';
492 			ExecuteSqlCommand(AH, str, "could not execute query");
493 			free(str);
494 		}
495 	}
496 
497 	return bufLen;
498 }
499 
500 /*
501  * Terminate a COPY operation during direct-to-DB restore
502  */
503 void
EndDBCopyMode(Archive * AHX,const char * tocEntryTag)504 EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
505 {
506 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
507 
508 	if (AH->pgCopyIn)
509 	{
510 		PGresult   *res;
511 
512 		if (PQputCopyEnd(AH->connection, NULL) <= 0)
513 			fatal("error returned by PQputCopyEnd: %s",
514 				  PQerrorMessage(AH->connection));
515 
516 		/* Check command status and return to normal libpq state */
517 		res = PQgetResult(AH->connection);
518 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
519 			warn_or_exit_horribly(AH, "COPY failed for table \"%s\": %s",
520 								  tocEntryTag, PQerrorMessage(AH->connection));
521 		PQclear(res);
522 
523 		/* Do this to ensure we've pumped libpq back to idle state */
524 		if (PQgetResult(AH->connection) != NULL)
525 			pg_log_warning("unexpected extra results during COPY of table \"%s\"",
526 						   tocEntryTag);
527 
528 		AH->pgCopyIn = false;
529 	}
530 }
531 
532 void
StartTransaction(Archive * AHX)533 StartTransaction(Archive *AHX)
534 {
535 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
536 
537 	ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
538 }
539 
540 void
CommitTransaction(Archive * AHX)541 CommitTransaction(Archive *AHX)
542 {
543 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
544 
545 	ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
546 }
547 
548 void
DropBlobIfExists(ArchiveHandle * AH,Oid oid)549 DropBlobIfExists(ArchiveHandle *AH, Oid oid)
550 {
551 	/*
552 	 * If we are not restoring to a direct database connection, we have to
553 	 * guess about how to detect whether the blob exists.  Assume new-style.
554 	 */
555 	if (AH->connection == NULL ||
556 		PQserverVersion(AH->connection) >= 90000)
557 	{
558 		ahprintf(AH,
559 				 "SELECT pg_catalog.lo_unlink(oid) "
560 				 "FROM pg_catalog.pg_largeobject_metadata "
561 				 "WHERE oid = '%u';\n",
562 				 oid);
563 	}
564 	else
565 	{
566 		/* Restoring to pre-9.0 server, so do it the old way */
567 		ahprintf(AH,
568 				 "SELECT CASE WHEN EXISTS("
569 				 "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
570 				 ") THEN pg_catalog.lo_unlink('%u') END;\n",
571 				 oid, oid);
572 	}
573 }
574