1 /*-------------------------------------------------------------------------
2 *
3 * pg_backup_archiver.c
4 *
5 * Private implementation of the archiver routines.
6 *
7 * See the headers to pg_restore for more details.
8 *
9 * Copyright (c) 2000, Philip Warner
10 * Rights are granted to use this software in any way so long
11 * as this notice is not removed.
12 *
13 * The author is not responsible for loss or damages that may
14 * result from its use.
15 *
16 *
17 * IDENTIFICATION
18 * src/bin/pg_dump/pg_backup_archiver.c
19 *
20 *-------------------------------------------------------------------------
21 */
22 #include "postgres_fe.h"
23
24 #include "parallel.h"
25 #include "pg_backup_archiver.h"
26 #include "pg_backup_db.h"
27 #include "pg_backup_utils.h"
28 #include "dumputils.h"
29 #include "fe_utils/string_utils.h"
30
31 #include <ctype.h>
32 #include <fcntl.h>
33 #include <unistd.h>
34 #include <sys/stat.h>
35 #include <sys/types.h>
36 #include <sys/wait.h>
37
38 #ifdef WIN32
39 #include <io.h>
40 #endif
41
42 #include "libpq/libpq-fs.h"
43
44 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
45 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
46
47 /* state needed to save/restore an archive's output target */
48 typedef struct _outputContext
49 {
50 void *OF;
51 int gzOut;
52 } OutputContext;
53
54 /* translator: this is a module name */
55 static const char *modulename = gettext_noop("archiver");
56
57
58 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
59 const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr);
60 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
61 ArchiveHandle *AH);
62 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
63 static char *replace_line_endings(const char *str);
64 static void _doSetFixedOutputState(ArchiveHandle *AH);
65 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
66 static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
67 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
68 static void _becomeUser(ArchiveHandle *AH, const char *user);
69 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
70 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
71 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
72 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
73 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
74 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
75 static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt);
76 static RestorePass _tocEntryRestorePass(TocEntry *te);
77 static bool _tocEntryIsACL(TocEntry *te);
78 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
79 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
80 static void buildTocEntryArrays(ArchiveHandle *AH);
81 static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
82 static int _discoverArchiveFormat(ArchiveHandle *AH);
83
84 static int RestoringToDB(ArchiveHandle *AH);
85 static void dump_lo_buf(ArchiveHandle *AH);
86 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
87 static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
88 static OutputContext SaveOutput(ArchiveHandle *AH);
89 static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
90
91 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
92 static void restore_toc_entries_prefork(ArchiveHandle *AH,
93 TocEntry *pending_list);
94 static void restore_toc_entries_parallel(ArchiveHandle *AH,
95 ParallelState *pstate,
96 TocEntry *pending_list);
97 static void restore_toc_entries_postfork(ArchiveHandle *AH,
98 TocEntry *pending_list);
99 static void par_list_header_init(TocEntry *l);
100 static void par_list_append(TocEntry *l, TocEntry *te);
101 static void par_list_remove(TocEntry *te);
102 static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
103 RestorePass pass);
104 static TocEntry *get_next_work_item(ArchiveHandle *AH,
105 TocEntry *ready_list,
106 ParallelState *pstate);
107 static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
108 int worker, int status,
109 ParallelState *pstate);
110 static void fix_dependencies(ArchiveHandle *AH);
111 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
112 static void repoint_table_dependencies(ArchiveHandle *AH);
113 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
114 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
115 TocEntry *ready_list);
116 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
117 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
118
119 static void StrictNamesCheck(RestoreOptions *ropt);
120
121
122 /*
123 * Allocate a new DumpOptions block containing all default values.
124 */
125 DumpOptions *
NewDumpOptions(void)126 NewDumpOptions(void)
127 {
128 DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
129
130 InitDumpOptions(opts);
131 return opts;
132 }
133
134 /*
135 * Initialize a DumpOptions struct to all default values
136 */
137 void
InitDumpOptions(DumpOptions * opts)138 InitDumpOptions(DumpOptions *opts)
139 {
140 memset(opts, 0, sizeof(DumpOptions));
141 /* set any fields that shouldn't default to zeroes */
142 opts->include_everything = true;
143 opts->cparams.promptPassword = TRI_DEFAULT;
144 opts->dumpSections = DUMP_UNSECTIONED;
145 }
146
147 /*
148 * Create a freshly allocated DumpOptions with options equivalent to those
149 * found in the given RestoreOptions.
150 */
151 DumpOptions *
dumpOptionsFromRestoreOptions(RestoreOptions * ropt)152 dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
153 {
154 DumpOptions *dopt = NewDumpOptions();
155
156 /* this is the inverse of what's at the end of pg_dump.c's main() */
157 dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
158 dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
159 dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
160 dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
161 dopt->cparams.promptPassword = ropt->cparams.promptPassword;
162 dopt->outputClean = ropt->dropSchema;
163 dopt->dataOnly = ropt->dataOnly;
164 dopt->schemaOnly = ropt->schemaOnly;
165 dopt->if_exists = ropt->if_exists;
166 dopt->column_inserts = ropt->column_inserts;
167 dopt->dumpSections = ropt->dumpSections;
168 dopt->aclsSkip = ropt->aclsSkip;
169 dopt->outputSuperuser = ropt->superuser;
170 dopt->outputCreateDB = ropt->createDB;
171 dopt->outputNoOwner = ropt->noOwner;
172 dopt->outputNoTablespaces = ropt->noTablespace;
173 dopt->disable_triggers = ropt->disable_triggers;
174 dopt->use_setsessauth = ropt->use_setsessauth;
175
176 dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
177 dopt->dump_inserts = ropt->dump_inserts;
178 dopt->no_security_labels = ropt->no_security_labels;
179 dopt->lockWaitTimeout = ropt->lockWaitTimeout;
180 dopt->include_everything = ropt->include_everything;
181 dopt->enable_row_security = ropt->enable_row_security;
182
183 return dopt;
184 }
185
186
187 /*
188 * Wrapper functions.
189 *
190 * The objective it to make writing new formats and dumpers as simple
191 * as possible, if necessary at the expense of extra function calls etc.
192 *
193 */
194
195 /*
196 * The dump worker setup needs lots of knowledge of the internals of pg_dump,
197 * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
198 * setup doesn't need to know anything much, so it's defined here.
199 */
200 static void
setupRestoreWorker(Archive * AHX)201 setupRestoreWorker(Archive *AHX)
202 {
203 ArchiveHandle *AH = (ArchiveHandle *) AHX;
204
205 (AH->ReopenPtr) (AH);
206 }
207
208
209 /* Create a new archive */
210 /* Public */
211 Archive *
CreateArchive(const char * FileSpec,const ArchiveFormat fmt,const int compression,ArchiveMode mode,SetupWorkerPtr setupDumpWorker)212 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
213 const int compression, ArchiveMode mode, SetupWorkerPtr setupDumpWorker)
214
215 {
216 ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode, setupDumpWorker);
217
218 return (Archive *) AH;
219 }
220
221 /* Open an existing archive */
222 /* Public */
223 Archive *
OpenArchive(const char * FileSpec,const ArchiveFormat fmt)224 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
225 {
226 ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead, setupRestoreWorker);
227
228 return (Archive *) AH;
229 }
230
231 /* Public */
232 void
CloseArchive(Archive * AHX)233 CloseArchive(Archive *AHX)
234 {
235 int res = 0;
236 ArchiveHandle *AH = (ArchiveHandle *) AHX;
237
238 (*AH->ClosePtr) (AH);
239
240 /* Close the output */
241 if (AH->gzOut)
242 res = GZCLOSE(AH->OF);
243 else if (AH->OF != stdout)
244 res = fclose(AH->OF);
245
246 if (res != 0)
247 exit_horribly(modulename, "could not close output file: %s\n",
248 strerror(errno));
249 }
250
251 /* Public */
252 void
SetArchiveOptions(Archive * AH,DumpOptions * dopt,RestoreOptions * ropt)253 SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
254 {
255 /* Caller can omit dump options, in which case we synthesize them */
256 if (dopt == NULL && ropt != NULL)
257 dopt = dumpOptionsFromRestoreOptions(ropt);
258
259 /* Save options for later access */
260 AH->dopt = dopt;
261 AH->ropt = ropt;
262 }
263
264 /* Public */
265 void
ProcessArchiveRestoreOptions(Archive * AHX)266 ProcessArchiveRestoreOptions(Archive *AHX)
267 {
268 ArchiveHandle *AH = (ArchiveHandle *) AHX;
269 RestoreOptions *ropt = AH->public.ropt;
270 TocEntry *te;
271 teSection curSection;
272
273 /* Decide which TOC entries will be dumped/restored, and mark them */
274 curSection = SECTION_PRE_DATA;
275 for (te = AH->toc->next; te != AH->toc; te = te->next)
276 {
277 /*
278 * When writing an archive, we also take this opportunity to check
279 * that we have generated the entries in a sane order that respects
280 * the section divisions. When reading, don't complain, since buggy
281 * old versions of pg_dump might generate out-of-order archives.
282 */
283 if (AH->mode != archModeRead)
284 {
285 switch (te->section)
286 {
287 case SECTION_NONE:
288 /* ok to be anywhere */
289 break;
290 case SECTION_PRE_DATA:
291 if (curSection != SECTION_PRE_DATA)
292 write_msg(modulename,
293 "WARNING: archive items not in correct section order\n");
294 break;
295 case SECTION_DATA:
296 if (curSection == SECTION_POST_DATA)
297 write_msg(modulename,
298 "WARNING: archive items not in correct section order\n");
299 break;
300 case SECTION_POST_DATA:
301 /* ok no matter which section we were in */
302 break;
303 default:
304 exit_horribly(modulename, "unexpected section code %d\n",
305 (int) te->section);
306 break;
307 }
308 }
309
310 if (te->section != SECTION_NONE)
311 curSection = te->section;
312
313 te->reqs = _tocEntryRequired(te, curSection, ropt);
314 }
315
316 /* Enforce strict names checking */
317 if (ropt->strict_names)
318 StrictNamesCheck(ropt);
319 }
320
321 /* Public */
322 void
RestoreArchive(Archive * AHX)323 RestoreArchive(Archive *AHX)
324 {
325 ArchiveHandle *AH = (ArchiveHandle *) AHX;
326 RestoreOptions *ropt = AH->public.ropt;
327 bool parallel_mode;
328 TocEntry *te;
329 OutputContext sav;
330
331 AH->stage = STAGE_INITIALIZING;
332
333 /*
334 * Check for nonsensical option combinations.
335 *
336 * -C is not compatible with -1, because we can't create a database inside
337 * a transaction block.
338 */
339 if (ropt->createDB && ropt->single_txn)
340 exit_horribly(modulename, "-C and -1 are incompatible options\n");
341
342 /*
343 * If we're going to do parallel restore, there are some restrictions.
344 */
345 parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
346 if (parallel_mode)
347 {
348 /* We haven't got round to making this work for all archive formats */
349 if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
350 exit_horribly(modulename, "parallel restore is not supported with this archive file format\n");
351
352 /* Doesn't work if the archive represents dependencies as OIDs */
353 if (AH->version < K_VERS_1_8)
354 exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
355
356 /*
357 * It's also not gonna work if we can't reopen the input file, so
358 * let's try that immediately.
359 */
360 (AH->ReopenPtr) (AH);
361 }
362
363 /*
364 * Make sure we won't need (de)compression we haven't got
365 */
366 #ifndef HAVE_LIBZ
367 if (AH->compression != 0 && AH->PrintTocDataPtr !=NULL)
368 {
369 for (te = AH->toc->next; te != AH->toc; te = te->next)
370 {
371 if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
372 exit_horribly(modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
373 }
374 }
375 #endif
376
377 /*
378 * Prepare index arrays, so we can assume we have them throughout restore.
379 * It's possible we already did this, though.
380 */
381 if (AH->tocsByDumpId == NULL)
382 buildTocEntryArrays(AH);
383
384 /*
385 * If we're using a DB connection, then connect it.
386 */
387 if (ropt->useDB)
388 {
389 ahlog(AH, 1, "connecting to database for restore\n");
390 if (AH->version < K_VERS_1_3)
391 exit_horribly(modulename, "direct database connections are not supported in pre-1.3 archives\n");
392
393 /*
394 * We don't want to guess at whether the dump will successfully
395 * restore; allow the attempt regardless of the version of the restore
396 * target.
397 */
398 AHX->minRemoteVersion = 0;
399 AHX->maxRemoteVersion = 999999;
400
401 ConnectDatabase(AHX, &ropt->cparams, false);
402
403 /*
404 * If we're talking to the DB directly, don't send comments since they
405 * obscure SQL when displaying errors
406 */
407 AH->noTocComments = 1;
408 }
409
410 /*
411 * Work out if we have an implied data-only restore. This can happen if
412 * the dump was data only or if the user has used a toc list to exclude
413 * all of the schema data. All we do is look for schema entries - if none
414 * are found then we set the dataOnly flag.
415 *
416 * We could scan for wanted TABLE entries, but that is not the same as
417 * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
418 */
419 if (!ropt->dataOnly)
420 {
421 int impliedDataOnly = 1;
422
423 for (te = AH->toc->next; te != AH->toc; te = te->next)
424 {
425 if ((te->reqs & REQ_SCHEMA) != 0)
426 { /* It's schema, and it's wanted */
427 impliedDataOnly = 0;
428 break;
429 }
430 }
431 if (impliedDataOnly)
432 {
433 ropt->dataOnly = impliedDataOnly;
434 ahlog(AH, 1, "implied data-only restore\n");
435 }
436 }
437
438 /*
439 * Setup the output file if necessary.
440 */
441 sav = SaveOutput(AH);
442 if (ropt->filename || ropt->compression)
443 SetOutput(AH, ropt->filename, ropt->compression);
444
445 ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
446
447 if (AH->archiveRemoteVersion)
448 ahprintf(AH, "-- Dumped from database version %s\n",
449 AH->archiveRemoteVersion);
450 if (AH->archiveDumpVersion)
451 ahprintf(AH, "-- Dumped by pg_dump version %s\n",
452 AH->archiveDumpVersion);
453
454 ahprintf(AH, "\n");
455
456 if (AH->public.verbose)
457 dumpTimestamp(AH, "Started on", AH->createDate);
458
459 if (ropt->single_txn)
460 {
461 if (AH->connection)
462 StartTransaction(AHX);
463 else
464 ahprintf(AH, "BEGIN;\n\n");
465 }
466
467 /*
468 * Establish important parameter values right away.
469 */
470 _doSetFixedOutputState(AH);
471
472 AH->stage = STAGE_PROCESSING;
473
474 /*
475 * Drop the items at the start, in reverse order
476 */
477 if (ropt->dropSchema)
478 {
479 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
480 {
481 AH->currentTE = te;
482
483 /*
484 * In createDB mode, issue a DROP *only* for the database as a
485 * whole. Issuing drops against anything else would be wrong,
486 * because at this point we're connected to the wrong database.
487 * Conversely, if we're not in createDB mode, we'd better not
488 * issue a DROP against the database at all.
489 */
490 if (ropt->createDB)
491 {
492 if (strcmp(te->desc, "DATABASE") != 0)
493 continue;
494 }
495 else
496 {
497 if (strcmp(te->desc, "DATABASE") == 0)
498 continue;
499 }
500
501 /* Otherwise, drop anything that's selected and has a dropStmt */
502 if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
503 {
504 ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
505 /* Select owner and schema as necessary */
506 _becomeOwner(AH, te);
507 _selectOutputSchema(AH, te->namespace);
508
509 /*
510 * Now emit the DROP command, if the object has one. Note we
511 * don't necessarily emit it verbatim; at this point we add an
512 * appropriate IF EXISTS clause, if the user requested it.
513 */
514 if (*te->dropStmt != '\0')
515 {
516 if (!ropt->if_exists)
517 {
518 /* No --if-exists? Then just use the original */
519 ahprintf(AH, "%s", te->dropStmt);
520 }
521 else
522 {
523 /*
524 * Inject an appropriate spelling of "if exists". For
525 * large objects, we have a separate routine that
526 * knows how to do it, without depending on
527 * te->dropStmt; use that. For other objects we need
528 * to parse the command.
529 */
530 if (strncmp(te->desc, "BLOB", 4) == 0)
531 {
532 DropBlobIfExists(AH, te->catalogId.oid);
533 }
534 else
535 {
536 char *dropStmt = pg_strdup(te->dropStmt);
537 char *dropStmtOrig = dropStmt;
538 PQExpBuffer ftStmt = createPQExpBuffer();
539
540 /*
541 * Need to inject IF EXISTS clause after ALTER
542 * TABLE part in ALTER TABLE .. DROP statement
543 */
544 if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
545 {
546 appendPQExpBuffer(ftStmt,
547 "ALTER TABLE IF EXISTS");
548 dropStmt = dropStmt + 11;
549 }
550
551 /*
552 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
553 * not support the IF EXISTS clause, and therefore
554 * we simply emit the original command for DEFAULT
555 * objects (modulo the adjustment made above).
556 *
557 * If we used CREATE OR REPLACE VIEW as a means of
558 * quasi-dropping an ON SELECT rule, that should
559 * be emitted unchanged as well.
560 *
561 * For other object types, we need to extract the
562 * first part of the DROP which includes the
563 * object type. Most of the time this matches
564 * te->desc, so search for that; however for the
565 * different kinds of CONSTRAINTs, we know to
566 * search for hardcoded "DROP CONSTRAINT" instead.
567 */
568 if (strcmp(te->desc, "DEFAULT") == 0 ||
569 strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
570 appendPQExpBufferStr(ftStmt, dropStmt);
571 else
572 {
573 char buffer[40];
574 char *mark;
575
576 if (strcmp(te->desc, "CONSTRAINT") == 0 ||
577 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
578 strcmp(te->desc, "FK CONSTRAINT") == 0)
579 strcpy(buffer, "DROP CONSTRAINT");
580 else
581 snprintf(buffer, sizeof(buffer), "DROP %s",
582 te->desc);
583
584 mark = strstr(dropStmt, buffer);
585
586 if (mark)
587 {
588 *mark = '\0';
589 appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
590 dropStmt, buffer,
591 mark + strlen(buffer));
592 }
593 else
594 {
595 /* complain and emit unmodified command */
596 write_msg(modulename,
597 "WARNING: could not find where to insert IF EXISTS in statement \"%s\"\n",
598 dropStmtOrig);
599 appendPQExpBufferStr(ftStmt, dropStmt);
600 }
601 }
602
603 ahprintf(AH, "%s", ftStmt->data);
604
605 destroyPQExpBuffer(ftStmt);
606 pg_free(dropStmtOrig);
607 }
608 }
609 }
610 }
611 }
612
613 /*
614 * _selectOutputSchema may have set currSchema to reflect the effect
615 * of a "SET search_path" command it emitted. However, by now we may
616 * have dropped that schema; or it might not have existed in the first
617 * place. In either case the effective value of search_path will not
618 * be what we think. Forcibly reset currSchema so that we will
619 * re-establish the search_path setting when needed (after creating
620 * the schema).
621 *
622 * If we treated users as pg_dump'able objects then we'd need to reset
623 * currUser here too.
624 */
625 if (AH->currSchema)
626 free(AH->currSchema);
627 AH->currSchema = NULL;
628 }
629
630 if (parallel_mode)
631 {
632 /*
633 * In parallel mode, turn control over to the parallel-restore logic.
634 */
635 ParallelState *pstate;
636 TocEntry pending_list;
637
638 par_list_header_init(&pending_list);
639
640 /* This runs PRE_DATA items and then disconnects from the database */
641 restore_toc_entries_prefork(AH, &pending_list);
642 Assert(AH->connection == NULL);
643
644 /* ParallelBackupStart() will actually fork the processes */
645 pstate = ParallelBackupStart(AH);
646 restore_toc_entries_parallel(AH, pstate, &pending_list);
647 ParallelBackupEnd(AH, pstate);
648
649 /* reconnect the master and see if we missed something */
650 restore_toc_entries_postfork(AH, &pending_list);
651 Assert(AH->connection != NULL);
652 }
653 else
654 {
655 /*
656 * In serial mode, process everything in three phases: normal items,
657 * then ACLs, then post-ACL items. We might be able to skip one or
658 * both extra phases in some cases, eg data-only restores.
659 */
660 bool haveACL = false;
661 bool havePostACL = false;
662
663 for (te = AH->toc->next; te != AH->toc; te = te->next)
664 {
665 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
666 continue; /* ignore if not to be dumped at all */
667
668 switch (_tocEntryRestorePass(te))
669 {
670 case RESTORE_PASS_MAIN:
671 (void) restore_toc_entry(AH, te, false);
672 break;
673 case RESTORE_PASS_ACL:
674 haveACL = true;
675 break;
676 case RESTORE_PASS_POST_ACL:
677 havePostACL = true;
678 break;
679 }
680 }
681
682 if (haveACL)
683 {
684 for (te = AH->toc->next; te != AH->toc; te = te->next)
685 {
686 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
687 _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
688 (void) restore_toc_entry(AH, te, false);
689 }
690 }
691
692 if (havePostACL)
693 {
694 for (te = AH->toc->next; te != AH->toc; te = te->next)
695 {
696 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
697 _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
698 (void) restore_toc_entry(AH, te, false);
699 }
700 }
701 }
702
703 if (ropt->single_txn)
704 {
705 if (AH->connection)
706 CommitTransaction(AHX);
707 else
708 ahprintf(AH, "COMMIT;\n\n");
709 }
710
711 if (AH->public.verbose)
712 dumpTimestamp(AH, "Completed on", time(NULL));
713
714 ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
715
716 /*
717 * Clean up & we're done.
718 */
719 AH->stage = STAGE_FINALIZING;
720
721 if (ropt->filename || ropt->compression)
722 RestoreOutput(AH, sav);
723
724 if (ropt->useDB)
725 DisconnectDatabase(&AH->public);
726 }
727
728 /*
729 * Restore a single TOC item. Used in both parallel and non-parallel restore;
730 * is_parallel is true if we are in a worker child process.
731 *
732 * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
733 * the parallel parent has to make the corresponding status update.
734 */
735 static int
restore_toc_entry(ArchiveHandle * AH,TocEntry * te,bool is_parallel)736 restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
737 {
738 RestoreOptions *ropt = AH->public.ropt;
739 int status = WORKER_OK;
740 teReqs reqs;
741 bool defnDumped;
742
743 AH->currentTE = te;
744
745 /* Work out what, if anything, we want from this entry */
746 reqs = te->reqs;
747
748 /*
749 * Ignore DATABASE entry unless we should create it. We must check this
750 * here, not in _tocEntryRequired, because the createDB option should not
751 * affect emitting a DATABASE entry to an archive file.
752 */
753 if (!ropt->createDB && strcmp(te->desc, "DATABASE") == 0)
754 reqs = 0;
755
756 /* Dump any relevant dump warnings to stderr */
757 if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
758 {
759 if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
760 write_msg(modulename, "warning from original dump file: %s\n", te->defn);
761 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
762 write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
763 }
764
765 defnDumped = false;
766
767 /*
768 * If it has a schema component that we want, then process that
769 */
770 if ((reqs & REQ_SCHEMA) != 0)
771 {
772 /* Show namespace in log message if available */
773 if (te->namespace)
774 ahlog(AH, 1, "creating %s \"%s.%s\"\n",
775 te->desc, te->namespace, te->tag);
776 else
777 ahlog(AH, 1, "creating %s \"%s\"\n", te->desc, te->tag);
778
779 _printTocEntry(AH, te, false);
780 defnDumped = true;
781
782 if (strcmp(te->desc, "TABLE") == 0)
783 {
784 if (AH->lastErrorTE == te)
785 {
786 /*
787 * We failed to create the table. If
788 * --no-data-for-failed-tables was given, mark the
789 * corresponding TABLE DATA to be ignored.
790 *
791 * In the parallel case this must be done in the parent, so we
792 * just set the return value.
793 */
794 if (ropt->noDataForFailedTables)
795 {
796 if (is_parallel)
797 status = WORKER_INHIBIT_DATA;
798 else
799 inhibit_data_for_failed_table(AH, te);
800 }
801 }
802 else
803 {
804 /*
805 * We created the table successfully. Mark the corresponding
806 * TABLE DATA for possible truncation.
807 *
808 * In the parallel case this must be done in the parent, so we
809 * just set the return value.
810 */
811 if (is_parallel)
812 status = WORKER_CREATE_DONE;
813 else
814 mark_create_done(AH, te);
815 }
816 }
817
818 /* If we created a DB, connect to it... */
819 if (strcmp(te->desc, "DATABASE") == 0)
820 {
821 ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
822 _reconnectToDB(AH, te->tag);
823 }
824 }
825
826 /*
827 * If it has a data component that we want, then process that
828 */
829 if ((reqs & REQ_DATA) != 0)
830 {
831 /*
832 * hadDumper will be set if there is genuine data component for this
833 * node. Otherwise, we need to check the defn field for statements
834 * that need to be executed in data-only restores.
835 */
836 if (te->hadDumper)
837 {
838 /*
839 * If we can output the data, then restore it.
840 */
841 if (AH->PrintTocDataPtr !=NULL)
842 {
843 _printTocEntry(AH, te, true);
844
845 if (strcmp(te->desc, "BLOBS") == 0 ||
846 strcmp(te->desc, "BLOB COMMENTS") == 0)
847 {
848 ahlog(AH, 1, "processing %s\n", te->desc);
849
850 _selectOutputSchema(AH, "pg_catalog");
851
852 /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
853 if (strcmp(te->desc, "BLOB COMMENTS") == 0)
854 AH->outputKind = OUTPUT_OTHERDATA;
855
856 (*AH->PrintTocDataPtr) (AH, te);
857
858 AH->outputKind = OUTPUT_SQLCMDS;
859 }
860 else
861 {
862 _disableTriggersIfNecessary(AH, te);
863
864 /* Select owner and schema as necessary */
865 _becomeOwner(AH, te);
866 _selectOutputSchema(AH, te->namespace);
867
868 ahlog(AH, 1, "processing data for table \"%s.%s\"\n",
869 te->namespace, te->tag);
870
871 /*
872 * In parallel restore, if we created the table earlier in
873 * the run then we wrap the COPY in a transaction and
874 * precede it with a TRUNCATE. If archiving is not on
875 * this prevents WAL-logging the COPY. This obtains a
876 * speedup similar to that from using single_txn mode in
877 * non-parallel restores.
878 */
879 if (is_parallel && te->created)
880 {
881 /*
882 * Parallel restore is always talking directly to a
883 * server, so no need to see if we should issue BEGIN.
884 */
885 StartTransaction(&AH->public);
886
887 /*
888 * If the server version is >= 8.4, make sure we issue
889 * TRUNCATE with ONLY so that child tables are not
890 * wiped.
891 */
892 ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
893 (PQserverVersion(AH->connection) >= 80400 ?
894 "ONLY " : ""),
895 fmtQualifiedId(PQserverVersion(AH->connection),
896 te->namespace,
897 te->tag));
898 }
899
900 /*
901 * If we have a copy statement, use it.
902 */
903 if (te->copyStmt && strlen(te->copyStmt) > 0)
904 {
905 ahprintf(AH, "%s", te->copyStmt);
906 AH->outputKind = OUTPUT_COPYDATA;
907 }
908 else
909 AH->outputKind = OUTPUT_OTHERDATA;
910
911 (*AH->PrintTocDataPtr) (AH, te);
912
913 /*
914 * Terminate COPY if needed.
915 */
916 if (AH->outputKind == OUTPUT_COPYDATA &&
917 RestoringToDB(AH))
918 EndDBCopyMode(&AH->public, te->tag);
919 AH->outputKind = OUTPUT_SQLCMDS;
920
921 /* close out the transaction started above */
922 if (is_parallel && te->created)
923 CommitTransaction(&AH->public);
924
925 _enableTriggersIfNecessary(AH, te);
926 }
927 }
928 }
929 else if (!defnDumped)
930 {
931 /* If we haven't already dumped the defn part, do so now */
932 ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
933 _printTocEntry(AH, te, false);
934 }
935 }
936
937 if (AH->public.n_errors > 0 && status == WORKER_OK)
938 status = WORKER_IGNORED_ERRORS;
939
940 return status;
941 }
942
943 /*
944 * Allocate a new RestoreOptions block.
945 * This is mainly so we can initialize it, but also for future expansion,
946 */
947 RestoreOptions *
NewRestoreOptions(void)948 NewRestoreOptions(void)
949 {
950 RestoreOptions *opts;
951
952 opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
953
954 /* set any fields that shouldn't default to zeroes */
955 opts->format = archUnknown;
956 opts->cparams.promptPassword = TRI_DEFAULT;
957 opts->dumpSections = DUMP_UNSECTIONED;
958
959 return opts;
960 }
961
962 static void
_disableTriggersIfNecessary(ArchiveHandle * AH,TocEntry * te)963 _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
964 {
965 RestoreOptions *ropt = AH->public.ropt;
966
967 /* This hack is only needed in a data-only restore */
968 if (!ropt->dataOnly || !ropt->disable_triggers)
969 return;
970
971 ahlog(AH, 1, "disabling triggers for %s\n", te->tag);
972
973 /*
974 * Become superuser if possible, since they are the only ones who can
975 * disable constraint triggers. If -S was not given, assume the initial
976 * user identity is a superuser. (XXX would it be better to become the
977 * table owner?)
978 */
979 _becomeUser(AH, ropt->superuser);
980
981 /*
982 * Disable them. Assume that the table name should be schema-qualified
983 * (we can't look at PQserverVersion, since we might not have any
984 * connection; and anyway we don't promise our output will load pre-7.3).
985 */
986 ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
987 fmtQualifiedId(70300,
988 te->namespace,
989 te->tag));
990 }
991
992 static void
_enableTriggersIfNecessary(ArchiveHandle * AH,TocEntry * te)993 _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
994 {
995 RestoreOptions *ropt = AH->public.ropt;
996
997 /* This hack is only needed in a data-only restore */
998 if (!ropt->dataOnly || !ropt->disable_triggers)
999 return;
1000
1001 ahlog(AH, 1, "enabling triggers for %s\n", te->tag);
1002
1003 /*
1004 * Become superuser if possible, since they are the only ones who can
1005 * disable constraint triggers. If -S was not given, assume the initial
1006 * user identity is a superuser. (XXX would it be better to become the
1007 * table owner?)
1008 */
1009 _becomeUser(AH, ropt->superuser);
1010
1011 /*
1012 * Enable them. As above, force schema qualification.
1013 */
1014 ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1015 fmtQualifiedId(70300,
1016 te->namespace,
1017 te->tag));
1018 }
1019
1020 /*
1021 * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1022 */
1023
1024 /* Public */
1025 void
WriteData(Archive * AHX,const void * data,size_t dLen)1026 WriteData(Archive *AHX, const void *data, size_t dLen)
1027 {
1028 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1029
1030 if (!AH->currToc)
1031 exit_horribly(modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
1032
1033 (*AH->WriteDataPtr) (AH, data, dLen);
1034
1035 return;
1036 }
1037
1038 /*
1039 * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1040 * repository for all metadata. But the name has stuck.
1041 */
1042
1043 /* Public */
1044 void
ArchiveEntry(Archive * AHX,CatalogId catalogId,DumpId dumpId,const char * tag,const char * namespace,const char * tablespace,const char * owner,bool withOids,const char * desc,teSection section,const char * defn,const char * dropStmt,const char * copyStmt,const DumpId * deps,int nDeps,DataDumperPtr dumpFn,void * dumpArg)1045 ArchiveEntry(Archive *AHX,
1046 CatalogId catalogId, DumpId dumpId,
1047 const char *tag,
1048 const char *namespace,
1049 const char *tablespace,
1050 const char *owner, bool withOids,
1051 const char *desc, teSection section,
1052 const char *defn,
1053 const char *dropStmt, const char *copyStmt,
1054 const DumpId *deps, int nDeps,
1055 DataDumperPtr dumpFn, void *dumpArg)
1056 {
1057 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1058 TocEntry *newToc;
1059
1060 newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1061
1062 AH->tocCount++;
1063 if (dumpId > AH->maxDumpId)
1064 AH->maxDumpId = dumpId;
1065
1066 newToc->prev = AH->toc->prev;
1067 newToc->next = AH->toc;
1068 AH->toc->prev->next = newToc;
1069 AH->toc->prev = newToc;
1070
1071 newToc->catalogId = catalogId;
1072 newToc->dumpId = dumpId;
1073 newToc->section = section;
1074
1075 newToc->tag = pg_strdup(tag);
1076 newToc->namespace = namespace ? pg_strdup(namespace) : NULL;
1077 newToc->tablespace = tablespace ? pg_strdup(tablespace) : NULL;
1078 newToc->owner = pg_strdup(owner);
1079 newToc->withOids = withOids;
1080 newToc->desc = pg_strdup(desc);
1081 newToc->defn = pg_strdup(defn);
1082 newToc->dropStmt = pg_strdup(dropStmt);
1083 newToc->copyStmt = copyStmt ? pg_strdup(copyStmt) : NULL;
1084
1085 if (nDeps > 0)
1086 {
1087 newToc->dependencies = (DumpId *) pg_malloc(nDeps * sizeof(DumpId));
1088 memcpy(newToc->dependencies, deps, nDeps * sizeof(DumpId));
1089 newToc->nDeps = nDeps;
1090 }
1091 else
1092 {
1093 newToc->dependencies = NULL;
1094 newToc->nDeps = 0;
1095 }
1096
1097 newToc->dataDumper = dumpFn;
1098 newToc->dataDumperArg = dumpArg;
1099 newToc->hadDumper = dumpFn ? true : false;
1100
1101 newToc->formatData = NULL;
1102
1103 if (AH->ArchiveEntryPtr !=NULL)
1104 (*AH->ArchiveEntryPtr) (AH, newToc);
1105 }
1106
1107 /* Public */
1108 void
PrintTOCSummary(Archive * AHX)1109 PrintTOCSummary(Archive *AHX)
1110 {
1111 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1112 RestoreOptions *ropt = AH->public.ropt;
1113 TocEntry *te;
1114 teSection curSection;
1115 OutputContext sav;
1116 const char *fmtName;
1117 char stamp_str[64];
1118
1119 sav = SaveOutput(AH);
1120 if (ropt->filename)
1121 SetOutput(AH, ropt->filename, 0 /* no compression */ );
1122
1123 if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1124 localtime(&AH->createDate)) == 0)
1125 strcpy(stamp_str, "[unknown]");
1126
1127 ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1128 ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n",
1129 replace_line_endings(AH->archdbname),
1130 AH->tocCount, AH->compression);
1131
1132 switch (AH->format)
1133 {
1134 case archCustom:
1135 fmtName = "CUSTOM";
1136 break;
1137 case archDirectory:
1138 fmtName = "DIRECTORY";
1139 break;
1140 case archTar:
1141 fmtName = "TAR";
1142 break;
1143 default:
1144 fmtName = "UNKNOWN";
1145 }
1146
1147 ahprintf(AH, "; Dump Version: %d.%d-%d\n", AH->vmaj, AH->vmin, AH->vrev);
1148 ahprintf(AH, "; Format: %s\n", fmtName);
1149 ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1150 ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1151 if (AH->archiveRemoteVersion)
1152 ahprintf(AH, "; Dumped from database version: %s\n",
1153 AH->archiveRemoteVersion);
1154 if (AH->archiveDumpVersion)
1155 ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1156 AH->archiveDumpVersion);
1157
1158 ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1159
1160 curSection = SECTION_PRE_DATA;
1161 for (te = AH->toc->next; te != AH->toc; te = te->next)
1162 {
1163 if (te->section != SECTION_NONE)
1164 curSection = te->section;
1165 if (ropt->verbose ||
1166 (_tocEntryRequired(te, curSection, ropt) & (REQ_SCHEMA | REQ_DATA)) != 0)
1167 {
1168 char *sanitized_name;
1169 char *sanitized_schema;
1170 char *sanitized_owner;
1171
1172 /*
1173 * As in _printTocEntry(), sanitize strings that might contain
1174 * newlines, to ensure that each logical output line is in fact
1175 * one physical output line. This prevents confusion when the
1176 * file is read by "pg_restore -L". Note that we currently don't
1177 * bother to quote names, meaning that the name fields aren't
1178 * automatically parseable. "pg_restore -L" doesn't care because
1179 * it only examines the dumpId field, but someday we might want to
1180 * try harder.
1181 */
1182 sanitized_name = replace_line_endings(te->tag);
1183 if (te->namespace)
1184 sanitized_schema = replace_line_endings(te->namespace);
1185 else
1186 sanitized_schema = pg_strdup("-");
1187 sanitized_owner = replace_line_endings(te->owner);
1188
1189 ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1190 te->catalogId.tableoid, te->catalogId.oid,
1191 te->desc, sanitized_schema, sanitized_name,
1192 sanitized_owner);
1193
1194 free(sanitized_name);
1195 free(sanitized_schema);
1196 free(sanitized_owner);
1197 }
1198 if (ropt->verbose && te->nDeps > 0)
1199 {
1200 int i;
1201
1202 ahprintf(AH, ";\tdepends on:");
1203 for (i = 0; i < te->nDeps; i++)
1204 ahprintf(AH, " %d", te->dependencies[i]);
1205 ahprintf(AH, "\n");
1206 }
1207 }
1208
1209 /* Enforce strict names checking */
1210 if (ropt->strict_names)
1211 StrictNamesCheck(ropt);
1212
1213 if (ropt->filename)
1214 RestoreOutput(AH, sav);
1215 }
1216
1217 /***********
1218 * BLOB Archival
1219 ***********/
1220
1221 /* Called by a dumper to signal start of a BLOB */
1222 int
StartBlob(Archive * AHX,Oid oid)1223 StartBlob(Archive *AHX, Oid oid)
1224 {
1225 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1226
1227 if (!AH->StartBlobPtr)
1228 exit_horribly(modulename, "large-object output not supported in chosen format\n");
1229
1230 (*AH->StartBlobPtr) (AH, AH->currToc, oid);
1231
1232 return 1;
1233 }
1234
1235 /* Called by a dumper to signal end of a BLOB */
1236 int
EndBlob(Archive * AHX,Oid oid)1237 EndBlob(Archive *AHX, Oid oid)
1238 {
1239 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1240
1241 if (AH->EndBlobPtr)
1242 (*AH->EndBlobPtr) (AH, AH->currToc, oid);
1243
1244 return 1;
1245 }
1246
1247 /**********
1248 * BLOB Restoration
1249 **********/
1250
1251 /*
1252 * Called by a format handler before any blobs are restored
1253 */
1254 void
StartRestoreBlobs(ArchiveHandle * AH)1255 StartRestoreBlobs(ArchiveHandle *AH)
1256 {
1257 RestoreOptions *ropt = AH->public.ropt;
1258
1259 if (!ropt->single_txn)
1260 {
1261 if (AH->connection)
1262 StartTransaction(&AH->public);
1263 else
1264 ahprintf(AH, "BEGIN;\n\n");
1265 }
1266
1267 AH->blobCount = 0;
1268 }
1269
1270 /*
1271 * Called by a format handler after all blobs are restored
1272 */
1273 void
EndRestoreBlobs(ArchiveHandle * AH)1274 EndRestoreBlobs(ArchiveHandle *AH)
1275 {
1276 RestoreOptions *ropt = AH->public.ropt;
1277
1278 if (!ropt->single_txn)
1279 {
1280 if (AH->connection)
1281 CommitTransaction(&AH->public);
1282 else
1283 ahprintf(AH, "COMMIT;\n\n");
1284 }
1285
1286 ahlog(AH, 1, ngettext("restored %d large object\n",
1287 "restored %d large objects\n",
1288 AH->blobCount),
1289 AH->blobCount);
1290 }
1291
1292
1293 /*
1294 * Called by a format handler to initiate restoration of a blob
1295 */
1296 void
StartRestoreBlob(ArchiveHandle * AH,Oid oid,bool drop)1297 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
1298 {
1299 bool old_blob_style = (AH->version < K_VERS_1_12);
1300 Oid loOid;
1301
1302 AH->blobCount++;
1303
1304 /* Initialize the LO Buffer */
1305 AH->lo_buf_used = 0;
1306
1307 ahlog(AH, 1, "restoring large object with OID %u\n", oid);
1308
1309 /* With an old archive we must do drop and create logic here */
1310 if (old_blob_style && drop)
1311 DropBlobIfExists(AH, oid);
1312
1313 if (AH->connection)
1314 {
1315 if (old_blob_style)
1316 {
1317 loOid = lo_create(AH->connection, oid);
1318 if (loOid == 0 || loOid != oid)
1319 exit_horribly(modulename, "could not create large object %u: %s",
1320 oid, PQerrorMessage(AH->connection));
1321 }
1322 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1323 if (AH->loFd == -1)
1324 exit_horribly(modulename, "could not open large object %u: %s",
1325 oid, PQerrorMessage(AH->connection));
1326 }
1327 else
1328 {
1329 if (old_blob_style)
1330 ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1331 oid, INV_WRITE);
1332 else
1333 ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1334 oid, INV_WRITE);
1335 }
1336
1337 AH->writingBlob = 1;
1338 }
1339
1340 void
EndRestoreBlob(ArchiveHandle * AH,Oid oid)1341 EndRestoreBlob(ArchiveHandle *AH, Oid oid)
1342 {
1343 if (AH->lo_buf_used > 0)
1344 {
1345 /* Write remaining bytes from the LO buffer */
1346 dump_lo_buf(AH);
1347 }
1348
1349 AH->writingBlob = 0;
1350
1351 if (AH->connection)
1352 {
1353 lo_close(AH->connection, AH->loFd);
1354 AH->loFd = -1;
1355 }
1356 else
1357 {
1358 ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1359 }
1360 }
1361
1362 /***********
1363 * Sorting and Reordering
1364 ***********/
1365
1366 void
SortTocFromFile(Archive * AHX)1367 SortTocFromFile(Archive *AHX)
1368 {
1369 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1370 RestoreOptions *ropt = AH->public.ropt;
1371 FILE *fh;
1372 char buf[100];
1373 bool incomplete_line;
1374
1375 /* Allocate space for the 'wanted' array, and init it */
1376 ropt->idWanted = (bool *) pg_malloc(sizeof(bool) * AH->maxDumpId);
1377 memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
1378
1379 /* Setup the file */
1380 fh = fopen(ropt->tocFile, PG_BINARY_R);
1381 if (!fh)
1382 exit_horribly(modulename, "could not open TOC file \"%s\": %s\n",
1383 ropt->tocFile, strerror(errno));
1384
1385 incomplete_line = false;
1386 while (fgets(buf, sizeof(buf), fh) != NULL)
1387 {
1388 bool prev_incomplete_line = incomplete_line;
1389 int buflen;
1390 char *cmnt;
1391 char *endptr;
1392 DumpId id;
1393 TocEntry *te;
1394
1395 /*
1396 * Some lines in the file might be longer than sizeof(buf). This is
1397 * no problem, since we only care about the leading numeric ID which
1398 * can be at most a few characters; but we have to skip continuation
1399 * bufferloads when processing a long line.
1400 */
1401 buflen = strlen(buf);
1402 if (buflen > 0 && buf[buflen - 1] == '\n')
1403 incomplete_line = false;
1404 else
1405 incomplete_line = true;
1406 if (prev_incomplete_line)
1407 continue;
1408
1409 /* Truncate line at comment, if any */
1410 cmnt = strchr(buf, ';');
1411 if (cmnt != NULL)
1412 cmnt[0] = '\0';
1413
1414 /* Ignore if all blank */
1415 if (strspn(buf, " \t\r\n") == strlen(buf))
1416 continue;
1417
1418 /* Get an ID, check it's valid and not already seen */
1419 id = strtol(buf, &endptr, 10);
1420 if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
1421 ropt->idWanted[id - 1])
1422 {
1423 write_msg(modulename, "WARNING: line ignored: %s\n", buf);
1424 continue;
1425 }
1426
1427 /* Find TOC entry */
1428 te = getTocEntryByDumpId(AH, id);
1429 if (!te)
1430 exit_horribly(modulename, "could not find entry for ID %d\n",
1431 id);
1432
1433 /* Mark it wanted */
1434 ropt->idWanted[id - 1] = true;
1435
1436 /*
1437 * Move each item to the end of the list as it is selected, so that
1438 * they are placed in the desired order. Any unwanted items will end
1439 * up at the front of the list, which may seem unintuitive but it's
1440 * what we need. In an ordinary serial restore that makes no
1441 * difference, but in a parallel restore we need to mark unrestored
1442 * items' dependencies as satisfied before we start examining
1443 * restorable items. Otherwise they could have surprising
1444 * side-effects on the order in which restorable items actually get
1445 * restored.
1446 */
1447 _moveBefore(AH, AH->toc, te);
1448 }
1449
1450 if (fclose(fh) != 0)
1451 exit_horribly(modulename, "could not close TOC file: %s\n",
1452 strerror(errno));
1453 }
1454
1455 /**********************
1456 * 'Convenience functions that look like standard IO functions
1457 * for writing data when in dump mode.
1458 **********************/
1459
1460 /* Public */
1461 void
archputs(const char * s,Archive * AH)1462 archputs(const char *s, Archive *AH)
1463 {
1464 WriteData(AH, s, strlen(s));
1465 return;
1466 }
1467
1468 /* Public */
1469 int
archprintf(Archive * AH,const char * fmt,...)1470 archprintf(Archive *AH, const char *fmt,...)
1471 {
1472 char *p;
1473 size_t len = 128; /* initial assumption about buffer size */
1474 size_t cnt;
1475
1476 for (;;)
1477 {
1478 va_list args;
1479
1480 /* Allocate work buffer. */
1481 p = (char *) pg_malloc(len);
1482
1483 /* Try to format the data. */
1484 va_start(args, fmt);
1485 cnt = pvsnprintf(p, len, fmt, args);
1486 va_end(args);
1487
1488 if (cnt < len)
1489 break; /* success */
1490
1491 /* Release buffer and loop around to try again with larger len. */
1492 free(p);
1493 len = cnt;
1494 }
1495
1496 WriteData(AH, p, cnt);
1497 free(p);
1498 return (int) cnt;
1499 }
1500
1501
1502 /*******************************
1503 * Stuff below here should be 'private' to the archiver routines
1504 *******************************/
1505
1506 static void
SetOutput(ArchiveHandle * AH,const char * filename,int compression)1507 SetOutput(ArchiveHandle *AH, const char *filename, int compression)
1508 {
1509 int fn;
1510
1511 if (filename)
1512 {
1513 if (strcmp(filename, "-") == 0)
1514 fn = fileno(stdout);
1515 else
1516 fn = -1;
1517 }
1518 else if (AH->FH)
1519 fn = fileno(AH->FH);
1520 else if (AH->fSpec)
1521 {
1522 fn = -1;
1523 filename = AH->fSpec;
1524 }
1525 else
1526 fn = fileno(stdout);
1527
1528 /* If compression explicitly requested, use gzopen */
1529 #ifdef HAVE_LIBZ
1530 if (compression != 0)
1531 {
1532 char fmode[10];
1533
1534 /* Don't use PG_BINARY_x since this is zlib */
1535 sprintf(fmode, "wb%d", compression);
1536 if (fn >= 0)
1537 AH->OF = gzdopen(dup(fn), fmode);
1538 else
1539 AH->OF = gzopen(filename, fmode);
1540 AH->gzOut = 1;
1541 }
1542 else
1543 #endif
1544 { /* Use fopen */
1545 if (AH->mode == archModeAppend)
1546 {
1547 if (fn >= 0)
1548 AH->OF = fdopen(dup(fn), PG_BINARY_A);
1549 else
1550 AH->OF = fopen(filename, PG_BINARY_A);
1551 }
1552 else
1553 {
1554 if (fn >= 0)
1555 AH->OF = fdopen(dup(fn), PG_BINARY_W);
1556 else
1557 AH->OF = fopen(filename, PG_BINARY_W);
1558 }
1559 AH->gzOut = 0;
1560 }
1561
1562 if (!AH->OF)
1563 {
1564 if (filename)
1565 exit_horribly(modulename, "could not open output file \"%s\": %s\n",
1566 filename, strerror(errno));
1567 else
1568 exit_horribly(modulename, "could not open output file: %s\n",
1569 strerror(errno));
1570 }
1571 }
1572
1573 static OutputContext
SaveOutput(ArchiveHandle * AH)1574 SaveOutput(ArchiveHandle *AH)
1575 {
1576 OutputContext sav;
1577
1578 sav.OF = AH->OF;
1579 sav.gzOut = AH->gzOut;
1580
1581 return sav;
1582 }
1583
1584 static void
RestoreOutput(ArchiveHandle * AH,OutputContext savedContext)1585 RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
1586 {
1587 int res;
1588
1589 if (AH->gzOut)
1590 res = GZCLOSE(AH->OF);
1591 else
1592 res = fclose(AH->OF);
1593
1594 if (res != 0)
1595 exit_horribly(modulename, "could not close output file: %s\n",
1596 strerror(errno));
1597
1598 AH->gzOut = savedContext.gzOut;
1599 AH->OF = savedContext.OF;
1600 }
1601
1602
1603
1604 /*
1605 * Print formatted text to the output file (usually stdout).
1606 */
1607 int
ahprintf(ArchiveHandle * AH,const char * fmt,...)1608 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1609 {
1610 char *p;
1611 size_t len = 128; /* initial assumption about buffer size */
1612 size_t cnt;
1613
1614 for (;;)
1615 {
1616 va_list args;
1617
1618 /* Allocate work buffer. */
1619 p = (char *) pg_malloc(len);
1620
1621 /* Try to format the data. */
1622 va_start(args, fmt);
1623 cnt = pvsnprintf(p, len, fmt, args);
1624 va_end(args);
1625
1626 if (cnt < len)
1627 break; /* success */
1628
1629 /* Release buffer and loop around to try again with larger len. */
1630 free(p);
1631 len = cnt;
1632 }
1633
1634 ahwrite(p, 1, cnt, AH);
1635 free(p);
1636 return (int) cnt;
1637 }
1638
1639 void
ahlog(ArchiveHandle * AH,int level,const char * fmt,...)1640 ahlog(ArchiveHandle *AH, int level, const char *fmt,...)
1641 {
1642 va_list ap;
1643
1644 if (AH->debugLevel < level && (!AH->public.verbose || level > 1))
1645 return;
1646
1647 va_start(ap, fmt);
1648 vwrite_msg(NULL, fmt, ap);
1649 va_end(ap);
1650 }
1651
1652 /*
1653 * Single place for logic which says 'We are restoring to a direct DB connection'.
1654 */
1655 static int
RestoringToDB(ArchiveHandle * AH)1656 RestoringToDB(ArchiveHandle *AH)
1657 {
1658 RestoreOptions *ropt = AH->public.ropt;
1659
1660 return (ropt && ropt->useDB && AH->connection);
1661 }
1662
1663 /*
1664 * Dump the current contents of the LO data buffer while writing a BLOB
1665 */
1666 static void
dump_lo_buf(ArchiveHandle * AH)1667 dump_lo_buf(ArchiveHandle *AH)
1668 {
1669 if (AH->connection)
1670 {
1671 size_t res;
1672
1673 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1674 ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n",
1675 "wrote %lu bytes of large object data (result = %lu)\n",
1676 AH->lo_buf_used),
1677 (unsigned long) AH->lo_buf_used, (unsigned long) res);
1678 if (res != AH->lo_buf_used)
1679 exit_horribly(modulename,
1680 "could not write to large object (result: %lu, expected: %lu)\n",
1681 (unsigned long) res, (unsigned long) AH->lo_buf_used);
1682 }
1683 else
1684 {
1685 PQExpBuffer buf = createPQExpBuffer();
1686
1687 appendByteaLiteralAHX(buf,
1688 (const unsigned char *) AH->lo_buf,
1689 AH->lo_buf_used,
1690 AH);
1691
1692 /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1693 AH->writingBlob = 0;
1694 ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1695 AH->writingBlob = 1;
1696
1697 destroyPQExpBuffer(buf);
1698 }
1699 AH->lo_buf_used = 0;
1700 }
1701
1702
1703 /*
1704 * Write buffer to the output file (usually stdout). This is used for
1705 * outputting 'restore' scripts etc. It is even possible for an archive
1706 * format to create a custom output routine to 'fake' a restore if it
1707 * wants to generate a script (see TAR output).
1708 */
1709 void
ahwrite(const void * ptr,size_t size,size_t nmemb,ArchiveHandle * AH)1710 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1711 {
1712 int bytes_written = 0;
1713
1714 if (AH->writingBlob)
1715 {
1716 size_t remaining = size * nmemb;
1717
1718 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1719 {
1720 size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1721
1722 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1723 ptr = (const void *) ((const char *) ptr + avail);
1724 remaining -= avail;
1725 AH->lo_buf_used += avail;
1726 dump_lo_buf(AH);
1727 }
1728
1729 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1730 AH->lo_buf_used += remaining;
1731
1732 bytes_written = size * nmemb;
1733 }
1734 else if (AH->gzOut)
1735 bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
1736 else if (AH->CustomOutPtr)
1737 bytes_written = AH->CustomOutPtr (AH, ptr, size * nmemb);
1738
1739 else
1740 {
1741 /*
1742 * If we're doing a restore, and it's direct to DB, and we're
1743 * connected then send it to the DB.
1744 */
1745 if (RestoringToDB(AH))
1746 bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1747 else
1748 bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
1749 }
1750
1751 if (bytes_written != size * nmemb)
1752 WRITE_ERROR_EXIT;
1753
1754 return;
1755 }
1756
1757 /* on some error, we may decide to go on... */
1758 void
warn_or_exit_horribly(ArchiveHandle * AH,const char * modulename,const char * fmt,...)1759 warn_or_exit_horribly(ArchiveHandle *AH,
1760 const char *modulename, const char *fmt,...)
1761 {
1762 va_list ap;
1763
1764 switch (AH->stage)
1765 {
1766
1767 case STAGE_NONE:
1768 /* Do nothing special */
1769 break;
1770
1771 case STAGE_INITIALIZING:
1772 if (AH->stage != AH->lastErrorStage)
1773 write_msg(modulename, "Error while INITIALIZING:\n");
1774 break;
1775
1776 case STAGE_PROCESSING:
1777 if (AH->stage != AH->lastErrorStage)
1778 write_msg(modulename, "Error while PROCESSING TOC:\n");
1779 break;
1780
1781 case STAGE_FINALIZING:
1782 if (AH->stage != AH->lastErrorStage)
1783 write_msg(modulename, "Error while FINALIZING:\n");
1784 break;
1785 }
1786 if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1787 {
1788 write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n",
1789 AH->currentTE->dumpId,
1790 AH->currentTE->catalogId.tableoid,
1791 AH->currentTE->catalogId.oid,
1792 AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1793 AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1794 AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1795 }
1796 AH->lastErrorStage = AH->stage;
1797 AH->lastErrorTE = AH->currentTE;
1798
1799 va_start(ap, fmt);
1800 vwrite_msg(modulename, fmt, ap);
1801 va_end(ap);
1802
1803 if (AH->public.exit_on_error)
1804 exit_nicely(1);
1805 else
1806 AH->public.n_errors++;
1807 }
1808
1809 #ifdef NOT_USED
1810
1811 static void
_moveAfter(ArchiveHandle * AH,TocEntry * pos,TocEntry * te)1812 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1813 {
1814 /* Unlink te from list */
1815 te->prev->next = te->next;
1816 te->next->prev = te->prev;
1817
1818 /* and insert it after "pos" */
1819 te->prev = pos;
1820 te->next = pos->next;
1821 pos->next->prev = te;
1822 pos->next = te;
1823 }
1824 #endif
1825
1826 static void
_moveBefore(ArchiveHandle * AH,TocEntry * pos,TocEntry * te)1827 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1828 {
1829 /* Unlink te from list */
1830 te->prev->next = te->next;
1831 te->next->prev = te->prev;
1832
1833 /* and insert it before "pos" */
1834 te->prev = pos->prev;
1835 te->next = pos;
1836 pos->prev->next = te;
1837 pos->prev = te;
1838 }
1839
1840 /*
1841 * Build index arrays for the TOC list
1842 *
1843 * This should be invoked only after we have created or read in all the TOC
1844 * items.
1845 *
1846 * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1847 * array entries run only up to maxDumpId. We might see dependency dump IDs
1848 * beyond that (if the dump was partial); so always check the array bound
1849 * before trying to touch an array entry.
1850 */
1851 static void
buildTocEntryArrays(ArchiveHandle * AH)1852 buildTocEntryArrays(ArchiveHandle *AH)
1853 {
1854 DumpId maxDumpId = AH->maxDumpId;
1855 TocEntry *te;
1856
1857 AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1858 AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1859
1860 for (te = AH->toc->next; te != AH->toc; te = te->next)
1861 {
1862 /* this check is purely paranoia, maxDumpId should be correct */
1863 if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1864 exit_horribly(modulename, "bad dumpId\n");
1865
1866 /* tocsByDumpId indexes all TOCs by their dump ID */
1867 AH->tocsByDumpId[te->dumpId] = te;
1868
1869 /*
1870 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1871 * TOC entry that has a DATA item. We compute this by reversing the
1872 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1873 * just one dependency and it is the TABLE item.
1874 */
1875 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1876 {
1877 DumpId tableId = te->dependencies[0];
1878
1879 /*
1880 * The TABLE item might not have been in the archive, if this was
1881 * a data-only dump; but its dump ID should be less than its data
1882 * item's dump ID, so there should be a place for it in the array.
1883 */
1884 if (tableId <= 0 || tableId > maxDumpId)
1885 exit_horribly(modulename, "bad table dumpId for TABLE DATA item\n");
1886
1887 AH->tableDataId[tableId] = te->dumpId;
1888 }
1889 }
1890 }
1891
1892 TocEntry *
getTocEntryByDumpId(ArchiveHandle * AH,DumpId id)1893 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1894 {
1895 /* build index arrays if we didn't already */
1896 if (AH->tocsByDumpId == NULL)
1897 buildTocEntryArrays(AH);
1898
1899 if (id > 0 && id <= AH->maxDumpId)
1900 return AH->tocsByDumpId[id];
1901
1902 return NULL;
1903 }
1904
1905 teReqs
TocIDRequired(ArchiveHandle * AH,DumpId id)1906 TocIDRequired(ArchiveHandle *AH, DumpId id)
1907 {
1908 TocEntry *te = getTocEntryByDumpId(AH, id);
1909
1910 if (!te)
1911 return 0;
1912
1913 return te->reqs;
1914 }
1915
1916 size_t
WriteOffset(ArchiveHandle * AH,pgoff_t o,int wasSet)1917 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1918 {
1919 int off;
1920
1921 /* Save the flag */
1922 (*AH->WriteBytePtr) (AH, wasSet);
1923
1924 /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1925 for (off = 0; off < sizeof(pgoff_t); off++)
1926 {
1927 (*AH->WriteBytePtr) (AH, o & 0xFF);
1928 o >>= 8;
1929 }
1930 return sizeof(pgoff_t) + 1;
1931 }
1932
1933 int
ReadOffset(ArchiveHandle * AH,pgoff_t * o)1934 ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1935 {
1936 int i;
1937 int off;
1938 int offsetFlg;
1939
1940 /* Initialize to zero */
1941 *o = 0;
1942
1943 /* Check for old version */
1944 if (AH->version < K_VERS_1_7)
1945 {
1946 /* Prior versions wrote offsets using WriteInt */
1947 i = ReadInt(AH);
1948 /* -1 means not set */
1949 if (i < 0)
1950 return K_OFFSET_POS_NOT_SET;
1951 else if (i == 0)
1952 return K_OFFSET_NO_DATA;
1953
1954 /* Cast to pgoff_t because it was written as an int. */
1955 *o = (pgoff_t) i;
1956 return K_OFFSET_POS_SET;
1957 }
1958
1959 /*
1960 * Read the flag indicating the state of the data pointer. Check if valid
1961 * and die if not.
1962 *
1963 * This used to be handled by a negative or zero pointer, now we use an
1964 * extra byte specifically for the state.
1965 */
1966 offsetFlg = (*AH->ReadBytePtr) (AH) & 0xFF;
1967
1968 switch (offsetFlg)
1969 {
1970 case K_OFFSET_POS_NOT_SET:
1971 case K_OFFSET_NO_DATA:
1972 case K_OFFSET_POS_SET:
1973
1974 break;
1975
1976 default:
1977 exit_horribly(modulename, "unexpected data offset flag %d\n", offsetFlg);
1978 }
1979
1980 /*
1981 * Read the bytes
1982 */
1983 for (off = 0; off < AH->offSize; off++)
1984 {
1985 if (off < sizeof(pgoff_t))
1986 *o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8);
1987 else
1988 {
1989 if ((*AH->ReadBytePtr) (AH) != 0)
1990 exit_horribly(modulename, "file offset in dump file is too large\n");
1991 }
1992 }
1993
1994 return offsetFlg;
1995 }
1996
1997 size_t
WriteInt(ArchiveHandle * AH,int i)1998 WriteInt(ArchiveHandle *AH, int i)
1999 {
2000 int b;
2001
2002 /*
2003 * This is a bit yucky, but I don't want to make the binary format very
2004 * dependent on representation, and not knowing much about it, I write out
2005 * a sign byte. If you change this, don't forget to change the file
2006 * version #, and modify readInt to read the new format AS WELL AS the old
2007 * formats.
2008 */
2009
2010 /* SIGN byte */
2011 if (i < 0)
2012 {
2013 (*AH->WriteBytePtr) (AH, 1);
2014 i = -i;
2015 }
2016 else
2017 (*AH->WriteBytePtr) (AH, 0);
2018
2019 for (b = 0; b < AH->intSize; b++)
2020 {
2021 (*AH->WriteBytePtr) (AH, i & 0xFF);
2022 i >>= 8;
2023 }
2024
2025 return AH->intSize + 1;
2026 }
2027
2028 int
ReadInt(ArchiveHandle * AH)2029 ReadInt(ArchiveHandle *AH)
2030 {
2031 int res = 0;
2032 int bv,
2033 b;
2034 int sign = 0; /* Default positive */
2035 int bitShift = 0;
2036
2037 if (AH->version > K_VERS_1_0)
2038 /* Read a sign byte */
2039 sign = (*AH->ReadBytePtr) (AH);
2040
2041 for (b = 0; b < AH->intSize; b++)
2042 {
2043 bv = (*AH->ReadBytePtr) (AH) & 0xFF;
2044 if (bv != 0)
2045 res = res + (bv << bitShift);
2046 bitShift += 8;
2047 }
2048
2049 if (sign)
2050 res = -res;
2051
2052 return res;
2053 }
2054
2055 size_t
WriteStr(ArchiveHandle * AH,const char * c)2056 WriteStr(ArchiveHandle *AH, const char *c)
2057 {
2058 size_t res;
2059
2060 if (c)
2061 {
2062 int len = strlen(c);
2063
2064 res = WriteInt(AH, len);
2065 (*AH->WriteBufPtr) (AH, c, len);
2066 res += len;
2067 }
2068 else
2069 res = WriteInt(AH, -1);
2070
2071 return res;
2072 }
2073
2074 char *
ReadStr(ArchiveHandle * AH)2075 ReadStr(ArchiveHandle *AH)
2076 {
2077 char *buf;
2078 int l;
2079
2080 l = ReadInt(AH);
2081 if (l < 0)
2082 buf = NULL;
2083 else
2084 {
2085 buf = (char *) pg_malloc(l + 1);
2086 (*AH->ReadBufPtr) (AH, (void *) buf, l);
2087
2088 buf[l] = '\0';
2089 }
2090
2091 return buf;
2092 }
2093
2094 static int
_discoverArchiveFormat(ArchiveHandle * AH)2095 _discoverArchiveFormat(ArchiveHandle *AH)
2096 {
2097 FILE *fh;
2098 char sig[6]; /* More than enough */
2099 size_t cnt;
2100 int wantClose = 0;
2101
2102 #if 0
2103 write_msg(modulename, "attempting to ascertain archive format\n");
2104 #endif
2105
2106 if (AH->lookahead)
2107 free(AH->lookahead);
2108
2109 AH->readHeader = 0;
2110 AH->lookaheadSize = 512;
2111 AH->lookahead = pg_malloc0(512);
2112 AH->lookaheadLen = 0;
2113 AH->lookaheadPos = 0;
2114
2115 if (AH->fSpec)
2116 {
2117 struct stat st;
2118
2119 wantClose = 1;
2120
2121 /*
2122 * Check if the specified archive is a directory. If so, check if
2123 * there's a "toc.dat" (or "toc.dat.gz") file in it.
2124 */
2125 if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2126 {
2127 char buf[MAXPGPATH];
2128
2129 if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
2130 exit_horribly(modulename, "directory name too long: \"%s\"\n",
2131 AH->fSpec);
2132 if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2133 {
2134 AH->format = archDirectory;
2135 return AH->format;
2136 }
2137
2138 #ifdef HAVE_LIBZ
2139 if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
2140 exit_horribly(modulename, "directory name too long: \"%s\"\n",
2141 AH->fSpec);
2142 if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2143 {
2144 AH->format = archDirectory;
2145 return AH->format;
2146 }
2147 #endif
2148 exit_horribly(modulename, "directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)\n",
2149 AH->fSpec);
2150 fh = NULL; /* keep compiler quiet */
2151 }
2152 else
2153 {
2154 fh = fopen(AH->fSpec, PG_BINARY_R);
2155 if (!fh)
2156 exit_horribly(modulename, "could not open input file \"%s\": %s\n",
2157 AH->fSpec, strerror(errno));
2158 }
2159 }
2160 else
2161 {
2162 fh = stdin;
2163 if (!fh)
2164 exit_horribly(modulename, "could not open input file: %s\n",
2165 strerror(errno));
2166 }
2167
2168 if ((cnt = fread(sig, 1, 5, fh)) != 5)
2169 {
2170 if (ferror(fh))
2171 exit_horribly(modulename, "could not read input file: %s\n", strerror(errno));
2172 else
2173 exit_horribly(modulename, "input file is too short (read %lu, expected 5)\n",
2174 (unsigned long) cnt);
2175 }
2176
2177 /* Save it, just in case we need it later */
2178 memcpy(&AH->lookahead[0], sig, 5);
2179 AH->lookaheadLen = 5;
2180
2181 if (strncmp(sig, "PGDMP", 5) == 0)
2182 {
2183 /* It's custom format, stop here */
2184 AH->format = archCustom;
2185 AH->readHeader = 1;
2186 }
2187 else
2188 {
2189 /*
2190 * *Maybe* we have a tar archive format file or a text dump ... So,
2191 * read first 512 byte header...
2192 */
2193 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2194 /* read failure is checked below */
2195 AH->lookaheadLen += cnt;
2196
2197 if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2198 (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2199 strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2200 {
2201 /*
2202 * looks like it's probably a text format dump. so suggest they
2203 * try psql
2204 */
2205 exit_horribly(modulename, "input file appears to be a text format dump. Please use psql.\n");
2206 }
2207
2208 if (AH->lookaheadLen != 512)
2209 {
2210 if (feof(fh))
2211 exit_horribly(modulename, "input file does not appear to be a valid archive (too short?)\n");
2212 else
2213 READ_ERROR_EXIT(fh);
2214 }
2215
2216 if (!isValidTarHeader(AH->lookahead))
2217 exit_horribly(modulename, "input file does not appear to be a valid archive\n");
2218
2219 AH->format = archTar;
2220 }
2221
2222 /* Close the file if we opened it */
2223 if (wantClose)
2224 {
2225 if (fclose(fh) != 0)
2226 exit_horribly(modulename, "could not close input file: %s\n",
2227 strerror(errno));
2228 /* Forget lookahead, since we'll re-read header after re-opening */
2229 AH->readHeader = 0;
2230 AH->lookaheadLen = 0;
2231 }
2232
2233 return AH->format;
2234 }
2235
2236
2237 /*
2238 * Allocate an archive handle
2239 */
2240 static ArchiveHandle *
_allocAH(const char * FileSpec,const ArchiveFormat fmt,const int compression,ArchiveMode mode,SetupWorkerPtr setupWorkerPtr)2241 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2242 const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr)
2243 {
2244 ArchiveHandle *AH;
2245
2246 #if 0
2247 write_msg(modulename, "allocating AH for %s, format %d\n", FileSpec, fmt);
2248 #endif
2249
2250 AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2251
2252 /* AH->debugLevel = 100; */
2253
2254 AH->vmaj = K_VERS_MAJOR;
2255 AH->vmin = K_VERS_MINOR;
2256 AH->vrev = K_VERS_REV;
2257
2258 /* Make a convenient integer <maj><min><rev>00 */
2259 AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
2260
2261 /* initialize for backwards compatible string processing */
2262 AH->public.encoding = 0; /* PG_SQL_ASCII */
2263 AH->public.std_strings = false;
2264
2265 /* sql error handling */
2266 AH->public.exit_on_error = true;
2267 AH->public.n_errors = 0;
2268
2269 AH->archiveDumpVersion = PG_VERSION;
2270
2271 AH->createDate = time(NULL);
2272
2273 AH->intSize = sizeof(int);
2274 AH->offSize = sizeof(pgoff_t);
2275 if (FileSpec)
2276 {
2277 AH->fSpec = pg_strdup(FileSpec);
2278
2279 /*
2280 * Not used; maybe later....
2281 *
2282 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2283 * i--) if (AH->workDir[i-1] == '/')
2284 */
2285 }
2286 else
2287 AH->fSpec = NULL;
2288
2289 AH->currUser = NULL; /* unknown */
2290 AH->currSchema = NULL; /* ditto */
2291 AH->currTablespace = NULL; /* ditto */
2292 AH->currWithOids = -1; /* force SET */
2293
2294 AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2295
2296 AH->toc->next = AH->toc;
2297 AH->toc->prev = AH->toc;
2298
2299 AH->mode = mode;
2300 AH->compression = compression;
2301
2302 memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2303
2304 /* Open stdout with no compression for AH output handle */
2305 AH->gzOut = 0;
2306 AH->OF = stdout;
2307
2308 /*
2309 * On Windows, we need to use binary mode to read/write non-text files,
2310 * which include all archive formats as well as compressed plain text.
2311 * Force stdin/stdout into binary mode if that is what we are using.
2312 */
2313 #ifdef WIN32
2314 if ((fmt != archNull || compression != 0) &&
2315 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2316 {
2317 if (mode == archModeWrite)
2318 _setmode(fileno(stdout), O_BINARY);
2319 else
2320 _setmode(fileno(stdin), O_BINARY);
2321 }
2322 #endif
2323
2324 AH->SetupWorkerPtr = setupWorkerPtr;
2325
2326 if (fmt == archUnknown)
2327 AH->format = _discoverArchiveFormat(AH);
2328 else
2329 AH->format = fmt;
2330
2331 switch (AH->format)
2332 {
2333 case archCustom:
2334 InitArchiveFmt_Custom(AH);
2335 break;
2336
2337 case archNull:
2338 InitArchiveFmt_Null(AH);
2339 break;
2340
2341 case archDirectory:
2342 InitArchiveFmt_Directory(AH);
2343 break;
2344
2345 case archTar:
2346 InitArchiveFmt_Tar(AH);
2347 break;
2348
2349 default:
2350 exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt);
2351 }
2352
2353 return AH;
2354 }
2355
2356 /*
2357 * Write out all data (tables & blobs)
2358 */
2359 void
WriteDataChunks(ArchiveHandle * AH,ParallelState * pstate)2360 WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2361 {
2362 TocEntry *te;
2363
2364 for (te = AH->toc->next; te != AH->toc; te = te->next)
2365 {
2366 if (!te->dataDumper)
2367 continue;
2368
2369 if ((te->reqs & REQ_DATA) == 0)
2370 continue;
2371
2372 if (pstate && pstate->numWorkers > 1)
2373 {
2374 /*
2375 * If we are in a parallel backup, then we are always the master
2376 * process. Dispatch each data-transfer job to a worker.
2377 */
2378 EnsureIdleWorker(AH, pstate);
2379 DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
2380 }
2381 else
2382 WriteDataChunksForTocEntry(AH, te);
2383 }
2384
2385 /*
2386 * If parallel, wait for workers to finish.
2387 */
2388 EnsureWorkersFinished(AH, pstate);
2389 }
2390
2391 void
WriteDataChunksForTocEntry(ArchiveHandle * AH,TocEntry * te)2392 WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2393 {
2394 StartDataPtr startPtr;
2395 EndDataPtr endPtr;
2396
2397 AH->currToc = te;
2398
2399 if (strcmp(te->desc, "BLOBS") == 0)
2400 {
2401 startPtr = AH->StartBlobsPtr;
2402 endPtr = AH->EndBlobsPtr;
2403 }
2404 else
2405 {
2406 startPtr = AH->StartDataPtr;
2407 endPtr = AH->EndDataPtr;
2408 }
2409
2410 if (startPtr != NULL)
2411 (*startPtr) (AH, te);
2412
2413 /*
2414 * The user-provided DataDumper routine needs to call AH->WriteData
2415 */
2416 (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
2417
2418 if (endPtr != NULL)
2419 (*endPtr) (AH, te);
2420
2421 AH->currToc = NULL;
2422 }
2423
2424 void
WriteToc(ArchiveHandle * AH)2425 WriteToc(ArchiveHandle *AH)
2426 {
2427 TocEntry *te;
2428 char workbuf[32];
2429 int tocCount;
2430 int i;
2431
2432 /* count entries that will actually be dumped */
2433 tocCount = 0;
2434 for (te = AH->toc->next; te != AH->toc; te = te->next)
2435 {
2436 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2437 tocCount++;
2438 }
2439
2440 /* printf("%d TOC Entries to save\n", tocCount); */
2441
2442 WriteInt(AH, tocCount);
2443
2444 for (te = AH->toc->next; te != AH->toc; te = te->next)
2445 {
2446 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2447 continue;
2448
2449 WriteInt(AH, te->dumpId);
2450 WriteInt(AH, te->dataDumper ? 1 : 0);
2451
2452 /* OID is recorded as a string for historical reasons */
2453 sprintf(workbuf, "%u", te->catalogId.tableoid);
2454 WriteStr(AH, workbuf);
2455 sprintf(workbuf, "%u", te->catalogId.oid);
2456 WriteStr(AH, workbuf);
2457
2458 WriteStr(AH, te->tag);
2459 WriteStr(AH, te->desc);
2460 WriteInt(AH, te->section);
2461 WriteStr(AH, te->defn);
2462 WriteStr(AH, te->dropStmt);
2463 WriteStr(AH, te->copyStmt);
2464 WriteStr(AH, te->namespace);
2465 WriteStr(AH, te->tablespace);
2466 WriteStr(AH, te->owner);
2467 WriteStr(AH, te->withOids ? "true" : "false");
2468
2469 /* Dump list of dependencies */
2470 for (i = 0; i < te->nDeps; i++)
2471 {
2472 sprintf(workbuf, "%d", te->dependencies[i]);
2473 WriteStr(AH, workbuf);
2474 }
2475 WriteStr(AH, NULL); /* Terminate List */
2476
2477 if (AH->WriteExtraTocPtr)
2478 (*AH->WriteExtraTocPtr) (AH, te);
2479 }
2480 }
2481
2482 void
ReadToc(ArchiveHandle * AH)2483 ReadToc(ArchiveHandle *AH)
2484 {
2485 int i;
2486 char *tmp;
2487 DumpId *deps;
2488 int depIdx;
2489 int depSize;
2490 TocEntry *te;
2491
2492 AH->tocCount = ReadInt(AH);
2493 AH->maxDumpId = 0;
2494
2495 for (i = 0; i < AH->tocCount; i++)
2496 {
2497 te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2498 te->dumpId = ReadInt(AH);
2499
2500 if (te->dumpId > AH->maxDumpId)
2501 AH->maxDumpId = te->dumpId;
2502
2503 /* Sanity check */
2504 if (te->dumpId <= 0)
2505 exit_horribly(modulename,
2506 "entry ID %d out of range -- perhaps a corrupt TOC\n",
2507 te->dumpId);
2508
2509 te->hadDumper = ReadInt(AH);
2510
2511 if (AH->version >= K_VERS_1_8)
2512 {
2513 tmp = ReadStr(AH);
2514 sscanf(tmp, "%u", &te->catalogId.tableoid);
2515 free(tmp);
2516 }
2517 else
2518 te->catalogId.tableoid = InvalidOid;
2519 tmp = ReadStr(AH);
2520 sscanf(tmp, "%u", &te->catalogId.oid);
2521 free(tmp);
2522
2523 te->tag = ReadStr(AH);
2524 te->desc = ReadStr(AH);
2525
2526 if (AH->version >= K_VERS_1_11)
2527 {
2528 te->section = ReadInt(AH);
2529 }
2530 else
2531 {
2532 /*
2533 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2534 * the entries into sections. This list need not cover entry
2535 * types added later than 8.4.
2536 */
2537 if (strcmp(te->desc, "COMMENT") == 0 ||
2538 strcmp(te->desc, "ACL") == 0 ||
2539 strcmp(te->desc, "ACL LANGUAGE") == 0)
2540 te->section = SECTION_NONE;
2541 else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2542 strcmp(te->desc, "BLOBS") == 0 ||
2543 strcmp(te->desc, "BLOB COMMENTS") == 0)
2544 te->section = SECTION_DATA;
2545 else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2546 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2547 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2548 strcmp(te->desc, "INDEX") == 0 ||
2549 strcmp(te->desc, "RULE") == 0 ||
2550 strcmp(te->desc, "TRIGGER") == 0)
2551 te->section = SECTION_POST_DATA;
2552 else
2553 te->section = SECTION_PRE_DATA;
2554 }
2555
2556 te->defn = ReadStr(AH);
2557 te->dropStmt = ReadStr(AH);
2558
2559 if (AH->version >= K_VERS_1_3)
2560 te->copyStmt = ReadStr(AH);
2561
2562 if (AH->version >= K_VERS_1_6)
2563 te->namespace = ReadStr(AH);
2564
2565 if (AH->version >= K_VERS_1_10)
2566 te->tablespace = ReadStr(AH);
2567
2568 te->owner = ReadStr(AH);
2569 if (AH->version >= K_VERS_1_9)
2570 {
2571 if (strcmp(ReadStr(AH), "true") == 0)
2572 te->withOids = true;
2573 else
2574 te->withOids = false;
2575 }
2576 else
2577 te->withOids = true;
2578
2579 /* Read TOC entry dependencies */
2580 if (AH->version >= K_VERS_1_5)
2581 {
2582 depSize = 100;
2583 deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2584 depIdx = 0;
2585 for (;;)
2586 {
2587 tmp = ReadStr(AH);
2588 if (!tmp)
2589 break; /* end of list */
2590 if (depIdx >= depSize)
2591 {
2592 depSize *= 2;
2593 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2594 }
2595 sscanf(tmp, "%d", &deps[depIdx]);
2596 free(tmp);
2597 depIdx++;
2598 }
2599
2600 if (depIdx > 0) /* We have a non-null entry */
2601 {
2602 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2603 te->dependencies = deps;
2604 te->nDeps = depIdx;
2605 }
2606 else
2607 {
2608 free(deps);
2609 te->dependencies = NULL;
2610 te->nDeps = 0;
2611 }
2612 }
2613 else
2614 {
2615 te->dependencies = NULL;
2616 te->nDeps = 0;
2617 }
2618
2619 if (AH->ReadExtraTocPtr)
2620 (*AH->ReadExtraTocPtr) (AH, te);
2621
2622 ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n",
2623 i, te->dumpId, te->desc, te->tag);
2624
2625 /* link completed entry into TOC circular list */
2626 te->prev = AH->toc->prev;
2627 AH->toc->prev->next = te;
2628 AH->toc->prev = te;
2629 te->next = AH->toc;
2630
2631 /* special processing immediately upon read for some items */
2632 if (strcmp(te->desc, "ENCODING") == 0)
2633 processEncodingEntry(AH, te);
2634 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2635 processStdStringsEntry(AH, te);
2636 else if (strcmp(te->desc, "SEARCHPATH") == 0)
2637 processSearchPathEntry(AH, te);
2638 }
2639 }
2640
2641 static void
processEncodingEntry(ArchiveHandle * AH,TocEntry * te)2642 processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2643 {
2644 /* te->defn should have the form SET client_encoding = 'foo'; */
2645 char *defn = pg_strdup(te->defn);
2646 char *ptr1;
2647 char *ptr2 = NULL;
2648 int encoding;
2649
2650 ptr1 = strchr(defn, '\'');
2651 if (ptr1)
2652 ptr2 = strchr(++ptr1, '\'');
2653 if (ptr2)
2654 {
2655 *ptr2 = '\0';
2656 encoding = pg_char_to_encoding(ptr1);
2657 if (encoding < 0)
2658 exit_horribly(modulename, "unrecognized encoding \"%s\"\n",
2659 ptr1);
2660 AH->public.encoding = encoding;
2661 }
2662 else
2663 exit_horribly(modulename, "invalid ENCODING item: %s\n",
2664 te->defn);
2665
2666 free(defn);
2667 }
2668
2669 static void
processStdStringsEntry(ArchiveHandle * AH,TocEntry * te)2670 processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2671 {
2672 /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2673 char *ptr1;
2674
2675 ptr1 = strchr(te->defn, '\'');
2676 if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2677 AH->public.std_strings = true;
2678 else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2679 AH->public.std_strings = false;
2680 else
2681 exit_horribly(modulename, "invalid STDSTRINGS item: %s\n",
2682 te->defn);
2683 }
2684
2685 static void
processSearchPathEntry(ArchiveHandle * AH,TocEntry * te)2686 processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2687 {
2688 /*
2689 * te->defn should contain a command to set search_path. We just copy it
2690 * verbatim for use later.
2691 */
2692 AH->public.searchpath = pg_strdup(te->defn);
2693 }
2694
2695 static void
StrictNamesCheck(RestoreOptions * ropt)2696 StrictNamesCheck(RestoreOptions *ropt)
2697 {
2698 const char *missing_name;
2699
2700 Assert(ropt->strict_names);
2701
2702 if (ropt->schemaNames.head != NULL)
2703 {
2704 missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2705 if (missing_name != NULL)
2706 exit_horribly(modulename, "schema \"%s\" not found\n", missing_name);
2707 }
2708
2709 if (ropt->tableNames.head != NULL)
2710 {
2711 missing_name = simple_string_list_not_touched(&ropt->tableNames);
2712 if (missing_name != NULL)
2713 exit_horribly(modulename, "table \"%s\" not found\n", missing_name);
2714 }
2715
2716 if (ropt->indexNames.head != NULL)
2717 {
2718 missing_name = simple_string_list_not_touched(&ropt->indexNames);
2719 if (missing_name != NULL)
2720 exit_horribly(modulename, "index \"%s\" not found\n", missing_name);
2721 }
2722
2723 if (ropt->functionNames.head != NULL)
2724 {
2725 missing_name = simple_string_list_not_touched(&ropt->functionNames);
2726 if (missing_name != NULL)
2727 exit_horribly(modulename, "function \"%s\" not found\n", missing_name);
2728 }
2729
2730 if (ropt->triggerNames.head != NULL)
2731 {
2732 missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2733 if (missing_name != NULL)
2734 exit_horribly(modulename, "trigger \"%s\" not found\n", missing_name);
2735 }
2736 }
2737
2738 static teReqs
_tocEntryRequired(TocEntry * te,teSection curSection,RestoreOptions * ropt)2739 _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt)
2740 {
2741 teReqs res = REQ_SCHEMA | REQ_DATA;
2742
2743 /* These items are treated specially */
2744 if (strcmp(te->desc, "ENCODING") == 0 ||
2745 strcmp(te->desc, "STDSTRINGS") == 0 ||
2746 strcmp(te->desc, "SEARCHPATH") == 0)
2747 return REQ_SPECIAL;
2748
2749 /* If it's an ACL, maybe ignore it */
2750 if (ropt->aclsSkip && _tocEntryIsACL(te))
2751 return 0;
2752
2753 /* If it's security labels, maybe ignore it */
2754 if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2755 return 0;
2756
2757 /* Ignore it if section is not to be dumped/restored */
2758 switch (curSection)
2759 {
2760 case SECTION_PRE_DATA:
2761 if (!(ropt->dumpSections & DUMP_PRE_DATA))
2762 return 0;
2763 break;
2764 case SECTION_DATA:
2765 if (!(ropt->dumpSections & DUMP_DATA))
2766 return 0;
2767 break;
2768 case SECTION_POST_DATA:
2769 if (!(ropt->dumpSections & DUMP_POST_DATA))
2770 return 0;
2771 break;
2772 default:
2773 /* shouldn't get here, really, but ignore it */
2774 return 0;
2775 }
2776
2777 /* Check options for selective dump/restore */
2778 if (ropt->schemaNames.head != NULL)
2779 {
2780 /* If no namespace is specified, it means all. */
2781 if (!te->namespace)
2782 return 0;
2783 if (!(simple_string_list_member(&ropt->schemaNames, te->namespace)))
2784 return 0;
2785 }
2786
2787 if (ropt->selTypes)
2788 {
2789 if (strcmp(te->desc, "TABLE") == 0 ||
2790 strcmp(te->desc, "TABLE DATA") == 0 ||
2791 strcmp(te->desc, "VIEW") == 0 ||
2792 strcmp(te->desc, "FOREIGN TABLE") == 0 ||
2793 strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
2794 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
2795 strcmp(te->desc, "SEQUENCE") == 0 ||
2796 strcmp(te->desc, "SEQUENCE SET") == 0)
2797 {
2798 if (!ropt->selTable)
2799 return 0;
2800 if (ropt->tableNames.head != NULL && (!(simple_string_list_member(&ropt->tableNames, te->tag))))
2801 return 0;
2802 }
2803 else if (strcmp(te->desc, "INDEX") == 0)
2804 {
2805 if (!ropt->selIndex)
2806 return 0;
2807 if (ropt->indexNames.head != NULL && (!(simple_string_list_member(&ropt->indexNames, te->tag))))
2808 return 0;
2809 }
2810 else if (strcmp(te->desc, "FUNCTION") == 0)
2811 {
2812 if (!ropt->selFunction)
2813 return 0;
2814 if (ropt->functionNames.head != NULL && (!(simple_string_list_member(&ropt->functionNames, te->tag))))
2815 return 0;
2816 }
2817 else if (strcmp(te->desc, "TRIGGER") == 0)
2818 {
2819 if (!ropt->selTrigger)
2820 return 0;
2821 if (ropt->triggerNames.head != NULL && (!(simple_string_list_member(&ropt->triggerNames, te->tag))))
2822 return 0;
2823 }
2824 else
2825 return 0;
2826 }
2827
2828 /*
2829 * Check if we had a dataDumper. Indicates if the entry is schema or data
2830 */
2831 if (!te->hadDumper)
2832 {
2833 /*
2834 * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
2835 * it is considered a data entry. We don't need to check for the
2836 * BLOBS entry or old-style BLOB COMMENTS, because they will have
2837 * hadDumper = true ... but we do need to check new-style BLOB
2838 * comments.
2839 */
2840 if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
2841 strcmp(te->desc, "BLOB") == 0 ||
2842 (strcmp(te->desc, "ACL") == 0 &&
2843 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2844 (strcmp(te->desc, "COMMENT") == 0 &&
2845 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2846 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
2847 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
2848 res = res & REQ_DATA;
2849 else
2850 res = res & ~REQ_DATA;
2851 }
2852
2853 /*
2854 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
2855 * always ignore it.
2856 */
2857 if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2858 return 0;
2859
2860 /* Mask it if we only want schema */
2861 if (ropt->schemaOnly)
2862 {
2863 /*
2864 * In binary-upgrade mode, even with schema-only set, we do not mask
2865 * out large objects. Only large object definitions, comments and
2866 * other information should be generated in binary-upgrade mode (not
2867 * the actual data).
2868 */
2869 if (!(ropt->binary_upgrade &&
2870 (strcmp(te->desc, "BLOB") == 0 ||
2871 (strcmp(te->desc, "ACL") == 0 &&
2872 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2873 (strcmp(te->desc, "COMMENT") == 0 &&
2874 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2875 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
2876 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
2877 res = res & REQ_SCHEMA;
2878 }
2879
2880 /* Mask it if we only want data */
2881 if (ropt->dataOnly)
2882 res = res & REQ_DATA;
2883
2884 /* Mask it if we don't have a schema contribution */
2885 if (!te->defn || strlen(te->defn) == 0)
2886 res = res & ~REQ_SCHEMA;
2887
2888 /* Finally, if there's a per-ID filter, limit based on that as well */
2889 if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2890 return 0;
2891
2892 return res;
2893 }
2894
2895 /*
2896 * Identify which pass we should restore this TOC entry in.
2897 *
2898 * See notes with the RestorePass typedef in pg_backup_archiver.h.
2899 */
2900 static RestorePass
_tocEntryRestorePass(TocEntry * te)2901 _tocEntryRestorePass(TocEntry *te)
2902 {
2903 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
2904 if (strcmp(te->desc, "ACL") == 0 ||
2905 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
2906 strcmp(te->desc, "DEFAULT ACL") == 0)
2907 return RESTORE_PASS_ACL;
2908 if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
2909 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
2910 return RESTORE_PASS_POST_ACL;
2911
2912 /*
2913 * Comments need to be emitted in the same pass as their parent objects.
2914 * ACLs haven't got comments, and neither do matview data objects, but
2915 * event triggers do. (Fortunately, event triggers haven't got ACLs, or
2916 * we'd need yet another weird special case.)
2917 */
2918 if (strcmp(te->desc, "COMMENT") == 0 &&
2919 strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
2920 return RESTORE_PASS_POST_ACL;
2921
2922 /* All else can be handled in the main pass. */
2923 return RESTORE_PASS_MAIN;
2924 }
2925
2926 /*
2927 * Identify TOC entries that are ACLs.
2928 *
2929 * Note: it seems worth duplicating some code here to avoid a hard-wired
2930 * assumption that these are exactly the same entries that we restore during
2931 * the RESTORE_PASS_ACL phase.
2932 */
2933 static bool
_tocEntryIsACL(TocEntry * te)2934 _tocEntryIsACL(TocEntry *te)
2935 {
2936 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
2937 if (strcmp(te->desc, "ACL") == 0 ||
2938 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
2939 strcmp(te->desc, "DEFAULT ACL") == 0)
2940 return true;
2941 return false;
2942 }
2943
2944 /*
2945 * Issue SET commands for parameters that we want to have set the same way
2946 * at all times during execution of a restore script.
2947 */
2948 static void
_doSetFixedOutputState(ArchiveHandle * AH)2949 _doSetFixedOutputState(ArchiveHandle *AH)
2950 {
2951 RestoreOptions *ropt = AH->public.ropt;
2952
2953 /*
2954 * Disable timeouts to allow for slow commands, idle parallel workers, etc
2955 */
2956 ahprintf(AH, "SET statement_timeout = 0;\n");
2957 ahprintf(AH, "SET lock_timeout = 0;\n");
2958 ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
2959
2960 /* Select the correct character set encoding */
2961 ahprintf(AH, "SET client_encoding = '%s';\n",
2962 pg_encoding_to_char(AH->public.encoding));
2963
2964 /* Select the correct string literal syntax */
2965 ahprintf(AH, "SET standard_conforming_strings = %s;\n",
2966 AH->public.std_strings ? "on" : "off");
2967
2968 /* Select the role to be used during restore */
2969 if (ropt && ropt->use_role)
2970 ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
2971
2972 /* Select the dump-time search_path */
2973 if (AH->public.searchpath)
2974 ahprintf(AH, "%s", AH->public.searchpath);
2975
2976 /* Make sure function checking is disabled */
2977 ahprintf(AH, "SET check_function_bodies = false;\n");
2978
2979 /* Ensure that all valid XML data will be accepted */
2980 ahprintf(AH, "SET xmloption = content;\n");
2981
2982 /* Avoid annoying notices etc */
2983 ahprintf(AH, "SET client_min_messages = warning;\n");
2984 if (!AH->public.std_strings)
2985 ahprintf(AH, "SET escape_string_warning = off;\n");
2986
2987 /* Adjust row-security state */
2988 if (ropt && ropt->enable_row_security)
2989 ahprintf(AH, "SET row_security = on;\n");
2990 else
2991 ahprintf(AH, "SET row_security = off;\n");
2992
2993 ahprintf(AH, "\n");
2994 }
2995
2996 /*
2997 * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
2998 * for updating state if appropriate. If user is NULL or an empty string,
2999 * the specification DEFAULT will be used.
3000 */
3001 static void
_doSetSessionAuth(ArchiveHandle * AH,const char * user)3002 _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3003 {
3004 PQExpBuffer cmd = createPQExpBuffer();
3005
3006 appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3007
3008 /*
3009 * SQL requires a string literal here. Might as well be correct.
3010 */
3011 if (user && *user)
3012 appendStringLiteralAHX(cmd, user, AH);
3013 else
3014 appendPQExpBufferStr(cmd, "DEFAULT");
3015 appendPQExpBufferChar(cmd, ';');
3016
3017 if (RestoringToDB(AH))
3018 {
3019 PGresult *res;
3020
3021 res = PQexec(AH->connection, cmd->data);
3022
3023 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3024 /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3025 exit_horribly(modulename, "could not set session user to \"%s\": %s",
3026 user, PQerrorMessage(AH->connection));
3027
3028 PQclear(res);
3029 }
3030 else
3031 ahprintf(AH, "%s\n\n", cmd->data);
3032
3033 destroyPQExpBuffer(cmd);
3034 }
3035
3036
3037 /*
3038 * Issue a SET default_with_oids command. Caller is responsible
3039 * for updating state if appropriate.
3040 */
3041 static void
_doSetWithOids(ArchiveHandle * AH,const bool withOids)3042 _doSetWithOids(ArchiveHandle *AH, const bool withOids)
3043 {
3044 PQExpBuffer cmd = createPQExpBuffer();
3045
3046 appendPQExpBuffer(cmd, "SET default_with_oids = %s;", withOids ?
3047 "true" : "false");
3048
3049 if (RestoringToDB(AH))
3050 {
3051 PGresult *res;
3052
3053 res = PQexec(AH->connection, cmd->data);
3054
3055 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3056 warn_or_exit_horribly(AH, modulename,
3057 "could not set default_with_oids: %s",
3058 PQerrorMessage(AH->connection));
3059
3060 PQclear(res);
3061 }
3062 else
3063 ahprintf(AH, "%s\n\n", cmd->data);
3064
3065 destroyPQExpBuffer(cmd);
3066 }
3067
3068
3069 /*
3070 * Issue the commands to connect to the specified database.
3071 *
3072 * If we're currently restoring right into a database, this will
3073 * actually establish a connection. Otherwise it puts a \connect into
3074 * the script output.
3075 */
3076 static void
_reconnectToDB(ArchiveHandle * AH,const char * dbname)3077 _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3078 {
3079 if (RestoringToDB(AH))
3080 ReconnectToServer(AH, dbname);
3081 else
3082 {
3083 PQExpBufferData connectbuf;
3084
3085 initPQExpBuffer(&connectbuf);
3086 appendPsqlMetaConnect(&connectbuf, dbname);
3087 ahprintf(AH, "%s\n", connectbuf.data);
3088 termPQExpBuffer(&connectbuf);
3089 }
3090
3091 /*
3092 * NOTE: currUser keeps track of what the imaginary session user in our
3093 * script is. It's now effectively reset to the original userID.
3094 */
3095 if (AH->currUser)
3096 free(AH->currUser);
3097 AH->currUser = NULL;
3098
3099 /* don't assume we still know the output schema, tablespace, etc either */
3100 if (AH->currSchema)
3101 free(AH->currSchema);
3102 AH->currSchema = NULL;
3103 if (AH->currTablespace)
3104 free(AH->currTablespace);
3105 AH->currTablespace = NULL;
3106 AH->currWithOids = -1;
3107
3108 /* re-establish fixed state */
3109 _doSetFixedOutputState(AH);
3110 }
3111
3112 /*
3113 * Become the specified user, and update state to avoid redundant commands
3114 *
3115 * NULL or empty argument is taken to mean restoring the session default
3116 */
3117 static void
_becomeUser(ArchiveHandle * AH,const char * user)3118 _becomeUser(ArchiveHandle *AH, const char *user)
3119 {
3120 if (!user)
3121 user = ""; /* avoid null pointers */
3122
3123 if (AH->currUser && strcmp(AH->currUser, user) == 0)
3124 return; /* no need to do anything */
3125
3126 _doSetSessionAuth(AH, user);
3127
3128 /*
3129 * NOTE: currUser keeps track of what the imaginary session user in our
3130 * script is
3131 */
3132 if (AH->currUser)
3133 free(AH->currUser);
3134 AH->currUser = pg_strdup(user);
3135 }
3136
3137 /*
3138 * Become the owner of the given TOC entry object. If
3139 * changes in ownership are not allowed, this doesn't do anything.
3140 */
3141 static void
_becomeOwner(ArchiveHandle * AH,TocEntry * te)3142 _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3143 {
3144 RestoreOptions *ropt = AH->public.ropt;
3145
3146 if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3147 return;
3148
3149 _becomeUser(AH, te->owner);
3150 }
3151
3152
3153 /*
3154 * Set the proper default_with_oids value for the table.
3155 */
3156 static void
_setWithOids(ArchiveHandle * AH,TocEntry * te)3157 _setWithOids(ArchiveHandle *AH, TocEntry *te)
3158 {
3159 if (AH->currWithOids != te->withOids)
3160 {
3161 _doSetWithOids(AH, te->withOids);
3162 AH->currWithOids = te->withOids;
3163 }
3164 }
3165
3166
3167 /*
3168 * Issue the commands to select the specified schema as the current schema
3169 * in the target database.
3170 */
3171 static void
_selectOutputSchema(ArchiveHandle * AH,const char * schemaName)3172 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3173 {
3174 PQExpBuffer qry;
3175
3176 /*
3177 * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3178 * that search_path rather than switching to entry-specific paths.
3179 * Otherwise, it's an old archive that will not restore correctly unless
3180 * we set the search_path as it's expecting.
3181 */
3182 if (AH->public.searchpath)
3183 return;
3184
3185 if (!schemaName || *schemaName == '\0' ||
3186 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3187 return; /* no need to do anything */
3188
3189 qry = createPQExpBuffer();
3190
3191 appendPQExpBuffer(qry, "SET search_path = %s",
3192 fmtId(schemaName));
3193 if (strcmp(schemaName, "pg_catalog") != 0)
3194 appendPQExpBufferStr(qry, ", pg_catalog");
3195
3196 if (RestoringToDB(AH))
3197 {
3198 PGresult *res;
3199
3200 res = PQexec(AH->connection, qry->data);
3201
3202 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3203 warn_or_exit_horribly(AH, modulename,
3204 "could not set search_path to \"%s\": %s",
3205 schemaName, PQerrorMessage(AH->connection));
3206
3207 PQclear(res);
3208 }
3209 else
3210 ahprintf(AH, "%s;\n\n", qry->data);
3211
3212 if (AH->currSchema)
3213 free(AH->currSchema);
3214 AH->currSchema = pg_strdup(schemaName);
3215
3216 destroyPQExpBuffer(qry);
3217 }
3218
3219 /*
3220 * Issue the commands to select the specified tablespace as the current one
3221 * in the target database.
3222 */
3223 static void
_selectTablespace(ArchiveHandle * AH,const char * tablespace)3224 _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3225 {
3226 RestoreOptions *ropt = AH->public.ropt;
3227 PQExpBuffer qry;
3228 const char *want,
3229 *have;
3230
3231 /* do nothing in --no-tablespaces mode */
3232 if (ropt->noTablespace)
3233 return;
3234
3235 have = AH->currTablespace;
3236 want = tablespace;
3237
3238 /* no need to do anything for non-tablespace object */
3239 if (!want)
3240 return;
3241
3242 if (have && strcmp(want, have) == 0)
3243 return; /* no need to do anything */
3244
3245 qry = createPQExpBuffer();
3246
3247 if (strcmp(want, "") == 0)
3248 {
3249 /* We want the tablespace to be the database's default */
3250 appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3251 }
3252 else
3253 {
3254 /* We want an explicit tablespace */
3255 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3256 }
3257
3258 if (RestoringToDB(AH))
3259 {
3260 PGresult *res;
3261
3262 res = PQexec(AH->connection, qry->data);
3263
3264 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3265 warn_or_exit_horribly(AH, modulename,
3266 "could not set default_tablespace to %s: %s",
3267 fmtId(want), PQerrorMessage(AH->connection));
3268
3269 PQclear(res);
3270 }
3271 else
3272 ahprintf(AH, "%s;\n\n", qry->data);
3273
3274 if (AH->currTablespace)
3275 free(AH->currTablespace);
3276 AH->currTablespace = pg_strdup(want);
3277
3278 destroyPQExpBuffer(qry);
3279 }
3280
3281 /*
3282 * Extract an object description for a TOC entry, and append it to buf.
3283 *
3284 * This is used for ALTER ... OWNER TO.
3285 */
3286 static void
_getObjectDescription(PQExpBuffer buf,TocEntry * te,ArchiveHandle * AH)3287 _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
3288 {
3289 const char *type = te->desc;
3290
3291 /* Use ALTER TABLE for views and sequences */
3292 if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
3293 strcmp(type, "MATERIALIZED VIEW") == 0)
3294 type = "TABLE";
3295
3296 /* objects that don't require special decoration */
3297 if (strcmp(type, "COLLATION") == 0 ||
3298 strcmp(type, "CONVERSION") == 0 ||
3299 strcmp(type, "DOMAIN") == 0 ||
3300 strcmp(type, "TABLE") == 0 ||
3301 strcmp(type, "TYPE") == 0 ||
3302 strcmp(type, "FOREIGN TABLE") == 0 ||
3303 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3304 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3305 /* non-schema-specified objects */
3306 strcmp(type, "DATABASE") == 0 ||
3307 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3308 strcmp(type, "SCHEMA") == 0 ||
3309 strcmp(type, "EVENT TRIGGER") == 0 ||
3310 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3311 strcmp(type, "SERVER") == 0 ||
3312 strcmp(type, "USER MAPPING") == 0)
3313 {
3314 appendPQExpBuffer(buf, "%s ", type);
3315 if (te->namespace && *te->namespace)
3316 appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3317 appendPQExpBufferStr(buf, fmtId(te->tag));
3318 return;
3319 }
3320
3321 /* BLOBs just have a name, but it's numeric so must not use fmtId */
3322 if (strcmp(type, "BLOB") == 0)
3323 {
3324 appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3325 return;
3326 }
3327
3328 /*
3329 * These object types require additional decoration. Fortunately, the
3330 * information needed is exactly what's in the DROP command.
3331 */
3332 if (strcmp(type, "AGGREGATE") == 0 ||
3333 strcmp(type, "FUNCTION") == 0 ||
3334 strcmp(type, "OPERATOR") == 0 ||
3335 strcmp(type, "OPERATOR CLASS") == 0 ||
3336 strcmp(type, "OPERATOR FAMILY") == 0)
3337 {
3338 /* Chop "DROP " off the front and make a modifiable copy */
3339 char *first = pg_strdup(te->dropStmt + 5);
3340 char *last;
3341
3342 /* point to last character in string */
3343 last = first + strlen(first) - 1;
3344
3345 /* Strip off any ';' or '\n' at the end */
3346 while (last >= first && (*last == '\n' || *last == ';'))
3347 last--;
3348 *(last + 1) = '\0';
3349
3350 appendPQExpBufferStr(buf, first);
3351
3352 free(first);
3353 return;
3354 }
3355
3356 write_msg(modulename, "WARNING: don't know how to set owner for object type \"%s\"\n",
3357 type);
3358 }
3359
3360 /*
3361 * Emit the SQL commands to create the object represented by a TOC entry
3362 *
3363 * This now also includes issuing an ALTER OWNER command to restore the
3364 * object's ownership, if wanted. But note that the object's permissions
3365 * will remain at default, until the matching ACL TOC entry is restored.
3366 */
3367 static void
_printTocEntry(ArchiveHandle * AH,TocEntry * te,bool isData)3368 _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3369 {
3370 RestoreOptions *ropt = AH->public.ropt;
3371
3372 /*
3373 * Avoid dumping the public schema, as it will already be created ...
3374 * unless we are using --clean mode (and *not* --create mode), in which
3375 * case we've previously issued a DROP for it so we'd better recreate it.
3376 *
3377 * Likewise for its comment, if any. (We could try issuing the COMMENT
3378 * command anyway; but it'd fail if the restore is done as non-super-user,
3379 * so let's not.)
3380 *
3381 * XXX it looks pretty ugly to hard-wire the public schema like this, but
3382 * it sits in a sort of no-mans-land between being a system object and a
3383 * user object, so it really is special in a way.
3384 */
3385 if (!(ropt->dropSchema && !ropt->createDB))
3386 {
3387 if (strcmp(te->desc, "SCHEMA") == 0 &&
3388 strcmp(te->tag, "public") == 0)
3389 return;
3390 if (strcmp(te->desc, "COMMENT") == 0 &&
3391 strcmp(te->tag, "SCHEMA public") == 0)
3392 return;
3393 }
3394
3395 /* Select owner, schema, and tablespace as necessary */
3396 _becomeOwner(AH, te);
3397 _selectOutputSchema(AH, te->namespace);
3398 _selectTablespace(AH, te->tablespace);
3399
3400 /* Set up OID mode too */
3401 if (strcmp(te->desc, "TABLE") == 0)
3402 _setWithOids(AH, te);
3403
3404 /* Emit header comment for item */
3405 if (!AH->noTocComments)
3406 {
3407 const char *pfx;
3408 char *sanitized_name;
3409 char *sanitized_schema;
3410 char *sanitized_owner;
3411
3412 if (isData)
3413 pfx = "Data for ";
3414 else
3415 pfx = "";
3416
3417 ahprintf(AH, "--\n");
3418 if (AH->public.verbose)
3419 {
3420 ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3421 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3422 if (te->nDeps > 0)
3423 {
3424 int i;
3425
3426 ahprintf(AH, "-- Dependencies:");
3427 for (i = 0; i < te->nDeps; i++)
3428 ahprintf(AH, " %d", te->dependencies[i]);
3429 ahprintf(AH, "\n");
3430 }
3431 }
3432
3433 /*
3434 * Zap any line endings embedded in user-supplied fields, to prevent
3435 * corruption of the dump (which could, in the worst case, present an
3436 * SQL injection vulnerability if someone were to incautiously load a
3437 * dump containing objects with maliciously crafted names).
3438 */
3439 sanitized_name = replace_line_endings(te->tag);
3440 if (te->namespace)
3441 sanitized_schema = replace_line_endings(te->namespace);
3442 else
3443 sanitized_schema = pg_strdup("-");
3444 if (!ropt->noOwner)
3445 sanitized_owner = replace_line_endings(te->owner);
3446 else
3447 sanitized_owner = pg_strdup("-");
3448
3449 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3450 pfx, sanitized_name, te->desc, sanitized_schema,
3451 sanitized_owner);
3452
3453 free(sanitized_name);
3454 free(sanitized_schema);
3455 free(sanitized_owner);
3456
3457 if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3458 {
3459 char *sanitized_tablespace;
3460
3461 sanitized_tablespace = replace_line_endings(te->tablespace);
3462 ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3463 free(sanitized_tablespace);
3464 }
3465 ahprintf(AH, "\n");
3466
3467 if (AH->PrintExtraTocPtr !=NULL)
3468 (*AH->PrintExtraTocPtr) (AH, te);
3469 ahprintf(AH, "--\n\n");
3470 }
3471
3472 /*
3473 * Actually print the definition.
3474 *
3475 * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
3476 * versions put into CREATE SCHEMA. We have to do this when --no-owner
3477 * mode is selected. This is ugly, but I see no other good way ...
3478 */
3479 if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
3480 {
3481 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3482 }
3483 else
3484 {
3485 if (strlen(te->defn) > 0)
3486 ahprintf(AH, "%s\n\n", te->defn);
3487 }
3488
3489 /*
3490 * If we aren't using SET SESSION AUTH to determine ownership, we must
3491 * instead issue an ALTER OWNER command. We assume that anything without
3492 * a DROP command is not a separately ownable object. All the categories
3493 * with DROP commands must appear in one list or the other.
3494 */
3495 if (!ropt->noOwner && !ropt->use_setsessauth &&
3496 strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
3497 {
3498 if (strcmp(te->desc, "AGGREGATE") == 0 ||
3499 strcmp(te->desc, "BLOB") == 0 ||
3500 strcmp(te->desc, "COLLATION") == 0 ||
3501 strcmp(te->desc, "CONVERSION") == 0 ||
3502 strcmp(te->desc, "DATABASE") == 0 ||
3503 strcmp(te->desc, "DOMAIN") == 0 ||
3504 strcmp(te->desc, "FUNCTION") == 0 ||
3505 strcmp(te->desc, "OPERATOR") == 0 ||
3506 strcmp(te->desc, "OPERATOR CLASS") == 0 ||
3507 strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
3508 strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
3509 strcmp(te->desc, "SCHEMA") == 0 ||
3510 strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3511 strcmp(te->desc, "TABLE") == 0 ||
3512 strcmp(te->desc, "TYPE") == 0 ||
3513 strcmp(te->desc, "VIEW") == 0 ||
3514 strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3515 strcmp(te->desc, "SEQUENCE") == 0 ||
3516 strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3517 strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
3518 strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
3519 strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
3520 strcmp(te->desc, "SERVER") == 0)
3521 {
3522 PQExpBuffer temp = createPQExpBuffer();
3523
3524 appendPQExpBufferStr(temp, "ALTER ");
3525 _getObjectDescription(temp, te, AH);
3526 appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
3527 ahprintf(AH, "%s\n\n", temp->data);
3528 destroyPQExpBuffer(temp);
3529 }
3530 else if (strcmp(te->desc, "CAST") == 0 ||
3531 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3532 strcmp(te->desc, "CONSTRAINT") == 0 ||
3533 strcmp(te->desc, "DEFAULT") == 0 ||
3534 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3535 strcmp(te->desc, "INDEX") == 0 ||
3536 strcmp(te->desc, "RULE") == 0 ||
3537 strcmp(te->desc, "TRIGGER") == 0 ||
3538 strcmp(te->desc, "ROW SECURITY") == 0 ||
3539 strcmp(te->desc, "POLICY") == 0 ||
3540 strcmp(te->desc, "USER MAPPING") == 0)
3541 {
3542 /* these object types don't have separate owners */
3543 }
3544 else
3545 {
3546 write_msg(modulename, "WARNING: don't know how to set owner for object type \"%s\"\n",
3547 te->desc);
3548 }
3549 }
3550
3551 /*
3552 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3553 * commands, so we can no longer assume we know the current auth setting.
3554 */
3555 if (_tocEntryIsACL(te))
3556 {
3557 if (AH->currUser)
3558 free(AH->currUser);
3559 AH->currUser = NULL;
3560 }
3561 }
3562
3563 /*
3564 * Sanitize a string to be included in an SQL comment or TOC listing,
3565 * by replacing any newlines with spaces.
3566 * The result is a freshly malloc'd string.
3567 */
3568 static char *
replace_line_endings(const char * str)3569 replace_line_endings(const char *str)
3570 {
3571 char *result;
3572 char *s;
3573
3574 result = pg_strdup(str);
3575
3576 for (s = result; *s != '\0'; s++)
3577 {
3578 if (*s == '\n' || *s == '\r')
3579 *s = ' ';
3580 }
3581
3582 return result;
3583 }
3584
3585 /*
3586 * Write the file header for a custom-format archive
3587 */
3588 void
WriteHead(ArchiveHandle * AH)3589 WriteHead(ArchiveHandle *AH)
3590 {
3591 struct tm crtm;
3592
3593 (*AH->WriteBufPtr) (AH, "PGDMP", 5); /* Magic code */
3594 (*AH->WriteBytePtr) (AH, AH->vmaj);
3595 (*AH->WriteBytePtr) (AH, AH->vmin);
3596 (*AH->WriteBytePtr) (AH, AH->vrev);
3597 (*AH->WriteBytePtr) (AH, AH->intSize);
3598 (*AH->WriteBytePtr) (AH, AH->offSize);
3599 (*AH->WriteBytePtr) (AH, AH->format);
3600 WriteInt(AH, AH->compression);
3601 crtm = *localtime(&AH->createDate);
3602 WriteInt(AH, crtm.tm_sec);
3603 WriteInt(AH, crtm.tm_min);
3604 WriteInt(AH, crtm.tm_hour);
3605 WriteInt(AH, crtm.tm_mday);
3606 WriteInt(AH, crtm.tm_mon);
3607 WriteInt(AH, crtm.tm_year);
3608 WriteInt(AH, crtm.tm_isdst);
3609 WriteStr(AH, PQdb(AH->connection));
3610 WriteStr(AH, AH->public.remoteVersionStr);
3611 WriteStr(AH, PG_VERSION);
3612 }
3613
3614 void
ReadHead(ArchiveHandle * AH)3615 ReadHead(ArchiveHandle *AH)
3616 {
3617 int fmt;
3618
3619 /*
3620 * If we haven't already read the header, do so.
3621 *
3622 * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3623 * way to unify the cases?
3624 */
3625 if (!AH->readHeader)
3626 {
3627 char tmpMag[7];
3628
3629 (*AH->ReadBufPtr) (AH, tmpMag, 5);
3630
3631 if (strncmp(tmpMag, "PGDMP", 5) != 0)
3632 exit_horribly(modulename, "did not find magic string in file header\n");
3633 }
3634
3635 AH->vmaj = (*AH->ReadBytePtr) (AH);
3636 AH->vmin = (*AH->ReadBytePtr) (AH);
3637
3638 if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0))) /* Version > 1.0 */
3639 AH->vrev = (*AH->ReadBytePtr) (AH);
3640 else
3641 AH->vrev = 0;
3642
3643 AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
3644
3645 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3646 exit_horribly(modulename, "unsupported version (%d.%d) in file header\n",
3647 AH->vmaj, AH->vmin);
3648
3649 AH->intSize = (*AH->ReadBytePtr) (AH);
3650 if (AH->intSize > 32)
3651 exit_horribly(modulename, "sanity check on integer size (%lu) failed\n",
3652 (unsigned long) AH->intSize);
3653
3654 if (AH->intSize > sizeof(int))
3655 write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
3656
3657 if (AH->version >= K_VERS_1_7)
3658 AH->offSize = (*AH->ReadBytePtr) (AH);
3659 else
3660 AH->offSize = AH->intSize;
3661
3662 fmt = (*AH->ReadBytePtr) (AH);
3663
3664 if (AH->format != fmt)
3665 exit_horribly(modulename, "expected format (%d) differs from format found in file (%d)\n",
3666 AH->format, fmt);
3667
3668 if (AH->version >= K_VERS_1_2)
3669 {
3670 if (AH->version < K_VERS_1_4)
3671 AH->compression = (*AH->ReadBytePtr) (AH);
3672 else
3673 AH->compression = ReadInt(AH);
3674 }
3675 else
3676 AH->compression = Z_DEFAULT_COMPRESSION;
3677
3678 #ifndef HAVE_LIBZ
3679 if (AH->compression != 0)
3680 write_msg(modulename, "WARNING: archive is compressed, but this installation does not support compression -- no data will be available\n");
3681 #endif
3682
3683 if (AH->version >= K_VERS_1_4)
3684 {
3685 struct tm crtm;
3686
3687 crtm.tm_sec = ReadInt(AH);
3688 crtm.tm_min = ReadInt(AH);
3689 crtm.tm_hour = ReadInt(AH);
3690 crtm.tm_mday = ReadInt(AH);
3691 crtm.tm_mon = ReadInt(AH);
3692 crtm.tm_year = ReadInt(AH);
3693 crtm.tm_isdst = ReadInt(AH);
3694
3695 /*
3696 * Newer versions of glibc have mktime() report failure if tm_isdst is
3697 * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
3698 * TZ=UTC. This is problematic when restoring an archive under a
3699 * different timezone setting. If we get a failure, try again with
3700 * tm_isdst set to -1 ("don't know").
3701 *
3702 * XXX with or without this hack, we reconstruct createDate
3703 * incorrectly when the prevailing timezone is different from
3704 * pg_dump's. Next time we bump the archive version, we should flush
3705 * this representation and store a plain seconds-since-the-Epoch
3706 * timestamp instead.
3707 */
3708 AH->createDate = mktime(&crtm);
3709 if (AH->createDate == (time_t) -1)
3710 {
3711 crtm.tm_isdst = -1;
3712 AH->createDate = mktime(&crtm);
3713 if (AH->createDate == (time_t) -1)
3714 write_msg(modulename,
3715 "WARNING: invalid creation date in header\n");
3716 }
3717 }
3718
3719 if (AH->version >= K_VERS_1_4)
3720 {
3721 AH->archdbname = ReadStr(AH);
3722 }
3723
3724 if (AH->version >= K_VERS_1_10)
3725 {
3726 AH->archiveRemoteVersion = ReadStr(AH);
3727 AH->archiveDumpVersion = ReadStr(AH);
3728 }
3729 }
3730
3731
3732 /*
3733 * checkSeek
3734 * check to see if ftell/fseek can be performed.
3735 */
3736 bool
checkSeek(FILE * fp)3737 checkSeek(FILE *fp)
3738 {
3739 pgoff_t tpos;
3740
3741 /*
3742 * If pgoff_t is wider than long, we must have "real" fseeko and not an
3743 * emulation using fseek. Otherwise report no seek capability.
3744 */
3745 #ifndef HAVE_FSEEKO
3746 if (sizeof(pgoff_t) > sizeof(long))
3747 return false;
3748 #endif
3749
3750 /* Check that ftello works on this file */
3751 tpos = ftello(fp);
3752 if (tpos < 0)
3753 return false;
3754
3755 /*
3756 * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
3757 * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
3758 * successful no-op even on files that are otherwise unseekable.
3759 */
3760 if (fseeko(fp, tpos, SEEK_SET) != 0)
3761 return false;
3762
3763 return true;
3764 }
3765
3766
3767 /*
3768 * dumpTimestamp
3769 */
3770 static void
dumpTimestamp(ArchiveHandle * AH,const char * msg,time_t tim)3771 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3772 {
3773 char buf[64];
3774
3775 if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3776 ahprintf(AH, "-- %s %s\n\n", msg, buf);
3777 }
3778
3779 /*
3780 * Main engine for parallel restore.
3781 *
3782 * Parallel restore is done in three phases. In this first phase,
3783 * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
3784 * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
3785 * PRE_DATA items other than ACLs.) Entries we can't process now are
3786 * added to the pending_list for later phases to deal with.
3787 */
3788 static void
restore_toc_entries_prefork(ArchiveHandle * AH,TocEntry * pending_list)3789 restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
3790 {
3791 bool skipped_some;
3792 TocEntry *next_work_item;
3793
3794 ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
3795
3796 /* Adjust dependency information */
3797 fix_dependencies(AH);
3798
3799 /*
3800 * Do all the early stuff in a single connection in the parent. There's no
3801 * great point in running it in parallel, in fact it will actually run
3802 * faster in a single connection because we avoid all the connection and
3803 * setup overhead. Also, pre-9.2 pg_dump versions were not very good
3804 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
3805 * not risk trying to process them out-of-order.
3806 *
3807 * Stuff that we can't do immediately gets added to the pending_list.
3808 * Note: we don't yet filter out entries that aren't going to be restored.
3809 * They might participate in dependency chains connecting entries that
3810 * should be restored, so we treat them as live until we actually process
3811 * them.
3812 *
3813 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
3814 * before DATA items, and all DATA items before POST_DATA items. That is
3815 * not certain to be true in older archives, though, and in any case use
3816 * of a list file would destroy that ordering (cf. SortTocFromFile). So
3817 * this loop cannot assume that it holds.
3818 */
3819 AH->restorePass = RESTORE_PASS_MAIN;
3820 skipped_some = false;
3821 for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3822 {
3823 bool do_now = true;
3824
3825 if (next_work_item->section != SECTION_PRE_DATA)
3826 {
3827 /* DATA and POST_DATA items are just ignored for now */
3828 if (next_work_item->section == SECTION_DATA ||
3829 next_work_item->section == SECTION_POST_DATA)
3830 {
3831 do_now = false;
3832 skipped_some = true;
3833 }
3834 else
3835 {
3836 /*
3837 * SECTION_NONE items, such as comments, can be processed now
3838 * if we are still in the PRE_DATA part of the archive. Once
3839 * we've skipped any items, we have to consider whether the
3840 * comment's dependencies are satisfied, so skip it for now.
3841 */
3842 if (skipped_some)
3843 do_now = false;
3844 }
3845 }
3846
3847 /*
3848 * Also skip items that need to be forced into later passes. We need
3849 * not set skipped_some in this case, since by assumption no main-pass
3850 * items could depend on these.
3851 */
3852 if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
3853 do_now = false;
3854
3855 if (do_now)
3856 {
3857 /* OK, restore the item and update its dependencies */
3858 ahlog(AH, 1, "processing item %d %s %s\n",
3859 next_work_item->dumpId,
3860 next_work_item->desc, next_work_item->tag);
3861
3862 (void) restore_toc_entry(AH, next_work_item, false);
3863
3864 /* Reduce dependencies, but don't move anything to ready_list */
3865 reduce_dependencies(AH, next_work_item, NULL);
3866 }
3867 else
3868 {
3869 /* Nope, so add it to pending_list */
3870 par_list_append(pending_list, next_work_item);
3871 }
3872 }
3873
3874 /*
3875 * Now close parent connection in prep for parallel steps. We do this
3876 * mainly to ensure that we don't exceed the specified number of parallel
3877 * connections.
3878 */
3879 DisconnectDatabase(&AH->public);
3880
3881 /* blow away any transient state from the old connection */
3882 if (AH->currUser)
3883 free(AH->currUser);
3884 AH->currUser = NULL;
3885 if (AH->currSchema)
3886 free(AH->currSchema);
3887 AH->currSchema = NULL;
3888 if (AH->currTablespace)
3889 free(AH->currTablespace);
3890 AH->currTablespace = NULL;
3891 AH->currWithOids = -1;
3892 }
3893
3894 /*
3895 * Main engine for parallel restore.
3896 *
3897 * Parallel restore is done in three phases. In this second phase,
3898 * we process entries by dispatching them to parallel worker children
3899 * (processes on Unix, threads on Windows), each of which connects
3900 * separately to the database. Inter-entry dependencies are respected,
3901 * and so is the RestorePass multi-pass structure. When we can no longer
3902 * make any entries ready to process, we exit. Normally, there will be
3903 * nothing left to do; but if there is, the third phase will mop up.
3904 */
3905 static void
restore_toc_entries_parallel(ArchiveHandle * AH,ParallelState * pstate,TocEntry * pending_list)3906 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
3907 TocEntry *pending_list)
3908 {
3909 TocEntry ready_list;
3910 TocEntry *next_work_item;
3911
3912 ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
3913
3914 /*
3915 * The pending_list contains all items that we need to restore. Move all
3916 * items that are available to process immediately into the ready_list.
3917 * After this setup, the pending list is everything that needs to be done
3918 * but is blocked by one or more dependencies, while the ready list
3919 * contains items that have no remaining dependencies and are OK to
3920 * process in the current restore pass.
3921 */
3922 par_list_header_init(&ready_list);
3923 AH->restorePass = RESTORE_PASS_MAIN;
3924 move_to_ready_list(pending_list, &ready_list, AH->restorePass);
3925
3926 /*
3927 * main parent loop
3928 *
3929 * Keep going until there is no worker still running AND there is no work
3930 * left to be done. Note invariant: at top of loop, there should always
3931 * be at least one worker available to dispatch a job to.
3932 */
3933 ahlog(AH, 1, "entering main parallel loop\n");
3934
3935 for (;;)
3936 {
3937 /* Look for an item ready to be dispatched to a worker */
3938 next_work_item = get_next_work_item(AH, &ready_list, pstate);
3939 if (next_work_item != NULL)
3940 {
3941 /* If not to be restored, don't waste time launching a worker */
3942 if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
3943 {
3944 ahlog(AH, 1, "skipping item %d %s %s\n",
3945 next_work_item->dumpId,
3946 next_work_item->desc, next_work_item->tag);
3947 /* Drop it from ready_list, and update its dependencies */
3948 par_list_remove(next_work_item);
3949 reduce_dependencies(AH, next_work_item, &ready_list);
3950 /* Loop around to see if anything else can be dispatched */
3951 continue;
3952 }
3953
3954 ahlog(AH, 1, "launching item %d %s %s\n",
3955 next_work_item->dumpId,
3956 next_work_item->desc, next_work_item->tag);
3957
3958 /* Remove it from ready_list, and dispatch to some worker */
3959 par_list_remove(next_work_item);
3960
3961 DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
3962 }
3963 else if (IsEveryWorkerIdle(pstate))
3964 {
3965 /*
3966 * Nothing is ready and no worker is running, so we're done with
3967 * the current pass or maybe with the whole process.
3968 */
3969 if (AH->restorePass == RESTORE_PASS_LAST)
3970 break; /* No more parallel processing is possible */
3971
3972 /* Advance to next restore pass */
3973 AH->restorePass++;
3974 /* That probably allows some stuff to be made ready */
3975 move_to_ready_list(pending_list, &ready_list, AH->restorePass);
3976 /* Loop around to see if anything's now ready */
3977 continue;
3978 }
3979 else
3980 {
3981 /*
3982 * We have nothing ready, but at least one child is working, so
3983 * wait for some subjob to finish.
3984 */
3985 }
3986
3987 for (;;)
3988 {
3989 int nTerm = 0;
3990 int ret_child;
3991 int work_status;
3992
3993 /*
3994 * In order to reduce dependencies as soon as possible and
3995 * especially to reap the status of workers who are working on
3996 * items that pending items depend on, we do a non-blocking check
3997 * for ended workers first.
3998 *
3999 * However, if we do not have any other work items currently that
4000 * workers can work on, we do not busy-loop here but instead
4001 * really wait for at least one worker to terminate. Hence we call
4002 * ListenToWorkers(..., ..., do_wait = true) in this case.
4003 */
4004 ListenToWorkers(AH, pstate, !next_work_item);
4005
4006 while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
4007 {
4008 nTerm++;
4009 mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
4010 }
4011
4012 /*
4013 * We need to make sure that we have an idle worker before
4014 * re-running the loop. If nTerm > 0 we already have that (quick
4015 * check).
4016 */
4017 if (nTerm > 0)
4018 break;
4019
4020 /* if nobody terminated, explicitly check for an idle worker */
4021 if (GetIdleWorker(pstate) != NO_SLOT)
4022 break;
4023
4024 /*
4025 * If we have no idle worker, read the result of one or more
4026 * workers and loop the loop to call ReapWorkerStatus() on them.
4027 */
4028 ListenToWorkers(AH, pstate, true);
4029 }
4030 }
4031
4032 /* There should now be nothing in ready_list. */
4033 Assert(ready_list.par_next == &ready_list);
4034
4035 ahlog(AH, 1, "finished main parallel loop\n");
4036 }
4037
4038 /*
4039 * Main engine for parallel restore.
4040 *
4041 * Parallel restore is done in three phases. In this third phase,
4042 * we mop up any remaining TOC entries by processing them serially.
4043 * This phase normally should have nothing to do, but if we've somehow
4044 * gotten stuck due to circular dependencies or some such, this provides
4045 * at least some chance of completing the restore successfully.
4046 */
4047 static void
restore_toc_entries_postfork(ArchiveHandle * AH,TocEntry * pending_list)4048 restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4049 {
4050 RestoreOptions *ropt = AH->public.ropt;
4051 TocEntry *te;
4052
4053 ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
4054
4055 /*
4056 * Now reconnect the single parent connection.
4057 */
4058 ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4059
4060 /* re-establish fixed state */
4061 _doSetFixedOutputState(AH);
4062
4063 /*
4064 * Make sure there is no work left due to, say, circular dependencies, or
4065 * some other pathological condition. If so, do it in the single parent
4066 * connection. We don't sweat about RestorePass ordering; it's likely we
4067 * already violated that.
4068 */
4069 for (te = pending_list->par_next; te != pending_list; te = te->par_next)
4070 {
4071 ahlog(AH, 1, "processing missed item %d %s %s\n",
4072 te->dumpId, te->desc, te->tag);
4073 (void) restore_toc_entry(AH, te, false);
4074 }
4075 }
4076
4077 /*
4078 * Check if te1 has an exclusive lock requirement for an item that te2 also
4079 * requires, whether or not te2's requirement is for an exclusive lock.
4080 */
4081 static bool
has_lock_conflicts(TocEntry * te1,TocEntry * te2)4082 has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4083 {
4084 int j,
4085 k;
4086
4087 for (j = 0; j < te1->nLockDeps; j++)
4088 {
4089 for (k = 0; k < te2->nDeps; k++)
4090 {
4091 if (te1->lockDeps[j] == te2->dependencies[k])
4092 return true;
4093 }
4094 }
4095 return false;
4096 }
4097
4098
4099 /*
4100 * Initialize the header of a parallel-processing list.
4101 *
4102 * These are circular lists with a dummy TocEntry as header, just like the
4103 * main TOC list; but we use separate list links so that an entry can be in
4104 * the main TOC list as well as in a parallel-processing list.
4105 */
4106 static void
par_list_header_init(TocEntry * l)4107 par_list_header_init(TocEntry *l)
4108 {
4109 l->par_prev = l->par_next = l;
4110 }
4111
4112 /* Append te to the end of the parallel-processing list headed by l */
4113 static void
par_list_append(TocEntry * l,TocEntry * te)4114 par_list_append(TocEntry *l, TocEntry *te)
4115 {
4116 te->par_prev = l->par_prev;
4117 l->par_prev->par_next = te;
4118 l->par_prev = te;
4119 te->par_next = l;
4120 }
4121
4122 /* Remove te from whatever parallel-processing list it's in */
4123 static void
par_list_remove(TocEntry * te)4124 par_list_remove(TocEntry *te)
4125 {
4126 te->par_prev->par_next = te->par_next;
4127 te->par_next->par_prev = te->par_prev;
4128 te->par_prev = NULL;
4129 te->par_next = NULL;
4130 }
4131
4132
4133 /*
4134 * Move all immediately-ready items from pending_list to ready_list.
4135 *
4136 * Items are considered ready if they have no remaining dependencies and
4137 * they belong in the current restore pass. (See also reduce_dependencies,
4138 * which applies the same logic one-at-a-time.)
4139 */
4140 static void
move_to_ready_list(TocEntry * pending_list,TocEntry * ready_list,RestorePass pass)4141 move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
4142 RestorePass pass)
4143 {
4144 TocEntry *te;
4145 TocEntry *next_te;
4146
4147 for (te = pending_list->par_next; te != pending_list; te = next_te)
4148 {
4149 /* must save list link before possibly moving te to other list */
4150 next_te = te->par_next;
4151
4152 if (te->depCount == 0 &&
4153 _tocEntryRestorePass(te) == pass)
4154 {
4155 /* Remove it from pending_list ... */
4156 par_list_remove(te);
4157 /* ... and add to ready_list */
4158 par_list_append(ready_list, te);
4159 }
4160 }
4161 }
4162
4163 /*
4164 * Find the next work item (if any) that is capable of being run now.
4165 *
4166 * To qualify, the item must have no remaining dependencies
4167 * and no requirements for locks that are incompatible with
4168 * items currently running. Items in the ready_list are known to have
4169 * no remaining dependencies, but we have to check for lock conflicts.
4170 *
4171 * Note that the returned item has *not* been removed from ready_list.
4172 * The caller must do that after successfully dispatching the item.
4173 *
4174 * pref_non_data is for an alternative selection algorithm that gives
4175 * preference to non-data items if there is already a data load running.
4176 * It is currently disabled.
4177 */
4178 static TocEntry *
get_next_work_item(ArchiveHandle * AH,TocEntry * ready_list,ParallelState * pstate)4179 get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
4180 ParallelState *pstate)
4181 {
4182 bool pref_non_data = false; /* or get from AH->ropt */
4183 TocEntry *data_te = NULL;
4184 TocEntry *te;
4185 int i,
4186 k;
4187
4188 /*
4189 * Bogus heuristics for pref_non_data
4190 */
4191 if (pref_non_data)
4192 {
4193 int count = 0;
4194
4195 for (k = 0; k < pstate->numWorkers; k++)
4196 if (pstate->parallelSlot[k].args->te != NULL &&
4197 pstate->parallelSlot[k].args->te->section == SECTION_DATA)
4198 count++;
4199 if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
4200 pref_non_data = false;
4201 }
4202
4203 /*
4204 * Search the ready_list until we find a suitable item.
4205 */
4206 for (te = ready_list->par_next; te != ready_list; te = te->par_next)
4207 {
4208 bool conflicts = false;
4209
4210 /*
4211 * Check to see if the item would need exclusive lock on something
4212 * that a currently running item also needs lock on, or vice versa. If
4213 * so, we don't want to schedule them together.
4214 */
4215 for (i = 0; i < pstate->numWorkers && !conflicts; i++)
4216 {
4217 TocEntry *running_te;
4218
4219 if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
4220 continue;
4221 running_te = pstate->parallelSlot[i].args->te;
4222
4223 if (has_lock_conflicts(te, running_te) ||
4224 has_lock_conflicts(running_te, te))
4225 {
4226 conflicts = true;
4227 break;
4228 }
4229 }
4230
4231 if (conflicts)
4232 continue;
4233
4234 if (pref_non_data && te->section == SECTION_DATA)
4235 {
4236 if (data_te == NULL)
4237 data_te = te;
4238 continue;
4239 }
4240
4241 /* passed all tests, so this item can run */
4242 return te;
4243 }
4244
4245 if (data_te != NULL)
4246 return data_te;
4247
4248 ahlog(AH, 2, "no item ready\n");
4249 return NULL;
4250 }
4251
4252
4253 /*
4254 * Restore a single TOC item in parallel with others
4255 *
4256 * this is run in the worker, i.e. in a thread (Windows) or a separate process
4257 * (everything else). A worker process executes several such work items during
4258 * a parallel backup or restore. Once we terminate here and report back that
4259 * our work is finished, the master process will assign us a new work item.
4260 */
4261 int
parallel_restore(ParallelArgs * args)4262 parallel_restore(ParallelArgs *args)
4263 {
4264 ArchiveHandle *AH = args->AH;
4265 TocEntry *te = args->te;
4266 int status;
4267
4268 Assert(AH->connection != NULL);
4269
4270 /* Count only errors associated with this TOC entry */
4271 AH->public.n_errors = 0;
4272
4273 /* Restore the TOC item */
4274 status = restore_toc_entry(AH, te, true);
4275
4276 return status;
4277 }
4278
4279
4280 /*
4281 * Housekeeping to be done after a step has been parallel restored.
4282 *
4283 * Clear the appropriate slot, free all the extra memory we allocated,
4284 * update status, and reduce the dependency count of any dependent items.
4285 */
4286 static void
mark_work_done(ArchiveHandle * AH,TocEntry * ready_list,int worker,int status,ParallelState * pstate)4287 mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
4288 int worker, int status,
4289 ParallelState *pstate)
4290 {
4291 TocEntry *te = NULL;
4292
4293 te = pstate->parallelSlot[worker].args->te;
4294
4295 if (te == NULL)
4296 exit_horribly(modulename, "could not find slot of finished worker\n");
4297
4298 ahlog(AH, 1, "finished item %d %s %s\n",
4299 te->dumpId, te->desc, te->tag);
4300
4301 if (status == WORKER_CREATE_DONE)
4302 mark_create_done(AH, te);
4303 else if (status == WORKER_INHIBIT_DATA)
4304 {
4305 inhibit_data_for_failed_table(AH, te);
4306 AH->public.n_errors++;
4307 }
4308 else if (status == WORKER_IGNORED_ERRORS)
4309 AH->public.n_errors++;
4310 else if (status != 0)
4311 exit_horribly(modulename, "worker process failed: exit code %d\n",
4312 status);
4313
4314 reduce_dependencies(AH, te, ready_list);
4315 }
4316
4317
4318 /*
4319 * Process the dependency information into a form useful for parallel restore.
4320 *
4321 * This function takes care of fixing up some missing or badly designed
4322 * dependencies, and then prepares subsidiary data structures that will be
4323 * used in the main parallel-restore logic, including:
4324 * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4325 * 2. We set up depCount fields that are the number of as-yet-unprocessed
4326 * dependencies for each TOC entry.
4327 *
4328 * We also identify locking dependencies so that we can avoid trying to
4329 * schedule conflicting items at the same time.
4330 */
4331 static void
fix_dependencies(ArchiveHandle * AH)4332 fix_dependencies(ArchiveHandle *AH)
4333 {
4334 TocEntry *te;
4335 int i;
4336
4337 /*
4338 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4339 * items are marked as not being in any parallel-processing list.
4340 */
4341 for (te = AH->toc->next; te != AH->toc; te = te->next)
4342 {
4343 te->depCount = te->nDeps;
4344 te->revDeps = NULL;
4345 te->nRevDeps = 0;
4346 te->par_prev = NULL;
4347 te->par_next = NULL;
4348 }
4349
4350 /*
4351 * POST_DATA items that are shown as depending on a table need to be
4352 * re-pointed to depend on that table's data, instead. This ensures they
4353 * won't get scheduled until the data has been loaded.
4354 */
4355 repoint_table_dependencies(AH);
4356
4357 /*
4358 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4359 * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4360 * one BLOB COMMENTS in such files.)
4361 */
4362 if (AH->version < K_VERS_1_11)
4363 {
4364 for (te = AH->toc->next; te != AH->toc; te = te->next)
4365 {
4366 if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4367 {
4368 TocEntry *te2;
4369
4370 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4371 {
4372 if (strcmp(te2->desc, "BLOBS") == 0)
4373 {
4374 te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4375 te->dependencies[0] = te2->dumpId;
4376 te->nDeps++;
4377 te->depCount++;
4378 break;
4379 }
4380 }
4381 break;
4382 }
4383 }
4384 }
4385
4386 /*
4387 * At this point we start to build the revDeps reverse-dependency arrays,
4388 * so all changes of dependencies must be complete.
4389 */
4390
4391 /*
4392 * Count the incoming dependencies for each item. Also, it is possible
4393 * that the dependencies list items that are not in the archive at all
4394 * (that should not happen in 9.2 and later, but is highly likely in older
4395 * archives). Subtract such items from the depCounts.
4396 */
4397 for (te = AH->toc->next; te != AH->toc; te = te->next)
4398 {
4399 for (i = 0; i < te->nDeps; i++)
4400 {
4401 DumpId depid = te->dependencies[i];
4402
4403 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4404 AH->tocsByDumpId[depid]->nRevDeps++;
4405 else
4406 te->depCount--;
4407 }
4408 }
4409
4410 /*
4411 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4412 * it as a counter below.
4413 */
4414 for (te = AH->toc->next; te != AH->toc; te = te->next)
4415 {
4416 if (te->nRevDeps > 0)
4417 te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4418 te->nRevDeps = 0;
4419 }
4420
4421 /*
4422 * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4423 * better agree with the loops above.
4424 */
4425 for (te = AH->toc->next; te != AH->toc; te = te->next)
4426 {
4427 for (i = 0; i < te->nDeps; i++)
4428 {
4429 DumpId depid = te->dependencies[i];
4430
4431 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4432 {
4433 TocEntry *otherte = AH->tocsByDumpId[depid];
4434
4435 otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4436 }
4437 }
4438 }
4439
4440 /*
4441 * Lastly, work out the locking dependencies.
4442 */
4443 for (te = AH->toc->next; te != AH->toc; te = te->next)
4444 {
4445 te->lockDeps = NULL;
4446 te->nLockDeps = 0;
4447 identify_locking_dependencies(AH, te);
4448 }
4449 }
4450
4451 /*
4452 * Change dependencies on table items to depend on table data items instead,
4453 * but only in POST_DATA items.
4454 */
4455 static void
repoint_table_dependencies(ArchiveHandle * AH)4456 repoint_table_dependencies(ArchiveHandle *AH)
4457 {
4458 TocEntry *te;
4459 int i;
4460 DumpId olddep;
4461
4462 for (te = AH->toc->next; te != AH->toc; te = te->next)
4463 {
4464 if (te->section != SECTION_POST_DATA)
4465 continue;
4466 for (i = 0; i < te->nDeps; i++)
4467 {
4468 olddep = te->dependencies[i];
4469 if (olddep <= AH->maxDumpId &&
4470 AH->tableDataId[olddep] != 0)
4471 {
4472 te->dependencies[i] = AH->tableDataId[olddep];
4473 ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
4474 te->dumpId, olddep, AH->tableDataId[olddep]);
4475 }
4476 }
4477 }
4478 }
4479
4480 /*
4481 * Identify which objects we'll need exclusive lock on in order to restore
4482 * the given TOC entry (*other* than the one identified by the TOC entry
4483 * itself). Record their dump IDs in the entry's lockDeps[] array.
4484 */
4485 static void
identify_locking_dependencies(ArchiveHandle * AH,TocEntry * te)4486 identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4487 {
4488 DumpId *lockids;
4489 int nlockids;
4490 int i;
4491
4492 /*
4493 * We only care about this for POST_DATA items. PRE_DATA items are not
4494 * run in parallel, and DATA items are all independent by assumption.
4495 */
4496 if (te->section != SECTION_POST_DATA)
4497 return;
4498
4499 /* Quick exit if no dependencies at all */
4500 if (te->nDeps == 0)
4501 return;
4502
4503 /*
4504 * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4505 * and hence require exclusive lock. However, we know that CREATE INDEX
4506 * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4507 * category too ... but today is not that day.)
4508 */
4509 if (strcmp(te->desc, "INDEX") == 0)
4510 return;
4511
4512 /*
4513 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4514 * item listed among its dependencies. Originally all of these would have
4515 * been TABLE items, but repoint_table_dependencies would have repointed
4516 * them to the TABLE DATA items if those are present (which they might not
4517 * be, eg in a schema-only dump). Note that all of the entries we are
4518 * processing here are POST_DATA; otherwise there might be a significant
4519 * difference between a dependency on a table and a dependency on its
4520 * data, so that closer analysis would be needed here.
4521 */
4522 lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4523 nlockids = 0;
4524 for (i = 0; i < te->nDeps; i++)
4525 {
4526 DumpId depid = te->dependencies[i];
4527
4528 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4529 ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4530 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4531 lockids[nlockids++] = depid;
4532 }
4533
4534 if (nlockids == 0)
4535 {
4536 free(lockids);
4537 return;
4538 }
4539
4540 te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4541 te->nLockDeps = nlockids;
4542 }
4543
4544 /*
4545 * Remove the specified TOC entry from the depCounts of items that depend on
4546 * it, thereby possibly making them ready-to-run. Any pending item that
4547 * becomes ready should be moved to the ready_list, if that's provided.
4548 */
4549 static void
reduce_dependencies(ArchiveHandle * AH,TocEntry * te,TocEntry * ready_list)4550 reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
4551 {
4552 int i;
4553
4554 ahlog(AH, 2, "reducing dependencies for %d\n", te->dumpId);
4555
4556 for (i = 0; i < te->nRevDeps; i++)
4557 {
4558 TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4559
4560 Assert(otherte->depCount > 0);
4561 otherte->depCount--;
4562
4563 /*
4564 * It's ready if it has no remaining dependencies, and it belongs in
4565 * the current restore pass, and it is currently a member of the
4566 * pending list (that check is needed to prevent double restore in
4567 * some cases where a list-file forces out-of-order restoring).
4568 * However, if ready_list == NULL then caller doesn't want any list
4569 * memberships changed.
4570 */
4571 if (otherte->depCount == 0 &&
4572 _tocEntryRestorePass(otherte) == AH->restorePass &&
4573 otherte->par_prev != NULL &&
4574 ready_list != NULL)
4575 {
4576 /* Remove it from pending list ... */
4577 par_list_remove(otherte);
4578 /* ... and add to ready_list */
4579 par_list_append(ready_list, otherte);
4580 }
4581 }
4582 }
4583
4584 /*
4585 * Set the created flag on the DATA member corresponding to the given
4586 * TABLE member
4587 */
4588 static void
mark_create_done(ArchiveHandle * AH,TocEntry * te)4589 mark_create_done(ArchiveHandle *AH, TocEntry *te)
4590 {
4591 if (AH->tableDataId[te->dumpId] != 0)
4592 {
4593 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4594
4595 ted->created = true;
4596 }
4597 }
4598
4599 /*
4600 * Mark the DATA member corresponding to the given TABLE member
4601 * as not wanted
4602 */
4603 static void
inhibit_data_for_failed_table(ArchiveHandle * AH,TocEntry * te)4604 inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
4605 {
4606 ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
4607 te->tag);
4608
4609 if (AH->tableDataId[te->dumpId] != 0)
4610 {
4611 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4612
4613 ted->reqs = 0;
4614 }
4615 }
4616
4617 /*
4618 * Clone and de-clone routines used in parallel restoration.
4619 *
4620 * Enough of the structure is cloned to ensure that there is no
4621 * conflict between different threads each with their own clone.
4622 */
4623 ArchiveHandle *
CloneArchive(ArchiveHandle * AH)4624 CloneArchive(ArchiveHandle *AH)
4625 {
4626 ArchiveHandle *clone;
4627
4628 /* Make a "flat" copy */
4629 clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4630 memcpy(clone, AH, sizeof(ArchiveHandle));
4631
4632 /* Handle format-independent fields */
4633 memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4634
4635 /* The clone will have its own connection, so disregard connection state */
4636 clone->connection = NULL;
4637 clone->connCancel = NULL;
4638 clone->currUser = NULL;
4639 clone->currSchema = NULL;
4640 clone->currTablespace = NULL;
4641 clone->currWithOids = -1;
4642
4643 /* savedPassword must be local in case we change it while connecting */
4644 if (clone->savedPassword)
4645 clone->savedPassword = pg_strdup(clone->savedPassword);
4646
4647 /* clone has its own error count, too */
4648 clone->public.n_errors = 0;
4649
4650 /*
4651 * Connect our new clone object to the database, using the same connection
4652 * parameters used for the original connection.
4653 */
4654 ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
4655
4656 /* re-establish fixed state */
4657 if (AH->mode == archModeRead)
4658 _doSetFixedOutputState(clone);
4659 /* in write case, setupDumpWorker will fix up connection state */
4660
4661 /* Let the format-specific code have a chance too */
4662 (clone->ClonePtr) (clone);
4663
4664 Assert(clone->connection != NULL);
4665 return clone;
4666 }
4667
4668 /*
4669 * Release clone-local storage.
4670 *
4671 * Note: we assume any clone-local connection was already closed.
4672 */
4673 void
DeCloneArchive(ArchiveHandle * AH)4674 DeCloneArchive(ArchiveHandle *AH)
4675 {
4676 /* Should not have an open database connection */
4677 Assert(AH->connection == NULL);
4678
4679 /* Clear format-specific state */
4680 (AH->DeClonePtr) (AH);
4681
4682 /* Clear state allocated by CloneArchive */
4683 if (AH->sqlparse.curCmd)
4684 destroyPQExpBuffer(AH->sqlparse.curCmd);
4685
4686 /* Clear any connection-local state */
4687 if (AH->currUser)
4688 free(AH->currUser);
4689 if (AH->currSchema)
4690 free(AH->currSchema);
4691 if (AH->currTablespace)
4692 free(AH->currTablespace);
4693 if (AH->savedPassword)
4694 free(AH->savedPassword);
4695
4696 free(AH);
4697 }
4698