1 /*-------------------------------------------------------------------------
2 *
3 * pg_backup_archiver.c
4 *
5 * Private implementation of the archiver routines.
6 *
7 * See the headers to pg_restore for more details.
8 *
9 * Copyright (c) 2000, Philip Warner
10 * Rights are granted to use this software in any way so long
11 * as this notice is not removed.
12 *
13 * The author is not responsible for loss or damages that may
14 * result from its use.
15 *
16 *
17 * IDENTIFICATION
18 * src/bin/pg_dump/pg_backup_archiver.c
19 *
20 *-------------------------------------------------------------------------
21 */
22 #include "postgres_fe.h"
23
24 #include <ctype.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28 #include <sys/wait.h>
29 #ifdef WIN32
30 #include <io.h>
31 #endif
32
33 #include "dumputils.h"
34 #include "fe_utils/string_utils.h"
35 #include "libpq/libpq-fs.h"
36 #include "parallel.h"
37 #include "pg_backup_archiver.h"
38 #include "pg_backup_db.h"
39 #include "pg_backup_utils.h"
40
41 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
42 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
43
44 /* state needed to save/restore an archive's output target */
45 typedef struct _outputContext
46 {
47 void *OF;
48 int gzOut;
49 } OutputContext;
50
51 /*
52 * State for tracking TocEntrys that are ready to process during a parallel
53 * restore. (This used to be a list, and we still call it that, though now
54 * it's really an array so that we can apply qsort to it.)
55 *
56 * tes[] is sized large enough that we can't overrun it.
57 * The valid entries are indexed first_te .. last_te inclusive.
58 * We periodically sort the array to bring larger-by-dataLength entries to
59 * the front; "sorted" is true if the valid entries are known sorted.
60 */
61 typedef struct _parallelReadyList
62 {
63 TocEntry **tes; /* Ready-to-dump TocEntrys */
64 int first_te; /* index of first valid entry in tes[] */
65 int last_te; /* index of last valid entry in tes[] */
66 bool sorted; /* are valid entries currently sorted? */
67 } ParallelReadyList;
68
69
70 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
71 const int compression, bool dosync, ArchiveMode mode,
72 SetupWorkerPtrType setupWorkerPtr);
73 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
74 ArchiveHandle *AH);
75 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
76 static char *sanitize_line(const char *str, bool want_hyphen);
77 static void _doSetFixedOutputState(ArchiveHandle *AH);
78 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
79 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
80 static void _becomeUser(ArchiveHandle *AH, const char *user);
81 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
82 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
83 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
84 static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
85 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
86 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
87 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
88 static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
89 static RestorePass _tocEntryRestorePass(TocEntry *te);
90 static bool _tocEntryIsACL(TocEntry *te);
91 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
92 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
93 static void buildTocEntryArrays(ArchiveHandle *AH);
94 static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
95 static int _discoverArchiveFormat(ArchiveHandle *AH);
96
97 static int RestoringToDB(ArchiveHandle *AH);
98 static void dump_lo_buf(ArchiveHandle *AH);
99 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
100 static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
101 static OutputContext SaveOutput(ArchiveHandle *AH);
102 static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
103
104 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
105 static void restore_toc_entries_prefork(ArchiveHandle *AH,
106 TocEntry *pending_list);
107 static void restore_toc_entries_parallel(ArchiveHandle *AH,
108 ParallelState *pstate,
109 TocEntry *pending_list);
110 static void restore_toc_entries_postfork(ArchiveHandle *AH,
111 TocEntry *pending_list);
112 static void pending_list_header_init(TocEntry *l);
113 static void pending_list_append(TocEntry *l, TocEntry *te);
114 static void pending_list_remove(TocEntry *te);
115 static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
116 static void ready_list_free(ParallelReadyList *ready_list);
117 static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
118 static void ready_list_remove(ParallelReadyList *ready_list, int i);
119 static void ready_list_sort(ParallelReadyList *ready_list);
120 static int TocEntrySizeCompare(const void *p1, const void *p2);
121 static void move_to_ready_list(TocEntry *pending_list,
122 ParallelReadyList *ready_list,
123 RestorePass pass);
124 static TocEntry *pop_next_work_item(ArchiveHandle *AH,
125 ParallelReadyList *ready_list,
126 ParallelState *pstate);
127 static void mark_dump_job_done(ArchiveHandle *AH,
128 TocEntry *te,
129 int status,
130 void *callback_data);
131 static void mark_restore_job_done(ArchiveHandle *AH,
132 TocEntry *te,
133 int status,
134 void *callback_data);
135 static void fix_dependencies(ArchiveHandle *AH);
136 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
137 static void repoint_table_dependencies(ArchiveHandle *AH);
138 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
139 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
140 ParallelReadyList *ready_list);
141 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
142 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
143
144 static void StrictNamesCheck(RestoreOptions *ropt);
145
146
147 /*
148 * Allocate a new DumpOptions block containing all default values.
149 */
150 DumpOptions *
NewDumpOptions(void)151 NewDumpOptions(void)
152 {
153 DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
154
155 InitDumpOptions(opts);
156 return opts;
157 }
158
159 /*
160 * Initialize a DumpOptions struct to all default values
161 */
162 void
InitDumpOptions(DumpOptions * opts)163 InitDumpOptions(DumpOptions *opts)
164 {
165 memset(opts, 0, sizeof(DumpOptions));
166 /* set any fields that shouldn't default to zeroes */
167 opts->include_everything = true;
168 opts->cparams.promptPassword = TRI_DEFAULT;
169 opts->dumpSections = DUMP_UNSECTIONED;
170 }
171
172 /*
173 * Create a freshly allocated DumpOptions with options equivalent to those
174 * found in the given RestoreOptions.
175 */
176 DumpOptions *
dumpOptionsFromRestoreOptions(RestoreOptions * ropt)177 dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
178 {
179 DumpOptions *dopt = NewDumpOptions();
180
181 /* this is the inverse of what's at the end of pg_dump.c's main() */
182 dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
183 dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
184 dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
185 dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
186 dopt->cparams.promptPassword = ropt->cparams.promptPassword;
187 dopt->outputClean = ropt->dropSchema;
188 dopt->dataOnly = ropt->dataOnly;
189 dopt->schemaOnly = ropt->schemaOnly;
190 dopt->if_exists = ropt->if_exists;
191 dopt->column_inserts = ropt->column_inserts;
192 dopt->dumpSections = ropt->dumpSections;
193 dopt->aclsSkip = ropt->aclsSkip;
194 dopt->outputSuperuser = ropt->superuser;
195 dopt->outputCreateDB = ropt->createDB;
196 dopt->outputNoOwner = ropt->noOwner;
197 dopt->outputNoTablespaces = ropt->noTablespace;
198 dopt->disable_triggers = ropt->disable_triggers;
199 dopt->use_setsessauth = ropt->use_setsessauth;
200 dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
201 dopt->dump_inserts = ropt->dump_inserts;
202 dopt->no_comments = ropt->no_comments;
203 dopt->no_publications = ropt->no_publications;
204 dopt->no_security_labels = ropt->no_security_labels;
205 dopt->no_subscriptions = ropt->no_subscriptions;
206 dopt->lockWaitTimeout = ropt->lockWaitTimeout;
207 dopt->include_everything = ropt->include_everything;
208 dopt->enable_row_security = ropt->enable_row_security;
209 dopt->sequence_data = ropt->sequence_data;
210
211 return dopt;
212 }
213
214
215 /*
216 * Wrapper functions.
217 *
218 * The objective it to make writing new formats and dumpers as simple
219 * as possible, if necessary at the expense of extra function calls etc.
220 *
221 */
222
223 /*
224 * The dump worker setup needs lots of knowledge of the internals of pg_dump,
225 * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
226 * setup doesn't need to know anything much, so it's defined here.
227 */
228 static void
setupRestoreWorker(Archive * AHX)229 setupRestoreWorker(Archive *AHX)
230 {
231 ArchiveHandle *AH = (ArchiveHandle *) AHX;
232
233 AH->ReopenPtr(AH);
234 }
235
236
237 /* Create a new archive */
238 /* Public */
239 Archive *
CreateArchive(const char * FileSpec,const ArchiveFormat fmt,const int compression,bool dosync,ArchiveMode mode,SetupWorkerPtrType setupDumpWorker)240 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
241 const int compression, bool dosync, ArchiveMode mode,
242 SetupWorkerPtrType setupDumpWorker)
243
244 {
245 ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, dosync,
246 mode, setupDumpWorker);
247
248 return (Archive *) AH;
249 }
250
251 /* Open an existing archive */
252 /* Public */
253 Archive *
OpenArchive(const char * FileSpec,const ArchiveFormat fmt)254 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
255 {
256 ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker);
257
258 return (Archive *) AH;
259 }
260
261 /* Public */
262 void
CloseArchive(Archive * AHX)263 CloseArchive(Archive *AHX)
264 {
265 int res = 0;
266 ArchiveHandle *AH = (ArchiveHandle *) AHX;
267
268 AH->ClosePtr(AH);
269
270 /* Close the output */
271 if (AH->gzOut)
272 res = GZCLOSE(AH->OF);
273 else if (AH->OF != stdout)
274 res = fclose(AH->OF);
275
276 if (res != 0)
277 fatal("could not close output file: %m");
278 }
279
280 /* Public */
281 void
SetArchiveOptions(Archive * AH,DumpOptions * dopt,RestoreOptions * ropt)282 SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
283 {
284 /* Caller can omit dump options, in which case we synthesize them */
285 if (dopt == NULL && ropt != NULL)
286 dopt = dumpOptionsFromRestoreOptions(ropt);
287
288 /* Save options for later access */
289 AH->dopt = dopt;
290 AH->ropt = ropt;
291 }
292
293 /* Public */
294 void
ProcessArchiveRestoreOptions(Archive * AHX)295 ProcessArchiveRestoreOptions(Archive *AHX)
296 {
297 ArchiveHandle *AH = (ArchiveHandle *) AHX;
298 RestoreOptions *ropt = AH->public.ropt;
299 TocEntry *te;
300 teSection curSection;
301
302 /* Decide which TOC entries will be dumped/restored, and mark them */
303 curSection = SECTION_PRE_DATA;
304 for (te = AH->toc->next; te != AH->toc; te = te->next)
305 {
306 /*
307 * When writing an archive, we also take this opportunity to check
308 * that we have generated the entries in a sane order that respects
309 * the section divisions. When reading, don't complain, since buggy
310 * old versions of pg_dump might generate out-of-order archives.
311 */
312 if (AH->mode != archModeRead)
313 {
314 switch (te->section)
315 {
316 case SECTION_NONE:
317 /* ok to be anywhere */
318 break;
319 case SECTION_PRE_DATA:
320 if (curSection != SECTION_PRE_DATA)
321 pg_log_warning("archive items not in correct section order");
322 break;
323 case SECTION_DATA:
324 if (curSection == SECTION_POST_DATA)
325 pg_log_warning("archive items not in correct section order");
326 break;
327 case SECTION_POST_DATA:
328 /* ok no matter which section we were in */
329 break;
330 default:
331 fatal("unexpected section code %d",
332 (int) te->section);
333 break;
334 }
335 }
336
337 if (te->section != SECTION_NONE)
338 curSection = te->section;
339
340 te->reqs = _tocEntryRequired(te, curSection, AH);
341 }
342
343 /* Enforce strict names checking */
344 if (ropt->strict_names)
345 StrictNamesCheck(ropt);
346 }
347
348 /* Public */
349 void
RestoreArchive(Archive * AHX)350 RestoreArchive(Archive *AHX)
351 {
352 ArchiveHandle *AH = (ArchiveHandle *) AHX;
353 RestoreOptions *ropt = AH->public.ropt;
354 bool parallel_mode;
355 TocEntry *te;
356 OutputContext sav;
357
358 AH->stage = STAGE_INITIALIZING;
359
360 /*
361 * If we're going to do parallel restore, there are some restrictions.
362 */
363 parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
364 if (parallel_mode)
365 {
366 /* We haven't got round to making this work for all archive formats */
367 if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
368 fatal("parallel restore is not supported with this archive file format");
369
370 /* Doesn't work if the archive represents dependencies as OIDs */
371 if (AH->version < K_VERS_1_8)
372 fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
373
374 /*
375 * It's also not gonna work if we can't reopen the input file, so
376 * let's try that immediately.
377 */
378 AH->ReopenPtr(AH);
379 }
380
381 /*
382 * Make sure we won't need (de)compression we haven't got
383 */
384 #ifndef HAVE_LIBZ
385 if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
386 {
387 for (te = AH->toc->next; te != AH->toc; te = te->next)
388 {
389 if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
390 fatal("cannot restore from compressed archive (compression not supported in this installation)");
391 }
392 }
393 #endif
394
395 /*
396 * Prepare index arrays, so we can assume we have them throughout restore.
397 * It's possible we already did this, though.
398 */
399 if (AH->tocsByDumpId == NULL)
400 buildTocEntryArrays(AH);
401
402 /*
403 * If we're using a DB connection, then connect it.
404 */
405 if (ropt->useDB)
406 {
407 pg_log_info("connecting to database for restore");
408 if (AH->version < K_VERS_1_3)
409 fatal("direct database connections are not supported in pre-1.3 archives");
410
411 /*
412 * We don't want to guess at whether the dump will successfully
413 * restore; allow the attempt regardless of the version of the restore
414 * target.
415 */
416 AHX->minRemoteVersion = 0;
417 AHX->maxRemoteVersion = 9999999;
418
419 ConnectDatabase(AHX, &ropt->cparams, false);
420
421 /*
422 * If we're talking to the DB directly, don't send comments since they
423 * obscure SQL when displaying errors
424 */
425 AH->noTocComments = 1;
426 }
427
428 /*
429 * Work out if we have an implied data-only restore. This can happen if
430 * the dump was data only or if the user has used a toc list to exclude
431 * all of the schema data. All we do is look for schema entries - if none
432 * are found then we set the dataOnly flag.
433 *
434 * We could scan for wanted TABLE entries, but that is not the same as
435 * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
436 */
437 if (!ropt->dataOnly)
438 {
439 int impliedDataOnly = 1;
440
441 for (te = AH->toc->next; te != AH->toc; te = te->next)
442 {
443 if ((te->reqs & REQ_SCHEMA) != 0)
444 { /* It's schema, and it's wanted */
445 impliedDataOnly = 0;
446 break;
447 }
448 }
449 if (impliedDataOnly)
450 {
451 ropt->dataOnly = impliedDataOnly;
452 pg_log_info("implied data-only restore");
453 }
454 }
455
456 /*
457 * Setup the output file if necessary.
458 */
459 sav = SaveOutput(AH);
460 if (ropt->filename || ropt->compression)
461 SetOutput(AH, ropt->filename, ropt->compression);
462
463 ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
464
465 if (AH->archiveRemoteVersion)
466 ahprintf(AH, "-- Dumped from database version %s\n",
467 AH->archiveRemoteVersion);
468 if (AH->archiveDumpVersion)
469 ahprintf(AH, "-- Dumped by pg_dump version %s\n",
470 AH->archiveDumpVersion);
471
472 ahprintf(AH, "\n");
473
474 if (AH->public.verbose)
475 dumpTimestamp(AH, "Started on", AH->createDate);
476
477 if (ropt->single_txn)
478 {
479 if (AH->connection)
480 StartTransaction(AHX);
481 else
482 ahprintf(AH, "BEGIN;\n\n");
483 }
484
485 /*
486 * Establish important parameter values right away.
487 */
488 _doSetFixedOutputState(AH);
489
490 AH->stage = STAGE_PROCESSING;
491
492 /*
493 * Drop the items at the start, in reverse order
494 */
495 if (ropt->dropSchema)
496 {
497 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
498 {
499 AH->currentTE = te;
500
501 /*
502 * In createDB mode, issue a DROP *only* for the database as a
503 * whole. Issuing drops against anything else would be wrong,
504 * because at this point we're connected to the wrong database.
505 * (The DATABASE PROPERTIES entry, if any, should be treated like
506 * the DATABASE entry.)
507 */
508 if (ropt->createDB)
509 {
510 if (strcmp(te->desc, "DATABASE") != 0 &&
511 strcmp(te->desc, "DATABASE PROPERTIES") != 0)
512 continue;
513 }
514
515 /* Otherwise, drop anything that's selected and has a dropStmt */
516 if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
517 {
518 pg_log_info("dropping %s %s", te->desc, te->tag);
519 /* Select owner and schema as necessary */
520 _becomeOwner(AH, te);
521 _selectOutputSchema(AH, te->namespace);
522
523 /*
524 * Now emit the DROP command, if the object has one. Note we
525 * don't necessarily emit it verbatim; at this point we add an
526 * appropriate IF EXISTS clause, if the user requested it.
527 */
528 if (*te->dropStmt != '\0')
529 {
530 if (!ropt->if_exists)
531 {
532 /* No --if-exists? Then just use the original */
533 ahprintf(AH, "%s", te->dropStmt);
534 }
535 else
536 {
537 /*
538 * Inject an appropriate spelling of "if exists". For
539 * large objects, we have a separate routine that
540 * knows how to do it, without depending on
541 * te->dropStmt; use that. For other objects we need
542 * to parse the command.
543 */
544 if (strncmp(te->desc, "BLOB", 4) == 0)
545 {
546 DropBlobIfExists(AH, te->catalogId.oid);
547 }
548 else
549 {
550 char *dropStmt = pg_strdup(te->dropStmt);
551 char *dropStmtOrig = dropStmt;
552 PQExpBuffer ftStmt = createPQExpBuffer();
553
554 /*
555 * Need to inject IF EXISTS clause after ALTER
556 * TABLE part in ALTER TABLE .. DROP statement
557 */
558 if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
559 {
560 appendPQExpBufferStr(ftStmt,
561 "ALTER TABLE IF EXISTS");
562 dropStmt = dropStmt + 11;
563 }
564
565 /*
566 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
567 * not support the IF EXISTS clause, and therefore
568 * we simply emit the original command for DEFAULT
569 * objects (modulo the adjustment made above).
570 *
571 * Likewise, don't mess with DATABASE PROPERTIES.
572 *
573 * If we used CREATE OR REPLACE VIEW as a means of
574 * quasi-dropping an ON SELECT rule, that should
575 * be emitted unchanged as well.
576 *
577 * For other object types, we need to extract the
578 * first part of the DROP which includes the
579 * object type. Most of the time this matches
580 * te->desc, so search for that; however for the
581 * different kinds of CONSTRAINTs, we know to
582 * search for hardcoded "DROP CONSTRAINT" instead.
583 */
584 if (strcmp(te->desc, "DEFAULT") == 0 ||
585 strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
586 strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
587 appendPQExpBufferStr(ftStmt, dropStmt);
588 else
589 {
590 char buffer[40];
591 char *mark;
592
593 if (strcmp(te->desc, "CONSTRAINT") == 0 ||
594 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
595 strcmp(te->desc, "FK CONSTRAINT") == 0)
596 strcpy(buffer, "DROP CONSTRAINT");
597 else
598 snprintf(buffer, sizeof(buffer), "DROP %s",
599 te->desc);
600
601 mark = strstr(dropStmt, buffer);
602
603 if (mark)
604 {
605 *mark = '\0';
606 appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
607 dropStmt, buffer,
608 mark + strlen(buffer));
609 }
610 else
611 {
612 /* complain and emit unmodified command */
613 pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
614 dropStmtOrig);
615 appendPQExpBufferStr(ftStmt, dropStmt);
616 }
617 }
618
619 ahprintf(AH, "%s", ftStmt->data);
620
621 destroyPQExpBuffer(ftStmt);
622 pg_free(dropStmtOrig);
623 }
624 }
625 }
626 }
627 }
628
629 /*
630 * _selectOutputSchema may have set currSchema to reflect the effect
631 * of a "SET search_path" command it emitted. However, by now we may
632 * have dropped that schema; or it might not have existed in the first
633 * place. In either case the effective value of search_path will not
634 * be what we think. Forcibly reset currSchema so that we will
635 * re-establish the search_path setting when needed (after creating
636 * the schema).
637 *
638 * If we treated users as pg_dump'able objects then we'd need to reset
639 * currUser here too.
640 */
641 if (AH->currSchema)
642 free(AH->currSchema);
643 AH->currSchema = NULL;
644 }
645
646 if (parallel_mode)
647 {
648 /*
649 * In parallel mode, turn control over to the parallel-restore logic.
650 */
651 ParallelState *pstate;
652 TocEntry pending_list;
653
654 /* The archive format module may need some setup for this */
655 if (AH->PrepParallelRestorePtr)
656 AH->PrepParallelRestorePtr(AH);
657
658 pending_list_header_init(&pending_list);
659
660 /* This runs PRE_DATA items and then disconnects from the database */
661 restore_toc_entries_prefork(AH, &pending_list);
662 Assert(AH->connection == NULL);
663
664 /* ParallelBackupStart() will actually fork the processes */
665 pstate = ParallelBackupStart(AH);
666 restore_toc_entries_parallel(AH, pstate, &pending_list);
667 ParallelBackupEnd(AH, pstate);
668
669 /* reconnect the master and see if we missed something */
670 restore_toc_entries_postfork(AH, &pending_list);
671 Assert(AH->connection != NULL);
672 }
673 else
674 {
675 /*
676 * In serial mode, process everything in three phases: normal items,
677 * then ACLs, then post-ACL items. We might be able to skip one or
678 * both extra phases in some cases, eg data-only restores.
679 */
680 bool haveACL = false;
681 bool havePostACL = false;
682
683 for (te = AH->toc->next; te != AH->toc; te = te->next)
684 {
685 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
686 continue; /* ignore if not to be dumped at all */
687
688 switch (_tocEntryRestorePass(te))
689 {
690 case RESTORE_PASS_MAIN:
691 (void) restore_toc_entry(AH, te, false);
692 break;
693 case RESTORE_PASS_ACL:
694 haveACL = true;
695 break;
696 case RESTORE_PASS_POST_ACL:
697 havePostACL = true;
698 break;
699 }
700 }
701
702 if (haveACL)
703 {
704 for (te = AH->toc->next; te != AH->toc; te = te->next)
705 {
706 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
707 _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
708 (void) restore_toc_entry(AH, te, false);
709 }
710 }
711
712 if (havePostACL)
713 {
714 for (te = AH->toc->next; te != AH->toc; te = te->next)
715 {
716 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
717 _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
718 (void) restore_toc_entry(AH, te, false);
719 }
720 }
721 }
722
723 if (ropt->single_txn)
724 {
725 if (AH->connection)
726 CommitTransaction(AHX);
727 else
728 ahprintf(AH, "COMMIT;\n\n");
729 }
730
731 if (AH->public.verbose)
732 dumpTimestamp(AH, "Completed on", time(NULL));
733
734 ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
735
736 /*
737 * Clean up & we're done.
738 */
739 AH->stage = STAGE_FINALIZING;
740
741 if (ropt->filename || ropt->compression)
742 RestoreOutput(AH, sav);
743
744 if (ropt->useDB)
745 DisconnectDatabase(&AH->public);
746 }
747
748 /*
749 * Restore a single TOC item. Used in both parallel and non-parallel restore;
750 * is_parallel is true if we are in a worker child process.
751 *
752 * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
753 * the parallel parent has to make the corresponding status update.
754 */
755 static int
restore_toc_entry(ArchiveHandle * AH,TocEntry * te,bool is_parallel)756 restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
757 {
758 RestoreOptions *ropt = AH->public.ropt;
759 int status = WORKER_OK;
760 teReqs reqs;
761 bool defnDumped;
762
763 AH->currentTE = te;
764
765 /* Dump any relevant dump warnings to stderr */
766 if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
767 {
768 if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
769 pg_log_warning("warning from original dump file: %s", te->defn);
770 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
771 pg_log_warning("warning from original dump file: %s", te->copyStmt);
772 }
773
774 /* Work out what, if anything, we want from this entry */
775 reqs = te->reqs;
776
777 defnDumped = false;
778
779 /*
780 * If it has a schema component that we want, then process that
781 */
782 if ((reqs & REQ_SCHEMA) != 0)
783 {
784 /* Show namespace in log message if available */
785 if (te->namespace)
786 pg_log_info("creating %s \"%s.%s\"",
787 te->desc, te->namespace, te->tag);
788 else
789 pg_log_info("creating %s \"%s\"",
790 te->desc, te->tag);
791
792 _printTocEntry(AH, te, false);
793 defnDumped = true;
794
795 if (strcmp(te->desc, "TABLE") == 0)
796 {
797 if (AH->lastErrorTE == te)
798 {
799 /*
800 * We failed to create the table. If
801 * --no-data-for-failed-tables was given, mark the
802 * corresponding TABLE DATA to be ignored.
803 *
804 * In the parallel case this must be done in the parent, so we
805 * just set the return value.
806 */
807 if (ropt->noDataForFailedTables)
808 {
809 if (is_parallel)
810 status = WORKER_INHIBIT_DATA;
811 else
812 inhibit_data_for_failed_table(AH, te);
813 }
814 }
815 else
816 {
817 /*
818 * We created the table successfully. Mark the corresponding
819 * TABLE DATA for possible truncation.
820 *
821 * In the parallel case this must be done in the parent, so we
822 * just set the return value.
823 */
824 if (is_parallel)
825 status = WORKER_CREATE_DONE;
826 else
827 mark_create_done(AH, te);
828 }
829 }
830
831 /*
832 * If we created a DB, connect to it. Also, if we changed DB
833 * properties, reconnect to ensure that relevant GUC settings are
834 * applied to our session.
835 */
836 if (strcmp(te->desc, "DATABASE") == 0 ||
837 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
838 {
839 pg_log_info("connecting to new database \"%s\"", te->tag);
840 _reconnectToDB(AH, te->tag);
841 }
842 }
843
844 /*
845 * If it has a data component that we want, then process that
846 */
847 if ((reqs & REQ_DATA) != 0)
848 {
849 /*
850 * hadDumper will be set if there is genuine data component for this
851 * node. Otherwise, we need to check the defn field for statements
852 * that need to be executed in data-only restores.
853 */
854 if (te->hadDumper)
855 {
856 /*
857 * If we can output the data, then restore it.
858 */
859 if (AH->PrintTocDataPtr != NULL)
860 {
861 _printTocEntry(AH, te, true);
862
863 if (strcmp(te->desc, "BLOBS") == 0 ||
864 strcmp(te->desc, "BLOB COMMENTS") == 0)
865 {
866 pg_log_info("processing %s", te->desc);
867
868 _selectOutputSchema(AH, "pg_catalog");
869
870 /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
871 if (strcmp(te->desc, "BLOB COMMENTS") == 0)
872 AH->outputKind = OUTPUT_OTHERDATA;
873
874 AH->PrintTocDataPtr(AH, te);
875
876 AH->outputKind = OUTPUT_SQLCMDS;
877 }
878 else
879 {
880 _disableTriggersIfNecessary(AH, te);
881
882 /* Select owner and schema as necessary */
883 _becomeOwner(AH, te);
884 _selectOutputSchema(AH, te->namespace);
885
886 pg_log_info("processing data for table \"%s.%s\"",
887 te->namespace, te->tag);
888
889 /*
890 * In parallel restore, if we created the table earlier in
891 * the run then we wrap the COPY in a transaction and
892 * precede it with a TRUNCATE. If archiving is not on
893 * this prevents WAL-logging the COPY. This obtains a
894 * speedup similar to that from using single_txn mode in
895 * non-parallel restores.
896 */
897 if (is_parallel && te->created)
898 {
899 /*
900 * Parallel restore is always talking directly to a
901 * server, so no need to see if we should issue BEGIN.
902 */
903 StartTransaction(&AH->public);
904
905 /*
906 * If the server version is >= 8.4, make sure we issue
907 * TRUNCATE with ONLY so that child tables are not
908 * wiped.
909 */
910 ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
911 (PQserverVersion(AH->connection) >= 80400 ?
912 "ONLY " : ""),
913 fmtQualifiedId(te->namespace, te->tag));
914 }
915
916 /*
917 * If we have a copy statement, use it.
918 */
919 if (te->copyStmt && strlen(te->copyStmt) > 0)
920 {
921 ahprintf(AH, "%s", te->copyStmt);
922 AH->outputKind = OUTPUT_COPYDATA;
923 }
924 else
925 AH->outputKind = OUTPUT_OTHERDATA;
926
927 AH->PrintTocDataPtr(AH, te);
928
929 /*
930 * Terminate COPY if needed.
931 */
932 if (AH->outputKind == OUTPUT_COPYDATA &&
933 RestoringToDB(AH))
934 EndDBCopyMode(&AH->public, te->tag);
935 AH->outputKind = OUTPUT_SQLCMDS;
936
937 /* close out the transaction started above */
938 if (is_parallel && te->created)
939 CommitTransaction(&AH->public);
940
941 _enableTriggersIfNecessary(AH, te);
942 }
943 }
944 }
945 else if (!defnDumped)
946 {
947 /* If we haven't already dumped the defn part, do so now */
948 pg_log_info("executing %s %s", te->desc, te->tag);
949 _printTocEntry(AH, te, false);
950 }
951 }
952
953 if (AH->public.n_errors > 0 && status == WORKER_OK)
954 status = WORKER_IGNORED_ERRORS;
955
956 return status;
957 }
958
959 /*
960 * Allocate a new RestoreOptions block.
961 * This is mainly so we can initialize it, but also for future expansion,
962 */
963 RestoreOptions *
NewRestoreOptions(void)964 NewRestoreOptions(void)
965 {
966 RestoreOptions *opts;
967
968 opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
969
970 /* set any fields that shouldn't default to zeroes */
971 opts->format = archUnknown;
972 opts->cparams.promptPassword = TRI_DEFAULT;
973 opts->dumpSections = DUMP_UNSECTIONED;
974
975 return opts;
976 }
977
978 static void
_disableTriggersIfNecessary(ArchiveHandle * AH,TocEntry * te)979 _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
980 {
981 RestoreOptions *ropt = AH->public.ropt;
982
983 /* This hack is only needed in a data-only restore */
984 if (!ropt->dataOnly || !ropt->disable_triggers)
985 return;
986
987 pg_log_info("disabling triggers for %s", te->tag);
988
989 /*
990 * Become superuser if possible, since they are the only ones who can
991 * disable constraint triggers. If -S was not given, assume the initial
992 * user identity is a superuser. (XXX would it be better to become the
993 * table owner?)
994 */
995 _becomeUser(AH, ropt->superuser);
996
997 /*
998 * Disable them.
999 */
1000 ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1001 fmtQualifiedId(te->namespace, te->tag));
1002 }
1003
1004 static void
_enableTriggersIfNecessary(ArchiveHandle * AH,TocEntry * te)1005 _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1006 {
1007 RestoreOptions *ropt = AH->public.ropt;
1008
1009 /* This hack is only needed in a data-only restore */
1010 if (!ropt->dataOnly || !ropt->disable_triggers)
1011 return;
1012
1013 pg_log_info("enabling triggers for %s", te->tag);
1014
1015 /*
1016 * Become superuser if possible, since they are the only ones who can
1017 * disable constraint triggers. If -S was not given, assume the initial
1018 * user identity is a superuser. (XXX would it be better to become the
1019 * table owner?)
1020 */
1021 _becomeUser(AH, ropt->superuser);
1022
1023 /*
1024 * Enable them.
1025 */
1026 ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1027 fmtQualifiedId(te->namespace, te->tag));
1028 }
1029
1030 /*
1031 * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1032 */
1033
1034 /* Public */
1035 void
WriteData(Archive * AHX,const void * data,size_t dLen)1036 WriteData(Archive *AHX, const void *data, size_t dLen)
1037 {
1038 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1039
1040 if (!AH->currToc)
1041 fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1042
1043 AH->WriteDataPtr(AH, data, dLen);
1044 }
1045
1046 /*
1047 * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1048 * repository for all metadata. But the name has stuck.
1049 *
1050 * The new entry is added to the Archive's TOC list. Most callers can ignore
1051 * the result value because nothing else need be done, but a few want to
1052 * manipulate the TOC entry further.
1053 */
1054
1055 /* Public */
1056 TocEntry *
ArchiveEntry(Archive * AHX,CatalogId catalogId,DumpId dumpId,ArchiveOpts * opts)1057 ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1058 ArchiveOpts *opts)
1059 {
1060 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1061 TocEntry *newToc;
1062
1063 newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1064
1065 AH->tocCount++;
1066 if (dumpId > AH->maxDumpId)
1067 AH->maxDumpId = dumpId;
1068
1069 newToc->prev = AH->toc->prev;
1070 newToc->next = AH->toc;
1071 AH->toc->prev->next = newToc;
1072 AH->toc->prev = newToc;
1073
1074 newToc->catalogId = catalogId;
1075 newToc->dumpId = dumpId;
1076 newToc->section = opts->section;
1077
1078 newToc->tag = pg_strdup(opts->tag);
1079 newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1080 newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1081 newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1082 newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1083 newToc->desc = pg_strdup(opts->description);
1084 newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1085 newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1086 newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1087
1088 if (opts->nDeps > 0)
1089 {
1090 newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1091 memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1092 newToc->nDeps = opts->nDeps;
1093 }
1094 else
1095 {
1096 newToc->dependencies = NULL;
1097 newToc->nDeps = 0;
1098 }
1099
1100 newToc->dataDumper = opts->dumpFn;
1101 newToc->dataDumperArg = opts->dumpArg;
1102 newToc->hadDumper = opts->dumpFn ? true : false;
1103
1104 newToc->formatData = NULL;
1105 newToc->dataLength = 0;
1106
1107 if (AH->ArchiveEntryPtr != NULL)
1108 AH->ArchiveEntryPtr(AH, newToc);
1109
1110 return newToc;
1111 }
1112
1113 /* Public */
1114 void
PrintTOCSummary(Archive * AHX)1115 PrintTOCSummary(Archive *AHX)
1116 {
1117 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1118 RestoreOptions *ropt = AH->public.ropt;
1119 TocEntry *te;
1120 teSection curSection;
1121 OutputContext sav;
1122 const char *fmtName;
1123 char stamp_str[64];
1124
1125 sav = SaveOutput(AH);
1126 if (ropt->filename)
1127 SetOutput(AH, ropt->filename, 0 /* no compression */ );
1128
1129 if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1130 localtime(&AH->createDate)) == 0)
1131 strcpy(stamp_str, "[unknown]");
1132
1133 ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1134 ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n",
1135 sanitize_line(AH->archdbname, false),
1136 AH->tocCount, AH->compression);
1137
1138 switch (AH->format)
1139 {
1140 case archCustom:
1141 fmtName = "CUSTOM";
1142 break;
1143 case archDirectory:
1144 fmtName = "DIRECTORY";
1145 break;
1146 case archTar:
1147 fmtName = "TAR";
1148 break;
1149 default:
1150 fmtName = "UNKNOWN";
1151 }
1152
1153 ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1154 ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1155 ahprintf(AH, "; Format: %s\n", fmtName);
1156 ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1157 ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1158 if (AH->archiveRemoteVersion)
1159 ahprintf(AH, "; Dumped from database version: %s\n",
1160 AH->archiveRemoteVersion);
1161 if (AH->archiveDumpVersion)
1162 ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1163 AH->archiveDumpVersion);
1164
1165 ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1166
1167 curSection = SECTION_PRE_DATA;
1168 for (te = AH->toc->next; te != AH->toc; te = te->next)
1169 {
1170 if (te->section != SECTION_NONE)
1171 curSection = te->section;
1172 if (ropt->verbose ||
1173 (_tocEntryRequired(te, curSection, AH) & (REQ_SCHEMA | REQ_DATA)) != 0)
1174 {
1175 char *sanitized_name;
1176 char *sanitized_schema;
1177 char *sanitized_owner;
1178
1179 /*
1180 */
1181 sanitized_name = sanitize_line(te->tag, false);
1182 sanitized_schema = sanitize_line(te->namespace, true);
1183 sanitized_owner = sanitize_line(te->owner, false);
1184
1185 ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1186 te->catalogId.tableoid, te->catalogId.oid,
1187 te->desc, sanitized_schema, sanitized_name,
1188 sanitized_owner);
1189
1190 free(sanitized_name);
1191 free(sanitized_schema);
1192 free(sanitized_owner);
1193 }
1194 if (ropt->verbose && te->nDeps > 0)
1195 {
1196 int i;
1197
1198 ahprintf(AH, ";\tdepends on:");
1199 for (i = 0; i < te->nDeps; i++)
1200 ahprintf(AH, " %d", te->dependencies[i]);
1201 ahprintf(AH, "\n");
1202 }
1203 }
1204
1205 /* Enforce strict names checking */
1206 if (ropt->strict_names)
1207 StrictNamesCheck(ropt);
1208
1209 if (ropt->filename)
1210 RestoreOutput(AH, sav);
1211 }
1212
1213 /***********
1214 * BLOB Archival
1215 ***********/
1216
1217 /* Called by a dumper to signal start of a BLOB */
1218 int
StartBlob(Archive * AHX,Oid oid)1219 StartBlob(Archive *AHX, Oid oid)
1220 {
1221 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1222
1223 if (!AH->StartBlobPtr)
1224 fatal("large-object output not supported in chosen format");
1225
1226 AH->StartBlobPtr(AH, AH->currToc, oid);
1227
1228 return 1;
1229 }
1230
1231 /* Called by a dumper to signal end of a BLOB */
1232 int
EndBlob(Archive * AHX,Oid oid)1233 EndBlob(Archive *AHX, Oid oid)
1234 {
1235 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1236
1237 if (AH->EndBlobPtr)
1238 AH->EndBlobPtr(AH, AH->currToc, oid);
1239
1240 return 1;
1241 }
1242
1243 /**********
1244 * BLOB Restoration
1245 **********/
1246
1247 /*
1248 * Called by a format handler before any blobs are restored
1249 */
1250 void
StartRestoreBlobs(ArchiveHandle * AH)1251 StartRestoreBlobs(ArchiveHandle *AH)
1252 {
1253 RestoreOptions *ropt = AH->public.ropt;
1254
1255 if (!ropt->single_txn)
1256 {
1257 if (AH->connection)
1258 StartTransaction(&AH->public);
1259 else
1260 ahprintf(AH, "BEGIN;\n\n");
1261 }
1262
1263 AH->blobCount = 0;
1264 }
1265
1266 /*
1267 * Called by a format handler after all blobs are restored
1268 */
1269 void
EndRestoreBlobs(ArchiveHandle * AH)1270 EndRestoreBlobs(ArchiveHandle *AH)
1271 {
1272 RestoreOptions *ropt = AH->public.ropt;
1273
1274 if (!ropt->single_txn)
1275 {
1276 if (AH->connection)
1277 CommitTransaction(&AH->public);
1278 else
1279 ahprintf(AH, "COMMIT;\n\n");
1280 }
1281
1282 pg_log_info(ngettext("restored %d large object",
1283 "restored %d large objects",
1284 AH->blobCount),
1285 AH->blobCount);
1286 }
1287
1288
1289 /*
1290 * Called by a format handler to initiate restoration of a blob
1291 */
1292 void
StartRestoreBlob(ArchiveHandle * AH,Oid oid,bool drop)1293 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
1294 {
1295 bool old_blob_style = (AH->version < K_VERS_1_12);
1296 Oid loOid;
1297
1298 AH->blobCount++;
1299
1300 /* Initialize the LO Buffer */
1301 AH->lo_buf_used = 0;
1302
1303 pg_log_info("restoring large object with OID %u", oid);
1304
1305 /* With an old archive we must do drop and create logic here */
1306 if (old_blob_style && drop)
1307 DropBlobIfExists(AH, oid);
1308
1309 if (AH->connection)
1310 {
1311 if (old_blob_style)
1312 {
1313 loOid = lo_create(AH->connection, oid);
1314 if (loOid == 0 || loOid != oid)
1315 fatal("could not create large object %u: %s",
1316 oid, PQerrorMessage(AH->connection));
1317 }
1318 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1319 if (AH->loFd == -1)
1320 fatal("could not open large object %u: %s",
1321 oid, PQerrorMessage(AH->connection));
1322 }
1323 else
1324 {
1325 if (old_blob_style)
1326 ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1327 oid, INV_WRITE);
1328 else
1329 ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1330 oid, INV_WRITE);
1331 }
1332
1333 AH->writingBlob = 1;
1334 }
1335
1336 void
EndRestoreBlob(ArchiveHandle * AH,Oid oid)1337 EndRestoreBlob(ArchiveHandle *AH, Oid oid)
1338 {
1339 if (AH->lo_buf_used > 0)
1340 {
1341 /* Write remaining bytes from the LO buffer */
1342 dump_lo_buf(AH);
1343 }
1344
1345 AH->writingBlob = 0;
1346
1347 if (AH->connection)
1348 {
1349 lo_close(AH->connection, AH->loFd);
1350 AH->loFd = -1;
1351 }
1352 else
1353 {
1354 ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1355 }
1356 }
1357
1358 /***********
1359 * Sorting and Reordering
1360 ***********/
1361
1362 void
SortTocFromFile(Archive * AHX)1363 SortTocFromFile(Archive *AHX)
1364 {
1365 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1366 RestoreOptions *ropt = AH->public.ropt;
1367 FILE *fh;
1368 char buf[100];
1369 bool incomplete_line;
1370
1371 /* Allocate space for the 'wanted' array, and init it */
1372 ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1373
1374 /* Setup the file */
1375 fh = fopen(ropt->tocFile, PG_BINARY_R);
1376 if (!fh)
1377 fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1378
1379 incomplete_line = false;
1380 while (fgets(buf, sizeof(buf), fh) != NULL)
1381 {
1382 bool prev_incomplete_line = incomplete_line;
1383 int buflen;
1384 char *cmnt;
1385 char *endptr;
1386 DumpId id;
1387 TocEntry *te;
1388
1389 /*
1390 * Some lines in the file might be longer than sizeof(buf). This is
1391 * no problem, since we only care about the leading numeric ID which
1392 * can be at most a few characters; but we have to skip continuation
1393 * bufferloads when processing a long line.
1394 */
1395 buflen = strlen(buf);
1396 if (buflen > 0 && buf[buflen - 1] == '\n')
1397 incomplete_line = false;
1398 else
1399 incomplete_line = true;
1400 if (prev_incomplete_line)
1401 continue;
1402
1403 /* Truncate line at comment, if any */
1404 cmnt = strchr(buf, ';');
1405 if (cmnt != NULL)
1406 cmnt[0] = '\0';
1407
1408 /* Ignore if all blank */
1409 if (strspn(buf, " \t\r\n") == strlen(buf))
1410 continue;
1411
1412 /* Get an ID, check it's valid and not already seen */
1413 id = strtol(buf, &endptr, 10);
1414 if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
1415 ropt->idWanted[id - 1])
1416 {
1417 pg_log_warning("line ignored: %s", buf);
1418 continue;
1419 }
1420
1421 /* Find TOC entry */
1422 te = getTocEntryByDumpId(AH, id);
1423 if (!te)
1424 fatal("could not find entry for ID %d",
1425 id);
1426
1427 /* Mark it wanted */
1428 ropt->idWanted[id - 1] = true;
1429
1430 /*
1431 * Move each item to the end of the list as it is selected, so that
1432 * they are placed in the desired order. Any unwanted items will end
1433 * up at the front of the list, which may seem unintuitive but it's
1434 * what we need. In an ordinary serial restore that makes no
1435 * difference, but in a parallel restore we need to mark unrestored
1436 * items' dependencies as satisfied before we start examining
1437 * restorable items. Otherwise they could have surprising
1438 * side-effects on the order in which restorable items actually get
1439 * restored.
1440 */
1441 _moveBefore(AH, AH->toc, te);
1442 }
1443
1444 if (fclose(fh) != 0)
1445 fatal("could not close TOC file: %m");
1446 }
1447
1448 /**********************
1449 * 'Convenience functions that look like standard IO functions
1450 * for writing data when in dump mode.
1451 **********************/
1452
1453 /* Public */
1454 void
archputs(const char * s,Archive * AH)1455 archputs(const char *s, Archive *AH)
1456 {
1457 WriteData(AH, s, strlen(s));
1458 }
1459
1460 /* Public */
1461 int
archprintf(Archive * AH,const char * fmt,...)1462 archprintf(Archive *AH, const char *fmt,...)
1463 {
1464 int save_errno = errno;
1465 char *p;
1466 size_t len = 128; /* initial assumption about buffer size */
1467 size_t cnt;
1468
1469 for (;;)
1470 {
1471 va_list args;
1472
1473 /* Allocate work buffer. */
1474 p = (char *) pg_malloc(len);
1475
1476 /* Try to format the data. */
1477 errno = save_errno;
1478 va_start(args, fmt);
1479 cnt = pvsnprintf(p, len, fmt, args);
1480 va_end(args);
1481
1482 if (cnt < len)
1483 break; /* success */
1484
1485 /* Release buffer and loop around to try again with larger len. */
1486 free(p);
1487 len = cnt;
1488 }
1489
1490 WriteData(AH, p, cnt);
1491 free(p);
1492 return (int) cnt;
1493 }
1494
1495
1496 /*******************************
1497 * Stuff below here should be 'private' to the archiver routines
1498 *******************************/
1499
1500 static void
SetOutput(ArchiveHandle * AH,const char * filename,int compression)1501 SetOutput(ArchiveHandle *AH, const char *filename, int compression)
1502 {
1503 int fn;
1504
1505 if (filename)
1506 {
1507 if (strcmp(filename, "-") == 0)
1508 fn = fileno(stdout);
1509 else
1510 fn = -1;
1511 }
1512 else if (AH->FH)
1513 fn = fileno(AH->FH);
1514 else if (AH->fSpec)
1515 {
1516 fn = -1;
1517 filename = AH->fSpec;
1518 }
1519 else
1520 fn = fileno(stdout);
1521
1522 /* If compression explicitly requested, use gzopen */
1523 #ifdef HAVE_LIBZ
1524 if (compression != 0)
1525 {
1526 char fmode[14];
1527
1528 /* Don't use PG_BINARY_x since this is zlib */
1529 sprintf(fmode, "wb%d", compression);
1530 if (fn >= 0)
1531 AH->OF = gzdopen(dup(fn), fmode);
1532 else
1533 AH->OF = gzopen(filename, fmode);
1534 AH->gzOut = 1;
1535 }
1536 else
1537 #endif
1538 { /* Use fopen */
1539 if (AH->mode == archModeAppend)
1540 {
1541 if (fn >= 0)
1542 AH->OF = fdopen(dup(fn), PG_BINARY_A);
1543 else
1544 AH->OF = fopen(filename, PG_BINARY_A);
1545 }
1546 else
1547 {
1548 if (fn >= 0)
1549 AH->OF = fdopen(dup(fn), PG_BINARY_W);
1550 else
1551 AH->OF = fopen(filename, PG_BINARY_W);
1552 }
1553 AH->gzOut = 0;
1554 }
1555
1556 if (!AH->OF)
1557 {
1558 if (filename)
1559 fatal("could not open output file \"%s\": %m", filename);
1560 else
1561 fatal("could not open output file: %m");
1562 }
1563 }
1564
1565 static OutputContext
SaveOutput(ArchiveHandle * AH)1566 SaveOutput(ArchiveHandle *AH)
1567 {
1568 OutputContext sav;
1569
1570 sav.OF = AH->OF;
1571 sav.gzOut = AH->gzOut;
1572
1573 return sav;
1574 }
1575
1576 static void
RestoreOutput(ArchiveHandle * AH,OutputContext savedContext)1577 RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
1578 {
1579 int res;
1580
1581 if (AH->gzOut)
1582 res = GZCLOSE(AH->OF);
1583 else
1584 res = fclose(AH->OF);
1585
1586 if (res != 0)
1587 fatal("could not close output file: %m");
1588
1589 AH->gzOut = savedContext.gzOut;
1590 AH->OF = savedContext.OF;
1591 }
1592
1593
1594
1595 /*
1596 * Print formatted text to the output file (usually stdout).
1597 */
1598 int
ahprintf(ArchiveHandle * AH,const char * fmt,...)1599 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1600 {
1601 int save_errno = errno;
1602 char *p;
1603 size_t len = 128; /* initial assumption about buffer size */
1604 size_t cnt;
1605
1606 for (;;)
1607 {
1608 va_list args;
1609
1610 /* Allocate work buffer. */
1611 p = (char *) pg_malloc(len);
1612
1613 /* Try to format the data. */
1614 errno = save_errno;
1615 va_start(args, fmt);
1616 cnt = pvsnprintf(p, len, fmt, args);
1617 va_end(args);
1618
1619 if (cnt < len)
1620 break; /* success */
1621
1622 /* Release buffer and loop around to try again with larger len. */
1623 free(p);
1624 len = cnt;
1625 }
1626
1627 ahwrite(p, 1, cnt, AH);
1628 free(p);
1629 return (int) cnt;
1630 }
1631
1632 /*
1633 * Single place for logic which says 'We are restoring to a direct DB connection'.
1634 */
1635 static int
RestoringToDB(ArchiveHandle * AH)1636 RestoringToDB(ArchiveHandle *AH)
1637 {
1638 RestoreOptions *ropt = AH->public.ropt;
1639
1640 return (ropt && ropt->useDB && AH->connection);
1641 }
1642
1643 /*
1644 * Dump the current contents of the LO data buffer while writing a BLOB
1645 */
1646 static void
dump_lo_buf(ArchiveHandle * AH)1647 dump_lo_buf(ArchiveHandle *AH)
1648 {
1649 if (AH->connection)
1650 {
1651 size_t res;
1652
1653 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1654 pg_log_debug(ngettext("wrote %lu byte of large object data (result = %lu)",
1655 "wrote %lu bytes of large object data (result = %lu)",
1656 AH->lo_buf_used),
1657 (unsigned long) AH->lo_buf_used, (unsigned long) res);
1658 if (res != AH->lo_buf_used)
1659 fatal("could not write to large object (result: %lu, expected: %lu)",
1660 (unsigned long) res, (unsigned long) AH->lo_buf_used);
1661 }
1662 else
1663 {
1664 PQExpBuffer buf = createPQExpBuffer();
1665
1666 appendByteaLiteralAHX(buf,
1667 (const unsigned char *) AH->lo_buf,
1668 AH->lo_buf_used,
1669 AH);
1670
1671 /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1672 AH->writingBlob = 0;
1673 ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1674 AH->writingBlob = 1;
1675
1676 destroyPQExpBuffer(buf);
1677 }
1678 AH->lo_buf_used = 0;
1679 }
1680
1681
1682 /*
1683 * Write buffer to the output file (usually stdout). This is used for
1684 * outputting 'restore' scripts etc. It is even possible for an archive
1685 * format to create a custom output routine to 'fake' a restore if it
1686 * wants to generate a script (see TAR output).
1687 */
1688 void
ahwrite(const void * ptr,size_t size,size_t nmemb,ArchiveHandle * AH)1689 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1690 {
1691 int bytes_written = 0;
1692
1693 if (AH->writingBlob)
1694 {
1695 size_t remaining = size * nmemb;
1696
1697 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1698 {
1699 size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1700
1701 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1702 ptr = (const void *) ((const char *) ptr + avail);
1703 remaining -= avail;
1704 AH->lo_buf_used += avail;
1705 dump_lo_buf(AH);
1706 }
1707
1708 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1709 AH->lo_buf_used += remaining;
1710
1711 bytes_written = size * nmemb;
1712 }
1713 else if (AH->gzOut)
1714 bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
1715 else if (AH->CustomOutPtr)
1716 bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1717
1718 else
1719 {
1720 /*
1721 * If we're doing a restore, and it's direct to DB, and we're
1722 * connected then send it to the DB.
1723 */
1724 if (RestoringToDB(AH))
1725 bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1726 else
1727 bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
1728 }
1729
1730 if (bytes_written != size * nmemb)
1731 WRITE_ERROR_EXIT;
1732 }
1733
1734 /* on some error, we may decide to go on... */
1735 void
warn_or_exit_horribly(ArchiveHandle * AH,const char * fmt,...)1736 warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1737 {
1738 va_list ap;
1739
1740 switch (AH->stage)
1741 {
1742
1743 case STAGE_NONE:
1744 /* Do nothing special */
1745 break;
1746
1747 case STAGE_INITIALIZING:
1748 if (AH->stage != AH->lastErrorStage)
1749 pg_log_generic(PG_LOG_INFO, "while INITIALIZING:");
1750 break;
1751
1752 case STAGE_PROCESSING:
1753 if (AH->stage != AH->lastErrorStage)
1754 pg_log_generic(PG_LOG_INFO, "while PROCESSING TOC:");
1755 break;
1756
1757 case STAGE_FINALIZING:
1758 if (AH->stage != AH->lastErrorStage)
1759 pg_log_generic(PG_LOG_INFO, "while FINALIZING:");
1760 break;
1761 }
1762 if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1763 {
1764 pg_log_generic(PG_LOG_INFO, "from TOC entry %d; %u %u %s %s %s",
1765 AH->currentTE->dumpId,
1766 AH->currentTE->catalogId.tableoid,
1767 AH->currentTE->catalogId.oid,
1768 AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1769 AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1770 AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1771 }
1772 AH->lastErrorStage = AH->stage;
1773 AH->lastErrorTE = AH->currentTE;
1774
1775 va_start(ap, fmt);
1776 pg_log_generic_v(PG_LOG_ERROR, fmt, ap);
1777 va_end(ap);
1778
1779 if (AH->public.exit_on_error)
1780 exit_nicely(1);
1781 else
1782 AH->public.n_errors++;
1783 }
1784
1785 #ifdef NOT_USED
1786
1787 static void
_moveAfter(ArchiveHandle * AH,TocEntry * pos,TocEntry * te)1788 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1789 {
1790 /* Unlink te from list */
1791 te->prev->next = te->next;
1792 te->next->prev = te->prev;
1793
1794 /* and insert it after "pos" */
1795 te->prev = pos;
1796 te->next = pos->next;
1797 pos->next->prev = te;
1798 pos->next = te;
1799 }
1800 #endif
1801
1802 static void
_moveBefore(ArchiveHandle * AH,TocEntry * pos,TocEntry * te)1803 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1804 {
1805 /* Unlink te from list */
1806 te->prev->next = te->next;
1807 te->next->prev = te->prev;
1808
1809 /* and insert it before "pos" */
1810 te->prev = pos->prev;
1811 te->next = pos;
1812 pos->prev->next = te;
1813 pos->prev = te;
1814 }
1815
1816 /*
1817 * Build index arrays for the TOC list
1818 *
1819 * This should be invoked only after we have created or read in all the TOC
1820 * items.
1821 *
1822 * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1823 * array entries run only up to maxDumpId. We might see dependency dump IDs
1824 * beyond that (if the dump was partial); so always check the array bound
1825 * before trying to touch an array entry.
1826 */
1827 static void
buildTocEntryArrays(ArchiveHandle * AH)1828 buildTocEntryArrays(ArchiveHandle *AH)
1829 {
1830 DumpId maxDumpId = AH->maxDumpId;
1831 TocEntry *te;
1832
1833 AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1834 AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1835
1836 for (te = AH->toc->next; te != AH->toc; te = te->next)
1837 {
1838 /* this check is purely paranoia, maxDumpId should be correct */
1839 if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1840 fatal("bad dumpId");
1841
1842 /* tocsByDumpId indexes all TOCs by their dump ID */
1843 AH->tocsByDumpId[te->dumpId] = te;
1844
1845 /*
1846 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1847 * TOC entry that has a DATA item. We compute this by reversing the
1848 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1849 * just one dependency and it is the TABLE item.
1850 */
1851 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1852 {
1853 DumpId tableId = te->dependencies[0];
1854
1855 /*
1856 * The TABLE item might not have been in the archive, if this was
1857 * a data-only dump; but its dump ID should be less than its data
1858 * item's dump ID, so there should be a place for it in the array.
1859 */
1860 if (tableId <= 0 || tableId > maxDumpId)
1861 fatal("bad table dumpId for TABLE DATA item");
1862
1863 AH->tableDataId[tableId] = te->dumpId;
1864 }
1865 }
1866 }
1867
1868 TocEntry *
getTocEntryByDumpId(ArchiveHandle * AH,DumpId id)1869 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1870 {
1871 /* build index arrays if we didn't already */
1872 if (AH->tocsByDumpId == NULL)
1873 buildTocEntryArrays(AH);
1874
1875 if (id > 0 && id <= AH->maxDumpId)
1876 return AH->tocsByDumpId[id];
1877
1878 return NULL;
1879 }
1880
1881 teReqs
TocIDRequired(ArchiveHandle * AH,DumpId id)1882 TocIDRequired(ArchiveHandle *AH, DumpId id)
1883 {
1884 TocEntry *te = getTocEntryByDumpId(AH, id);
1885
1886 if (!te)
1887 return 0;
1888
1889 return te->reqs;
1890 }
1891
1892 size_t
WriteOffset(ArchiveHandle * AH,pgoff_t o,int wasSet)1893 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1894 {
1895 int off;
1896
1897 /* Save the flag */
1898 AH->WriteBytePtr(AH, wasSet);
1899
1900 /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1901 for (off = 0; off < sizeof(pgoff_t); off++)
1902 {
1903 AH->WriteBytePtr(AH, o & 0xFF);
1904 o >>= 8;
1905 }
1906 return sizeof(pgoff_t) + 1;
1907 }
1908
1909 int
ReadOffset(ArchiveHandle * AH,pgoff_t * o)1910 ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1911 {
1912 int i;
1913 int off;
1914 int offsetFlg;
1915
1916 /* Initialize to zero */
1917 *o = 0;
1918
1919 /* Check for old version */
1920 if (AH->version < K_VERS_1_7)
1921 {
1922 /* Prior versions wrote offsets using WriteInt */
1923 i = ReadInt(AH);
1924 /* -1 means not set */
1925 if (i < 0)
1926 return K_OFFSET_POS_NOT_SET;
1927 else if (i == 0)
1928 return K_OFFSET_NO_DATA;
1929
1930 /* Cast to pgoff_t because it was written as an int. */
1931 *o = (pgoff_t) i;
1932 return K_OFFSET_POS_SET;
1933 }
1934
1935 /*
1936 * Read the flag indicating the state of the data pointer. Check if valid
1937 * and die if not.
1938 *
1939 * This used to be handled by a negative or zero pointer, now we use an
1940 * extra byte specifically for the state.
1941 */
1942 offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
1943
1944 switch (offsetFlg)
1945 {
1946 case K_OFFSET_POS_NOT_SET:
1947 case K_OFFSET_NO_DATA:
1948 case K_OFFSET_POS_SET:
1949
1950 break;
1951
1952 default:
1953 fatal("unexpected data offset flag %d", offsetFlg);
1954 }
1955
1956 /*
1957 * Read the bytes
1958 */
1959 for (off = 0; off < AH->offSize; off++)
1960 {
1961 if (off < sizeof(pgoff_t))
1962 *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
1963 else
1964 {
1965 if (AH->ReadBytePtr(AH) != 0)
1966 fatal("file offset in dump file is too large");
1967 }
1968 }
1969
1970 return offsetFlg;
1971 }
1972
1973 size_t
WriteInt(ArchiveHandle * AH,int i)1974 WriteInt(ArchiveHandle *AH, int i)
1975 {
1976 int b;
1977
1978 /*
1979 * This is a bit yucky, but I don't want to make the binary format very
1980 * dependent on representation, and not knowing much about it, I write out
1981 * a sign byte. If you change this, don't forget to change the file
1982 * version #, and modify ReadInt to read the new format AS WELL AS the old
1983 * formats.
1984 */
1985
1986 /* SIGN byte */
1987 if (i < 0)
1988 {
1989 AH->WriteBytePtr(AH, 1);
1990 i = -i;
1991 }
1992 else
1993 AH->WriteBytePtr(AH, 0);
1994
1995 for (b = 0; b < AH->intSize; b++)
1996 {
1997 AH->WriteBytePtr(AH, i & 0xFF);
1998 i >>= 8;
1999 }
2000
2001 return AH->intSize + 1;
2002 }
2003
2004 int
ReadInt(ArchiveHandle * AH)2005 ReadInt(ArchiveHandle *AH)
2006 {
2007 int res = 0;
2008 int bv,
2009 b;
2010 int sign = 0; /* Default positive */
2011 int bitShift = 0;
2012
2013 if (AH->version > K_VERS_1_0)
2014 /* Read a sign byte */
2015 sign = AH->ReadBytePtr(AH);
2016
2017 for (b = 0; b < AH->intSize; b++)
2018 {
2019 bv = AH->ReadBytePtr(AH) & 0xFF;
2020 if (bv != 0)
2021 res = res + (bv << bitShift);
2022 bitShift += 8;
2023 }
2024
2025 if (sign)
2026 res = -res;
2027
2028 return res;
2029 }
2030
2031 size_t
WriteStr(ArchiveHandle * AH,const char * c)2032 WriteStr(ArchiveHandle *AH, const char *c)
2033 {
2034 size_t res;
2035
2036 if (c)
2037 {
2038 int len = strlen(c);
2039
2040 res = WriteInt(AH, len);
2041 AH->WriteBufPtr(AH, c, len);
2042 res += len;
2043 }
2044 else
2045 res = WriteInt(AH, -1);
2046
2047 return res;
2048 }
2049
2050 char *
ReadStr(ArchiveHandle * AH)2051 ReadStr(ArchiveHandle *AH)
2052 {
2053 char *buf;
2054 int l;
2055
2056 l = ReadInt(AH);
2057 if (l < 0)
2058 buf = NULL;
2059 else
2060 {
2061 buf = (char *) pg_malloc(l + 1);
2062 AH->ReadBufPtr(AH, (void *) buf, l);
2063
2064 buf[l] = '\0';
2065 }
2066
2067 return buf;
2068 }
2069
2070 static int
_discoverArchiveFormat(ArchiveHandle * AH)2071 _discoverArchiveFormat(ArchiveHandle *AH)
2072 {
2073 FILE *fh;
2074 char sig[6]; /* More than enough */
2075 size_t cnt;
2076 int wantClose = 0;
2077
2078 pg_log_debug("attempting to ascertain archive format");
2079
2080 if (AH->lookahead)
2081 free(AH->lookahead);
2082
2083 AH->readHeader = 0;
2084 AH->lookaheadSize = 512;
2085 AH->lookahead = pg_malloc0(512);
2086 AH->lookaheadLen = 0;
2087 AH->lookaheadPos = 0;
2088
2089 if (AH->fSpec)
2090 {
2091 struct stat st;
2092
2093 wantClose = 1;
2094
2095 /*
2096 * Check if the specified archive is a directory. If so, check if
2097 * there's a "toc.dat" (or "toc.dat.gz") file in it.
2098 */
2099 if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2100 {
2101 char buf[MAXPGPATH];
2102
2103 if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
2104 fatal("directory name too long: \"%s\"",
2105 AH->fSpec);
2106 if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2107 {
2108 AH->format = archDirectory;
2109 return AH->format;
2110 }
2111
2112 #ifdef HAVE_LIBZ
2113 if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
2114 fatal("directory name too long: \"%s\"",
2115 AH->fSpec);
2116 if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2117 {
2118 AH->format = archDirectory;
2119 return AH->format;
2120 }
2121 #endif
2122 fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2123 AH->fSpec);
2124 fh = NULL; /* keep compiler quiet */
2125 }
2126 else
2127 {
2128 fh = fopen(AH->fSpec, PG_BINARY_R);
2129 if (!fh)
2130 fatal("could not open input file \"%s\": %m", AH->fSpec);
2131 }
2132 }
2133 else
2134 {
2135 fh = stdin;
2136 if (!fh)
2137 fatal("could not open input file: %m");
2138 }
2139
2140 if ((cnt = fread(sig, 1, 5, fh)) != 5)
2141 {
2142 if (ferror(fh))
2143 fatal("could not read input file: %m");
2144 else
2145 fatal("input file is too short (read %lu, expected 5)",
2146 (unsigned long) cnt);
2147 }
2148
2149 /* Save it, just in case we need it later */
2150 memcpy(&AH->lookahead[0], sig, 5);
2151 AH->lookaheadLen = 5;
2152
2153 if (strncmp(sig, "PGDMP", 5) == 0)
2154 {
2155 /* It's custom format, stop here */
2156 AH->format = archCustom;
2157 AH->readHeader = 1;
2158 }
2159 else
2160 {
2161 /*
2162 * *Maybe* we have a tar archive format file or a text dump ... So,
2163 * read first 512 byte header...
2164 */
2165 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2166 /* read failure is checked below */
2167 AH->lookaheadLen += cnt;
2168
2169 if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2170 (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2171 strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2172 {
2173 /*
2174 * looks like it's probably a text format dump. so suggest they
2175 * try psql
2176 */
2177 fatal("input file appears to be a text format dump. Please use psql.");
2178 }
2179
2180 if (AH->lookaheadLen != 512)
2181 {
2182 if (feof(fh))
2183 fatal("input file does not appear to be a valid archive (too short?)");
2184 else
2185 READ_ERROR_EXIT(fh);
2186 }
2187
2188 if (!isValidTarHeader(AH->lookahead))
2189 fatal("input file does not appear to be a valid archive");
2190
2191 AH->format = archTar;
2192 }
2193
2194 /* Close the file if we opened it */
2195 if (wantClose)
2196 {
2197 if (fclose(fh) != 0)
2198 fatal("could not close input file: %m");
2199 /* Forget lookahead, since we'll re-read header after re-opening */
2200 AH->readHeader = 0;
2201 AH->lookaheadLen = 0;
2202 }
2203
2204 return AH->format;
2205 }
2206
2207
2208 /*
2209 * Allocate an archive handle
2210 */
2211 static ArchiveHandle *
_allocAH(const char * FileSpec,const ArchiveFormat fmt,const int compression,bool dosync,ArchiveMode mode,SetupWorkerPtrType setupWorkerPtr)2212 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2213 const int compression, bool dosync, ArchiveMode mode,
2214 SetupWorkerPtrType setupWorkerPtr)
2215 {
2216 ArchiveHandle *AH;
2217
2218 pg_log_debug("allocating AH for %s, format %d", FileSpec, fmt);
2219
2220 AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2221
2222 AH->version = K_VERS_SELF;
2223
2224 /* initialize for backwards compatible string processing */
2225 AH->public.encoding = 0; /* PG_SQL_ASCII */
2226 AH->public.std_strings = false;
2227
2228 /* sql error handling */
2229 AH->public.exit_on_error = true;
2230 AH->public.n_errors = 0;
2231
2232 AH->archiveDumpVersion = PG_VERSION;
2233
2234 AH->createDate = time(NULL);
2235
2236 AH->intSize = sizeof(int);
2237 AH->offSize = sizeof(pgoff_t);
2238 if (FileSpec)
2239 {
2240 AH->fSpec = pg_strdup(FileSpec);
2241
2242 /*
2243 * Not used; maybe later....
2244 *
2245 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2246 * i--) if (AH->workDir[i-1] == '/')
2247 */
2248 }
2249 else
2250 AH->fSpec = NULL;
2251
2252 AH->currUser = NULL; /* unknown */
2253 AH->currSchema = NULL; /* ditto */
2254 AH->currTablespace = NULL; /* ditto */
2255 AH->currTableAm = NULL; /* ditto */
2256
2257 AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2258
2259 AH->toc->next = AH->toc;
2260 AH->toc->prev = AH->toc;
2261
2262 AH->mode = mode;
2263 AH->compression = compression;
2264 AH->dosync = dosync;
2265
2266 memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2267
2268 /* Open stdout with no compression for AH output handle */
2269 AH->gzOut = 0;
2270 AH->OF = stdout;
2271
2272 /*
2273 * On Windows, we need to use binary mode to read/write non-text files,
2274 * which include all archive formats as well as compressed plain text.
2275 * Force stdin/stdout into binary mode if that is what we are using.
2276 */
2277 #ifdef WIN32
2278 if ((fmt != archNull || compression != 0) &&
2279 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2280 {
2281 if (mode == archModeWrite)
2282 _setmode(fileno(stdout), O_BINARY);
2283 else
2284 _setmode(fileno(stdin), O_BINARY);
2285 }
2286 #endif
2287
2288 AH->SetupWorkerPtr = setupWorkerPtr;
2289
2290 if (fmt == archUnknown)
2291 AH->format = _discoverArchiveFormat(AH);
2292 else
2293 AH->format = fmt;
2294
2295 switch (AH->format)
2296 {
2297 case archCustom:
2298 InitArchiveFmt_Custom(AH);
2299 break;
2300
2301 case archNull:
2302 InitArchiveFmt_Null(AH);
2303 break;
2304
2305 case archDirectory:
2306 InitArchiveFmt_Directory(AH);
2307 break;
2308
2309 case archTar:
2310 InitArchiveFmt_Tar(AH);
2311 break;
2312
2313 default:
2314 fatal("unrecognized file format \"%d\"", fmt);
2315 }
2316
2317 return AH;
2318 }
2319
2320 /*
2321 * Write out all data (tables & blobs)
2322 */
2323 void
WriteDataChunks(ArchiveHandle * AH,ParallelState * pstate)2324 WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2325 {
2326 TocEntry *te;
2327
2328 if (pstate && pstate->numWorkers > 1)
2329 {
2330 /*
2331 * In parallel mode, this code runs in the master process. We
2332 * construct an array of candidate TEs, then sort it into decreasing
2333 * size order, then dispatch each TE to a data-transfer worker. By
2334 * dumping larger tables first, we avoid getting into a situation
2335 * where we're down to one job and it's big, losing parallelism.
2336 */
2337 TocEntry **tes;
2338 int ntes;
2339
2340 tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2341 ntes = 0;
2342 for (te = AH->toc->next; te != AH->toc; te = te->next)
2343 {
2344 /* Consider only TEs with dataDumper functions ... */
2345 if (!te->dataDumper)
2346 continue;
2347 /* ... and ignore ones not enabled for dump */
2348 if ((te->reqs & REQ_DATA) == 0)
2349 continue;
2350
2351 tes[ntes++] = te;
2352 }
2353
2354 if (ntes > 1)
2355 qsort((void *) tes, ntes, sizeof(TocEntry *),
2356 TocEntrySizeCompare);
2357
2358 for (int i = 0; i < ntes; i++)
2359 DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2360 mark_dump_job_done, NULL);
2361
2362 pg_free(tes);
2363
2364 /* Now wait for workers to finish. */
2365 WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2366 }
2367 else
2368 {
2369 /* Non-parallel mode: just dump all candidate TEs sequentially. */
2370 for (te = AH->toc->next; te != AH->toc; te = te->next)
2371 {
2372 /* Must have same filter conditions as above */
2373 if (!te->dataDumper)
2374 continue;
2375 if ((te->reqs & REQ_DATA) == 0)
2376 continue;
2377
2378 WriteDataChunksForTocEntry(AH, te);
2379 }
2380 }
2381 }
2382
2383
2384 /*
2385 * Callback function that's invoked in the master process after a step has
2386 * been parallel dumped.
2387 *
2388 * We don't need to do anything except check for worker failure.
2389 */
2390 static void
mark_dump_job_done(ArchiveHandle * AH,TocEntry * te,int status,void * callback_data)2391 mark_dump_job_done(ArchiveHandle *AH,
2392 TocEntry *te,
2393 int status,
2394 void *callback_data)
2395 {
2396 pg_log_info("finished item %d %s %s",
2397 te->dumpId, te->desc, te->tag);
2398
2399 if (status != 0)
2400 fatal("worker process failed: exit code %d",
2401 status);
2402 }
2403
2404
2405 void
WriteDataChunksForTocEntry(ArchiveHandle * AH,TocEntry * te)2406 WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2407 {
2408 StartDataPtrType startPtr;
2409 EndDataPtrType endPtr;
2410
2411 AH->currToc = te;
2412
2413 if (strcmp(te->desc, "BLOBS") == 0)
2414 {
2415 startPtr = AH->StartBlobsPtr;
2416 endPtr = AH->EndBlobsPtr;
2417 }
2418 else
2419 {
2420 startPtr = AH->StartDataPtr;
2421 endPtr = AH->EndDataPtr;
2422 }
2423
2424 if (startPtr != NULL)
2425 (*startPtr) (AH, te);
2426
2427 /*
2428 * The user-provided DataDumper routine needs to call AH->WriteData
2429 */
2430 te->dataDumper((Archive *) AH, te->dataDumperArg);
2431
2432 if (endPtr != NULL)
2433 (*endPtr) (AH, te);
2434
2435 AH->currToc = NULL;
2436 }
2437
2438 void
WriteToc(ArchiveHandle * AH)2439 WriteToc(ArchiveHandle *AH)
2440 {
2441 TocEntry *te;
2442 char workbuf[32];
2443 int tocCount;
2444 int i;
2445
2446 /* count entries that will actually be dumped */
2447 tocCount = 0;
2448 for (te = AH->toc->next; te != AH->toc; te = te->next)
2449 {
2450 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2451 tocCount++;
2452 }
2453
2454 /* printf("%d TOC Entries to save\n", tocCount); */
2455
2456 WriteInt(AH, tocCount);
2457
2458 for (te = AH->toc->next; te != AH->toc; te = te->next)
2459 {
2460 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2461 continue;
2462
2463 WriteInt(AH, te->dumpId);
2464 WriteInt(AH, te->dataDumper ? 1 : 0);
2465
2466 /* OID is recorded as a string for historical reasons */
2467 sprintf(workbuf, "%u", te->catalogId.tableoid);
2468 WriteStr(AH, workbuf);
2469 sprintf(workbuf, "%u", te->catalogId.oid);
2470 WriteStr(AH, workbuf);
2471
2472 WriteStr(AH, te->tag);
2473 WriteStr(AH, te->desc);
2474 WriteInt(AH, te->section);
2475 WriteStr(AH, te->defn);
2476 WriteStr(AH, te->dropStmt);
2477 WriteStr(AH, te->copyStmt);
2478 WriteStr(AH, te->namespace);
2479 WriteStr(AH, te->tablespace);
2480 WriteStr(AH, te->tableam);
2481 WriteStr(AH, te->owner);
2482 WriteStr(AH, "false");
2483
2484 /* Dump list of dependencies */
2485 for (i = 0; i < te->nDeps; i++)
2486 {
2487 sprintf(workbuf, "%d", te->dependencies[i]);
2488 WriteStr(AH, workbuf);
2489 }
2490 WriteStr(AH, NULL); /* Terminate List */
2491
2492 if (AH->WriteExtraTocPtr)
2493 AH->WriteExtraTocPtr(AH, te);
2494 }
2495 }
2496
2497 void
ReadToc(ArchiveHandle * AH)2498 ReadToc(ArchiveHandle *AH)
2499 {
2500 int i;
2501 char *tmp;
2502 DumpId *deps;
2503 int depIdx;
2504 int depSize;
2505 TocEntry *te;
2506
2507 AH->tocCount = ReadInt(AH);
2508 AH->maxDumpId = 0;
2509
2510 for (i = 0; i < AH->tocCount; i++)
2511 {
2512 te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2513 te->dumpId = ReadInt(AH);
2514
2515 if (te->dumpId > AH->maxDumpId)
2516 AH->maxDumpId = te->dumpId;
2517
2518 /* Sanity check */
2519 if (te->dumpId <= 0)
2520 fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2521 te->dumpId);
2522
2523 te->hadDumper = ReadInt(AH);
2524
2525 if (AH->version >= K_VERS_1_8)
2526 {
2527 tmp = ReadStr(AH);
2528 sscanf(tmp, "%u", &te->catalogId.tableoid);
2529 free(tmp);
2530 }
2531 else
2532 te->catalogId.tableoid = InvalidOid;
2533 tmp = ReadStr(AH);
2534 sscanf(tmp, "%u", &te->catalogId.oid);
2535 free(tmp);
2536
2537 te->tag = ReadStr(AH);
2538 te->desc = ReadStr(AH);
2539
2540 if (AH->version >= K_VERS_1_11)
2541 {
2542 te->section = ReadInt(AH);
2543 }
2544 else
2545 {
2546 /*
2547 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2548 * the entries into sections. This list need not cover entry
2549 * types added later than 8.4.
2550 */
2551 if (strcmp(te->desc, "COMMENT") == 0 ||
2552 strcmp(te->desc, "ACL") == 0 ||
2553 strcmp(te->desc, "ACL LANGUAGE") == 0)
2554 te->section = SECTION_NONE;
2555 else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2556 strcmp(te->desc, "BLOBS") == 0 ||
2557 strcmp(te->desc, "BLOB COMMENTS") == 0)
2558 te->section = SECTION_DATA;
2559 else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2560 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2561 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2562 strcmp(te->desc, "INDEX") == 0 ||
2563 strcmp(te->desc, "RULE") == 0 ||
2564 strcmp(te->desc, "TRIGGER") == 0)
2565 te->section = SECTION_POST_DATA;
2566 else
2567 te->section = SECTION_PRE_DATA;
2568 }
2569
2570 te->defn = ReadStr(AH);
2571 te->dropStmt = ReadStr(AH);
2572
2573 if (AH->version >= K_VERS_1_3)
2574 te->copyStmt = ReadStr(AH);
2575
2576 if (AH->version >= K_VERS_1_6)
2577 te->namespace = ReadStr(AH);
2578
2579 if (AH->version >= K_VERS_1_10)
2580 te->tablespace = ReadStr(AH);
2581
2582 if (AH->version >= K_VERS_1_14)
2583 te->tableam = ReadStr(AH);
2584
2585 te->owner = ReadStr(AH);
2586 if (AH->version < K_VERS_1_9 || strcmp(ReadStr(AH), "true") == 0)
2587 pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2588
2589 /* Read TOC entry dependencies */
2590 if (AH->version >= K_VERS_1_5)
2591 {
2592 depSize = 100;
2593 deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2594 depIdx = 0;
2595 for (;;)
2596 {
2597 tmp = ReadStr(AH);
2598 if (!tmp)
2599 break; /* end of list */
2600 if (depIdx >= depSize)
2601 {
2602 depSize *= 2;
2603 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2604 }
2605 sscanf(tmp, "%d", &deps[depIdx]);
2606 free(tmp);
2607 depIdx++;
2608 }
2609
2610 if (depIdx > 0) /* We have a non-null entry */
2611 {
2612 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2613 te->dependencies = deps;
2614 te->nDeps = depIdx;
2615 }
2616 else
2617 {
2618 free(deps);
2619 te->dependencies = NULL;
2620 te->nDeps = 0;
2621 }
2622 }
2623 else
2624 {
2625 te->dependencies = NULL;
2626 te->nDeps = 0;
2627 }
2628 te->dataLength = 0;
2629
2630 if (AH->ReadExtraTocPtr)
2631 AH->ReadExtraTocPtr(AH, te);
2632
2633 pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2634 i, te->dumpId, te->desc, te->tag);
2635
2636 /* link completed entry into TOC circular list */
2637 te->prev = AH->toc->prev;
2638 AH->toc->prev->next = te;
2639 AH->toc->prev = te;
2640 te->next = AH->toc;
2641
2642 /* special processing immediately upon read for some items */
2643 if (strcmp(te->desc, "ENCODING") == 0)
2644 processEncodingEntry(AH, te);
2645 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2646 processStdStringsEntry(AH, te);
2647 else if (strcmp(te->desc, "SEARCHPATH") == 0)
2648 processSearchPathEntry(AH, te);
2649 }
2650 }
2651
2652 static void
processEncodingEntry(ArchiveHandle * AH,TocEntry * te)2653 processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2654 {
2655 /* te->defn should have the form SET client_encoding = 'foo'; */
2656 char *defn = pg_strdup(te->defn);
2657 char *ptr1;
2658 char *ptr2 = NULL;
2659 int encoding;
2660
2661 ptr1 = strchr(defn, '\'');
2662 if (ptr1)
2663 ptr2 = strchr(++ptr1, '\'');
2664 if (ptr2)
2665 {
2666 *ptr2 = '\0';
2667 encoding = pg_char_to_encoding(ptr1);
2668 if (encoding < 0)
2669 fatal("unrecognized encoding \"%s\"",
2670 ptr1);
2671 AH->public.encoding = encoding;
2672 }
2673 else
2674 fatal("invalid ENCODING item: %s",
2675 te->defn);
2676
2677 free(defn);
2678 }
2679
2680 static void
processStdStringsEntry(ArchiveHandle * AH,TocEntry * te)2681 processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2682 {
2683 /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2684 char *ptr1;
2685
2686 ptr1 = strchr(te->defn, '\'');
2687 if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2688 AH->public.std_strings = true;
2689 else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2690 AH->public.std_strings = false;
2691 else
2692 fatal("invalid STDSTRINGS item: %s",
2693 te->defn);
2694 }
2695
2696 static void
processSearchPathEntry(ArchiveHandle * AH,TocEntry * te)2697 processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2698 {
2699 /*
2700 * te->defn should contain a command to set search_path. We just copy it
2701 * verbatim for use later.
2702 */
2703 AH->public.searchpath = pg_strdup(te->defn);
2704 }
2705
2706 static void
StrictNamesCheck(RestoreOptions * ropt)2707 StrictNamesCheck(RestoreOptions *ropt)
2708 {
2709 const char *missing_name;
2710
2711 Assert(ropt->strict_names);
2712
2713 if (ropt->schemaNames.head != NULL)
2714 {
2715 missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2716 if (missing_name != NULL)
2717 fatal("schema \"%s\" not found", missing_name);
2718 }
2719
2720 if (ropt->tableNames.head != NULL)
2721 {
2722 missing_name = simple_string_list_not_touched(&ropt->tableNames);
2723 if (missing_name != NULL)
2724 fatal("table \"%s\" not found", missing_name);
2725 }
2726
2727 if (ropt->indexNames.head != NULL)
2728 {
2729 missing_name = simple_string_list_not_touched(&ropt->indexNames);
2730 if (missing_name != NULL)
2731 fatal("index \"%s\" not found", missing_name);
2732 }
2733
2734 if (ropt->functionNames.head != NULL)
2735 {
2736 missing_name = simple_string_list_not_touched(&ropt->functionNames);
2737 if (missing_name != NULL)
2738 fatal("function \"%s\" not found", missing_name);
2739 }
2740
2741 if (ropt->triggerNames.head != NULL)
2742 {
2743 missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2744 if (missing_name != NULL)
2745 fatal("trigger \"%s\" not found", missing_name);
2746 }
2747 }
2748
2749 /*
2750 * Determine whether we want to restore this TOC entry.
2751 *
2752 * Returns 0 if entry should be skipped, or some combination of the
2753 * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2754 * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2755 */
2756 static teReqs
_tocEntryRequired(TocEntry * te,teSection curSection,ArchiveHandle * AH)2757 _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2758 {
2759 teReqs res = REQ_SCHEMA | REQ_DATA;
2760 RestoreOptions *ropt = AH->public.ropt;
2761
2762 /* These items are treated specially */
2763 if (strcmp(te->desc, "ENCODING") == 0 ||
2764 strcmp(te->desc, "STDSTRINGS") == 0 ||
2765 strcmp(te->desc, "SEARCHPATH") == 0)
2766 return REQ_SPECIAL;
2767
2768 /*
2769 * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2770 * restored in createDB mode, and not restored otherwise, independently of
2771 * all else.
2772 */
2773 if (strcmp(te->desc, "DATABASE") == 0 ||
2774 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2775 {
2776 if (ropt->createDB)
2777 return REQ_SCHEMA;
2778 else
2779 return 0;
2780 }
2781
2782 /*
2783 * Process exclusions that affect certain classes of TOC entries.
2784 */
2785
2786 /* If it's an ACL, maybe ignore it */
2787 if (ropt->aclsSkip && _tocEntryIsACL(te))
2788 return 0;
2789
2790 /* If it's a comment, maybe ignore it */
2791 if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2792 return 0;
2793
2794 /*
2795 * If it's a publication or a table part of a publication, maybe ignore
2796 * it.
2797 */
2798 if (ropt->no_publications &&
2799 (strcmp(te->desc, "PUBLICATION") == 0 ||
2800 strcmp(te->desc, "PUBLICATION TABLE") == 0))
2801 return 0;
2802
2803 /* If it's a security label, maybe ignore it */
2804 if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2805 return 0;
2806
2807 /* If it's a subscription, maybe ignore it */
2808 if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2809 return 0;
2810
2811 /* Ignore it if section is not to be dumped/restored */
2812 switch (curSection)
2813 {
2814 case SECTION_PRE_DATA:
2815 if (!(ropt->dumpSections & DUMP_PRE_DATA))
2816 return 0;
2817 break;
2818 case SECTION_DATA:
2819 if (!(ropt->dumpSections & DUMP_DATA))
2820 return 0;
2821 break;
2822 case SECTION_POST_DATA:
2823 if (!(ropt->dumpSections & DUMP_POST_DATA))
2824 return 0;
2825 break;
2826 default:
2827 /* shouldn't get here, really, but ignore it */
2828 return 0;
2829 }
2830
2831 /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2832 if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2833 return 0;
2834
2835 /*
2836 * Check options for selective dump/restore.
2837 */
2838 if (strcmp(te->desc, "ACL") == 0 ||
2839 strcmp(te->desc, "COMMENT") == 0 ||
2840 strcmp(te->desc, "SECURITY LABEL") == 0)
2841 {
2842 /* Database properties react to createDB, not selectivity options. */
2843 if (strncmp(te->tag, "DATABASE ", 9) == 0)
2844 {
2845 if (!ropt->createDB)
2846 return 0;
2847 }
2848 else if (ropt->schemaNames.head != NULL ||
2849 ropt->schemaExcludeNames.head != NULL ||
2850 ropt->selTypes)
2851 {
2852 /*
2853 * In a selective dump/restore, we want to restore these dependent
2854 * TOC entry types only if their parent object is being restored.
2855 * Without selectivity options, we let through everything in the
2856 * archive. Note there may be such entries with no parent, eg
2857 * non-default ACLs for built-in objects.
2858 *
2859 * This code depends on the parent having been marked already,
2860 * which should be the case; if it isn't, perhaps due to
2861 * SortTocFromFile rearrangement, skipping the dependent entry
2862 * seems prudent anyway.
2863 *
2864 * Ideally we'd handle, eg, table CHECK constraints this way too.
2865 * But it's hard to tell which of their dependencies is the one to
2866 * consult.
2867 */
2868 if (te->nDeps != 1 ||
2869 TocIDRequired(AH, te->dependencies[0]) == 0)
2870 return 0;
2871 }
2872 }
2873 else
2874 {
2875 /* Apply selective-restore rules for standalone TOC entries. */
2876 if (ropt->schemaNames.head != NULL)
2877 {
2878 /* If no namespace is specified, it means all. */
2879 if (!te->namespace)
2880 return 0;
2881 if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
2882 return 0;
2883 }
2884
2885 if (ropt->schemaExcludeNames.head != NULL &&
2886 te->namespace &&
2887 simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
2888 return 0;
2889
2890 if (ropt->selTypes)
2891 {
2892 if (strcmp(te->desc, "TABLE") == 0 ||
2893 strcmp(te->desc, "TABLE DATA") == 0 ||
2894 strcmp(te->desc, "VIEW") == 0 ||
2895 strcmp(te->desc, "FOREIGN TABLE") == 0 ||
2896 strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
2897 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
2898 strcmp(te->desc, "SEQUENCE") == 0 ||
2899 strcmp(te->desc, "SEQUENCE SET") == 0)
2900 {
2901 if (!ropt->selTable)
2902 return 0;
2903 if (ropt->tableNames.head != NULL &&
2904 !simple_string_list_member(&ropt->tableNames, te->tag))
2905 return 0;
2906 }
2907 else if (strcmp(te->desc, "INDEX") == 0)
2908 {
2909 if (!ropt->selIndex)
2910 return 0;
2911 if (ropt->indexNames.head != NULL &&
2912 !simple_string_list_member(&ropt->indexNames, te->tag))
2913 return 0;
2914 }
2915 else if (strcmp(te->desc, "FUNCTION") == 0 ||
2916 strcmp(te->desc, "AGGREGATE") == 0 ||
2917 strcmp(te->desc, "PROCEDURE") == 0)
2918 {
2919 if (!ropt->selFunction)
2920 return 0;
2921 if (ropt->functionNames.head != NULL &&
2922 !simple_string_list_member(&ropt->functionNames, te->tag))
2923 return 0;
2924 }
2925 else if (strcmp(te->desc, "TRIGGER") == 0)
2926 {
2927 if (!ropt->selTrigger)
2928 return 0;
2929 if (ropt->triggerNames.head != NULL &&
2930 !simple_string_list_member(&ropt->triggerNames, te->tag))
2931 return 0;
2932 }
2933 else
2934 return 0;
2935 }
2936 }
2937
2938 /*
2939 * Determine whether the TOC entry contains schema and/or data components,
2940 * and mask off inapplicable REQ bits. If it had a dataDumper, assume
2941 * it's both schema and data. Otherwise it's probably schema-only, but
2942 * there are exceptions.
2943 */
2944 if (!te->hadDumper)
2945 {
2946 /*
2947 * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
2948 * it is considered a data entry. We don't need to check for the
2949 * BLOBS entry or old-style BLOB COMMENTS, because they will have
2950 * hadDumper = true ... but we do need to check new-style BLOB ACLs,
2951 * comments, etc.
2952 */
2953 if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
2954 strcmp(te->desc, "BLOB") == 0 ||
2955 (strcmp(te->desc, "ACL") == 0 &&
2956 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2957 (strcmp(te->desc, "COMMENT") == 0 &&
2958 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2959 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
2960 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
2961 res = res & REQ_DATA;
2962 else
2963 res = res & ~REQ_DATA;
2964 }
2965
2966 /* If there's no definition command, there's no schema component */
2967 if (!te->defn || !te->defn[0])
2968 res = res & ~REQ_SCHEMA;
2969
2970 /*
2971 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
2972 * always ignore it.
2973 */
2974 if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2975 return 0;
2976
2977 /* Mask it if we only want schema */
2978 if (ropt->schemaOnly)
2979 {
2980 /*
2981 * The sequence_data option overrides schemaOnly for SEQUENCE SET.
2982 *
2983 * In binary-upgrade mode, even with schemaOnly set, we do not mask
2984 * out large objects. (Only large object definitions, comments and
2985 * other metadata should be generated in binary-upgrade mode, not the
2986 * actual data, but that need not concern us here.)
2987 */
2988 if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
2989 !(ropt->binary_upgrade &&
2990 (strcmp(te->desc, "BLOB") == 0 ||
2991 (strcmp(te->desc, "ACL") == 0 &&
2992 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2993 (strcmp(te->desc, "COMMENT") == 0 &&
2994 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2995 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
2996 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
2997 res = res & REQ_SCHEMA;
2998 }
2999
3000 /* Mask it if we only want data */
3001 if (ropt->dataOnly)
3002 res = res & REQ_DATA;
3003
3004 return res;
3005 }
3006
3007 /*
3008 * Identify which pass we should restore this TOC entry in.
3009 *
3010 * See notes with the RestorePass typedef in pg_backup_archiver.h.
3011 */
3012 static RestorePass
_tocEntryRestorePass(TocEntry * te)3013 _tocEntryRestorePass(TocEntry *te)
3014 {
3015 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3016 if (strcmp(te->desc, "ACL") == 0 ||
3017 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3018 strcmp(te->desc, "DEFAULT ACL") == 0)
3019 return RESTORE_PASS_ACL;
3020 if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3021 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3022 return RESTORE_PASS_POST_ACL;
3023
3024 /*
3025 * Comments need to be emitted in the same pass as their parent objects.
3026 * ACLs haven't got comments, and neither do matview data objects, but
3027 * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3028 * we'd need yet another weird special case.)
3029 */
3030 if (strcmp(te->desc, "COMMENT") == 0 &&
3031 strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3032 return RESTORE_PASS_POST_ACL;
3033
3034 /* All else can be handled in the main pass. */
3035 return RESTORE_PASS_MAIN;
3036 }
3037
3038 /*
3039 * Identify TOC entries that are ACLs.
3040 *
3041 * Note: it seems worth duplicating some code here to avoid a hard-wired
3042 * assumption that these are exactly the same entries that we restore during
3043 * the RESTORE_PASS_ACL phase.
3044 */
3045 static bool
_tocEntryIsACL(TocEntry * te)3046 _tocEntryIsACL(TocEntry *te)
3047 {
3048 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3049 if (strcmp(te->desc, "ACL") == 0 ||
3050 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3051 strcmp(te->desc, "DEFAULT ACL") == 0)
3052 return true;
3053 return false;
3054 }
3055
3056 /*
3057 * Issue SET commands for parameters that we want to have set the same way
3058 * at all times during execution of a restore script.
3059 */
3060 static void
_doSetFixedOutputState(ArchiveHandle * AH)3061 _doSetFixedOutputState(ArchiveHandle *AH)
3062 {
3063 RestoreOptions *ropt = AH->public.ropt;
3064
3065 /*
3066 * Disable timeouts to allow for slow commands, idle parallel workers, etc
3067 */
3068 ahprintf(AH, "SET statement_timeout = 0;\n");
3069 ahprintf(AH, "SET lock_timeout = 0;\n");
3070 ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3071
3072 /* Select the correct character set encoding */
3073 ahprintf(AH, "SET client_encoding = '%s';\n",
3074 pg_encoding_to_char(AH->public.encoding));
3075
3076 /* Select the correct string literal syntax */
3077 ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3078 AH->public.std_strings ? "on" : "off");
3079
3080 /* Select the role to be used during restore */
3081 if (ropt && ropt->use_role)
3082 ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3083
3084 /* Select the dump-time search_path */
3085 if (AH->public.searchpath)
3086 ahprintf(AH, "%s", AH->public.searchpath);
3087
3088 /* Make sure function checking is disabled */
3089 ahprintf(AH, "SET check_function_bodies = false;\n");
3090
3091 /* Ensure that all valid XML data will be accepted */
3092 ahprintf(AH, "SET xmloption = content;\n");
3093
3094 /* Avoid annoying notices etc */
3095 ahprintf(AH, "SET client_min_messages = warning;\n");
3096 if (!AH->public.std_strings)
3097 ahprintf(AH, "SET escape_string_warning = off;\n");
3098
3099 /* Adjust row-security state */
3100 if (ropt && ropt->enable_row_security)
3101 ahprintf(AH, "SET row_security = on;\n");
3102 else
3103 ahprintf(AH, "SET row_security = off;\n");
3104
3105 ahprintf(AH, "\n");
3106 }
3107
3108 /*
3109 * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3110 * for updating state if appropriate. If user is NULL or an empty string,
3111 * the specification DEFAULT will be used.
3112 */
3113 static void
_doSetSessionAuth(ArchiveHandle * AH,const char * user)3114 _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3115 {
3116 PQExpBuffer cmd = createPQExpBuffer();
3117
3118 appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3119
3120 /*
3121 * SQL requires a string literal here. Might as well be correct.
3122 */
3123 if (user && *user)
3124 appendStringLiteralAHX(cmd, user, AH);
3125 else
3126 appendPQExpBufferStr(cmd, "DEFAULT");
3127 appendPQExpBufferChar(cmd, ';');
3128
3129 if (RestoringToDB(AH))
3130 {
3131 PGresult *res;
3132
3133 res = PQexec(AH->connection, cmd->data);
3134
3135 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3136 /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3137 fatal("could not set session user to \"%s\": %s",
3138 user, PQerrorMessage(AH->connection));
3139
3140 PQclear(res);
3141 }
3142 else
3143 ahprintf(AH, "%s\n\n", cmd->data);
3144
3145 destroyPQExpBuffer(cmd);
3146 }
3147
3148
3149 /*
3150 * Issue the commands to connect to the specified database.
3151 *
3152 * If we're currently restoring right into a database, this will
3153 * actually establish a connection. Otherwise it puts a \connect into
3154 * the script output.
3155 */
3156 static void
_reconnectToDB(ArchiveHandle * AH,const char * dbname)3157 _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3158 {
3159 if (RestoringToDB(AH))
3160 ReconnectToServer(AH, dbname);
3161 else
3162 {
3163 PQExpBufferData connectbuf;
3164
3165 initPQExpBuffer(&connectbuf);
3166 appendPsqlMetaConnect(&connectbuf, dbname);
3167 ahprintf(AH, "%s\n", connectbuf.data);
3168 termPQExpBuffer(&connectbuf);
3169 }
3170
3171 /*
3172 * NOTE: currUser keeps track of what the imaginary session user in our
3173 * script is. It's now effectively reset to the original userID.
3174 */
3175 if (AH->currUser)
3176 free(AH->currUser);
3177 AH->currUser = NULL;
3178
3179 /* don't assume we still know the output schema, tablespace, etc either */
3180 if (AH->currSchema)
3181 free(AH->currSchema);
3182 AH->currSchema = NULL;
3183 if (AH->currTablespace)
3184 free(AH->currTablespace);
3185 AH->currTablespace = NULL;
3186
3187 /* re-establish fixed state */
3188 _doSetFixedOutputState(AH);
3189 }
3190
3191 /*
3192 * Become the specified user, and update state to avoid redundant commands
3193 *
3194 * NULL or empty argument is taken to mean restoring the session default
3195 */
3196 static void
_becomeUser(ArchiveHandle * AH,const char * user)3197 _becomeUser(ArchiveHandle *AH, const char *user)
3198 {
3199 if (!user)
3200 user = ""; /* avoid null pointers */
3201
3202 if (AH->currUser && strcmp(AH->currUser, user) == 0)
3203 return; /* no need to do anything */
3204
3205 _doSetSessionAuth(AH, user);
3206
3207 /*
3208 * NOTE: currUser keeps track of what the imaginary session user in our
3209 * script is
3210 */
3211 if (AH->currUser)
3212 free(AH->currUser);
3213 AH->currUser = pg_strdup(user);
3214 }
3215
3216 /*
3217 * Become the owner of the given TOC entry object. If
3218 * changes in ownership are not allowed, this doesn't do anything.
3219 */
3220 static void
_becomeOwner(ArchiveHandle * AH,TocEntry * te)3221 _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3222 {
3223 RestoreOptions *ropt = AH->public.ropt;
3224
3225 if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3226 return;
3227
3228 _becomeUser(AH, te->owner);
3229 }
3230
3231
3232 /*
3233 * Issue the commands to select the specified schema as the current schema
3234 * in the target database.
3235 */
3236 static void
_selectOutputSchema(ArchiveHandle * AH,const char * schemaName)3237 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3238 {
3239 PQExpBuffer qry;
3240
3241 /*
3242 * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3243 * that search_path rather than switching to entry-specific paths.
3244 * Otherwise, it's an old archive that will not restore correctly unless
3245 * we set the search_path as it's expecting.
3246 */
3247 if (AH->public.searchpath)
3248 return;
3249
3250 if (!schemaName || *schemaName == '\0' ||
3251 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3252 return; /* no need to do anything */
3253
3254 qry = createPQExpBuffer();
3255
3256 appendPQExpBuffer(qry, "SET search_path = %s",
3257 fmtId(schemaName));
3258 if (strcmp(schemaName, "pg_catalog") != 0)
3259 appendPQExpBufferStr(qry, ", pg_catalog");
3260
3261 if (RestoringToDB(AH))
3262 {
3263 PGresult *res;
3264
3265 res = PQexec(AH->connection, qry->data);
3266
3267 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3268 warn_or_exit_horribly(AH,
3269 "could not set search_path to \"%s\": %s",
3270 schemaName, PQerrorMessage(AH->connection));
3271
3272 PQclear(res);
3273 }
3274 else
3275 ahprintf(AH, "%s;\n\n", qry->data);
3276
3277 if (AH->currSchema)
3278 free(AH->currSchema);
3279 AH->currSchema = pg_strdup(schemaName);
3280
3281 destroyPQExpBuffer(qry);
3282 }
3283
3284 /*
3285 * Issue the commands to select the specified tablespace as the current one
3286 * in the target database.
3287 */
3288 static void
_selectTablespace(ArchiveHandle * AH,const char * tablespace)3289 _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3290 {
3291 RestoreOptions *ropt = AH->public.ropt;
3292 PQExpBuffer qry;
3293 const char *want,
3294 *have;
3295
3296 /* do nothing in --no-tablespaces mode */
3297 if (ropt->noTablespace)
3298 return;
3299
3300 have = AH->currTablespace;
3301 want = tablespace;
3302
3303 /* no need to do anything for non-tablespace object */
3304 if (!want)
3305 return;
3306
3307 if (have && strcmp(want, have) == 0)
3308 return; /* no need to do anything */
3309
3310 qry = createPQExpBuffer();
3311
3312 if (strcmp(want, "") == 0)
3313 {
3314 /* We want the tablespace to be the database's default */
3315 appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3316 }
3317 else
3318 {
3319 /* We want an explicit tablespace */
3320 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3321 }
3322
3323 if (RestoringToDB(AH))
3324 {
3325 PGresult *res;
3326
3327 res = PQexec(AH->connection, qry->data);
3328
3329 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3330 warn_or_exit_horribly(AH,
3331 "could not set default_tablespace to %s: %s",
3332 fmtId(want), PQerrorMessage(AH->connection));
3333
3334 PQclear(res);
3335 }
3336 else
3337 ahprintf(AH, "%s;\n\n", qry->data);
3338
3339 if (AH->currTablespace)
3340 free(AH->currTablespace);
3341 AH->currTablespace = pg_strdup(want);
3342
3343 destroyPQExpBuffer(qry);
3344 }
3345
3346 /*
3347 * Set the proper default_table_access_method value for the table.
3348 */
3349 static void
_selectTableAccessMethod(ArchiveHandle * AH,const char * tableam)3350 _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3351 {
3352 PQExpBuffer cmd;
3353 const char *want,
3354 *have;
3355
3356 have = AH->currTableAm;
3357 want = tableam;
3358
3359 if (!want)
3360 return;
3361
3362 if (have && strcmp(want, have) == 0)
3363 return;
3364
3365 cmd = createPQExpBuffer();
3366 appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3367
3368 if (RestoringToDB(AH))
3369 {
3370 PGresult *res;
3371
3372 res = PQexec(AH->connection, cmd->data);
3373
3374 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3375 warn_or_exit_horribly(AH,
3376 "could not set default_table_access_method: %s",
3377 PQerrorMessage(AH->connection));
3378
3379 PQclear(res);
3380 }
3381 else
3382 ahprintf(AH, "%s\n\n", cmd->data);
3383
3384 destroyPQExpBuffer(cmd);
3385
3386 AH->currTableAm = pg_strdup(want);
3387 }
3388
3389 /*
3390 * Extract an object description for a TOC entry, and append it to buf.
3391 *
3392 * This is used for ALTER ... OWNER TO.
3393 */
3394 static void
_getObjectDescription(PQExpBuffer buf,TocEntry * te,ArchiveHandle * AH)3395 _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
3396 {
3397 const char *type = te->desc;
3398
3399 /* Use ALTER TABLE for views and sequences */
3400 if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
3401 strcmp(type, "MATERIALIZED VIEW") == 0)
3402 type = "TABLE";
3403
3404 /* objects that don't require special decoration */
3405 if (strcmp(type, "COLLATION") == 0 ||
3406 strcmp(type, "CONVERSION") == 0 ||
3407 strcmp(type, "DOMAIN") == 0 ||
3408 strcmp(type, "TABLE") == 0 ||
3409 strcmp(type, "TYPE") == 0 ||
3410 strcmp(type, "FOREIGN TABLE") == 0 ||
3411 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3412 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3413 strcmp(type, "STATISTICS") == 0 ||
3414 /* non-schema-specified objects */
3415 strcmp(type, "DATABASE") == 0 ||
3416 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3417 strcmp(type, "SCHEMA") == 0 ||
3418 strcmp(type, "EVENT TRIGGER") == 0 ||
3419 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3420 strcmp(type, "SERVER") == 0 ||
3421 strcmp(type, "PUBLICATION") == 0 ||
3422 strcmp(type, "SUBSCRIPTION") == 0 ||
3423 strcmp(type, "USER MAPPING") == 0)
3424 {
3425 appendPQExpBuffer(buf, "%s ", type);
3426 if (te->namespace && *te->namespace)
3427 appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3428 appendPQExpBufferStr(buf, fmtId(te->tag));
3429 return;
3430 }
3431
3432 /* BLOBs just have a name, but it's numeric so must not use fmtId */
3433 if (strcmp(type, "BLOB") == 0)
3434 {
3435 appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3436 return;
3437 }
3438
3439 /*
3440 * These object types require additional decoration. Fortunately, the
3441 * information needed is exactly what's in the DROP command.
3442 */
3443 if (strcmp(type, "AGGREGATE") == 0 ||
3444 strcmp(type, "FUNCTION") == 0 ||
3445 strcmp(type, "OPERATOR") == 0 ||
3446 strcmp(type, "OPERATOR CLASS") == 0 ||
3447 strcmp(type, "OPERATOR FAMILY") == 0 ||
3448 strcmp(type, "PROCEDURE") == 0)
3449 {
3450 /* Chop "DROP " off the front and make a modifiable copy */
3451 char *first = pg_strdup(te->dropStmt + 5);
3452 char *last;
3453
3454 /* point to last character in string */
3455 last = first + strlen(first) - 1;
3456
3457 /* Strip off any ';' or '\n' at the end */
3458 while (last >= first && (*last == '\n' || *last == ';'))
3459 last--;
3460 *(last + 1) = '\0';
3461
3462 appendPQExpBufferStr(buf, first);
3463
3464 free(first);
3465 return;
3466 }
3467
3468 pg_log_warning("don't know how to set owner for object type \"%s\"",
3469 type);
3470 }
3471
3472 /*
3473 * Emit the SQL commands to create the object represented by a TOC entry
3474 *
3475 * This now also includes issuing an ALTER OWNER command to restore the
3476 * object's ownership, if wanted. But note that the object's permissions
3477 * will remain at default, until the matching ACL TOC entry is restored.
3478 */
3479 static void
_printTocEntry(ArchiveHandle * AH,TocEntry * te,bool isData)3480 _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3481 {
3482 RestoreOptions *ropt = AH->public.ropt;
3483
3484 /* Select owner, schema, tablespace and default AM as necessary */
3485 _becomeOwner(AH, te);
3486 _selectOutputSchema(AH, te->namespace);
3487 _selectTablespace(AH, te->tablespace);
3488 _selectTableAccessMethod(AH, te->tableam);
3489
3490 /* Emit header comment for item */
3491 if (!AH->noTocComments)
3492 {
3493 const char *pfx;
3494 char *sanitized_name;
3495 char *sanitized_schema;
3496 char *sanitized_owner;
3497
3498 if (isData)
3499 pfx = "Data for ";
3500 else
3501 pfx = "";
3502
3503 ahprintf(AH, "--\n");
3504 if (AH->public.verbose)
3505 {
3506 ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3507 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3508 if (te->nDeps > 0)
3509 {
3510 int i;
3511
3512 ahprintf(AH, "-- Dependencies:");
3513 for (i = 0; i < te->nDeps; i++)
3514 ahprintf(AH, " %d", te->dependencies[i]);
3515 ahprintf(AH, "\n");
3516 }
3517 }
3518
3519 sanitized_name = sanitize_line(te->tag, false);
3520 sanitized_schema = sanitize_line(te->namespace, true);
3521 sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3522
3523 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3524 pfx, sanitized_name, te->desc, sanitized_schema,
3525 sanitized_owner);
3526
3527 free(sanitized_name);
3528 free(sanitized_schema);
3529 free(sanitized_owner);
3530
3531 if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3532 {
3533 char *sanitized_tablespace;
3534
3535 sanitized_tablespace = sanitize_line(te->tablespace, false);
3536 ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3537 free(sanitized_tablespace);
3538 }
3539 ahprintf(AH, "\n");
3540
3541 if (AH->PrintExtraTocPtr != NULL)
3542 AH->PrintExtraTocPtr(AH, te);
3543 ahprintf(AH, "--\n\n");
3544 }
3545
3546 /*
3547 * Actually print the definition.
3548 *
3549 * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
3550 * versions put into CREATE SCHEMA. We have to do this when --no-owner
3551 * mode is selected. This is ugly, but I see no other good way ...
3552 */
3553 if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
3554 {
3555 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3556 }
3557 else
3558 {
3559 if (te->defn && strlen(te->defn) > 0)
3560 ahprintf(AH, "%s\n\n", te->defn);
3561 }
3562
3563 /*
3564 * If we aren't using SET SESSION AUTH to determine ownership, we must
3565 * instead issue an ALTER OWNER command. We assume that anything without
3566 * a DROP command is not a separately ownable object. All the categories
3567 * with DROP commands must appear in one list or the other.
3568 */
3569 if (!ropt->noOwner && !ropt->use_setsessauth &&
3570 te->owner && strlen(te->owner) > 0 &&
3571 te->dropStmt && strlen(te->dropStmt) > 0)
3572 {
3573 if (strcmp(te->desc, "AGGREGATE") == 0 ||
3574 strcmp(te->desc, "BLOB") == 0 ||
3575 strcmp(te->desc, "COLLATION") == 0 ||
3576 strcmp(te->desc, "CONVERSION") == 0 ||
3577 strcmp(te->desc, "DATABASE") == 0 ||
3578 strcmp(te->desc, "DOMAIN") == 0 ||
3579 strcmp(te->desc, "FUNCTION") == 0 ||
3580 strcmp(te->desc, "OPERATOR") == 0 ||
3581 strcmp(te->desc, "OPERATOR CLASS") == 0 ||
3582 strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
3583 strcmp(te->desc, "PROCEDURE") == 0 ||
3584 strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
3585 strcmp(te->desc, "SCHEMA") == 0 ||
3586 strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3587 strcmp(te->desc, "TABLE") == 0 ||
3588 strcmp(te->desc, "TYPE") == 0 ||
3589 strcmp(te->desc, "VIEW") == 0 ||
3590 strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3591 strcmp(te->desc, "SEQUENCE") == 0 ||
3592 strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3593 strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
3594 strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
3595 strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
3596 strcmp(te->desc, "SERVER") == 0 ||
3597 strcmp(te->desc, "STATISTICS") == 0 ||
3598 strcmp(te->desc, "PUBLICATION") == 0 ||
3599 strcmp(te->desc, "SUBSCRIPTION") == 0)
3600 {
3601 PQExpBuffer temp = createPQExpBuffer();
3602
3603 appendPQExpBufferStr(temp, "ALTER ");
3604 _getObjectDescription(temp, te, AH);
3605 appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
3606 ahprintf(AH, "%s\n\n", temp->data);
3607 destroyPQExpBuffer(temp);
3608 }
3609 else if (strcmp(te->desc, "CAST") == 0 ||
3610 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3611 strcmp(te->desc, "CONSTRAINT") == 0 ||
3612 strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
3613 strcmp(te->desc, "DEFAULT") == 0 ||
3614 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3615 strcmp(te->desc, "INDEX") == 0 ||
3616 strcmp(te->desc, "RULE") == 0 ||
3617 strcmp(te->desc, "TRIGGER") == 0 ||
3618 strcmp(te->desc, "ROW SECURITY") == 0 ||
3619 strcmp(te->desc, "POLICY") == 0 ||
3620 strcmp(te->desc, "USER MAPPING") == 0)
3621 {
3622 /* these object types don't have separate owners */
3623 }
3624 else
3625 {
3626 pg_log_warning("don't know how to set owner for object type \"%s\"",
3627 te->desc);
3628 }
3629 }
3630
3631 /*
3632 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3633 * commands, so we can no longer assume we know the current auth setting.
3634 */
3635 if (_tocEntryIsACL(te))
3636 {
3637 if (AH->currUser)
3638 free(AH->currUser);
3639 AH->currUser = NULL;
3640 }
3641 }
3642
3643 /*
3644 * Sanitize a string to be included in an SQL comment or TOC listing, by
3645 * replacing any newlines with spaces. This ensures each logical output line
3646 * is in fact one physical output line, to prevent corruption of the dump
3647 * (which could, in the worst case, present an SQL injection vulnerability
3648 * if someone were to incautiously load a dump containing objects with
3649 * maliciously crafted names).
3650 *
3651 * The result is a freshly malloc'd string. If the input string is NULL,
3652 * return a malloc'ed empty string, unless want_hyphen, in which case return a
3653 * malloc'ed hyphen.
3654 *
3655 * Note that we currently don't bother to quote names, meaning that the name
3656 * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3657 * it only examines the dumpId field, but someday we might want to try harder.
3658 */
3659 static char *
sanitize_line(const char * str,bool want_hyphen)3660 sanitize_line(const char *str, bool want_hyphen)
3661 {
3662 char *result;
3663 char *s;
3664
3665 if (!str)
3666 return pg_strdup(want_hyphen ? "-" : "");
3667
3668 result = pg_strdup(str);
3669
3670 for (s = result; *s != '\0'; s++)
3671 {
3672 if (*s == '\n' || *s == '\r')
3673 *s = ' ';
3674 }
3675
3676 return result;
3677 }
3678
3679 /*
3680 * Write the file header for a custom-format archive
3681 */
3682 void
WriteHead(ArchiveHandle * AH)3683 WriteHead(ArchiveHandle *AH)
3684 {
3685 struct tm crtm;
3686
3687 AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3688 AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3689 AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3690 AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3691 AH->WriteBytePtr(AH, AH->intSize);
3692 AH->WriteBytePtr(AH, AH->offSize);
3693 AH->WriteBytePtr(AH, AH->format);
3694 WriteInt(AH, AH->compression);
3695 crtm = *localtime(&AH->createDate);
3696 WriteInt(AH, crtm.tm_sec);
3697 WriteInt(AH, crtm.tm_min);
3698 WriteInt(AH, crtm.tm_hour);
3699 WriteInt(AH, crtm.tm_mday);
3700 WriteInt(AH, crtm.tm_mon);
3701 WriteInt(AH, crtm.tm_year);
3702 WriteInt(AH, crtm.tm_isdst);
3703 WriteStr(AH, PQdb(AH->connection));
3704 WriteStr(AH, AH->public.remoteVersionStr);
3705 WriteStr(AH, PG_VERSION);
3706 }
3707
3708 void
ReadHead(ArchiveHandle * AH)3709 ReadHead(ArchiveHandle *AH)
3710 {
3711 char vmaj,
3712 vmin,
3713 vrev;
3714 int fmt;
3715
3716 /*
3717 * If we haven't already read the header, do so.
3718 *
3719 * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3720 * way to unify the cases?
3721 */
3722 if (!AH->readHeader)
3723 {
3724 char tmpMag[7];
3725
3726 AH->ReadBufPtr(AH, tmpMag, 5);
3727
3728 if (strncmp(tmpMag, "PGDMP", 5) != 0)
3729 fatal("did not find magic string in file header");
3730 }
3731
3732 vmaj = AH->ReadBytePtr(AH);
3733 vmin = AH->ReadBytePtr(AH);
3734
3735 if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
3736 vrev = AH->ReadBytePtr(AH);
3737 else
3738 vrev = 0;
3739
3740 AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
3741
3742 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3743 fatal("unsupported version (%d.%d) in file header",
3744 vmaj, vmin);
3745
3746 AH->intSize = AH->ReadBytePtr(AH);
3747 if (AH->intSize > 32)
3748 fatal("sanity check on integer size (%lu) failed",
3749 (unsigned long) AH->intSize);
3750
3751 if (AH->intSize > sizeof(int))
3752 pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
3753
3754 if (AH->version >= K_VERS_1_7)
3755 AH->offSize = AH->ReadBytePtr(AH);
3756 else
3757 AH->offSize = AH->intSize;
3758
3759 fmt = AH->ReadBytePtr(AH);
3760
3761 if (AH->format != fmt)
3762 fatal("expected format (%d) differs from format found in file (%d)",
3763 AH->format, fmt);
3764
3765 if (AH->version >= K_VERS_1_2)
3766 {
3767 if (AH->version < K_VERS_1_4)
3768 AH->compression = AH->ReadBytePtr(AH);
3769 else
3770 AH->compression = ReadInt(AH);
3771 }
3772 else
3773 AH->compression = Z_DEFAULT_COMPRESSION;
3774
3775 #ifndef HAVE_LIBZ
3776 if (AH->compression != 0)
3777 pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
3778 #endif
3779
3780 if (AH->version >= K_VERS_1_4)
3781 {
3782 struct tm crtm;
3783
3784 crtm.tm_sec = ReadInt(AH);
3785 crtm.tm_min = ReadInt(AH);
3786 crtm.tm_hour = ReadInt(AH);
3787 crtm.tm_mday = ReadInt(AH);
3788 crtm.tm_mon = ReadInt(AH);
3789 crtm.tm_year = ReadInt(AH);
3790 crtm.tm_isdst = ReadInt(AH);
3791
3792 /*
3793 * Newer versions of glibc have mktime() report failure if tm_isdst is
3794 * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
3795 * TZ=UTC. This is problematic when restoring an archive under a
3796 * different timezone setting. If we get a failure, try again with
3797 * tm_isdst set to -1 ("don't know").
3798 *
3799 * XXX with or without this hack, we reconstruct createDate
3800 * incorrectly when the prevailing timezone is different from
3801 * pg_dump's. Next time we bump the archive version, we should flush
3802 * this representation and store a plain seconds-since-the-Epoch
3803 * timestamp instead.
3804 */
3805 AH->createDate = mktime(&crtm);
3806 if (AH->createDate == (time_t) -1)
3807 {
3808 crtm.tm_isdst = -1;
3809 AH->createDate = mktime(&crtm);
3810 if (AH->createDate == (time_t) -1)
3811 pg_log_warning("invalid creation date in header");
3812 }
3813 }
3814
3815 if (AH->version >= K_VERS_1_4)
3816 {
3817 AH->archdbname = ReadStr(AH);
3818 }
3819
3820 if (AH->version >= K_VERS_1_10)
3821 {
3822 AH->archiveRemoteVersion = ReadStr(AH);
3823 AH->archiveDumpVersion = ReadStr(AH);
3824 }
3825 }
3826
3827
3828 /*
3829 * checkSeek
3830 * check to see if ftell/fseek can be performed.
3831 */
3832 bool
checkSeek(FILE * fp)3833 checkSeek(FILE *fp)
3834 {
3835 pgoff_t tpos;
3836
3837 /* Check that ftello works on this file */
3838 tpos = ftello(fp);
3839 if (tpos < 0)
3840 return false;
3841
3842 /*
3843 * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
3844 * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
3845 * successful no-op even on files that are otherwise unseekable.
3846 */
3847 if (fseeko(fp, tpos, SEEK_SET) != 0)
3848 return false;
3849
3850 return true;
3851 }
3852
3853
3854 /*
3855 * dumpTimestamp
3856 */
3857 static void
dumpTimestamp(ArchiveHandle * AH,const char * msg,time_t tim)3858 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3859 {
3860 char buf[64];
3861
3862 if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3863 ahprintf(AH, "-- %s %s\n\n", msg, buf);
3864 }
3865
3866 /*
3867 * Main engine for parallel restore.
3868 *
3869 * Parallel restore is done in three phases. In this first phase,
3870 * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
3871 * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
3872 * PRE_DATA items other than ACLs.) Entries we can't process now are
3873 * added to the pending_list for later phases to deal with.
3874 */
3875 static void
restore_toc_entries_prefork(ArchiveHandle * AH,TocEntry * pending_list)3876 restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
3877 {
3878 bool skipped_some;
3879 TocEntry *next_work_item;
3880
3881 pg_log_debug("entering restore_toc_entries_prefork");
3882
3883 /* Adjust dependency information */
3884 fix_dependencies(AH);
3885
3886 /*
3887 * Do all the early stuff in a single connection in the parent. There's no
3888 * great point in running it in parallel, in fact it will actually run
3889 * faster in a single connection because we avoid all the connection and
3890 * setup overhead. Also, pre-9.2 pg_dump versions were not very good
3891 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
3892 * not risk trying to process them out-of-order.
3893 *
3894 * Stuff that we can't do immediately gets added to the pending_list.
3895 * Note: we don't yet filter out entries that aren't going to be restored.
3896 * They might participate in dependency chains connecting entries that
3897 * should be restored, so we treat them as live until we actually process
3898 * them.
3899 *
3900 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
3901 * before DATA items, and all DATA items before POST_DATA items. That is
3902 * not certain to be true in older archives, though, and in any case use
3903 * of a list file would destroy that ordering (cf. SortTocFromFile). So
3904 * this loop cannot assume that it holds.
3905 */
3906 AH->restorePass = RESTORE_PASS_MAIN;
3907 skipped_some = false;
3908 for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3909 {
3910 bool do_now = true;
3911
3912 if (next_work_item->section != SECTION_PRE_DATA)
3913 {
3914 /* DATA and POST_DATA items are just ignored for now */
3915 if (next_work_item->section == SECTION_DATA ||
3916 next_work_item->section == SECTION_POST_DATA)
3917 {
3918 do_now = false;
3919 skipped_some = true;
3920 }
3921 else
3922 {
3923 /*
3924 * SECTION_NONE items, such as comments, can be processed now
3925 * if we are still in the PRE_DATA part of the archive. Once
3926 * we've skipped any items, we have to consider whether the
3927 * comment's dependencies are satisfied, so skip it for now.
3928 */
3929 if (skipped_some)
3930 do_now = false;
3931 }
3932 }
3933
3934 /*
3935 * Also skip items that need to be forced into later passes. We need
3936 * not set skipped_some in this case, since by assumption no main-pass
3937 * items could depend on these.
3938 */
3939 if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
3940 do_now = false;
3941
3942 if (do_now)
3943 {
3944 /* OK, restore the item and update its dependencies */
3945 pg_log_info("processing item %d %s %s",
3946 next_work_item->dumpId,
3947 next_work_item->desc, next_work_item->tag);
3948
3949 (void) restore_toc_entry(AH, next_work_item, false);
3950
3951 /* Reduce dependencies, but don't move anything to ready_list */
3952 reduce_dependencies(AH, next_work_item, NULL);
3953 }
3954 else
3955 {
3956 /* Nope, so add it to pending_list */
3957 pending_list_append(pending_list, next_work_item);
3958 }
3959 }
3960
3961 /*
3962 * Now close parent connection in prep for parallel steps. We do this
3963 * mainly to ensure that we don't exceed the specified number of parallel
3964 * connections.
3965 */
3966 DisconnectDatabase(&AH->public);
3967
3968 /* blow away any transient state from the old connection */
3969 if (AH->currUser)
3970 free(AH->currUser);
3971 AH->currUser = NULL;
3972 if (AH->currSchema)
3973 free(AH->currSchema);
3974 AH->currSchema = NULL;
3975 if (AH->currTablespace)
3976 free(AH->currTablespace);
3977 AH->currTablespace = NULL;
3978 if (AH->currTableAm)
3979 free(AH->currTableAm);
3980 AH->currTableAm = NULL;
3981 }
3982
3983 /*
3984 * Main engine for parallel restore.
3985 *
3986 * Parallel restore is done in three phases. In this second phase,
3987 * we process entries by dispatching them to parallel worker children
3988 * (processes on Unix, threads on Windows), each of which connects
3989 * separately to the database. Inter-entry dependencies are respected,
3990 * and so is the RestorePass multi-pass structure. When we can no longer
3991 * make any entries ready to process, we exit. Normally, there will be
3992 * nothing left to do; but if there is, the third phase will mop up.
3993 */
3994 static void
restore_toc_entries_parallel(ArchiveHandle * AH,ParallelState * pstate,TocEntry * pending_list)3995 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
3996 TocEntry *pending_list)
3997 {
3998 ParallelReadyList ready_list;
3999 TocEntry *next_work_item;
4000
4001 pg_log_debug("entering restore_toc_entries_parallel");
4002
4003 /* Set up ready_list with enough room for all known TocEntrys */
4004 ready_list_init(&ready_list, AH->tocCount);
4005
4006 /*
4007 * The pending_list contains all items that we need to restore. Move all
4008 * items that are available to process immediately into the ready_list.
4009 * After this setup, the pending list is everything that needs to be done
4010 * but is blocked by one or more dependencies, while the ready list
4011 * contains items that have no remaining dependencies and are OK to
4012 * process in the current restore pass.
4013 */
4014 AH->restorePass = RESTORE_PASS_MAIN;
4015 move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4016
4017 /*
4018 * main parent loop
4019 *
4020 * Keep going until there is no worker still running AND there is no work
4021 * left to be done. Note invariant: at top of loop, there should always
4022 * be at least one worker available to dispatch a job to.
4023 */
4024 pg_log_info("entering main parallel loop");
4025
4026 for (;;)
4027 {
4028 /* Look for an item ready to be dispatched to a worker */
4029 next_work_item = pop_next_work_item(AH, &ready_list, pstate);
4030 if (next_work_item != NULL)
4031 {
4032 /* If not to be restored, don't waste time launching a worker */
4033 if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4034 {
4035 pg_log_info("skipping item %d %s %s",
4036 next_work_item->dumpId,
4037 next_work_item->desc, next_work_item->tag);
4038 /* Update its dependencies as though we'd completed it */
4039 reduce_dependencies(AH, next_work_item, &ready_list);
4040 /* Loop around to see if anything else can be dispatched */
4041 continue;
4042 }
4043
4044 pg_log_info("launching item %d %s %s",
4045 next_work_item->dumpId,
4046 next_work_item->desc, next_work_item->tag);
4047
4048 /* Dispatch to some worker */
4049 DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4050 mark_restore_job_done, &ready_list);
4051 }
4052 else if (IsEveryWorkerIdle(pstate))
4053 {
4054 /*
4055 * Nothing is ready and no worker is running, so we're done with
4056 * the current pass or maybe with the whole process.
4057 */
4058 if (AH->restorePass == RESTORE_PASS_LAST)
4059 break; /* No more parallel processing is possible */
4060
4061 /* Advance to next restore pass */
4062 AH->restorePass++;
4063 /* That probably allows some stuff to be made ready */
4064 move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4065 /* Loop around to see if anything's now ready */
4066 continue;
4067 }
4068 else
4069 {
4070 /*
4071 * We have nothing ready, but at least one child is working, so
4072 * wait for some subjob to finish.
4073 */
4074 }
4075
4076 /*
4077 * Before dispatching another job, check to see if anything has
4078 * finished. We should check every time through the loop so as to
4079 * reduce dependencies as soon as possible. If we were unable to
4080 * dispatch any job this time through, wait until some worker finishes
4081 * (and, hopefully, unblocks some pending item). If we did dispatch
4082 * something, continue as soon as there's at least one idle worker.
4083 * Note that in either case, there's guaranteed to be at least one
4084 * idle worker when we return to the top of the loop. This ensures we
4085 * won't block inside DispatchJobForTocEntry, which would be
4086 * undesirable: we'd rather postpone dispatching until we see what's
4087 * been unblocked by finished jobs.
4088 */
4089 WaitForWorkers(AH, pstate,
4090 next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4091 }
4092
4093 /* There should now be nothing in ready_list. */
4094 Assert(ready_list.first_te > ready_list.last_te);
4095
4096 ready_list_free(&ready_list);
4097
4098 pg_log_info("finished main parallel loop");
4099 }
4100
4101 /*
4102 * Main engine for parallel restore.
4103 *
4104 * Parallel restore is done in three phases. In this third phase,
4105 * we mop up any remaining TOC entries by processing them serially.
4106 * This phase normally should have nothing to do, but if we've somehow
4107 * gotten stuck due to circular dependencies or some such, this provides
4108 * at least some chance of completing the restore successfully.
4109 */
4110 static void
restore_toc_entries_postfork(ArchiveHandle * AH,TocEntry * pending_list)4111 restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4112 {
4113 RestoreOptions *ropt = AH->public.ropt;
4114 TocEntry *te;
4115
4116 pg_log_debug("entering restore_toc_entries_postfork");
4117
4118 /*
4119 * Now reconnect the single parent connection.
4120 */
4121 ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4122
4123 /* re-establish fixed state */
4124 _doSetFixedOutputState(AH);
4125
4126 /*
4127 * Make sure there is no work left due to, say, circular dependencies, or
4128 * some other pathological condition. If so, do it in the single parent
4129 * connection. We don't sweat about RestorePass ordering; it's likely we
4130 * already violated that.
4131 */
4132 for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4133 {
4134 pg_log_info("processing missed item %d %s %s",
4135 te->dumpId, te->desc, te->tag);
4136 (void) restore_toc_entry(AH, te, false);
4137 }
4138 }
4139
4140 /*
4141 * Check if te1 has an exclusive lock requirement for an item that te2 also
4142 * requires, whether or not te2's requirement is for an exclusive lock.
4143 */
4144 static bool
has_lock_conflicts(TocEntry * te1,TocEntry * te2)4145 has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4146 {
4147 int j,
4148 k;
4149
4150 for (j = 0; j < te1->nLockDeps; j++)
4151 {
4152 for (k = 0; k < te2->nDeps; k++)
4153 {
4154 if (te1->lockDeps[j] == te2->dependencies[k])
4155 return true;
4156 }
4157 }
4158 return false;
4159 }
4160
4161
4162 /*
4163 * Initialize the header of the pending-items list.
4164 *
4165 * This is a circular list with a dummy TocEntry as header, just like the
4166 * main TOC list; but we use separate list links so that an entry can be in
4167 * the main TOC list as well as in the pending list.
4168 */
4169 static void
pending_list_header_init(TocEntry * l)4170 pending_list_header_init(TocEntry *l)
4171 {
4172 l->pending_prev = l->pending_next = l;
4173 }
4174
4175 /* Append te to the end of the pending-list headed by l */
4176 static void
pending_list_append(TocEntry * l,TocEntry * te)4177 pending_list_append(TocEntry *l, TocEntry *te)
4178 {
4179 te->pending_prev = l->pending_prev;
4180 l->pending_prev->pending_next = te;
4181 l->pending_prev = te;
4182 te->pending_next = l;
4183 }
4184
4185 /* Remove te from the pending-list */
4186 static void
pending_list_remove(TocEntry * te)4187 pending_list_remove(TocEntry *te)
4188 {
4189 te->pending_prev->pending_next = te->pending_next;
4190 te->pending_next->pending_prev = te->pending_prev;
4191 te->pending_prev = NULL;
4192 te->pending_next = NULL;
4193 }
4194
4195
4196 /*
4197 * Initialize the ready_list with enough room for up to tocCount entries.
4198 */
4199 static void
ready_list_init(ParallelReadyList * ready_list,int tocCount)4200 ready_list_init(ParallelReadyList *ready_list, int tocCount)
4201 {
4202 ready_list->tes = (TocEntry **)
4203 pg_malloc(tocCount * sizeof(TocEntry *));
4204 ready_list->first_te = 0;
4205 ready_list->last_te = -1;
4206 ready_list->sorted = false;
4207 }
4208
4209 /*
4210 * Free storage for a ready_list.
4211 */
4212 static void
ready_list_free(ParallelReadyList * ready_list)4213 ready_list_free(ParallelReadyList *ready_list)
4214 {
4215 pg_free(ready_list->tes);
4216 }
4217
4218 /* Add te to the ready_list */
4219 static void
ready_list_insert(ParallelReadyList * ready_list,TocEntry * te)4220 ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
4221 {
4222 ready_list->tes[++ready_list->last_te] = te;
4223 /* List is (probably) not sorted anymore. */
4224 ready_list->sorted = false;
4225 }
4226
4227 /* Remove the i'th entry in the ready_list */
4228 static void
ready_list_remove(ParallelReadyList * ready_list,int i)4229 ready_list_remove(ParallelReadyList *ready_list, int i)
4230 {
4231 int f = ready_list->first_te;
4232
4233 Assert(i >= f && i <= ready_list->last_te);
4234
4235 /*
4236 * In the typical case where the item to be removed is the first ready
4237 * entry, we need only increment first_te to remove it. Otherwise, move
4238 * the entries before it to compact the list. (This preserves sortedness,
4239 * if any.) We could alternatively move the entries after i, but there
4240 * are typically many more of those.
4241 */
4242 if (i > f)
4243 {
4244 TocEntry **first_te_ptr = &ready_list->tes[f];
4245
4246 memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
4247 }
4248 ready_list->first_te++;
4249 }
4250
4251 /* Sort the ready_list into the desired order */
4252 static void
ready_list_sort(ParallelReadyList * ready_list)4253 ready_list_sort(ParallelReadyList *ready_list)
4254 {
4255 if (!ready_list->sorted)
4256 {
4257 int n = ready_list->last_te - ready_list->first_te + 1;
4258
4259 if (n > 1)
4260 qsort(ready_list->tes + ready_list->first_te, n,
4261 sizeof(TocEntry *),
4262 TocEntrySizeCompare);
4263 ready_list->sorted = true;
4264 }
4265 }
4266
4267 /* qsort comparator for sorting TocEntries by dataLength */
4268 static int
TocEntrySizeCompare(const void * p1,const void * p2)4269 TocEntrySizeCompare(const void *p1, const void *p2)
4270 {
4271 const TocEntry *te1 = *(const TocEntry *const *) p1;
4272 const TocEntry *te2 = *(const TocEntry *const *) p2;
4273
4274 /* Sort by decreasing dataLength */
4275 if (te1->dataLength > te2->dataLength)
4276 return -1;
4277 if (te1->dataLength < te2->dataLength)
4278 return 1;
4279
4280 /* For equal dataLengths, sort by dumpId, just to be stable */
4281 if (te1->dumpId < te2->dumpId)
4282 return -1;
4283 if (te1->dumpId > te2->dumpId)
4284 return 1;
4285
4286 return 0;
4287 }
4288
4289
4290 /*
4291 * Move all immediately-ready items from pending_list to ready_list.
4292 *
4293 * Items are considered ready if they have no remaining dependencies and
4294 * they belong in the current restore pass. (See also reduce_dependencies,
4295 * which applies the same logic one-at-a-time.)
4296 */
4297 static void
move_to_ready_list(TocEntry * pending_list,ParallelReadyList * ready_list,RestorePass pass)4298 move_to_ready_list(TocEntry *pending_list,
4299 ParallelReadyList *ready_list,
4300 RestorePass pass)
4301 {
4302 TocEntry *te;
4303 TocEntry *next_te;
4304
4305 for (te = pending_list->pending_next; te != pending_list; te = next_te)
4306 {
4307 /* must save list link before possibly removing te from list */
4308 next_te = te->pending_next;
4309
4310 if (te->depCount == 0 &&
4311 _tocEntryRestorePass(te) == pass)
4312 {
4313 /* Remove it from pending_list ... */
4314 pending_list_remove(te);
4315 /* ... and add to ready_list */
4316 ready_list_insert(ready_list, te);
4317 }
4318 }
4319 }
4320
4321 /*
4322 * Find the next work item (if any) that is capable of being run now,
4323 * and remove it from the ready_list.
4324 *
4325 * Returns the item, or NULL if nothing is runnable.
4326 *
4327 * To qualify, the item must have no remaining dependencies
4328 * and no requirements for locks that are incompatible with
4329 * items currently running. Items in the ready_list are known to have
4330 * no remaining dependencies, but we have to check for lock conflicts.
4331 */
4332 static TocEntry *
pop_next_work_item(ArchiveHandle * AH,ParallelReadyList * ready_list,ParallelState * pstate)4333 pop_next_work_item(ArchiveHandle *AH, ParallelReadyList *ready_list,
4334 ParallelState *pstate)
4335 {
4336 /*
4337 * Sort the ready_list so that we'll tackle larger jobs first.
4338 */
4339 ready_list_sort(ready_list);
4340
4341 /*
4342 * Search the ready_list until we find a suitable item.
4343 */
4344 for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
4345 {
4346 TocEntry *te = ready_list->tes[i];
4347 bool conflicts = false;
4348
4349 /*
4350 * Check to see if the item would need exclusive lock on something
4351 * that a currently running item also needs lock on, or vice versa. If
4352 * so, we don't want to schedule them together.
4353 */
4354 for (int k = 0; k < pstate->numWorkers; k++)
4355 {
4356 TocEntry *running_te = pstate->te[k];
4357
4358 if (running_te == NULL)
4359 continue;
4360 if (has_lock_conflicts(te, running_te) ||
4361 has_lock_conflicts(running_te, te))
4362 {
4363 conflicts = true;
4364 break;
4365 }
4366 }
4367
4368 if (conflicts)
4369 continue;
4370
4371 /* passed all tests, so this item can run */
4372 ready_list_remove(ready_list, i);
4373 return te;
4374 }
4375
4376 pg_log_debug("no item ready");
4377 return NULL;
4378 }
4379
4380
4381 /*
4382 * Restore a single TOC item in parallel with others
4383 *
4384 * this is run in the worker, i.e. in a thread (Windows) or a separate process
4385 * (everything else). A worker process executes several such work items during
4386 * a parallel backup or restore. Once we terminate here and report back that
4387 * our work is finished, the master process will assign us a new work item.
4388 */
4389 int
parallel_restore(ArchiveHandle * AH,TocEntry * te)4390 parallel_restore(ArchiveHandle *AH, TocEntry *te)
4391 {
4392 int status;
4393
4394 Assert(AH->connection != NULL);
4395
4396 /* Count only errors associated with this TOC entry */
4397 AH->public.n_errors = 0;
4398
4399 /* Restore the TOC item */
4400 status = restore_toc_entry(AH, te, true);
4401
4402 return status;
4403 }
4404
4405
4406 /*
4407 * Callback function that's invoked in the master process after a step has
4408 * been parallel restored.
4409 *
4410 * Update status and reduce the dependency count of any dependent items.
4411 */
4412 static void
mark_restore_job_done(ArchiveHandle * AH,TocEntry * te,int status,void * callback_data)4413 mark_restore_job_done(ArchiveHandle *AH,
4414 TocEntry *te,
4415 int status,
4416 void *callback_data)
4417 {
4418 ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
4419
4420 pg_log_info("finished item %d %s %s",
4421 te->dumpId, te->desc, te->tag);
4422
4423 if (status == WORKER_CREATE_DONE)
4424 mark_create_done(AH, te);
4425 else if (status == WORKER_INHIBIT_DATA)
4426 {
4427 inhibit_data_for_failed_table(AH, te);
4428 AH->public.n_errors++;
4429 }
4430 else if (status == WORKER_IGNORED_ERRORS)
4431 AH->public.n_errors++;
4432 else if (status != 0)
4433 fatal("worker process failed: exit code %d",
4434 status);
4435
4436 reduce_dependencies(AH, te, ready_list);
4437 }
4438
4439
4440 /*
4441 * Process the dependency information into a form useful for parallel restore.
4442 *
4443 * This function takes care of fixing up some missing or badly designed
4444 * dependencies, and then prepares subsidiary data structures that will be
4445 * used in the main parallel-restore logic, including:
4446 * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4447 * 2. We set up depCount fields that are the number of as-yet-unprocessed
4448 * dependencies for each TOC entry.
4449 *
4450 * We also identify locking dependencies so that we can avoid trying to
4451 * schedule conflicting items at the same time.
4452 */
4453 static void
fix_dependencies(ArchiveHandle * AH)4454 fix_dependencies(ArchiveHandle *AH)
4455 {
4456 TocEntry *te;
4457 int i;
4458
4459 /*
4460 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4461 * items are marked as not being in any parallel-processing list.
4462 */
4463 for (te = AH->toc->next; te != AH->toc; te = te->next)
4464 {
4465 te->depCount = te->nDeps;
4466 te->revDeps = NULL;
4467 te->nRevDeps = 0;
4468 te->pending_prev = NULL;
4469 te->pending_next = NULL;
4470 }
4471
4472 /*
4473 * POST_DATA items that are shown as depending on a table need to be
4474 * re-pointed to depend on that table's data, instead. This ensures they
4475 * won't get scheduled until the data has been loaded.
4476 */
4477 repoint_table_dependencies(AH);
4478
4479 /*
4480 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4481 * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4482 * one BLOB COMMENTS in such files.)
4483 */
4484 if (AH->version < K_VERS_1_11)
4485 {
4486 for (te = AH->toc->next; te != AH->toc; te = te->next)
4487 {
4488 if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4489 {
4490 TocEntry *te2;
4491
4492 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4493 {
4494 if (strcmp(te2->desc, "BLOBS") == 0)
4495 {
4496 te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4497 te->dependencies[0] = te2->dumpId;
4498 te->nDeps++;
4499 te->depCount++;
4500 break;
4501 }
4502 }
4503 break;
4504 }
4505 }
4506 }
4507
4508 /*
4509 * At this point we start to build the revDeps reverse-dependency arrays,
4510 * so all changes of dependencies must be complete.
4511 */
4512
4513 /*
4514 * Count the incoming dependencies for each item. Also, it is possible
4515 * that the dependencies list items that are not in the archive at all
4516 * (that should not happen in 9.2 and later, but is highly likely in older
4517 * archives). Subtract such items from the depCounts.
4518 */
4519 for (te = AH->toc->next; te != AH->toc; te = te->next)
4520 {
4521 for (i = 0; i < te->nDeps; i++)
4522 {
4523 DumpId depid = te->dependencies[i];
4524
4525 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4526 AH->tocsByDumpId[depid]->nRevDeps++;
4527 else
4528 te->depCount--;
4529 }
4530 }
4531
4532 /*
4533 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4534 * it as a counter below.
4535 */
4536 for (te = AH->toc->next; te != AH->toc; te = te->next)
4537 {
4538 if (te->nRevDeps > 0)
4539 te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4540 te->nRevDeps = 0;
4541 }
4542
4543 /*
4544 * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4545 * better agree with the loops above.
4546 */
4547 for (te = AH->toc->next; te != AH->toc; te = te->next)
4548 {
4549 for (i = 0; i < te->nDeps; i++)
4550 {
4551 DumpId depid = te->dependencies[i];
4552
4553 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4554 {
4555 TocEntry *otherte = AH->tocsByDumpId[depid];
4556
4557 otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4558 }
4559 }
4560 }
4561
4562 /*
4563 * Lastly, work out the locking dependencies.
4564 */
4565 for (te = AH->toc->next; te != AH->toc; te = te->next)
4566 {
4567 te->lockDeps = NULL;
4568 te->nLockDeps = 0;
4569 identify_locking_dependencies(AH, te);
4570 }
4571 }
4572
4573 /*
4574 * Change dependencies on table items to depend on table data items instead,
4575 * but only in POST_DATA items.
4576 *
4577 * Also, for any item having such dependency(s), set its dataLength to the
4578 * largest dataLength of the table data items it depends on. This ensures
4579 * that parallel restore will prioritize larger jobs (index builds, FK
4580 * constraint checks, etc) over smaller ones, avoiding situations where we
4581 * end a restore with only one active job working on a large table.
4582 */
4583 static void
repoint_table_dependencies(ArchiveHandle * AH)4584 repoint_table_dependencies(ArchiveHandle *AH)
4585 {
4586 TocEntry *te;
4587 int i;
4588 DumpId olddep;
4589
4590 for (te = AH->toc->next; te != AH->toc; te = te->next)
4591 {
4592 if (te->section != SECTION_POST_DATA)
4593 continue;
4594 for (i = 0; i < te->nDeps; i++)
4595 {
4596 olddep = te->dependencies[i];
4597 if (olddep <= AH->maxDumpId &&
4598 AH->tableDataId[olddep] != 0)
4599 {
4600 DumpId tabledataid = AH->tableDataId[olddep];
4601 TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4602
4603 te->dependencies[i] = tabledataid;
4604 te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4605 pg_log_debug("transferring dependency %d -> %d to %d",
4606 te->dumpId, olddep, tabledataid);
4607 }
4608 }
4609 }
4610 }
4611
4612 /*
4613 * Identify which objects we'll need exclusive lock on in order to restore
4614 * the given TOC entry (*other* than the one identified by the TOC entry
4615 * itself). Record their dump IDs in the entry's lockDeps[] array.
4616 */
4617 static void
identify_locking_dependencies(ArchiveHandle * AH,TocEntry * te)4618 identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4619 {
4620 DumpId *lockids;
4621 int nlockids;
4622 int i;
4623
4624 /*
4625 * We only care about this for POST_DATA items. PRE_DATA items are not
4626 * run in parallel, and DATA items are all independent by assumption.
4627 */
4628 if (te->section != SECTION_POST_DATA)
4629 return;
4630
4631 /* Quick exit if no dependencies at all */
4632 if (te->nDeps == 0)
4633 return;
4634
4635 /*
4636 * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4637 * and hence require exclusive lock. However, we know that CREATE INDEX
4638 * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4639 * category too ... but today is not that day.)
4640 */
4641 if (strcmp(te->desc, "INDEX") == 0)
4642 return;
4643
4644 /*
4645 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4646 * item listed among its dependencies. Originally all of these would have
4647 * been TABLE items, but repoint_table_dependencies would have repointed
4648 * them to the TABLE DATA items if those are present (which they might not
4649 * be, eg in a schema-only dump). Note that all of the entries we are
4650 * processing here are POST_DATA; otherwise there might be a significant
4651 * difference between a dependency on a table and a dependency on its
4652 * data, so that closer analysis would be needed here.
4653 */
4654 lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4655 nlockids = 0;
4656 for (i = 0; i < te->nDeps; i++)
4657 {
4658 DumpId depid = te->dependencies[i];
4659
4660 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4661 ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4662 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4663 lockids[nlockids++] = depid;
4664 }
4665
4666 if (nlockids == 0)
4667 {
4668 free(lockids);
4669 return;
4670 }
4671
4672 te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4673 te->nLockDeps = nlockids;
4674 }
4675
4676 /*
4677 * Remove the specified TOC entry from the depCounts of items that depend on
4678 * it, thereby possibly making them ready-to-run. Any pending item that
4679 * becomes ready should be moved to the ready_list, if that's provided.
4680 */
4681 static void
reduce_dependencies(ArchiveHandle * AH,TocEntry * te,ParallelReadyList * ready_list)4682 reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
4683 ParallelReadyList *ready_list)
4684 {
4685 int i;
4686
4687 pg_log_debug("reducing dependencies for %d", te->dumpId);
4688
4689 for (i = 0; i < te->nRevDeps; i++)
4690 {
4691 TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4692
4693 Assert(otherte->depCount > 0);
4694 otherte->depCount--;
4695
4696 /*
4697 * It's ready if it has no remaining dependencies, and it belongs in
4698 * the current restore pass, and it is currently a member of the
4699 * pending list (that check is needed to prevent double restore in
4700 * some cases where a list-file forces out-of-order restoring).
4701 * However, if ready_list == NULL then caller doesn't want any list
4702 * memberships changed.
4703 */
4704 if (otherte->depCount == 0 &&
4705 _tocEntryRestorePass(otherte) == AH->restorePass &&
4706 otherte->pending_prev != NULL &&
4707 ready_list != NULL)
4708 {
4709 /* Remove it from pending list ... */
4710 pending_list_remove(otherte);
4711 /* ... and add to ready_list */
4712 ready_list_insert(ready_list, otherte);
4713 }
4714 }
4715 }
4716
4717 /*
4718 * Set the created flag on the DATA member corresponding to the given
4719 * TABLE member
4720 */
4721 static void
mark_create_done(ArchiveHandle * AH,TocEntry * te)4722 mark_create_done(ArchiveHandle *AH, TocEntry *te)
4723 {
4724 if (AH->tableDataId[te->dumpId] != 0)
4725 {
4726 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4727
4728 ted->created = true;
4729 }
4730 }
4731
4732 /*
4733 * Mark the DATA member corresponding to the given TABLE member
4734 * as not wanted
4735 */
4736 static void
inhibit_data_for_failed_table(ArchiveHandle * AH,TocEntry * te)4737 inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
4738 {
4739 pg_log_info("table \"%s\" could not be created, will not restore its data",
4740 te->tag);
4741
4742 if (AH->tableDataId[te->dumpId] != 0)
4743 {
4744 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4745
4746 ted->reqs = 0;
4747 }
4748 }
4749
4750 /*
4751 * Clone and de-clone routines used in parallel restoration.
4752 *
4753 * Enough of the structure is cloned to ensure that there is no
4754 * conflict between different threads each with their own clone.
4755 */
4756 ArchiveHandle *
CloneArchive(ArchiveHandle * AH)4757 CloneArchive(ArchiveHandle *AH)
4758 {
4759 ArchiveHandle *clone;
4760
4761 /* Make a "flat" copy */
4762 clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4763 memcpy(clone, AH, sizeof(ArchiveHandle));
4764
4765 /* Handle format-independent fields */
4766 memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4767
4768 /* The clone will have its own connection, so disregard connection state */
4769 clone->connection = NULL;
4770 clone->connCancel = NULL;
4771 clone->currUser = NULL;
4772 clone->currSchema = NULL;
4773 clone->currTablespace = NULL;
4774
4775 /* savedPassword must be local in case we change it while connecting */
4776 if (clone->savedPassword)
4777 clone->savedPassword = pg_strdup(clone->savedPassword);
4778
4779 /* clone has its own error count, too */
4780 clone->public.n_errors = 0;
4781
4782 /*
4783 * Connect our new clone object to the database, using the same connection
4784 * parameters used for the original connection.
4785 */
4786 ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
4787
4788 /* re-establish fixed state */
4789 if (AH->mode == archModeRead)
4790 _doSetFixedOutputState(clone);
4791 /* in write case, setupDumpWorker will fix up connection state */
4792
4793 /* Let the format-specific code have a chance too */
4794 clone->ClonePtr(clone);
4795
4796 Assert(clone->connection != NULL);
4797 return clone;
4798 }
4799
4800 /*
4801 * Release clone-local storage.
4802 *
4803 * Note: we assume any clone-local connection was already closed.
4804 */
4805 void
DeCloneArchive(ArchiveHandle * AH)4806 DeCloneArchive(ArchiveHandle *AH)
4807 {
4808 /* Should not have an open database connection */
4809 Assert(AH->connection == NULL);
4810
4811 /* Clear format-specific state */
4812 AH->DeClonePtr(AH);
4813
4814 /* Clear state allocated by CloneArchive */
4815 if (AH->sqlparse.curCmd)
4816 destroyPQExpBuffer(AH->sqlparse.curCmd);
4817
4818 /* Clear any connection-local state */
4819 if (AH->currUser)
4820 free(AH->currUser);
4821 if (AH->currSchema)
4822 free(AH->currSchema);
4823 if (AH->currTablespace)
4824 free(AH->currTablespace);
4825 if (AH->currTableAm)
4826 free(AH->currTableAm);
4827 if (AH->savedPassword)
4828 free(AH->savedPassword);
4829
4830 free(AH);
4831 }
4832