1 /*-------------------------------------------------------------------------
2 *
3 * pg_backup_db.c
4 *
5 * Implements the basic DB functions used by the archiver.
6 *
7 * IDENTIFICATION
8 * src/bin/pg_dump/pg_backup_db.c
9 *
10 *-------------------------------------------------------------------------
11 */
12 #include "postgres_fe.h"
13
14 #include <unistd.h>
15 #include <ctype.h>
16 #ifdef HAVE_TERMIOS_H
17 #include <termios.h>
18 #endif
19
20 #include "common/connect.h"
21 #include "common/string.h"
22 #include "dumputils.h"
23 #include "fe_utils/string_utils.h"
24 #include "parallel.h"
25 #include "pg_backup_archiver.h"
26 #include "pg_backup_db.h"
27 #include "pg_backup_utils.h"
28
29 static void _check_database_version(ArchiveHandle *AH);
30 static void notice_processor(void *arg, const char *message);
31
32 static void
_check_database_version(ArchiveHandle * AH)33 _check_database_version(ArchiveHandle *AH)
34 {
35 const char *remoteversion_str;
36 int remoteversion;
37 PGresult *res;
38
39 remoteversion_str = PQparameterStatus(AH->connection, "server_version");
40 remoteversion = PQserverVersion(AH->connection);
41 if (remoteversion == 0 || !remoteversion_str)
42 fatal("could not get server_version from libpq");
43
44 AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
45 AH->public.remoteVersion = remoteversion;
46 if (!AH->archiveRemoteVersion)
47 AH->archiveRemoteVersion = AH->public.remoteVersionStr;
48
49 if (remoteversion != PG_VERSION_NUM
50 && (remoteversion < AH->public.minRemoteVersion ||
51 remoteversion > AH->public.maxRemoteVersion))
52 {
53 pg_log_error("server version: %s; %s version: %s",
54 remoteversion_str, progname, PG_VERSION);
55 fatal("aborting because of server version mismatch");
56 }
57
58 /*
59 * When running against 9.0 or later, check if we are in recovery mode,
60 * which means we are on a hot standby.
61 */
62 if (remoteversion >= 90000)
63 {
64 res = ExecuteSqlQueryForSingleRow((Archive *) AH, "SELECT pg_catalog.pg_is_in_recovery()");
65
66 AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
67 PQclear(res);
68 }
69 else
70 AH->public.isStandby = false;
71 }
72
73 /*
74 * Reconnect to the server. If dbname is not NULL, use that database,
75 * else the one associated with the archive handle.
76 */
77 void
ReconnectToServer(ArchiveHandle * AH,const char * dbname)78 ReconnectToServer(ArchiveHandle *AH, const char *dbname)
79 {
80 PGconn *oldConn = AH->connection;
81 RestoreOptions *ropt = AH->public.ropt;
82
83 /*
84 * Save the dbname, if given, in override_dbname so that it will also
85 * affect any later reconnection attempt.
86 */
87 if (dbname)
88 ropt->cparams.override_dbname = pg_strdup(dbname);
89
90 /*
91 * Note: we want to establish the new connection, and in particular update
92 * ArchiveHandle's connCancel, before closing old connection. Otherwise
93 * an ill-timed SIGINT could try to access a dead connection.
94 */
95 AH->connection = NULL; /* dodge error check in ConnectDatabase */
96
97 ConnectDatabase((Archive *) AH, &ropt->cparams, true);
98
99 PQfinish(oldConn);
100 }
101
102 /*
103 * Make, or remake, a database connection with the given parameters.
104 *
105 * The resulting connection handle is stored in AHX->connection.
106 *
107 * An interactive password prompt is automatically issued if required.
108 * We store the results of that in AHX->savedPassword.
109 * Note: it's not really all that sensible to use a single-entry password
110 * cache if the username keeps changing. In current usage, however, the
111 * username never does change, so one savedPassword is sufficient.
112 */
113 void
ConnectDatabase(Archive * AHX,const ConnParams * cparams,bool isReconnect)114 ConnectDatabase(Archive *AHX,
115 const ConnParams *cparams,
116 bool isReconnect)
117 {
118 ArchiveHandle *AH = (ArchiveHandle *) AHX;
119 trivalue prompt_password;
120 char *password;
121 bool new_pass;
122
123 if (AH->connection)
124 fatal("already connected to a database");
125
126 /* Never prompt for a password during a reconnection */
127 prompt_password = isReconnect ? TRI_NO : cparams->promptPassword;
128
129 password = AH->savedPassword;
130
131 if (prompt_password == TRI_YES && password == NULL)
132 password = simple_prompt("Password: ", false);
133
134 /*
135 * Start the connection. Loop until we have a password if requested by
136 * backend.
137 */
138 do
139 {
140 const char *keywords[8];
141 const char *values[8];
142 int i = 0;
143
144 /*
145 * If dbname is a connstring, its entries can override the other
146 * values obtained from cparams; but in turn, override_dbname can
147 * override the dbname component of it.
148 */
149 keywords[i] = "host";
150 values[i++] = cparams->pghost;
151 keywords[i] = "port";
152 values[i++] = cparams->pgport;
153 keywords[i] = "user";
154 values[i++] = cparams->username;
155 keywords[i] = "password";
156 values[i++] = password;
157 keywords[i] = "dbname";
158 values[i++] = cparams->dbname;
159 if (cparams->override_dbname)
160 {
161 keywords[i] = "dbname";
162 values[i++] = cparams->override_dbname;
163 }
164 keywords[i] = "fallback_application_name";
165 values[i++] = progname;
166 keywords[i] = NULL;
167 values[i++] = NULL;
168 Assert(i <= lengthof(keywords));
169
170 new_pass = false;
171 AH->connection = PQconnectdbParams(keywords, values, true);
172
173 if (!AH->connection)
174 fatal("could not connect to database");
175
176 if (PQstatus(AH->connection) == CONNECTION_BAD &&
177 PQconnectionNeedsPassword(AH->connection) &&
178 password == NULL &&
179 prompt_password != TRI_NO)
180 {
181 PQfinish(AH->connection);
182 password = simple_prompt("Password: ", false);
183 new_pass = true;
184 }
185 } while (new_pass);
186
187 /* check to see that the backend connection was successfully made */
188 if (PQstatus(AH->connection) == CONNECTION_BAD)
189 {
190 if (isReconnect)
191 fatal("reconnection failed: %s",
192 PQerrorMessage(AH->connection));
193 else
194 fatal("%s",
195 PQerrorMessage(AH->connection));
196 }
197
198 /* Start strict; later phases may override this. */
199 PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
200 ALWAYS_SECURE_SEARCH_PATH_SQL));
201
202 if (password && password != AH->savedPassword)
203 free(password);
204
205 /*
206 * We want to remember connection's actual password, whether or not we got
207 * it by prompting. So we don't just store the password variable.
208 */
209 if (PQconnectionUsedPassword(AH->connection))
210 {
211 if (AH->savedPassword)
212 free(AH->savedPassword);
213 AH->savedPassword = pg_strdup(PQpass(AH->connection));
214 }
215
216 /* check for version mismatch */
217 _check_database_version(AH);
218
219 PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
220
221 /* arrange for SIGINT to issue a query cancel on this connection */
222 set_archive_cancel_info(AH, AH->connection);
223 }
224
225 /*
226 * Close the connection to the database and also cancel off the query if we
227 * have one running.
228 */
229 void
DisconnectDatabase(Archive * AHX)230 DisconnectDatabase(Archive *AHX)
231 {
232 ArchiveHandle *AH = (ArchiveHandle *) AHX;
233 char errbuf[1];
234
235 if (!AH->connection)
236 return;
237
238 if (AH->connCancel)
239 {
240 /*
241 * If we have an active query, send a cancel before closing, ignoring
242 * any errors. This is of no use for a normal exit, but might be
243 * helpful during fatal().
244 */
245 if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
246 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
247
248 /*
249 * Prevent signal handler from sending a cancel after this.
250 */
251 set_archive_cancel_info(AH, NULL);
252 }
253
254 PQfinish(AH->connection);
255 AH->connection = NULL;
256 }
257
258 PGconn *
GetConnection(Archive * AHX)259 GetConnection(Archive *AHX)
260 {
261 ArchiveHandle *AH = (ArchiveHandle *) AHX;
262
263 return AH->connection;
264 }
265
266 static void
notice_processor(void * arg,const char * message)267 notice_processor(void *arg, const char *message)
268 {
269 pg_log_generic(PG_LOG_INFO, "%s", message);
270 }
271
272 /* Like fatal(), but with a complaint about a particular query. */
273 static void
die_on_query_failure(ArchiveHandle * AH,const char * query)274 die_on_query_failure(ArchiveHandle *AH, const char *query)
275 {
276 pg_log_error("query failed: %s",
277 PQerrorMessage(AH->connection));
278 fatal("query was: %s", query);
279 }
280
281 void
ExecuteSqlStatement(Archive * AHX,const char * query)282 ExecuteSqlStatement(Archive *AHX, const char *query)
283 {
284 ArchiveHandle *AH = (ArchiveHandle *) AHX;
285 PGresult *res;
286
287 res = PQexec(AH->connection, query);
288 if (PQresultStatus(res) != PGRES_COMMAND_OK)
289 die_on_query_failure(AH, query);
290 PQclear(res);
291 }
292
293 PGresult *
ExecuteSqlQuery(Archive * AHX,const char * query,ExecStatusType status)294 ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
295 {
296 ArchiveHandle *AH = (ArchiveHandle *) AHX;
297 PGresult *res;
298
299 res = PQexec(AH->connection, query);
300 if (PQresultStatus(res) != status)
301 die_on_query_failure(AH, query);
302 return res;
303 }
304
305 /*
306 * Execute an SQL query and verify that we got exactly one row back.
307 */
308 PGresult *
ExecuteSqlQueryForSingleRow(Archive * fout,const char * query)309 ExecuteSqlQueryForSingleRow(Archive *fout, const char *query)
310 {
311 PGresult *res;
312 int ntups;
313
314 res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK);
315
316 /* Expecting a single result only */
317 ntups = PQntuples(res);
318 if (ntups != 1)
319 fatal(ngettext("query returned %d row instead of one: %s",
320 "query returned %d rows instead of one: %s",
321 ntups),
322 ntups, query);
323
324 return res;
325 }
326
327 /*
328 * Convenience function to send a query.
329 * Monitors result to detect COPY statements
330 */
331 static void
ExecuteSqlCommand(ArchiveHandle * AH,const char * qry,const char * desc)332 ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
333 {
334 PGconn *conn = AH->connection;
335 PGresult *res;
336
337 #ifdef NOT_USED
338 fprintf(stderr, "Executing: '%s'\n\n", qry);
339 #endif
340 res = PQexec(conn, qry);
341
342 switch (PQresultStatus(res))
343 {
344 case PGRES_COMMAND_OK:
345 case PGRES_TUPLES_OK:
346 case PGRES_EMPTY_QUERY:
347 /* A-OK */
348 break;
349 case PGRES_COPY_IN:
350 /* Assume this is an expected result */
351 AH->pgCopyIn = true;
352 break;
353 default:
354 /* trouble */
355 warn_or_exit_horribly(AH, "%s: %sCommand was: %s",
356 desc, PQerrorMessage(conn), qry);
357 break;
358 }
359
360 PQclear(res);
361 }
362
363
364 /*
365 * Process non-COPY table data (that is, INSERT commands).
366 *
367 * The commands have been run together as one long string for compressibility,
368 * and we are receiving them in bufferloads with arbitrary boundaries, so we
369 * have to locate command boundaries and save partial commands across calls.
370 * All state must be kept in AH->sqlparse, not in local variables of this
371 * routine. We assume that AH->sqlparse was filled with zeroes when created.
372 *
373 * We have to lex the data to the extent of identifying literals and quoted
374 * identifiers, so that we can recognize statement-terminating semicolons.
375 * We assume that INSERT data will not contain SQL comments, E'' literals,
376 * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
377 *
378 * Note: when restoring from a pre-9.0 dump file, this code is also used to
379 * process BLOB COMMENTS data, which has the same problem of containing
380 * multiple SQL commands that might be split across bufferloads. Fortunately,
381 * that data won't contain anything complicated to lex either.
382 */
383 static void
ExecuteSimpleCommands(ArchiveHandle * AH,const char * buf,size_t bufLen)384 ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
385 {
386 const char *qry = buf;
387 const char *eos = buf + bufLen;
388
389 /* initialize command buffer if first time through */
390 if (AH->sqlparse.curCmd == NULL)
391 AH->sqlparse.curCmd = createPQExpBuffer();
392
393 for (; qry < eos; qry++)
394 {
395 char ch = *qry;
396
397 /* For neatness, we skip any newlines between commands */
398 if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
399 appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
400
401 switch (AH->sqlparse.state)
402 {
403 case SQL_SCAN: /* Default state == 0, set in _allocAH */
404 if (ch == ';')
405 {
406 /*
407 * We've found the end of a statement. Send it and reset
408 * the buffer.
409 */
410 ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
411 "could not execute query");
412 resetPQExpBuffer(AH->sqlparse.curCmd);
413 }
414 else if (ch == '\'')
415 {
416 AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
417 AH->sqlparse.backSlash = false;
418 }
419 else if (ch == '"')
420 {
421 AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
422 }
423 break;
424
425 case SQL_IN_SINGLE_QUOTE:
426 /* We needn't handle '' specially */
427 if (ch == '\'' && !AH->sqlparse.backSlash)
428 AH->sqlparse.state = SQL_SCAN;
429 else if (ch == '\\' && !AH->public.std_strings)
430 AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
431 else
432 AH->sqlparse.backSlash = false;
433 break;
434
435 case SQL_IN_DOUBLE_QUOTE:
436 /* We needn't handle "" specially */
437 if (ch == '"')
438 AH->sqlparse.state = SQL_SCAN;
439 break;
440 }
441 }
442 }
443
444
445 /*
446 * Implement ahwrite() for direct-to-DB restore
447 */
448 int
ExecuteSqlCommandBuf(Archive * AHX,const char * buf,size_t bufLen)449 ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
450 {
451 ArchiveHandle *AH = (ArchiveHandle *) AHX;
452
453 if (AH->outputKind == OUTPUT_COPYDATA)
454 {
455 /*
456 * COPY data.
457 *
458 * We drop the data on the floor if libpq has failed to enter COPY
459 * mode; this allows us to behave reasonably when trying to continue
460 * after an error in a COPY command.
461 */
462 if (AH->pgCopyIn &&
463 PQputCopyData(AH->connection, buf, bufLen) <= 0)
464 fatal("error returned by PQputCopyData: %s",
465 PQerrorMessage(AH->connection));
466 }
467 else if (AH->outputKind == OUTPUT_OTHERDATA)
468 {
469 /*
470 * Table data expressed as INSERT commands; or, in old dump files,
471 * BLOB COMMENTS data (which is expressed as COMMENT ON commands).
472 */
473 ExecuteSimpleCommands(AH, buf, bufLen);
474 }
475 else
476 {
477 /*
478 * General SQL commands; we assume that commands will not be split
479 * across calls.
480 *
481 * In most cases the data passed to us will be a null-terminated
482 * string, but if it's not, we have to add a trailing null.
483 */
484 if (buf[bufLen] == '\0')
485 ExecuteSqlCommand(AH, buf, "could not execute query");
486 else
487 {
488 char *str = (char *) pg_malloc(bufLen + 1);
489
490 memcpy(str, buf, bufLen);
491 str[bufLen] = '\0';
492 ExecuteSqlCommand(AH, str, "could not execute query");
493 free(str);
494 }
495 }
496
497 return bufLen;
498 }
499
500 /*
501 * Terminate a COPY operation during direct-to-DB restore
502 */
503 void
EndDBCopyMode(Archive * AHX,const char * tocEntryTag)504 EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
505 {
506 ArchiveHandle *AH = (ArchiveHandle *) AHX;
507
508 if (AH->pgCopyIn)
509 {
510 PGresult *res;
511
512 if (PQputCopyEnd(AH->connection, NULL) <= 0)
513 fatal("error returned by PQputCopyEnd: %s",
514 PQerrorMessage(AH->connection));
515
516 /* Check command status and return to normal libpq state */
517 res = PQgetResult(AH->connection);
518 if (PQresultStatus(res) != PGRES_COMMAND_OK)
519 warn_or_exit_horribly(AH, "COPY failed for table \"%s\": %s",
520 tocEntryTag, PQerrorMessage(AH->connection));
521 PQclear(res);
522
523 /* Do this to ensure we've pumped libpq back to idle state */
524 if (PQgetResult(AH->connection) != NULL)
525 pg_log_warning("unexpected extra results during COPY of table \"%s\"",
526 tocEntryTag);
527
528 AH->pgCopyIn = false;
529 }
530 }
531
532 void
StartTransaction(Archive * AHX)533 StartTransaction(Archive *AHX)
534 {
535 ArchiveHandle *AH = (ArchiveHandle *) AHX;
536
537 ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
538 }
539
540 void
CommitTransaction(Archive * AHX)541 CommitTransaction(Archive *AHX)
542 {
543 ArchiveHandle *AH = (ArchiveHandle *) AHX;
544
545 ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
546 }
547
548 void
DropBlobIfExists(ArchiveHandle * AH,Oid oid)549 DropBlobIfExists(ArchiveHandle *AH, Oid oid)
550 {
551 /*
552 * If we are not restoring to a direct database connection, we have to
553 * guess about how to detect whether the blob exists. Assume new-style.
554 */
555 if (AH->connection == NULL ||
556 PQserverVersion(AH->connection) >= 90000)
557 {
558 ahprintf(AH,
559 "SELECT pg_catalog.lo_unlink(oid) "
560 "FROM pg_catalog.pg_largeobject_metadata "
561 "WHERE oid = '%u';\n",
562 oid);
563 }
564 else
565 {
566 /* Restoring to pre-9.0 server, so do it the old way */
567 ahprintf(AH,
568 "SELECT CASE WHEN EXISTS("
569 "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
570 ") THEN pg_catalog.lo_unlink('%u') END;\n",
571 oid, oid);
572 }
573 }
574