1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_db.c
4  *
5  *	Implements the basic DB functions used by the archiver.
6  *
7  * IDENTIFICATION
8  *	  src/bin/pg_dump/pg_backup_db.c
9  *
10  *-------------------------------------------------------------------------
11  */
12 #include "postgres_fe.h"
13 
14 #include "dumputils.h"
15 #include "fe_utils/connect.h"
16 #include "fe_utils/string_utils.h"
17 #include "parallel.h"
18 #include "pg_backup_archiver.h"
19 #include "pg_backup_db.h"
20 #include "pg_backup_utils.h"
21 
22 #include <unistd.h>
23 #include <ctype.h>
24 #ifdef HAVE_TERMIOS_H
25 #include <termios.h>
26 #endif
27 
28 
29 #define DB_MAX_ERR_STMT 128
30 
31 /* translator: this is a module name */
32 static const char *modulename = gettext_noop("archiver (db)");
33 
34 static void _check_database_version(ArchiveHandle *AH);
35 static void notice_processor(void *arg, const char *message);
36 
37 static void
_check_database_version(ArchiveHandle * AH)38 _check_database_version(ArchiveHandle *AH)
39 {
40 	const char *remoteversion_str;
41 	int			remoteversion;
42 	PGresult   *res;
43 
44 	remoteversion_str = PQparameterStatus(AH->connection, "server_version");
45 	remoteversion = PQserverVersion(AH->connection);
46 	if (remoteversion == 0 || !remoteversion_str)
47 		exit_horribly(modulename, "could not get server_version from libpq\n");
48 
49 	AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
50 	AH->public.remoteVersion = remoteversion;
51 	if (!AH->archiveRemoteVersion)
52 		AH->archiveRemoteVersion = AH->public.remoteVersionStr;
53 
54 	if (remoteversion != PG_VERSION_NUM
55 		&& (remoteversion < AH->public.minRemoteVersion ||
56 			remoteversion > AH->public.maxRemoteVersion))
57 	{
58 		write_msg(NULL, "server version: %s; %s version: %s\n",
59 				  remoteversion_str, progname, PG_VERSION);
60 		exit_horribly(NULL, "aborting because of server version mismatch\n");
61 	}
62 
63 	/*
64 	 * When running against 9.0 or later, check if we are in recovery mode,
65 	 * which means we are on a hot standby.
66 	 */
67 	if (remoteversion >= 90000)
68 	{
69 		res = ExecuteSqlQueryForSingleRow((Archive *) AH, "SELECT pg_catalog.pg_is_in_recovery()");
70 
71 		AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
72 		PQclear(res);
73 	}
74 	else
75 		AH->public.isStandby = false;
76 }
77 
78 /*
79  * Reconnect to the server.  If dbname is not NULL, use that database,
80  * else the one associated with the archive handle.
81  */
82 void
ReconnectToServer(ArchiveHandle * AH,const char * dbname)83 ReconnectToServer(ArchiveHandle *AH, const char *dbname)
84 {
85 	PGconn	   *oldConn = AH->connection;
86 	RestoreOptions *ropt = AH->public.ropt;
87 
88 	/*
89 	 * Save the dbname, if given, in override_dbname so that it will also
90 	 * affect any later reconnection attempt.
91 	 */
92 	if (dbname)
93 		ropt->cparams.override_dbname = pg_strdup(dbname);
94 
95 	/*
96 	 * Note: we want to establish the new connection, and in particular update
97 	 * ArchiveHandle's connCancel, before closing old connection.  Otherwise
98 	 * an ill-timed SIGINT could try to access a dead connection.
99 	 */
100 	AH->connection = NULL;		/* dodge error check in ConnectDatabase */
101 
102 	ConnectDatabase((Archive *) AH, &ropt->cparams, true);
103 
104 	PQfinish(oldConn);
105 }
106 
107 /*
108  * Make, or remake, a database connection with the given parameters.
109  *
110  * The resulting connection handle is stored in AHX->connection.
111  *
112  * An interactive password prompt is automatically issued if required.
113  * We store the results of that in AHX->savedPassword.
114  * Note: it's not really all that sensible to use a single-entry password
115  * cache if the username keeps changing.  In current usage, however, the
116  * username never does change, so one savedPassword is sufficient.
117  */
118 void
ConnectDatabase(Archive * AHX,const ConnParams * cparams,bool isReconnect)119 ConnectDatabase(Archive *AHX,
120 				const ConnParams *cparams,
121 				bool isReconnect)
122 {
123 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
124 	trivalue	prompt_password;
125 	char	   *password;
126 	bool		new_pass;
127 
128 	if (AH->connection)
129 		exit_horribly(modulename, "already connected to a database\n");
130 
131 	/* Never prompt for a password during a reconnection */
132 	prompt_password = isReconnect ? TRI_NO : cparams->promptPassword;
133 
134 	password = AH->savedPassword ? pg_strdup(AH->savedPassword) : NULL;
135 
136 	if (prompt_password == TRI_YES && password == NULL)
137 	{
138 		password = simple_prompt("Password: ", 100, false);
139 		if (password == NULL)
140 			exit_horribly(modulename, "out of memory\n");
141 	}
142 
143 	/*
144 	 * Start the connection.  Loop until we have a password if requested by
145 	 * backend.
146 	 */
147 	do
148 	{
149 		const char *keywords[8];
150 		const char *values[8];
151 		int			i = 0;
152 
153 		/*
154 		 * If dbname is a connstring, its entries can override the other
155 		 * values obtained from cparams; but in turn, override_dbname can
156 		 * override the dbname component of it.
157 		 */
158 		keywords[i] = "host";
159 		values[i++] = cparams->pghost;
160 		keywords[i] = "port";
161 		values[i++] = cparams->pgport;
162 		keywords[i] = "user";
163 		values[i++] = cparams->username;
164 		keywords[i] = "password";
165 		values[i++] = password;
166 		keywords[i] = "dbname";
167 		values[i++] = cparams->dbname;
168 		if (cparams->override_dbname)
169 		{
170 			keywords[i] = "dbname";
171 			values[i++] = cparams->override_dbname;
172 		}
173 		keywords[i] = "fallback_application_name";
174 		values[i++] = progname;
175 		keywords[i] = NULL;
176 		values[i++] = NULL;
177 		Assert(i <= lengthof(keywords));
178 
179 		new_pass = false;
180 		AH->connection = PQconnectdbParams(keywords, values, true);
181 
182 		if (!AH->connection)
183 			exit_horribly(modulename, "failed to connect to database\n");
184 
185 		if (PQstatus(AH->connection) == CONNECTION_BAD &&
186 			PQconnectionNeedsPassword(AH->connection) &&
187 			password == NULL &&
188 			prompt_password != TRI_NO)
189 		{
190 			PQfinish(AH->connection);
191 			password = simple_prompt("Password: ", 100, false);
192 			if (password == NULL)
193 				exit_horribly(modulename, "out of memory\n");
194 			new_pass = true;
195 		}
196 	} while (new_pass);
197 
198 	/* check to see that the backend connection was successfully made */
199 	if (PQstatus(AH->connection) == CONNECTION_BAD)
200 	{
201 		if (isReconnect)
202 			exit_horribly(modulename, "reconnection to database \"%s\" failed: %s",
203 						  PQdb(AH->connection) ? PQdb(AH->connection) : "",
204 						  PQerrorMessage(AH->connection));
205 		else
206 			exit_horribly(modulename, "connection to database \"%s\" failed: %s",
207 						  PQdb(AH->connection) ? PQdb(AH->connection) : "",
208 						  PQerrorMessage(AH->connection));
209 	}
210 
211 	/* Start strict; later phases may override this. */
212 	if (PQserverVersion(AH->connection) >= 70300)
213 		PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
214 											ALWAYS_SECURE_SEARCH_PATH_SQL));
215 
216 	/*
217 	 * We want to remember connection's actual password, whether or not we got
218 	 * it by prompting.  So we don't just store the password variable.
219 	 */
220 	if (PQconnectionUsedPassword(AH->connection))
221 	{
222 		if (AH->savedPassword)
223 			free(AH->savedPassword);
224 		AH->savedPassword = pg_strdup(PQpass(AH->connection));
225 	}
226 	if (password)
227 		free(password);
228 
229 	/* check for version mismatch */
230 	_check_database_version(AH);
231 
232 	PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
233 
234 	/* arrange for SIGINT to issue a query cancel on this connection */
235 	set_archive_cancel_info(AH, AH->connection);
236 }
237 
238 /*
239  * Close the connection to the database and also cancel off the query if we
240  * have one running.
241  */
242 void
DisconnectDatabase(Archive * AHX)243 DisconnectDatabase(Archive *AHX)
244 {
245 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
246 	char		errbuf[1];
247 
248 	if (!AH->connection)
249 		return;
250 
251 	if (AH->connCancel)
252 	{
253 		/*
254 		 * If we have an active query, send a cancel before closing, ignoring
255 		 * any errors.  This is of no use for a normal exit, but might be
256 		 * helpful during exit_horribly().
257 		 */
258 		if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
259 			(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
260 
261 		/*
262 		 * Prevent signal handler from sending a cancel after this.
263 		 */
264 		set_archive_cancel_info(AH, NULL);
265 	}
266 
267 	PQfinish(AH->connection);
268 	AH->connection = NULL;
269 }
270 
271 PGconn *
GetConnection(Archive * AHX)272 GetConnection(Archive *AHX)
273 {
274 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
275 
276 	return AH->connection;
277 }
278 
279 static void
notice_processor(void * arg,const char * message)280 notice_processor(void *arg, const char *message)
281 {
282 	write_msg(NULL, "%s", message);
283 }
284 
285 /* Like exit_horribly(), but with a complaint about a particular query. */
286 static void
die_on_query_failure(ArchiveHandle * AH,const char * modulename,const char * query)287 die_on_query_failure(ArchiveHandle *AH, const char *modulename, const char *query)
288 {
289 	write_msg(modulename, "query failed: %s",
290 			  PQerrorMessage(AH->connection));
291 	exit_horribly(modulename, "query was: %s\n", query);
292 }
293 
294 void
ExecuteSqlStatement(Archive * AHX,const char * query)295 ExecuteSqlStatement(Archive *AHX, const char *query)
296 {
297 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
298 	PGresult   *res;
299 
300 	res = PQexec(AH->connection, query);
301 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
302 		die_on_query_failure(AH, modulename, query);
303 	PQclear(res);
304 }
305 
306 PGresult *
ExecuteSqlQuery(Archive * AHX,const char * query,ExecStatusType status)307 ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
308 {
309 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
310 	PGresult   *res;
311 
312 	res = PQexec(AH->connection, query);
313 	if (PQresultStatus(res) != status)
314 		die_on_query_failure(AH, modulename, query);
315 	return res;
316 }
317 
318 /*
319  * Execute an SQL query and verify that we got exactly one row back.
320  */
321 PGresult *
ExecuteSqlQueryForSingleRow(Archive * fout,char * query)322 ExecuteSqlQueryForSingleRow(Archive *fout, char *query)
323 {
324 	PGresult   *res;
325 	int			ntups;
326 
327 	res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK);
328 
329 	/* Expecting a single result only */
330 	ntups = PQntuples(res);
331 	if (ntups != 1)
332 		exit_horribly(NULL,
333 					  ngettext("query returned %d row instead of one: %s\n",
334 							   "query returned %d rows instead of one: %s\n",
335 							   ntups),
336 					  ntups, query);
337 
338 	return res;
339 }
340 
341 /*
342  * Convenience function to send a query.
343  * Monitors result to detect COPY statements
344  */
345 static void
ExecuteSqlCommand(ArchiveHandle * AH,const char * qry,const char * desc)346 ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
347 {
348 	PGconn	   *conn = AH->connection;
349 	PGresult   *res;
350 	char		errStmt[DB_MAX_ERR_STMT];
351 
352 #ifdef NOT_USED
353 	fprintf(stderr, "Executing: '%s'\n\n", qry);
354 #endif
355 	res = PQexec(conn, qry);
356 
357 	switch (PQresultStatus(res))
358 	{
359 		case PGRES_COMMAND_OK:
360 		case PGRES_TUPLES_OK:
361 		case PGRES_EMPTY_QUERY:
362 			/* A-OK */
363 			break;
364 		case PGRES_COPY_IN:
365 			/* Assume this is an expected result */
366 			AH->pgCopyIn = true;
367 			break;
368 		default:
369 			/* trouble */
370 			strncpy(errStmt, qry, DB_MAX_ERR_STMT);		/* strncpy required here */
371 			if (errStmt[DB_MAX_ERR_STMT - 1] != '\0')
372 			{
373 				errStmt[DB_MAX_ERR_STMT - 4] = '.';
374 				errStmt[DB_MAX_ERR_STMT - 3] = '.';
375 				errStmt[DB_MAX_ERR_STMT - 2] = '.';
376 				errStmt[DB_MAX_ERR_STMT - 1] = '\0';
377 			}
378 			warn_or_exit_horribly(AH, modulename, "%s: %s    Command was: %s\n",
379 								  desc, PQerrorMessage(conn), errStmt);
380 			break;
381 	}
382 
383 	PQclear(res);
384 }
385 
386 
387 /*
388  * Process non-COPY table data (that is, INSERT commands).
389  *
390  * The commands have been run together as one long string for compressibility,
391  * and we are receiving them in bufferloads with arbitrary boundaries, so we
392  * have to locate command boundaries and save partial commands across calls.
393  * All state must be kept in AH->sqlparse, not in local variables of this
394  * routine.  We assume that AH->sqlparse was filled with zeroes when created.
395  *
396  * We have to lex the data to the extent of identifying literals and quoted
397  * identifiers, so that we can recognize statement-terminating semicolons.
398  * We assume that INSERT data will not contain SQL comments, E'' literals,
399  * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
400  *
401  * Note: when restoring from a pre-9.0 dump file, this code is also used to
402  * process BLOB COMMENTS data, which has the same problem of containing
403  * multiple SQL commands that might be split across bufferloads.  Fortunately,
404  * that data won't contain anything complicated to lex either.
405  */
406 static void
ExecuteSimpleCommands(ArchiveHandle * AH,const char * buf,size_t bufLen)407 ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
408 {
409 	const char *qry = buf;
410 	const char *eos = buf + bufLen;
411 
412 	/* initialize command buffer if first time through */
413 	if (AH->sqlparse.curCmd == NULL)
414 		AH->sqlparse.curCmd = createPQExpBuffer();
415 
416 	for (; qry < eos; qry++)
417 	{
418 		char		ch = *qry;
419 
420 		/* For neatness, we skip any newlines between commands */
421 		if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
422 			appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
423 
424 		switch (AH->sqlparse.state)
425 		{
426 			case SQL_SCAN:		/* Default state == 0, set in _allocAH */
427 				if (ch == ';')
428 				{
429 					/*
430 					 * We've found the end of a statement. Send it and reset
431 					 * the buffer.
432 					 */
433 					ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
434 									  "could not execute query");
435 					resetPQExpBuffer(AH->sqlparse.curCmd);
436 				}
437 				else if (ch == '\'')
438 				{
439 					AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
440 					AH->sqlparse.backSlash = false;
441 				}
442 				else if (ch == '"')
443 				{
444 					AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
445 				}
446 				break;
447 
448 			case SQL_IN_SINGLE_QUOTE:
449 				/* We needn't handle '' specially */
450 				if (ch == '\'' && !AH->sqlparse.backSlash)
451 					AH->sqlparse.state = SQL_SCAN;
452 				else if (ch == '\\' && !AH->public.std_strings)
453 					AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
454 				else
455 					AH->sqlparse.backSlash = false;
456 				break;
457 
458 			case SQL_IN_DOUBLE_QUOTE:
459 				/* We needn't handle "" specially */
460 				if (ch == '"')
461 					AH->sqlparse.state = SQL_SCAN;
462 				break;
463 		}
464 	}
465 }
466 
467 
468 /*
469  * Implement ahwrite() for direct-to-DB restore
470  */
471 int
ExecuteSqlCommandBuf(Archive * AHX,const char * buf,size_t bufLen)472 ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
473 {
474 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
475 
476 	if (AH->outputKind == OUTPUT_COPYDATA)
477 	{
478 		/*
479 		 * COPY data.
480 		 *
481 		 * We drop the data on the floor if libpq has failed to enter COPY
482 		 * mode; this allows us to behave reasonably when trying to continue
483 		 * after an error in a COPY command.
484 		 */
485 		if (AH->pgCopyIn &&
486 			PQputCopyData(AH->connection, buf, bufLen) <= 0)
487 			exit_horribly(modulename, "error returned by PQputCopyData: %s",
488 						  PQerrorMessage(AH->connection));
489 	}
490 	else if (AH->outputKind == OUTPUT_OTHERDATA)
491 	{
492 		/*
493 		 * Table data expressed as INSERT commands; or, in old dump files,
494 		 * BLOB COMMENTS data (which is expressed as COMMENT ON commands).
495 		 */
496 		ExecuteSimpleCommands(AH, buf, bufLen);
497 	}
498 	else
499 	{
500 		/*
501 		 * General SQL commands; we assume that commands will not be split
502 		 * across calls.
503 		 *
504 		 * In most cases the data passed to us will be a null-terminated
505 		 * string, but if it's not, we have to add a trailing null.
506 		 */
507 		if (buf[bufLen] == '\0')
508 			ExecuteSqlCommand(AH, buf, "could not execute query");
509 		else
510 		{
511 			char	   *str = (char *) pg_malloc(bufLen + 1);
512 
513 			memcpy(str, buf, bufLen);
514 			str[bufLen] = '\0';
515 			ExecuteSqlCommand(AH, str, "could not execute query");
516 			free(str);
517 		}
518 	}
519 
520 	return bufLen;
521 }
522 
523 /*
524  * Terminate a COPY operation during direct-to-DB restore
525  */
526 void
EndDBCopyMode(Archive * AHX,const char * tocEntryTag)527 EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
528 {
529 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
530 
531 	if (AH->pgCopyIn)
532 	{
533 		PGresult   *res;
534 
535 		if (PQputCopyEnd(AH->connection, NULL) <= 0)
536 			exit_horribly(modulename, "error returned by PQputCopyEnd: %s",
537 						  PQerrorMessage(AH->connection));
538 
539 		/* Check command status and return to normal libpq state */
540 		res = PQgetResult(AH->connection);
541 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
542 			warn_or_exit_horribly(AH, modulename, "COPY failed for table \"%s\": %s",
543 								tocEntryTag, PQerrorMessage(AH->connection));
544 		PQclear(res);
545 
546 		/* Do this to ensure we've pumped libpq back to idle state */
547 		if (PQgetResult(AH->connection) != NULL)
548 			write_msg(NULL, "WARNING: unexpected extra results during COPY of table \"%s\"\n",
549 					  tocEntryTag);
550 
551 		AH->pgCopyIn = false;
552 	}
553 }
554 
555 void
StartTransaction(Archive * AHX)556 StartTransaction(Archive *AHX)
557 {
558 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
559 
560 	ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
561 }
562 
563 void
CommitTransaction(Archive * AHX)564 CommitTransaction(Archive *AHX)
565 {
566 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
567 
568 	ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
569 }
570 
571 void
DropBlobIfExists(ArchiveHandle * AH,Oid oid)572 DropBlobIfExists(ArchiveHandle *AH, Oid oid)
573 {
574 	/*
575 	 * If we are not restoring to a direct database connection, we have to
576 	 * guess about how to detect whether the blob exists.  Assume new-style.
577 	 */
578 	if (AH->connection == NULL ||
579 		PQserverVersion(AH->connection) >= 90000)
580 	{
581 		ahprintf(AH,
582 				 "SELECT pg_catalog.lo_unlink(oid) "
583 				 "FROM pg_catalog.pg_largeobject_metadata "
584 				 "WHERE oid = '%u';\n",
585 				 oid);
586 	}
587 	else
588 	{
589 		/* Restoring to pre-9.0 server, so do it the old way */
590 		ahprintf(AH,
591 				 "SELECT CASE WHEN EXISTS("
592 				 "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
593 				 ") THEN pg_catalog.lo_unlink('%u') END;\n",
594 				 oid, oid);
595 	}
596 }
597