1 /*-------------------------------------------------------------------------
2 *
3 * pg_backup_db.c
4 *
5 * Implements the basic DB functions used by the archiver.
6 *
7 * IDENTIFICATION
8 * src/bin/pg_dump/pg_backup_db.c
9 *
10 *-------------------------------------------------------------------------
11 */
12 #include "postgres_fe.h"
13
14 #include "dumputils.h"
15 #include "fe_utils/connect.h"
16 #include "fe_utils/string_utils.h"
17 #include "parallel.h"
18 #include "pg_backup_archiver.h"
19 #include "pg_backup_db.h"
20 #include "pg_backup_utils.h"
21
22 #include <unistd.h>
23 #include <ctype.h>
24 #ifdef HAVE_TERMIOS_H
25 #include <termios.h>
26 #endif
27
28
29 #define DB_MAX_ERR_STMT 128
30
31 /* translator: this is a module name */
32 static const char *modulename = gettext_noop("archiver (db)");
33
34 static void _check_database_version(ArchiveHandle *AH);
35 static void notice_processor(void *arg, const char *message);
36
37 static void
_check_database_version(ArchiveHandle * AH)38 _check_database_version(ArchiveHandle *AH)
39 {
40 const char *remoteversion_str;
41 int remoteversion;
42 PGresult *res;
43
44 remoteversion_str = PQparameterStatus(AH->connection, "server_version");
45 remoteversion = PQserverVersion(AH->connection);
46 if (remoteversion == 0 || !remoteversion_str)
47 exit_horribly(modulename, "could not get server_version from libpq\n");
48
49 AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
50 AH->public.remoteVersion = remoteversion;
51 if (!AH->archiveRemoteVersion)
52 AH->archiveRemoteVersion = AH->public.remoteVersionStr;
53
54 if (remoteversion != PG_VERSION_NUM
55 && (remoteversion < AH->public.minRemoteVersion ||
56 remoteversion > AH->public.maxRemoteVersion))
57 {
58 write_msg(NULL, "server version: %s; %s version: %s\n",
59 remoteversion_str, progname, PG_VERSION);
60 exit_horribly(NULL, "aborting because of server version mismatch\n");
61 }
62
63 /*
64 * When running against 9.0 or later, check if we are in recovery mode,
65 * which means we are on a hot standby.
66 */
67 if (remoteversion >= 90000)
68 {
69 res = ExecuteSqlQueryForSingleRow((Archive *) AH, "SELECT pg_catalog.pg_is_in_recovery()");
70
71 AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
72 PQclear(res);
73 }
74 else
75 AH->public.isStandby = false;
76 }
77
78 /*
79 * Reconnect to the server. If dbname is not NULL, use that database,
80 * else the one associated with the archive handle.
81 */
82 void
ReconnectToServer(ArchiveHandle * AH,const char * dbname)83 ReconnectToServer(ArchiveHandle *AH, const char *dbname)
84 {
85 PGconn *oldConn = AH->connection;
86 RestoreOptions *ropt = AH->public.ropt;
87
88 /*
89 * Save the dbname, if given, in override_dbname so that it will also
90 * affect any later reconnection attempt.
91 */
92 if (dbname)
93 ropt->cparams.override_dbname = pg_strdup(dbname);
94
95 /*
96 * Note: we want to establish the new connection, and in particular update
97 * ArchiveHandle's connCancel, before closing old connection. Otherwise
98 * an ill-timed SIGINT could try to access a dead connection.
99 */
100 AH->connection = NULL; /* dodge error check in ConnectDatabase */
101
102 ConnectDatabase((Archive *) AH, &ropt->cparams, true);
103
104 PQfinish(oldConn);
105 }
106
107 /*
108 * Make, or remake, a database connection with the given parameters.
109 *
110 * The resulting connection handle is stored in AHX->connection.
111 *
112 * An interactive password prompt is automatically issued if required.
113 * We store the results of that in AHX->savedPassword.
114 * Note: it's not really all that sensible to use a single-entry password
115 * cache if the username keeps changing. In current usage, however, the
116 * username never does change, so one savedPassword is sufficient.
117 */
118 void
ConnectDatabase(Archive * AHX,const ConnParams * cparams,bool isReconnect)119 ConnectDatabase(Archive *AHX,
120 const ConnParams *cparams,
121 bool isReconnect)
122 {
123 ArchiveHandle *AH = (ArchiveHandle *) AHX;
124 trivalue prompt_password;
125 char *password;
126 bool new_pass;
127
128 if (AH->connection)
129 exit_horribly(modulename, "already connected to a database\n");
130
131 /* Never prompt for a password during a reconnection */
132 prompt_password = isReconnect ? TRI_NO : cparams->promptPassword;
133
134 password = AH->savedPassword ? pg_strdup(AH->savedPassword) : NULL;
135
136 if (prompt_password == TRI_YES && password == NULL)
137 {
138 password = simple_prompt("Password: ", 100, false);
139 if (password == NULL)
140 exit_horribly(modulename, "out of memory\n");
141 }
142
143 /*
144 * Start the connection. Loop until we have a password if requested by
145 * backend.
146 */
147 do
148 {
149 const char *keywords[8];
150 const char *values[8];
151 int i = 0;
152
153 /*
154 * If dbname is a connstring, its entries can override the other
155 * values obtained from cparams; but in turn, override_dbname can
156 * override the dbname component of it.
157 */
158 keywords[i] = "host";
159 values[i++] = cparams->pghost;
160 keywords[i] = "port";
161 values[i++] = cparams->pgport;
162 keywords[i] = "user";
163 values[i++] = cparams->username;
164 keywords[i] = "password";
165 values[i++] = password;
166 keywords[i] = "dbname";
167 values[i++] = cparams->dbname;
168 if (cparams->override_dbname)
169 {
170 keywords[i] = "dbname";
171 values[i++] = cparams->override_dbname;
172 }
173 keywords[i] = "fallback_application_name";
174 values[i++] = progname;
175 keywords[i] = NULL;
176 values[i++] = NULL;
177 Assert(i <= lengthof(keywords));
178
179 new_pass = false;
180 AH->connection = PQconnectdbParams(keywords, values, true);
181
182 if (!AH->connection)
183 exit_horribly(modulename, "failed to connect to database\n");
184
185 if (PQstatus(AH->connection) == CONNECTION_BAD &&
186 PQconnectionNeedsPassword(AH->connection) &&
187 password == NULL &&
188 prompt_password != TRI_NO)
189 {
190 PQfinish(AH->connection);
191 password = simple_prompt("Password: ", 100, false);
192 if (password == NULL)
193 exit_horribly(modulename, "out of memory\n");
194 new_pass = true;
195 }
196 } while (new_pass);
197
198 /* check to see that the backend connection was successfully made */
199 if (PQstatus(AH->connection) == CONNECTION_BAD)
200 {
201 if (isReconnect)
202 exit_horribly(modulename, "reconnection to database \"%s\" failed: %s",
203 PQdb(AH->connection) ? PQdb(AH->connection) : "",
204 PQerrorMessage(AH->connection));
205 else
206 exit_horribly(modulename, "connection to database \"%s\" failed: %s",
207 PQdb(AH->connection) ? PQdb(AH->connection) : "",
208 PQerrorMessage(AH->connection));
209 }
210
211 /* Start strict; later phases may override this. */
212 if (PQserverVersion(AH->connection) >= 70300)
213 PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
214 ALWAYS_SECURE_SEARCH_PATH_SQL));
215
216 /*
217 * We want to remember connection's actual password, whether or not we got
218 * it by prompting. So we don't just store the password variable.
219 */
220 if (PQconnectionUsedPassword(AH->connection))
221 {
222 if (AH->savedPassword)
223 free(AH->savedPassword);
224 AH->savedPassword = pg_strdup(PQpass(AH->connection));
225 }
226 if (password)
227 free(password);
228
229 /* check for version mismatch */
230 _check_database_version(AH);
231
232 PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
233
234 /* arrange for SIGINT to issue a query cancel on this connection */
235 set_archive_cancel_info(AH, AH->connection);
236 }
237
238 /*
239 * Close the connection to the database and also cancel off the query if we
240 * have one running.
241 */
242 void
DisconnectDatabase(Archive * AHX)243 DisconnectDatabase(Archive *AHX)
244 {
245 ArchiveHandle *AH = (ArchiveHandle *) AHX;
246 char errbuf[1];
247
248 if (!AH->connection)
249 return;
250
251 if (AH->connCancel)
252 {
253 /*
254 * If we have an active query, send a cancel before closing, ignoring
255 * any errors. This is of no use for a normal exit, but might be
256 * helpful during exit_horribly().
257 */
258 if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
259 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
260
261 /*
262 * Prevent signal handler from sending a cancel after this.
263 */
264 set_archive_cancel_info(AH, NULL);
265 }
266
267 PQfinish(AH->connection);
268 AH->connection = NULL;
269 }
270
271 PGconn *
GetConnection(Archive * AHX)272 GetConnection(Archive *AHX)
273 {
274 ArchiveHandle *AH = (ArchiveHandle *) AHX;
275
276 return AH->connection;
277 }
278
279 static void
notice_processor(void * arg,const char * message)280 notice_processor(void *arg, const char *message)
281 {
282 write_msg(NULL, "%s", message);
283 }
284
285 /* Like exit_horribly(), but with a complaint about a particular query. */
286 static void
die_on_query_failure(ArchiveHandle * AH,const char * modulename,const char * query)287 die_on_query_failure(ArchiveHandle *AH, const char *modulename, const char *query)
288 {
289 write_msg(modulename, "query failed: %s",
290 PQerrorMessage(AH->connection));
291 exit_horribly(modulename, "query was: %s\n", query);
292 }
293
294 void
ExecuteSqlStatement(Archive * AHX,const char * query)295 ExecuteSqlStatement(Archive *AHX, const char *query)
296 {
297 ArchiveHandle *AH = (ArchiveHandle *) AHX;
298 PGresult *res;
299
300 res = PQexec(AH->connection, query);
301 if (PQresultStatus(res) != PGRES_COMMAND_OK)
302 die_on_query_failure(AH, modulename, query);
303 PQclear(res);
304 }
305
306 PGresult *
ExecuteSqlQuery(Archive * AHX,const char * query,ExecStatusType status)307 ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
308 {
309 ArchiveHandle *AH = (ArchiveHandle *) AHX;
310 PGresult *res;
311
312 res = PQexec(AH->connection, query);
313 if (PQresultStatus(res) != status)
314 die_on_query_failure(AH, modulename, query);
315 return res;
316 }
317
318 /*
319 * Execute an SQL query and verify that we got exactly one row back.
320 */
321 PGresult *
ExecuteSqlQueryForSingleRow(Archive * fout,char * query)322 ExecuteSqlQueryForSingleRow(Archive *fout, char *query)
323 {
324 PGresult *res;
325 int ntups;
326
327 res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK);
328
329 /* Expecting a single result only */
330 ntups = PQntuples(res);
331 if (ntups != 1)
332 exit_horribly(NULL,
333 ngettext("query returned %d row instead of one: %s\n",
334 "query returned %d rows instead of one: %s\n",
335 ntups),
336 ntups, query);
337
338 return res;
339 }
340
341 /*
342 * Convenience function to send a query.
343 * Monitors result to detect COPY statements
344 */
345 static void
ExecuteSqlCommand(ArchiveHandle * AH,const char * qry,const char * desc)346 ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
347 {
348 PGconn *conn = AH->connection;
349 PGresult *res;
350 char errStmt[DB_MAX_ERR_STMT];
351
352 #ifdef NOT_USED
353 fprintf(stderr, "Executing: '%s'\n\n", qry);
354 #endif
355 res = PQexec(conn, qry);
356
357 switch (PQresultStatus(res))
358 {
359 case PGRES_COMMAND_OK:
360 case PGRES_TUPLES_OK:
361 case PGRES_EMPTY_QUERY:
362 /* A-OK */
363 break;
364 case PGRES_COPY_IN:
365 /* Assume this is an expected result */
366 AH->pgCopyIn = true;
367 break;
368 default:
369 /* trouble */
370 strncpy(errStmt, qry, DB_MAX_ERR_STMT); /* strncpy required here */
371 if (errStmt[DB_MAX_ERR_STMT - 1] != '\0')
372 {
373 errStmt[DB_MAX_ERR_STMT - 4] = '.';
374 errStmt[DB_MAX_ERR_STMT - 3] = '.';
375 errStmt[DB_MAX_ERR_STMT - 2] = '.';
376 errStmt[DB_MAX_ERR_STMT - 1] = '\0';
377 }
378 warn_or_exit_horribly(AH, modulename, "%s: %s Command was: %s\n",
379 desc, PQerrorMessage(conn), errStmt);
380 break;
381 }
382
383 PQclear(res);
384 }
385
386
387 /*
388 * Process non-COPY table data (that is, INSERT commands).
389 *
390 * The commands have been run together as one long string for compressibility,
391 * and we are receiving them in bufferloads with arbitrary boundaries, so we
392 * have to locate command boundaries and save partial commands across calls.
393 * All state must be kept in AH->sqlparse, not in local variables of this
394 * routine. We assume that AH->sqlparse was filled with zeroes when created.
395 *
396 * We have to lex the data to the extent of identifying literals and quoted
397 * identifiers, so that we can recognize statement-terminating semicolons.
398 * We assume that INSERT data will not contain SQL comments, E'' literals,
399 * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
400 *
401 * Note: when restoring from a pre-9.0 dump file, this code is also used to
402 * process BLOB COMMENTS data, which has the same problem of containing
403 * multiple SQL commands that might be split across bufferloads. Fortunately,
404 * that data won't contain anything complicated to lex either.
405 */
406 static void
ExecuteSimpleCommands(ArchiveHandle * AH,const char * buf,size_t bufLen)407 ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
408 {
409 const char *qry = buf;
410 const char *eos = buf + bufLen;
411
412 /* initialize command buffer if first time through */
413 if (AH->sqlparse.curCmd == NULL)
414 AH->sqlparse.curCmd = createPQExpBuffer();
415
416 for (; qry < eos; qry++)
417 {
418 char ch = *qry;
419
420 /* For neatness, we skip any newlines between commands */
421 if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
422 appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
423
424 switch (AH->sqlparse.state)
425 {
426 case SQL_SCAN: /* Default state == 0, set in _allocAH */
427 if (ch == ';')
428 {
429 /*
430 * We've found the end of a statement. Send it and reset
431 * the buffer.
432 */
433 ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
434 "could not execute query");
435 resetPQExpBuffer(AH->sqlparse.curCmd);
436 }
437 else if (ch == '\'')
438 {
439 AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
440 AH->sqlparse.backSlash = false;
441 }
442 else if (ch == '"')
443 {
444 AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
445 }
446 break;
447
448 case SQL_IN_SINGLE_QUOTE:
449 /* We needn't handle '' specially */
450 if (ch == '\'' && !AH->sqlparse.backSlash)
451 AH->sqlparse.state = SQL_SCAN;
452 else if (ch == '\\' && !AH->public.std_strings)
453 AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
454 else
455 AH->sqlparse.backSlash = false;
456 break;
457
458 case SQL_IN_DOUBLE_QUOTE:
459 /* We needn't handle "" specially */
460 if (ch == '"')
461 AH->sqlparse.state = SQL_SCAN;
462 break;
463 }
464 }
465 }
466
467
468 /*
469 * Implement ahwrite() for direct-to-DB restore
470 */
471 int
ExecuteSqlCommandBuf(Archive * AHX,const char * buf,size_t bufLen)472 ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
473 {
474 ArchiveHandle *AH = (ArchiveHandle *) AHX;
475
476 if (AH->outputKind == OUTPUT_COPYDATA)
477 {
478 /*
479 * COPY data.
480 *
481 * We drop the data on the floor if libpq has failed to enter COPY
482 * mode; this allows us to behave reasonably when trying to continue
483 * after an error in a COPY command.
484 */
485 if (AH->pgCopyIn &&
486 PQputCopyData(AH->connection, buf, bufLen) <= 0)
487 exit_horribly(modulename, "error returned by PQputCopyData: %s",
488 PQerrorMessage(AH->connection));
489 }
490 else if (AH->outputKind == OUTPUT_OTHERDATA)
491 {
492 /*
493 * Table data expressed as INSERT commands; or, in old dump files,
494 * BLOB COMMENTS data (which is expressed as COMMENT ON commands).
495 */
496 ExecuteSimpleCommands(AH, buf, bufLen);
497 }
498 else
499 {
500 /*
501 * General SQL commands; we assume that commands will not be split
502 * across calls.
503 *
504 * In most cases the data passed to us will be a null-terminated
505 * string, but if it's not, we have to add a trailing null.
506 */
507 if (buf[bufLen] == '\0')
508 ExecuteSqlCommand(AH, buf, "could not execute query");
509 else
510 {
511 char *str = (char *) pg_malloc(bufLen + 1);
512
513 memcpy(str, buf, bufLen);
514 str[bufLen] = '\0';
515 ExecuteSqlCommand(AH, str, "could not execute query");
516 free(str);
517 }
518 }
519
520 return bufLen;
521 }
522
523 /*
524 * Terminate a COPY operation during direct-to-DB restore
525 */
526 void
EndDBCopyMode(Archive * AHX,const char * tocEntryTag)527 EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
528 {
529 ArchiveHandle *AH = (ArchiveHandle *) AHX;
530
531 if (AH->pgCopyIn)
532 {
533 PGresult *res;
534
535 if (PQputCopyEnd(AH->connection, NULL) <= 0)
536 exit_horribly(modulename, "error returned by PQputCopyEnd: %s",
537 PQerrorMessage(AH->connection));
538
539 /* Check command status and return to normal libpq state */
540 res = PQgetResult(AH->connection);
541 if (PQresultStatus(res) != PGRES_COMMAND_OK)
542 warn_or_exit_horribly(AH, modulename, "COPY failed for table \"%s\": %s",
543 tocEntryTag, PQerrorMessage(AH->connection));
544 PQclear(res);
545
546 /* Do this to ensure we've pumped libpq back to idle state */
547 if (PQgetResult(AH->connection) != NULL)
548 write_msg(NULL, "WARNING: unexpected extra results during COPY of table \"%s\"\n",
549 tocEntryTag);
550
551 AH->pgCopyIn = false;
552 }
553 }
554
555 void
StartTransaction(Archive * AHX)556 StartTransaction(Archive *AHX)
557 {
558 ArchiveHandle *AH = (ArchiveHandle *) AHX;
559
560 ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
561 }
562
563 void
CommitTransaction(Archive * AHX)564 CommitTransaction(Archive *AHX)
565 {
566 ArchiveHandle *AH = (ArchiveHandle *) AHX;
567
568 ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
569 }
570
571 void
DropBlobIfExists(ArchiveHandle * AH,Oid oid)572 DropBlobIfExists(ArchiveHandle *AH, Oid oid)
573 {
574 /*
575 * If we are not restoring to a direct database connection, we have to
576 * guess about how to detect whether the blob exists. Assume new-style.
577 */
578 if (AH->connection == NULL ||
579 PQserverVersion(AH->connection) >= 90000)
580 {
581 ahprintf(AH,
582 "SELECT pg_catalog.lo_unlink(oid) "
583 "FROM pg_catalog.pg_largeobject_metadata "
584 "WHERE oid = '%u';\n",
585 oid);
586 }
587 else
588 {
589 /* Restoring to pre-9.0 server, so do it the old way */
590 ahprintf(AH,
591 "SELECT CASE WHEN EXISTS("
592 "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
593 ") THEN pg_catalog.lo_unlink('%u') END;\n",
594 oid, oid);
595 }
596 }
597