1 /*-------------------------------------------------------------------------
2 *
3 * pg_backup_db.c
4 *
5 * Implements the basic DB functions used by the archiver.
6 *
7 * IDENTIFICATION
8 * src/bin/pg_dump/pg_backup_db.c
9 *
10 *-------------------------------------------------------------------------
11 */
12 #include "postgres_fe.h"
13
14 #include "dumputils.h"
15 #include "fe_utils/connect.h"
16 #include "fe_utils/string_utils.h"
17 #include "parallel.h"
18 #include "pg_backup_archiver.h"
19 #include "pg_backup_db.h"
20 #include "pg_backup_utils.h"
21
22 #include <unistd.h>
23 #include <ctype.h>
24 #ifdef HAVE_TERMIOS_H
25 #include <termios.h>
26 #endif
27
28
29 /* translator: this is a module name */
30 static const char *modulename = gettext_noop("archiver (db)");
31
32 static void _check_database_version(ArchiveHandle *AH);
33 static void notice_processor(void *arg, const char *message);
34
35 static void
_check_database_version(ArchiveHandle * AH)36 _check_database_version(ArchiveHandle *AH)
37 {
38 const char *remoteversion_str;
39 int remoteversion;
40 PGresult *res;
41
42 remoteversion_str = PQparameterStatus(AH->connection, "server_version");
43 remoteversion = PQserverVersion(AH->connection);
44 if (remoteversion == 0 || !remoteversion_str)
45 exit_horribly(modulename, "could not get server_version from libpq\n");
46
47 AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
48 AH->public.remoteVersion = remoteversion;
49 if (!AH->archiveRemoteVersion)
50 AH->archiveRemoteVersion = AH->public.remoteVersionStr;
51
52 if (remoteversion != PG_VERSION_NUM
53 && (remoteversion < AH->public.minRemoteVersion ||
54 remoteversion > AH->public.maxRemoteVersion))
55 {
56 write_msg(NULL, "server version: %s; %s version: %s\n",
57 remoteversion_str, progname, PG_VERSION);
58 exit_horribly(NULL, "aborting because of server version mismatch\n");
59 }
60
61 /*
62 * When running against 9.0 or later, check if we are in recovery mode,
63 * which means we are on a hot standby.
64 */
65 if (remoteversion >= 90000)
66 {
67 res = ExecuteSqlQueryForSingleRow((Archive *) AH, "SELECT pg_catalog.pg_is_in_recovery()");
68
69 AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
70 PQclear(res);
71 }
72 else
73 AH->public.isStandby = false;
74 }
75
76 /*
77 * Reconnect to the server. If dbname is not NULL, use that database,
78 * else the one associated with the archive handle.
79 */
80 void
ReconnectToServer(ArchiveHandle * AH,const char * dbname)81 ReconnectToServer(ArchiveHandle *AH, const char *dbname)
82 {
83 PGconn *oldConn = AH->connection;
84 RestoreOptions *ropt = AH->public.ropt;
85
86 /*
87 * Save the dbname, if given, in override_dbname so that it will also
88 * affect any later reconnection attempt.
89 */
90 if (dbname)
91 ropt->cparams.override_dbname = pg_strdup(dbname);
92
93 /*
94 * Note: we want to establish the new connection, and in particular update
95 * ArchiveHandle's connCancel, before closing old connection. Otherwise
96 * an ill-timed SIGINT could try to access a dead connection.
97 */
98 AH->connection = NULL; /* dodge error check in ConnectDatabase */
99
100 ConnectDatabase((Archive *) AH, &ropt->cparams, true);
101
102 PQfinish(oldConn);
103 }
104
105 /*
106 * Make, or remake, a database connection with the given parameters.
107 *
108 * The resulting connection handle is stored in AHX->connection.
109 *
110 * An interactive password prompt is automatically issued if required.
111 * We store the results of that in AHX->savedPassword.
112 * Note: it's not really all that sensible to use a single-entry password
113 * cache if the username keeps changing. In current usage, however, the
114 * username never does change, so one savedPassword is sufficient.
115 */
116 void
ConnectDatabase(Archive * AHX,const ConnParams * cparams,bool isReconnect)117 ConnectDatabase(Archive *AHX,
118 const ConnParams *cparams,
119 bool isReconnect)
120 {
121 ArchiveHandle *AH = (ArchiveHandle *) AHX;
122 trivalue prompt_password;
123 char *password;
124 char passbuf[100];
125 bool new_pass;
126
127 if (AH->connection)
128 exit_horribly(modulename, "already connected to a database\n");
129
130 /* Never prompt for a password during a reconnection */
131 prompt_password = isReconnect ? TRI_NO : cparams->promptPassword;
132
133 password = AH->savedPassword;
134
135 if (prompt_password == TRI_YES && password == NULL)
136 {
137 simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
138 password = passbuf;
139 }
140
141 /*
142 * Start the connection. Loop until we have a password if requested by
143 * backend.
144 */
145 do
146 {
147 const char *keywords[8];
148 const char *values[8];
149 int i = 0;
150
151 /*
152 * If dbname is a connstring, its entries can override the other
153 * values obtained from cparams; but in turn, override_dbname can
154 * override the dbname component of it.
155 */
156 keywords[i] = "host";
157 values[i++] = cparams->pghost;
158 keywords[i] = "port";
159 values[i++] = cparams->pgport;
160 keywords[i] = "user";
161 values[i++] = cparams->username;
162 keywords[i] = "password";
163 values[i++] = password;
164 keywords[i] = "dbname";
165 values[i++] = cparams->dbname;
166 if (cparams->override_dbname)
167 {
168 keywords[i] = "dbname";
169 values[i++] = cparams->override_dbname;
170 }
171 keywords[i] = "fallback_application_name";
172 values[i++] = progname;
173 keywords[i] = NULL;
174 values[i++] = NULL;
175 Assert(i <= lengthof(keywords));
176
177 new_pass = false;
178 AH->connection = PQconnectdbParams(keywords, values, true);
179
180 if (!AH->connection)
181 exit_horribly(modulename, "failed to connect to database\n");
182
183 if (PQstatus(AH->connection) == CONNECTION_BAD &&
184 PQconnectionNeedsPassword(AH->connection) &&
185 password == NULL &&
186 prompt_password != TRI_NO)
187 {
188 PQfinish(AH->connection);
189 simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
190 password = passbuf;
191 new_pass = true;
192 }
193 } while (new_pass);
194
195 /* check to see that the backend connection was successfully made */
196 if (PQstatus(AH->connection) == CONNECTION_BAD)
197 {
198 if (isReconnect)
199 exit_horribly(modulename, "reconnection to database \"%s\" failed: %s",
200 PQdb(AH->connection) ? PQdb(AH->connection) : "",
201 PQerrorMessage(AH->connection));
202 else
203 exit_horribly(modulename, "connection to database \"%s\" failed: %s",
204 PQdb(AH->connection) ? PQdb(AH->connection) : "",
205 PQerrorMessage(AH->connection));
206 }
207
208 /* Start strict; later phases may override this. */
209 PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
210 ALWAYS_SECURE_SEARCH_PATH_SQL));
211
212 /*
213 * We want to remember connection's actual password, whether or not we got
214 * it by prompting. So we don't just store the password variable.
215 */
216 if (PQconnectionUsedPassword(AH->connection))
217 {
218 if (AH->savedPassword)
219 free(AH->savedPassword);
220 AH->savedPassword = pg_strdup(PQpass(AH->connection));
221 }
222
223 /* check for version mismatch */
224 _check_database_version(AH);
225
226 PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
227
228 /* arrange for SIGINT to issue a query cancel on this connection */
229 set_archive_cancel_info(AH, AH->connection);
230 }
231
232 /*
233 * Close the connection to the database and also cancel off the query if we
234 * have one running.
235 */
236 void
DisconnectDatabase(Archive * AHX)237 DisconnectDatabase(Archive *AHX)
238 {
239 ArchiveHandle *AH = (ArchiveHandle *) AHX;
240 char errbuf[1];
241
242 if (!AH->connection)
243 return;
244
245 if (AH->connCancel)
246 {
247 /*
248 * If we have an active query, send a cancel before closing, ignoring
249 * any errors. This is of no use for a normal exit, but might be
250 * helpful during exit_horribly().
251 */
252 if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
253 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
254
255 /*
256 * Prevent signal handler from sending a cancel after this.
257 */
258 set_archive_cancel_info(AH, NULL);
259 }
260
261 PQfinish(AH->connection);
262 AH->connection = NULL;
263 }
264
265 PGconn *
GetConnection(Archive * AHX)266 GetConnection(Archive *AHX)
267 {
268 ArchiveHandle *AH = (ArchiveHandle *) AHX;
269
270 return AH->connection;
271 }
272
273 static void
notice_processor(void * arg,const char * message)274 notice_processor(void *arg, const char *message)
275 {
276 write_msg(NULL, "%s", message);
277 }
278
279 /* Like exit_horribly(), but with a complaint about a particular query. */
280 static void
die_on_query_failure(ArchiveHandle * AH,const char * modulename,const char * query)281 die_on_query_failure(ArchiveHandle *AH, const char *modulename, const char *query)
282 {
283 write_msg(modulename, "query failed: %s",
284 PQerrorMessage(AH->connection));
285 exit_horribly(modulename, "query was: %s\n", query);
286 }
287
288 void
ExecuteSqlStatement(Archive * AHX,const char * query)289 ExecuteSqlStatement(Archive *AHX, const char *query)
290 {
291 ArchiveHandle *AH = (ArchiveHandle *) AHX;
292 PGresult *res;
293
294 res = PQexec(AH->connection, query);
295 if (PQresultStatus(res) != PGRES_COMMAND_OK)
296 die_on_query_failure(AH, modulename, query);
297 PQclear(res);
298 }
299
300 PGresult *
ExecuteSqlQuery(Archive * AHX,const char * query,ExecStatusType status)301 ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
302 {
303 ArchiveHandle *AH = (ArchiveHandle *) AHX;
304 PGresult *res;
305
306 res = PQexec(AH->connection, query);
307 if (PQresultStatus(res) != status)
308 die_on_query_failure(AH, modulename, query);
309 return res;
310 }
311
312 /*
313 * Execute an SQL query and verify that we got exactly one row back.
314 */
315 PGresult *
ExecuteSqlQueryForSingleRow(Archive * fout,const char * query)316 ExecuteSqlQueryForSingleRow(Archive *fout, const char *query)
317 {
318 PGresult *res;
319 int ntups;
320
321 res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK);
322
323 /* Expecting a single result only */
324 ntups = PQntuples(res);
325 if (ntups != 1)
326 exit_horribly(NULL,
327 ngettext("query returned %d row instead of one: %s\n",
328 "query returned %d rows instead of one: %s\n",
329 ntups),
330 ntups, query);
331
332 return res;
333 }
334
335 /*
336 * Convenience function to send a query.
337 * Monitors result to detect COPY statements
338 */
339 static void
ExecuteSqlCommand(ArchiveHandle * AH,const char * qry,const char * desc)340 ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
341 {
342 PGconn *conn = AH->connection;
343 PGresult *res;
344
345 #ifdef NOT_USED
346 fprintf(stderr, "Executing: '%s'\n\n", qry);
347 #endif
348 res = PQexec(conn, qry);
349
350 switch (PQresultStatus(res))
351 {
352 case PGRES_COMMAND_OK:
353 case PGRES_TUPLES_OK:
354 case PGRES_EMPTY_QUERY:
355 /* A-OK */
356 break;
357 case PGRES_COPY_IN:
358 /* Assume this is an expected result */
359 AH->pgCopyIn = true;
360 break;
361 default:
362 /* trouble */
363 warn_or_exit_horribly(AH, modulename, "%s: %s Command was: %s\n",
364 desc, PQerrorMessage(conn), qry);
365 break;
366 }
367
368 PQclear(res);
369 }
370
371
372 /*
373 * Process non-COPY table data (that is, INSERT commands).
374 *
375 * The commands have been run together as one long string for compressibility,
376 * and we are receiving them in bufferloads with arbitrary boundaries, so we
377 * have to locate command boundaries and save partial commands across calls.
378 * All state must be kept in AH->sqlparse, not in local variables of this
379 * routine. We assume that AH->sqlparse was filled with zeroes when created.
380 *
381 * We have to lex the data to the extent of identifying literals and quoted
382 * identifiers, so that we can recognize statement-terminating semicolons.
383 * We assume that INSERT data will not contain SQL comments, E'' literals,
384 * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
385 *
386 * Note: when restoring from a pre-9.0 dump file, this code is also used to
387 * process BLOB COMMENTS data, which has the same problem of containing
388 * multiple SQL commands that might be split across bufferloads. Fortunately,
389 * that data won't contain anything complicated to lex either.
390 */
391 static void
ExecuteSimpleCommands(ArchiveHandle * AH,const char * buf,size_t bufLen)392 ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
393 {
394 const char *qry = buf;
395 const char *eos = buf + bufLen;
396
397 /* initialize command buffer if first time through */
398 if (AH->sqlparse.curCmd == NULL)
399 AH->sqlparse.curCmd = createPQExpBuffer();
400
401 for (; qry < eos; qry++)
402 {
403 char ch = *qry;
404
405 /* For neatness, we skip any newlines between commands */
406 if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
407 appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
408
409 switch (AH->sqlparse.state)
410 {
411 case SQL_SCAN: /* Default state == 0, set in _allocAH */
412 if (ch == ';')
413 {
414 /*
415 * We've found the end of a statement. Send it and reset
416 * the buffer.
417 */
418 ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
419 "could not execute query");
420 resetPQExpBuffer(AH->sqlparse.curCmd);
421 }
422 else if (ch == '\'')
423 {
424 AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
425 AH->sqlparse.backSlash = false;
426 }
427 else if (ch == '"')
428 {
429 AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
430 }
431 break;
432
433 case SQL_IN_SINGLE_QUOTE:
434 /* We needn't handle '' specially */
435 if (ch == '\'' && !AH->sqlparse.backSlash)
436 AH->sqlparse.state = SQL_SCAN;
437 else if (ch == '\\' && !AH->public.std_strings)
438 AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
439 else
440 AH->sqlparse.backSlash = false;
441 break;
442
443 case SQL_IN_DOUBLE_QUOTE:
444 /* We needn't handle "" specially */
445 if (ch == '"')
446 AH->sqlparse.state = SQL_SCAN;
447 break;
448 }
449 }
450 }
451
452
453 /*
454 * Implement ahwrite() for direct-to-DB restore
455 */
456 int
ExecuteSqlCommandBuf(Archive * AHX,const char * buf,size_t bufLen)457 ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
458 {
459 ArchiveHandle *AH = (ArchiveHandle *) AHX;
460
461 if (AH->outputKind == OUTPUT_COPYDATA)
462 {
463 /*
464 * COPY data.
465 *
466 * We drop the data on the floor if libpq has failed to enter COPY
467 * mode; this allows us to behave reasonably when trying to continue
468 * after an error in a COPY command.
469 */
470 if (AH->pgCopyIn &&
471 PQputCopyData(AH->connection, buf, bufLen) <= 0)
472 exit_horribly(modulename, "error returned by PQputCopyData: %s",
473 PQerrorMessage(AH->connection));
474 }
475 else if (AH->outputKind == OUTPUT_OTHERDATA)
476 {
477 /*
478 * Table data expressed as INSERT commands; or, in old dump files,
479 * BLOB COMMENTS data (which is expressed as COMMENT ON commands).
480 */
481 ExecuteSimpleCommands(AH, buf, bufLen);
482 }
483 else
484 {
485 /*
486 * General SQL commands; we assume that commands will not be split
487 * across calls.
488 *
489 * In most cases the data passed to us will be a null-terminated
490 * string, but if it's not, we have to add a trailing null.
491 */
492 if (buf[bufLen] == '\0')
493 ExecuteSqlCommand(AH, buf, "could not execute query");
494 else
495 {
496 char *str = (char *) pg_malloc(bufLen + 1);
497
498 memcpy(str, buf, bufLen);
499 str[bufLen] = '\0';
500 ExecuteSqlCommand(AH, str, "could not execute query");
501 free(str);
502 }
503 }
504
505 return bufLen;
506 }
507
508 /*
509 * Terminate a COPY operation during direct-to-DB restore
510 */
511 void
EndDBCopyMode(Archive * AHX,const char * tocEntryTag)512 EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
513 {
514 ArchiveHandle *AH = (ArchiveHandle *) AHX;
515
516 if (AH->pgCopyIn)
517 {
518 PGresult *res;
519
520 if (PQputCopyEnd(AH->connection, NULL) <= 0)
521 exit_horribly(modulename, "error returned by PQputCopyEnd: %s",
522 PQerrorMessage(AH->connection));
523
524 /* Check command status and return to normal libpq state */
525 res = PQgetResult(AH->connection);
526 if (PQresultStatus(res) != PGRES_COMMAND_OK)
527 warn_or_exit_horribly(AH, modulename, "COPY failed for table \"%s\": %s",
528 tocEntryTag, PQerrorMessage(AH->connection));
529 PQclear(res);
530
531 /* Do this to ensure we've pumped libpq back to idle state */
532 if (PQgetResult(AH->connection) != NULL)
533 write_msg(NULL, "WARNING: unexpected extra results during COPY of table \"%s\"\n",
534 tocEntryTag);
535
536 AH->pgCopyIn = false;
537 }
538 }
539
540 void
StartTransaction(Archive * AHX)541 StartTransaction(Archive *AHX)
542 {
543 ArchiveHandle *AH = (ArchiveHandle *) AHX;
544
545 ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
546 }
547
548 void
CommitTransaction(Archive * AHX)549 CommitTransaction(Archive *AHX)
550 {
551 ArchiveHandle *AH = (ArchiveHandle *) AHX;
552
553 ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
554 }
555
556 void
DropBlobIfExists(ArchiveHandle * AH,Oid oid)557 DropBlobIfExists(ArchiveHandle *AH, Oid oid)
558 {
559 /*
560 * If we are not restoring to a direct database connection, we have to
561 * guess about how to detect whether the blob exists. Assume new-style.
562 */
563 if (AH->connection == NULL ||
564 PQserverVersion(AH->connection) >= 90000)
565 {
566 ahprintf(AH,
567 "SELECT pg_catalog.lo_unlink(oid) "
568 "FROM pg_catalog.pg_largeobject_metadata "
569 "WHERE oid = '%u';\n",
570 oid);
571 }
572 else
573 {
574 /* Restoring to pre-9.0 server, so do it the old way */
575 ahprintf(AH,
576 "SELECT CASE WHEN EXISTS("
577 "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
578 ") THEN pg_catalog.lo_unlink('%u') END;\n",
579 oid, oid);
580 }
581 }
582