1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  *	Private implementation of the archiver routines.
6  *
7  *	See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  *	Rights are granted to use this software in any way so long
11  *	as this notice is not removed.
12  *
13  *	The author is not responsible for loss or damages that may
14  *	result from its use.
15  *
16  *
17  * IDENTIFICATION
18  *		src/bin/pg_dump/pg_backup_archiver.c
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres_fe.h"
23 
24 #include <ctype.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28 #include <sys/wait.h>
29 #ifdef WIN32
30 #include <io.h>
31 #endif
32 
33 #include "parallel.h"
34 #include "pg_backup_archiver.h"
35 #include "pg_backup_db.h"
36 #include "pg_backup_utils.h"
37 #include "dumputils.h"
38 #include "fe_utils/string_utils.h"
39 
40 #include "libpq/libpq-fs.h"
41 
42 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
43 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
44 
45 /* state needed to save/restore an archive's output target */
46 typedef struct _outputContext
47 {
48 	void	   *OF;
49 	int			gzOut;
50 } OutputContext;
51 
52 /* translator: this is a module name */
53 static const char *modulename = gettext_noop("archiver");
54 
55 
56 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
57 		 const int compression, bool dosync, ArchiveMode mode,
58 		 SetupWorkerPtrType setupWorkerPtr);
59 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
60 					  ArchiveHandle *AH);
61 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
62 static char *replace_line_endings(const char *str);
63 static void _doSetFixedOutputState(ArchiveHandle *AH);
64 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
65 static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
66 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
67 static void _becomeUser(ArchiveHandle *AH, const char *user);
68 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
69 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
70 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
71 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
72 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
73 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
74 static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt);
75 static RestorePass _tocEntryRestorePass(TocEntry *te);
76 static bool _tocEntryIsACL(TocEntry *te);
77 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
78 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
79 static void buildTocEntryArrays(ArchiveHandle *AH);
80 static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
81 static int	_discoverArchiveFormat(ArchiveHandle *AH);
82 
83 static int	RestoringToDB(ArchiveHandle *AH);
84 static void dump_lo_buf(ArchiveHandle *AH);
85 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
86 static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
87 static OutputContext SaveOutput(ArchiveHandle *AH);
88 static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
89 
90 static int	restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
91 static void restore_toc_entries_prefork(ArchiveHandle *AH,
92 							TocEntry *pending_list);
93 static void restore_toc_entries_parallel(ArchiveHandle *AH,
94 							 ParallelState *pstate,
95 							 TocEntry *pending_list);
96 static void restore_toc_entries_postfork(ArchiveHandle *AH,
97 							 TocEntry *pending_list);
98 static void par_list_header_init(TocEntry *l);
99 static void par_list_append(TocEntry *l, TocEntry *te);
100 static void par_list_remove(TocEntry *te);
101 static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
102 				   RestorePass pass);
103 static TocEntry *get_next_work_item(ArchiveHandle *AH,
104 				   TocEntry *ready_list,
105 				   ParallelState *pstate);
106 static void mark_dump_job_done(ArchiveHandle *AH,
107 				   TocEntry *te,
108 				   int status,
109 				   void *callback_data);
110 static void mark_restore_job_done(ArchiveHandle *AH,
111 					  TocEntry *te,
112 					  int status,
113 					  void *callback_data);
114 static void fix_dependencies(ArchiveHandle *AH);
115 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
116 static void repoint_table_dependencies(ArchiveHandle *AH);
117 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
118 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
119 					TocEntry *ready_list);
120 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
121 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
122 
123 static void StrictNamesCheck(RestoreOptions *ropt);
124 
125 
126 /*
127  * Allocate a new DumpOptions block containing all default values.
128  */
129 DumpOptions *
NewDumpOptions(void)130 NewDumpOptions(void)
131 {
132 	DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
133 
134 	InitDumpOptions(opts);
135 	return opts;
136 }
137 
138 /*
139  * Initialize a DumpOptions struct to all default values
140  */
141 void
InitDumpOptions(DumpOptions * opts)142 InitDumpOptions(DumpOptions *opts)
143 {
144 	memset(opts, 0, sizeof(DumpOptions));
145 	/* set any fields that shouldn't default to zeroes */
146 	opts->include_everything = true;
147 	opts->cparams.promptPassword = TRI_DEFAULT;
148 	opts->dumpSections = DUMP_UNSECTIONED;
149 }
150 
151 /*
152  * Create a freshly allocated DumpOptions with options equivalent to those
153  * found in the given RestoreOptions.
154  */
155 DumpOptions *
dumpOptionsFromRestoreOptions(RestoreOptions * ropt)156 dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
157 {
158 	DumpOptions *dopt = NewDumpOptions();
159 
160 	/* this is the inverse of what's at the end of pg_dump.c's main() */
161 	dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
162 	dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
163 	dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
164 	dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
165 	dopt->cparams.promptPassword = ropt->cparams.promptPassword;
166 	dopt->outputClean = ropt->dropSchema;
167 	dopt->dataOnly = ropt->dataOnly;
168 	dopt->schemaOnly = ropt->schemaOnly;
169 	dopt->if_exists = ropt->if_exists;
170 	dopt->column_inserts = ropt->column_inserts;
171 	dopt->dumpSections = ropt->dumpSections;
172 	dopt->aclsSkip = ropt->aclsSkip;
173 	dopt->outputSuperuser = ropt->superuser;
174 	dopt->outputCreateDB = ropt->createDB;
175 	dopt->outputNoOwner = ropt->noOwner;
176 	dopt->outputNoTablespaces = ropt->noTablespace;
177 	dopt->disable_triggers = ropt->disable_triggers;
178 	dopt->use_setsessauth = ropt->use_setsessauth;
179 
180 	dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
181 	dopt->dump_inserts = ropt->dump_inserts;
182 	dopt->no_publications = ropt->no_publications;
183 	dopt->no_security_labels = ropt->no_security_labels;
184 	dopt->no_subscriptions = ropt->no_subscriptions;
185 	dopt->lockWaitTimeout = ropt->lockWaitTimeout;
186 	dopt->include_everything = ropt->include_everything;
187 	dopt->enable_row_security = ropt->enable_row_security;
188 	dopt->sequence_data = ropt->sequence_data;
189 
190 	return dopt;
191 }
192 
193 
194 /*
195  *	Wrapper functions.
196  *
197  *	The objective it to make writing new formats and dumpers as simple
198  *	as possible, if necessary at the expense of extra function calls etc.
199  *
200  */
201 
202 /*
203  * The dump worker setup needs lots of knowledge of the internals of pg_dump,
204  * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
205  * setup doesn't need to know anything much, so it's defined here.
206  */
207 static void
setupRestoreWorker(Archive * AHX)208 setupRestoreWorker(Archive *AHX)
209 {
210 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
211 
212 	(AH->ReopenPtr) (AH);
213 }
214 
215 
216 /* Create a new archive */
217 /* Public */
218 Archive *
CreateArchive(const char * FileSpec,const ArchiveFormat fmt,const int compression,bool dosync,ArchiveMode mode,SetupWorkerPtrType setupDumpWorker)219 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
220 			  const int compression, bool dosync, ArchiveMode mode,
221 			  SetupWorkerPtrType setupDumpWorker)
222 
223 {
224 	ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, dosync,
225 								 mode, setupDumpWorker);
226 
227 	return (Archive *) AH;
228 }
229 
230 /* Open an existing archive */
231 /* Public */
232 Archive *
OpenArchive(const char * FileSpec,const ArchiveFormat fmt)233 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
234 {
235 	ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker);
236 
237 	return (Archive *) AH;
238 }
239 
240 /* Public */
241 void
CloseArchive(Archive * AHX)242 CloseArchive(Archive *AHX)
243 {
244 	int			res = 0;
245 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
246 
247 	(*AH->ClosePtr) (AH);
248 
249 	/* Close the output */
250 	if (AH->gzOut)
251 		res = GZCLOSE(AH->OF);
252 	else if (AH->OF != stdout)
253 		res = fclose(AH->OF);
254 
255 	if (res != 0)
256 		exit_horribly(modulename, "could not close output file: %s\n",
257 					  strerror(errno));
258 }
259 
260 /* Public */
261 void
SetArchiveOptions(Archive * AH,DumpOptions * dopt,RestoreOptions * ropt)262 SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
263 {
264 	/* Caller can omit dump options, in which case we synthesize them */
265 	if (dopt == NULL && ropt != NULL)
266 		dopt = dumpOptionsFromRestoreOptions(ropt);
267 
268 	/* Save options for later access */
269 	AH->dopt = dopt;
270 	AH->ropt = ropt;
271 }
272 
273 /* Public */
274 void
ProcessArchiveRestoreOptions(Archive * AHX)275 ProcessArchiveRestoreOptions(Archive *AHX)
276 {
277 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
278 	RestoreOptions *ropt = AH->public.ropt;
279 	TocEntry   *te;
280 	teSection	curSection;
281 
282 	/* Decide which TOC entries will be dumped/restored, and mark them */
283 	curSection = SECTION_PRE_DATA;
284 	for (te = AH->toc->next; te != AH->toc; te = te->next)
285 	{
286 		/*
287 		 * When writing an archive, we also take this opportunity to check
288 		 * that we have generated the entries in a sane order that respects
289 		 * the section divisions.  When reading, don't complain, since buggy
290 		 * old versions of pg_dump might generate out-of-order archives.
291 		 */
292 		if (AH->mode != archModeRead)
293 		{
294 			switch (te->section)
295 			{
296 				case SECTION_NONE:
297 					/* ok to be anywhere */
298 					break;
299 				case SECTION_PRE_DATA:
300 					if (curSection != SECTION_PRE_DATA)
301 						write_msg(modulename,
302 								  "WARNING: archive items not in correct section order\n");
303 					break;
304 				case SECTION_DATA:
305 					if (curSection == SECTION_POST_DATA)
306 						write_msg(modulename,
307 								  "WARNING: archive items not in correct section order\n");
308 					break;
309 				case SECTION_POST_DATA:
310 					/* ok no matter which section we were in */
311 					break;
312 				default:
313 					exit_horribly(modulename, "unexpected section code %d\n",
314 								  (int) te->section);
315 					break;
316 			}
317 		}
318 
319 		if (te->section != SECTION_NONE)
320 			curSection = te->section;
321 
322 		te->reqs = _tocEntryRequired(te, curSection, ropt);
323 	}
324 
325 	/* Enforce strict names checking */
326 	if (ropt->strict_names)
327 		StrictNamesCheck(ropt);
328 }
329 
330 /* Public */
331 void
RestoreArchive(Archive * AHX)332 RestoreArchive(Archive *AHX)
333 {
334 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
335 	RestoreOptions *ropt = AH->public.ropt;
336 	bool		parallel_mode;
337 	TocEntry   *te;
338 	OutputContext sav;
339 
340 	AH->stage = STAGE_INITIALIZING;
341 
342 	/*
343 	 * Check for nonsensical option combinations.
344 	 *
345 	 * -C is not compatible with -1, because we can't create a database inside
346 	 * a transaction block.
347 	 */
348 	if (ropt->createDB && ropt->single_txn)
349 		exit_horribly(modulename, "-C and -1 are incompatible options\n");
350 
351 	/*
352 	 * If we're going to do parallel restore, there are some restrictions.
353 	 */
354 	parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
355 	if (parallel_mode)
356 	{
357 		/* We haven't got round to making this work for all archive formats */
358 		if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
359 			exit_horribly(modulename, "parallel restore is not supported with this archive file format\n");
360 
361 		/* Doesn't work if the archive represents dependencies as OIDs */
362 		if (AH->version < K_VERS_1_8)
363 			exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
364 
365 		/*
366 		 * It's also not gonna work if we can't reopen the input file, so
367 		 * let's try that immediately.
368 		 */
369 		(AH->ReopenPtr) (AH);
370 	}
371 
372 	/*
373 	 * Make sure we won't need (de)compression we haven't got
374 	 */
375 #ifndef HAVE_LIBZ
376 	if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
377 	{
378 		for (te = AH->toc->next; te != AH->toc; te = te->next)
379 		{
380 			if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
381 				exit_horribly(modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
382 		}
383 	}
384 #endif
385 
386 	/*
387 	 * Prepare index arrays, so we can assume we have them throughout restore.
388 	 * It's possible we already did this, though.
389 	 */
390 	if (AH->tocsByDumpId == NULL)
391 		buildTocEntryArrays(AH);
392 
393 	/*
394 	 * If we're using a DB connection, then connect it.
395 	 */
396 	if (ropt->useDB)
397 	{
398 		ahlog(AH, 1, "connecting to database for restore\n");
399 		if (AH->version < K_VERS_1_3)
400 			exit_horribly(modulename, "direct database connections are not supported in pre-1.3 archives\n");
401 
402 		/*
403 		 * We don't want to guess at whether the dump will successfully
404 		 * restore; allow the attempt regardless of the version of the restore
405 		 * target.
406 		 */
407 		AHX->minRemoteVersion = 0;
408 		AHX->maxRemoteVersion = 9999999;
409 
410 		ConnectDatabase(AHX, &ropt->cparams, false);
411 
412 		/*
413 		 * If we're talking to the DB directly, don't send comments since they
414 		 * obscure SQL when displaying errors
415 		 */
416 		AH->noTocComments = 1;
417 	}
418 
419 	/*
420 	 * Work out if we have an implied data-only restore. This can happen if
421 	 * the dump was data only or if the user has used a toc list to exclude
422 	 * all of the schema data. All we do is look for schema entries - if none
423 	 * are found then we set the dataOnly flag.
424 	 *
425 	 * We could scan for wanted TABLE entries, but that is not the same as
426 	 * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
427 	 */
428 	if (!ropt->dataOnly)
429 	{
430 		int			impliedDataOnly = 1;
431 
432 		for (te = AH->toc->next; te != AH->toc; te = te->next)
433 		{
434 			if ((te->reqs & REQ_SCHEMA) != 0)
435 			{					/* It's schema, and it's wanted */
436 				impliedDataOnly = 0;
437 				break;
438 			}
439 		}
440 		if (impliedDataOnly)
441 		{
442 			ropt->dataOnly = impliedDataOnly;
443 			ahlog(AH, 1, "implied data-only restore\n");
444 		}
445 	}
446 
447 	/*
448 	 * Setup the output file if necessary.
449 	 */
450 	sav = SaveOutput(AH);
451 	if (ropt->filename || ropt->compression)
452 		SetOutput(AH, ropt->filename, ropt->compression);
453 
454 	ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
455 
456 	if (AH->archiveRemoteVersion)
457 		ahprintf(AH, "-- Dumped from database version %s\n",
458 				 AH->archiveRemoteVersion);
459 	if (AH->archiveDumpVersion)
460 		ahprintf(AH, "-- Dumped by pg_dump version %s\n",
461 				 AH->archiveDumpVersion);
462 
463 	ahprintf(AH, "\n");
464 
465 	if (AH->public.verbose)
466 		dumpTimestamp(AH, "Started on", AH->createDate);
467 
468 	if (ropt->single_txn)
469 	{
470 		if (AH->connection)
471 			StartTransaction(AHX);
472 		else
473 			ahprintf(AH, "BEGIN;\n\n");
474 	}
475 
476 	/*
477 	 * Establish important parameter values right away.
478 	 */
479 	_doSetFixedOutputState(AH);
480 
481 	AH->stage = STAGE_PROCESSING;
482 
483 	/*
484 	 * Drop the items at the start, in reverse order
485 	 */
486 	if (ropt->dropSchema)
487 	{
488 		for (te = AH->toc->prev; te != AH->toc; te = te->prev)
489 		{
490 			AH->currentTE = te;
491 
492 			/*
493 			 * In createDB mode, issue a DROP *only* for the database as a
494 			 * whole.  Issuing drops against anything else would be wrong,
495 			 * because at this point we're connected to the wrong database.
496 			 * Conversely, if we're not in createDB mode, we'd better not
497 			 * issue a DROP against the database at all.
498 			 */
499 			if (ropt->createDB)
500 			{
501 				if (strcmp(te->desc, "DATABASE") != 0)
502 					continue;
503 			}
504 			else
505 			{
506 				if (strcmp(te->desc, "DATABASE") == 0)
507 					continue;
508 			}
509 
510 			/* Otherwise, drop anything that's selected and has a dropStmt */
511 			if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
512 			{
513 				ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
514 				/* Select owner and schema as necessary */
515 				_becomeOwner(AH, te);
516 				_selectOutputSchema(AH, te->namespace);
517 
518 				/*
519 				 * Now emit the DROP command, if the object has one.  Note we
520 				 * don't necessarily emit it verbatim; at this point we add an
521 				 * appropriate IF EXISTS clause, if the user requested it.
522 				 */
523 				if (*te->dropStmt != '\0')
524 				{
525 					if (!ropt->if_exists)
526 					{
527 						/* No --if-exists?	Then just use the original */
528 						ahprintf(AH, "%s", te->dropStmt);
529 					}
530 					else
531 					{
532 						/*
533 						 * Inject an appropriate spelling of "if exists".  For
534 						 * large objects, we have a separate routine that
535 						 * knows how to do it, without depending on
536 						 * te->dropStmt; use that.  For other objects we need
537 						 * to parse the command.
538 						 */
539 						if (strncmp(te->desc, "BLOB", 4) == 0)
540 						{
541 							DropBlobIfExists(AH, te->catalogId.oid);
542 						}
543 						else
544 						{
545 							char	   *dropStmt = pg_strdup(te->dropStmt);
546 							char	   *dropStmtOrig = dropStmt;
547 							PQExpBuffer ftStmt = createPQExpBuffer();
548 
549 							/*
550 							 * Need to inject IF EXISTS clause after ALTER
551 							 * TABLE part in ALTER TABLE .. DROP statement
552 							 */
553 							if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
554 							{
555 								appendPQExpBuffer(ftStmt,
556 												  "ALTER TABLE IF EXISTS");
557 								dropStmt = dropStmt + 11;
558 							}
559 
560 							/*
561 							 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
562 							 * not support the IF EXISTS clause, and therefore
563 							 * we simply emit the original command for DEFAULT
564 							 * objects (modulo the adjustment made above).
565 							 *
566 							 * If we used CREATE OR REPLACE VIEW as a means of
567 							 * quasi-dropping an ON SELECT rule, that should
568 							 * be emitted unchanged as well.
569 							 *
570 							 * For other object types, we need to extract the
571 							 * first part of the DROP which includes the
572 							 * object type.  Most of the time this matches
573 							 * te->desc, so search for that; however for the
574 							 * different kinds of CONSTRAINTs, we know to
575 							 * search for hardcoded "DROP CONSTRAINT" instead.
576 							 */
577 							if (strcmp(te->desc, "DEFAULT") == 0 ||
578 								strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
579 								appendPQExpBufferStr(ftStmt, dropStmt);
580 							else
581 							{
582 								char		buffer[40];
583 								char	   *mark;
584 
585 								if (strcmp(te->desc, "CONSTRAINT") == 0 ||
586 									strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
587 									strcmp(te->desc, "FK CONSTRAINT") == 0)
588 									strcpy(buffer, "DROP CONSTRAINT");
589 								else
590 									snprintf(buffer, sizeof(buffer), "DROP %s",
591 											 te->desc);
592 
593 								mark = strstr(dropStmt, buffer);
594 
595 								if (mark)
596 								{
597 									*mark = '\0';
598 									appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
599 													  dropStmt, buffer,
600 													  mark + strlen(buffer));
601 								}
602 								else
603 								{
604 									/* complain and emit unmodified command */
605 									write_msg(modulename,
606 											  "WARNING: could not find where to insert IF EXISTS in statement \"%s\"\n",
607 											  dropStmtOrig);
608 									appendPQExpBufferStr(ftStmt, dropStmt);
609 								}
610 							}
611 
612 							ahprintf(AH, "%s", ftStmt->data);
613 
614 							destroyPQExpBuffer(ftStmt);
615 							pg_free(dropStmtOrig);
616 						}
617 					}
618 				}
619 			}
620 		}
621 
622 		/*
623 		 * _selectOutputSchema may have set currSchema to reflect the effect
624 		 * of a "SET search_path" command it emitted.  However, by now we may
625 		 * have dropped that schema; or it might not have existed in the first
626 		 * place.  In either case the effective value of search_path will not
627 		 * be what we think.  Forcibly reset currSchema so that we will
628 		 * re-establish the search_path setting when needed (after creating
629 		 * the schema).
630 		 *
631 		 * If we treated users as pg_dump'able objects then we'd need to reset
632 		 * currUser here too.
633 		 */
634 		if (AH->currSchema)
635 			free(AH->currSchema);
636 		AH->currSchema = NULL;
637 	}
638 
639 	if (parallel_mode)
640 	{
641 		/*
642 		 * In parallel mode, turn control over to the parallel-restore logic.
643 		 */
644 		ParallelState *pstate;
645 		TocEntry	pending_list;
646 
647 		par_list_header_init(&pending_list);
648 
649 		/* This runs PRE_DATA items and then disconnects from the database */
650 		restore_toc_entries_prefork(AH, &pending_list);
651 		Assert(AH->connection == NULL);
652 
653 		/* ParallelBackupStart() will actually fork the processes */
654 		pstate = ParallelBackupStart(AH);
655 		restore_toc_entries_parallel(AH, pstate, &pending_list);
656 		ParallelBackupEnd(AH, pstate);
657 
658 		/* reconnect the master and see if we missed something */
659 		restore_toc_entries_postfork(AH, &pending_list);
660 		Assert(AH->connection != NULL);
661 	}
662 	else
663 	{
664 		/*
665 		 * In serial mode, process everything in three phases: normal items,
666 		 * then ACLs, then post-ACL items.  We might be able to skip one or
667 		 * both extra phases in some cases, eg data-only restores.
668 		 */
669 		bool		haveACL = false;
670 		bool		havePostACL = false;
671 
672 		for (te = AH->toc->next; te != AH->toc; te = te->next)
673 		{
674 			if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
675 				continue;		/* ignore if not to be dumped at all */
676 
677 			switch (_tocEntryRestorePass(te))
678 			{
679 				case RESTORE_PASS_MAIN:
680 					(void) restore_toc_entry(AH, te, false);
681 					break;
682 				case RESTORE_PASS_ACL:
683 					haveACL = true;
684 					break;
685 				case RESTORE_PASS_POST_ACL:
686 					havePostACL = true;
687 					break;
688 			}
689 		}
690 
691 		if (haveACL)
692 		{
693 			for (te = AH->toc->next; te != AH->toc; te = te->next)
694 			{
695 				if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
696 					_tocEntryRestorePass(te) == RESTORE_PASS_ACL)
697 					(void) restore_toc_entry(AH, te, false);
698 			}
699 		}
700 
701 		if (havePostACL)
702 		{
703 			for (te = AH->toc->next; te != AH->toc; te = te->next)
704 			{
705 				if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
706 					_tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
707 					(void) restore_toc_entry(AH, te, false);
708 			}
709 		}
710 	}
711 
712 	if (ropt->single_txn)
713 	{
714 		if (AH->connection)
715 			CommitTransaction(AHX);
716 		else
717 			ahprintf(AH, "COMMIT;\n\n");
718 	}
719 
720 	if (AH->public.verbose)
721 		dumpTimestamp(AH, "Completed on", time(NULL));
722 
723 	ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
724 
725 	/*
726 	 * Clean up & we're done.
727 	 */
728 	AH->stage = STAGE_FINALIZING;
729 
730 	if (ropt->filename || ropt->compression)
731 		RestoreOutput(AH, sav);
732 
733 	if (ropt->useDB)
734 		DisconnectDatabase(&AH->public);
735 }
736 
737 /*
738  * Restore a single TOC item.  Used in both parallel and non-parallel restore;
739  * is_parallel is true if we are in a worker child process.
740  *
741  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
742  * the parallel parent has to make the corresponding status update.
743  */
744 static int
restore_toc_entry(ArchiveHandle * AH,TocEntry * te,bool is_parallel)745 restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
746 {
747 	RestoreOptions *ropt = AH->public.ropt;
748 	int			status = WORKER_OK;
749 	teReqs		reqs;
750 	bool		defnDumped;
751 
752 	AH->currentTE = te;
753 
754 	/* Work out what, if anything, we want from this entry */
755 	reqs = te->reqs;
756 
757 	/*
758 	 * Ignore DATABASE entry unless we should create it.  We must check this
759 	 * here, not in _tocEntryRequired, because the createDB option should not
760 	 * affect emitting a DATABASE entry to an archive file.
761 	 */
762 	if (!ropt->createDB && strcmp(te->desc, "DATABASE") == 0)
763 		reqs = 0;
764 
765 	/* Dump any relevant dump warnings to stderr */
766 	if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
767 	{
768 		if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
769 			write_msg(modulename, "warning from original dump file: %s\n", te->defn);
770 		else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
771 			write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
772 	}
773 
774 	defnDumped = false;
775 
776 	/*
777 	 * If it has a schema component that we want, then process that
778 	 */
779 	if ((reqs & REQ_SCHEMA) != 0)
780 	{
781 		/* Show namespace in log message if available */
782 		if (te->namespace)
783 			ahlog(AH, 1, "creating %s \"%s.%s\"\n",
784 				  te->desc, te->namespace, te->tag);
785 		else
786 			ahlog(AH, 1, "creating %s \"%s\"\n", te->desc, te->tag);
787 
788 		_printTocEntry(AH, te, false);
789 		defnDumped = true;
790 
791 		if (strcmp(te->desc, "TABLE") == 0)
792 		{
793 			if (AH->lastErrorTE == te)
794 			{
795 				/*
796 				 * We failed to create the table. If
797 				 * --no-data-for-failed-tables was given, mark the
798 				 * corresponding TABLE DATA to be ignored.
799 				 *
800 				 * In the parallel case this must be done in the parent, so we
801 				 * just set the return value.
802 				 */
803 				if (ropt->noDataForFailedTables)
804 				{
805 					if (is_parallel)
806 						status = WORKER_INHIBIT_DATA;
807 					else
808 						inhibit_data_for_failed_table(AH, te);
809 				}
810 			}
811 			else
812 			{
813 				/*
814 				 * We created the table successfully.  Mark the corresponding
815 				 * TABLE DATA for possible truncation.
816 				 *
817 				 * In the parallel case this must be done in the parent, so we
818 				 * just set the return value.
819 				 */
820 				if (is_parallel)
821 					status = WORKER_CREATE_DONE;
822 				else
823 					mark_create_done(AH, te);
824 			}
825 		}
826 
827 		/* If we created a DB, connect to it... */
828 		if (strcmp(te->desc, "DATABASE") == 0)
829 		{
830 			ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
831 			_reconnectToDB(AH, te->tag);
832 		}
833 	}
834 
835 	/*
836 	 * If it has a data component that we want, then process that
837 	 */
838 	if ((reqs & REQ_DATA) != 0)
839 	{
840 		/*
841 		 * hadDumper will be set if there is genuine data component for this
842 		 * node. Otherwise, we need to check the defn field for statements
843 		 * that need to be executed in data-only restores.
844 		 */
845 		if (te->hadDumper)
846 		{
847 			/*
848 			 * If we can output the data, then restore it.
849 			 */
850 			if (AH->PrintTocDataPtr != NULL)
851 			{
852 				_printTocEntry(AH, te, true);
853 
854 				if (strcmp(te->desc, "BLOBS") == 0 ||
855 					strcmp(te->desc, "BLOB COMMENTS") == 0)
856 				{
857 					ahlog(AH, 1, "processing %s\n", te->desc);
858 
859 					_selectOutputSchema(AH, "pg_catalog");
860 
861 					/* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
862 					if (strcmp(te->desc, "BLOB COMMENTS") == 0)
863 						AH->outputKind = OUTPUT_OTHERDATA;
864 
865 					(*AH->PrintTocDataPtr) (AH, te);
866 
867 					AH->outputKind = OUTPUT_SQLCMDS;
868 				}
869 				else
870 				{
871 					_disableTriggersIfNecessary(AH, te);
872 
873 					/* Select owner and schema as necessary */
874 					_becomeOwner(AH, te);
875 					_selectOutputSchema(AH, te->namespace);
876 
877 					ahlog(AH, 1, "processing data for table \"%s.%s\"\n",
878 						  te->namespace, te->tag);
879 
880 					/*
881 					 * In parallel restore, if we created the table earlier in
882 					 * the run then we wrap the COPY in a transaction and
883 					 * precede it with a TRUNCATE.  If archiving is not on
884 					 * this prevents WAL-logging the COPY.  This obtains a
885 					 * speedup similar to that from using single_txn mode in
886 					 * non-parallel restores.
887 					 */
888 					if (is_parallel && te->created)
889 					{
890 						/*
891 						 * Parallel restore is always talking directly to a
892 						 * server, so no need to see if we should issue BEGIN.
893 						 */
894 						StartTransaction(&AH->public);
895 
896 						/*
897 						 * If the server version is >= 8.4, make sure we issue
898 						 * TRUNCATE with ONLY so that child tables are not
899 						 * wiped.
900 						 */
901 						ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
902 								 (PQserverVersion(AH->connection) >= 80400 ?
903 								  "ONLY " : ""),
904 								 fmtQualifiedId(PQserverVersion(AH->connection),
905 												te->namespace,
906 												te->tag));
907 					}
908 
909 					/*
910 					 * If we have a copy statement, use it.
911 					 */
912 					if (te->copyStmt && strlen(te->copyStmt) > 0)
913 					{
914 						ahprintf(AH, "%s", te->copyStmt);
915 						AH->outputKind = OUTPUT_COPYDATA;
916 					}
917 					else
918 						AH->outputKind = OUTPUT_OTHERDATA;
919 
920 					(*AH->PrintTocDataPtr) (AH, te);
921 
922 					/*
923 					 * Terminate COPY if needed.
924 					 */
925 					if (AH->outputKind == OUTPUT_COPYDATA &&
926 						RestoringToDB(AH))
927 						EndDBCopyMode(&AH->public, te->tag);
928 					AH->outputKind = OUTPUT_SQLCMDS;
929 
930 					/* close out the transaction started above */
931 					if (is_parallel && te->created)
932 						CommitTransaction(&AH->public);
933 
934 					_enableTriggersIfNecessary(AH, te);
935 				}
936 			}
937 		}
938 		else if (!defnDumped)
939 		{
940 			/* If we haven't already dumped the defn part, do so now */
941 			ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
942 			_printTocEntry(AH, te, false);
943 		}
944 	}
945 
946 	if (AH->public.n_errors > 0 && status == WORKER_OK)
947 		status = WORKER_IGNORED_ERRORS;
948 
949 	return status;
950 }
951 
952 /*
953  * Allocate a new RestoreOptions block.
954  * This is mainly so we can initialize it, but also for future expansion,
955  */
956 RestoreOptions *
NewRestoreOptions(void)957 NewRestoreOptions(void)
958 {
959 	RestoreOptions *opts;
960 
961 	opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
962 
963 	/* set any fields that shouldn't default to zeroes */
964 	opts->format = archUnknown;
965 	opts->cparams.promptPassword = TRI_DEFAULT;
966 	opts->dumpSections = DUMP_UNSECTIONED;
967 
968 	return opts;
969 }
970 
971 static void
_disableTriggersIfNecessary(ArchiveHandle * AH,TocEntry * te)972 _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
973 {
974 	RestoreOptions *ropt = AH->public.ropt;
975 
976 	/* This hack is only needed in a data-only restore */
977 	if (!ropt->dataOnly || !ropt->disable_triggers)
978 		return;
979 
980 	ahlog(AH, 1, "disabling triggers for %s\n", te->tag);
981 
982 	/*
983 	 * Become superuser if possible, since they are the only ones who can
984 	 * disable constraint triggers.  If -S was not given, assume the initial
985 	 * user identity is a superuser.  (XXX would it be better to become the
986 	 * table owner?)
987 	 */
988 	_becomeUser(AH, ropt->superuser);
989 
990 	/*
991 	 * Disable them.  Assume that the table name should be schema-qualified
992 	 * (we can't look at PQserverVersion, since we might not have any
993 	 * connection; and anyway we don't promise our output will load pre-7.3).
994 	 */
995 	ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
996 			 fmtQualifiedId(70300,
997 							te->namespace,
998 							te->tag));
999 }
1000 
1001 static void
_enableTriggersIfNecessary(ArchiveHandle * AH,TocEntry * te)1002 _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1003 {
1004 	RestoreOptions *ropt = AH->public.ropt;
1005 
1006 	/* This hack is only needed in a data-only restore */
1007 	if (!ropt->dataOnly || !ropt->disable_triggers)
1008 		return;
1009 
1010 	ahlog(AH, 1, "enabling triggers for %s\n", te->tag);
1011 
1012 	/*
1013 	 * Become superuser if possible, since they are the only ones who can
1014 	 * disable constraint triggers.  If -S was not given, assume the initial
1015 	 * user identity is a superuser.  (XXX would it be better to become the
1016 	 * table owner?)
1017 	 */
1018 	_becomeUser(AH, ropt->superuser);
1019 
1020 	/*
1021 	 * Enable them.  As above, force schema qualification.
1022 	 */
1023 	ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1024 			 fmtQualifiedId(70300,
1025 							te->namespace,
1026 							te->tag));
1027 }
1028 
1029 /*
1030  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1031  */
1032 
1033 /* Public */
1034 void
WriteData(Archive * AHX,const void * data,size_t dLen)1035 WriteData(Archive *AHX, const void *data, size_t dLen)
1036 {
1037 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1038 
1039 	if (!AH->currToc)
1040 		exit_horribly(modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
1041 
1042 	(*AH->WriteDataPtr) (AH, data, dLen);
1043 
1044 	return;
1045 }
1046 
1047 /*
1048  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1049  * repository for all metadata. But the name has stuck.
1050  */
1051 
1052 /* Public */
1053 void
ArchiveEntry(Archive * AHX,CatalogId catalogId,DumpId dumpId,const char * tag,const char * namespace,const char * tablespace,const char * owner,bool withOids,const char * desc,teSection section,const char * defn,const char * dropStmt,const char * copyStmt,const DumpId * deps,int nDeps,DataDumperPtr dumpFn,void * dumpArg)1054 ArchiveEntry(Archive *AHX,
1055 			 CatalogId catalogId, DumpId dumpId,
1056 			 const char *tag,
1057 			 const char *namespace,
1058 			 const char *tablespace,
1059 			 const char *owner, bool withOids,
1060 			 const char *desc, teSection section,
1061 			 const char *defn,
1062 			 const char *dropStmt, const char *copyStmt,
1063 			 const DumpId *deps, int nDeps,
1064 			 DataDumperPtr dumpFn, void *dumpArg)
1065 {
1066 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1067 	TocEntry   *newToc;
1068 
1069 	newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1070 
1071 	AH->tocCount++;
1072 	if (dumpId > AH->maxDumpId)
1073 		AH->maxDumpId = dumpId;
1074 
1075 	newToc->prev = AH->toc->prev;
1076 	newToc->next = AH->toc;
1077 	AH->toc->prev->next = newToc;
1078 	AH->toc->prev = newToc;
1079 
1080 	newToc->catalogId = catalogId;
1081 	newToc->dumpId = dumpId;
1082 	newToc->section = section;
1083 
1084 	newToc->tag = pg_strdup(tag);
1085 	newToc->namespace = namespace ? pg_strdup(namespace) : NULL;
1086 	newToc->tablespace = tablespace ? pg_strdup(tablespace) : NULL;
1087 	newToc->owner = pg_strdup(owner);
1088 	newToc->withOids = withOids;
1089 	newToc->desc = pg_strdup(desc);
1090 	newToc->defn = pg_strdup(defn);
1091 	newToc->dropStmt = pg_strdup(dropStmt);
1092 	newToc->copyStmt = copyStmt ? pg_strdup(copyStmt) : NULL;
1093 
1094 	if (nDeps > 0)
1095 	{
1096 		newToc->dependencies = (DumpId *) pg_malloc(nDeps * sizeof(DumpId));
1097 		memcpy(newToc->dependencies, deps, nDeps * sizeof(DumpId));
1098 		newToc->nDeps = nDeps;
1099 	}
1100 	else
1101 	{
1102 		newToc->dependencies = NULL;
1103 		newToc->nDeps = 0;
1104 	}
1105 
1106 	newToc->dataDumper = dumpFn;
1107 	newToc->dataDumperArg = dumpArg;
1108 	newToc->hadDumper = dumpFn ? true : false;
1109 
1110 	newToc->formatData = NULL;
1111 
1112 	if (AH->ArchiveEntryPtr != NULL)
1113 		(*AH->ArchiveEntryPtr) (AH, newToc);
1114 }
1115 
1116 /* Public */
1117 void
PrintTOCSummary(Archive * AHX)1118 PrintTOCSummary(Archive *AHX)
1119 {
1120 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1121 	RestoreOptions *ropt = AH->public.ropt;
1122 	TocEntry   *te;
1123 	teSection	curSection;
1124 	OutputContext sav;
1125 	const char *fmtName;
1126 	char		stamp_str[64];
1127 
1128 	sav = SaveOutput(AH);
1129 	if (ropt->filename)
1130 		SetOutput(AH, ropt->filename, 0 /* no compression */ );
1131 
1132 	if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1133 				 localtime(&AH->createDate)) == 0)
1134 		strcpy(stamp_str, "[unknown]");
1135 
1136 	ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1137 	ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
1138 			 replace_line_endings(AH->archdbname),
1139 			 AH->tocCount, AH->compression);
1140 
1141 	switch (AH->format)
1142 	{
1143 		case archCustom:
1144 			fmtName = "CUSTOM";
1145 			break;
1146 		case archDirectory:
1147 			fmtName = "DIRECTORY";
1148 			break;
1149 		case archTar:
1150 			fmtName = "TAR";
1151 			break;
1152 		default:
1153 			fmtName = "UNKNOWN";
1154 	}
1155 
1156 	ahprintf(AH, ";     Dump Version: %d.%d-%d\n",
1157 			 ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1158 	ahprintf(AH, ";     Format: %s\n", fmtName);
1159 	ahprintf(AH, ";     Integer: %d bytes\n", (int) AH->intSize);
1160 	ahprintf(AH, ";     Offset: %d bytes\n", (int) AH->offSize);
1161 	if (AH->archiveRemoteVersion)
1162 		ahprintf(AH, ";     Dumped from database version: %s\n",
1163 				 AH->archiveRemoteVersion);
1164 	if (AH->archiveDumpVersion)
1165 		ahprintf(AH, ";     Dumped by pg_dump version: %s\n",
1166 				 AH->archiveDumpVersion);
1167 
1168 	ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1169 
1170 	curSection = SECTION_PRE_DATA;
1171 	for (te = AH->toc->next; te != AH->toc; te = te->next)
1172 	{
1173 		if (te->section != SECTION_NONE)
1174 			curSection = te->section;
1175 		if (ropt->verbose ||
1176 			(_tocEntryRequired(te, curSection, ropt) & (REQ_SCHEMA | REQ_DATA)) != 0)
1177 		{
1178 			char	   *sanitized_name;
1179 			char	   *sanitized_schema;
1180 			char	   *sanitized_owner;
1181 
1182 			/*
1183 			 * As in _printTocEntry(), sanitize strings that might contain
1184 			 * newlines, to ensure that each logical output line is in fact
1185 			 * one physical output line.  This prevents confusion when the
1186 			 * file is read by "pg_restore -L".  Note that we currently don't
1187 			 * bother to quote names, meaning that the name fields aren't
1188 			 * automatically parseable.  "pg_restore -L" doesn't care because
1189 			 * it only examines the dumpId field, but someday we might want to
1190 			 * try harder.
1191 			 */
1192 			sanitized_name = replace_line_endings(te->tag);
1193 			if (te->namespace)
1194 				sanitized_schema = replace_line_endings(te->namespace);
1195 			else
1196 				sanitized_schema = pg_strdup("-");
1197 			sanitized_owner = replace_line_endings(te->owner);
1198 
1199 			ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1200 					 te->catalogId.tableoid, te->catalogId.oid,
1201 					 te->desc, sanitized_schema, sanitized_name,
1202 					 sanitized_owner);
1203 
1204 			free(sanitized_name);
1205 			free(sanitized_schema);
1206 			free(sanitized_owner);
1207 		}
1208 		if (ropt->verbose && te->nDeps > 0)
1209 		{
1210 			int			i;
1211 
1212 			ahprintf(AH, ";\tdepends on:");
1213 			for (i = 0; i < te->nDeps; i++)
1214 				ahprintf(AH, " %d", te->dependencies[i]);
1215 			ahprintf(AH, "\n");
1216 		}
1217 	}
1218 
1219 	/* Enforce strict names checking */
1220 	if (ropt->strict_names)
1221 		StrictNamesCheck(ropt);
1222 
1223 	if (ropt->filename)
1224 		RestoreOutput(AH, sav);
1225 }
1226 
1227 /***********
1228  * BLOB Archival
1229  ***********/
1230 
1231 /* Called by a dumper to signal start of a BLOB */
1232 int
StartBlob(Archive * AHX,Oid oid)1233 StartBlob(Archive *AHX, Oid oid)
1234 {
1235 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1236 
1237 	if (!AH->StartBlobPtr)
1238 		exit_horribly(modulename, "large-object output not supported in chosen format\n");
1239 
1240 	(*AH->StartBlobPtr) (AH, AH->currToc, oid);
1241 
1242 	return 1;
1243 }
1244 
1245 /* Called by a dumper to signal end of a BLOB */
1246 int
EndBlob(Archive * AHX,Oid oid)1247 EndBlob(Archive *AHX, Oid oid)
1248 {
1249 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1250 
1251 	if (AH->EndBlobPtr)
1252 		(*AH->EndBlobPtr) (AH, AH->currToc, oid);
1253 
1254 	return 1;
1255 }
1256 
1257 /**********
1258  * BLOB Restoration
1259  **********/
1260 
1261 /*
1262  * Called by a format handler before any blobs are restored
1263  */
1264 void
StartRestoreBlobs(ArchiveHandle * AH)1265 StartRestoreBlobs(ArchiveHandle *AH)
1266 {
1267 	RestoreOptions *ropt = AH->public.ropt;
1268 
1269 	if (!ropt->single_txn)
1270 	{
1271 		if (AH->connection)
1272 			StartTransaction(&AH->public);
1273 		else
1274 			ahprintf(AH, "BEGIN;\n\n");
1275 	}
1276 
1277 	AH->blobCount = 0;
1278 }
1279 
1280 /*
1281  * Called by a format handler after all blobs are restored
1282  */
1283 void
EndRestoreBlobs(ArchiveHandle * AH)1284 EndRestoreBlobs(ArchiveHandle *AH)
1285 {
1286 	RestoreOptions *ropt = AH->public.ropt;
1287 
1288 	if (!ropt->single_txn)
1289 	{
1290 		if (AH->connection)
1291 			CommitTransaction(&AH->public);
1292 		else
1293 			ahprintf(AH, "COMMIT;\n\n");
1294 	}
1295 
1296 	ahlog(AH, 1, ngettext("restored %d large object\n",
1297 						  "restored %d large objects\n",
1298 						  AH->blobCount),
1299 		  AH->blobCount);
1300 }
1301 
1302 
1303 /*
1304  * Called by a format handler to initiate restoration of a blob
1305  */
1306 void
StartRestoreBlob(ArchiveHandle * AH,Oid oid,bool drop)1307 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
1308 {
1309 	bool		old_blob_style = (AH->version < K_VERS_1_12);
1310 	Oid			loOid;
1311 
1312 	AH->blobCount++;
1313 
1314 	/* Initialize the LO Buffer */
1315 	AH->lo_buf_used = 0;
1316 
1317 	ahlog(AH, 1, "restoring large object with OID %u\n", oid);
1318 
1319 	/* With an old archive we must do drop and create logic here */
1320 	if (old_blob_style && drop)
1321 		DropBlobIfExists(AH, oid);
1322 
1323 	if (AH->connection)
1324 	{
1325 		if (old_blob_style)
1326 		{
1327 			loOid = lo_create(AH->connection, oid);
1328 			if (loOid == 0 || loOid != oid)
1329 				exit_horribly(modulename, "could not create large object %u: %s",
1330 							  oid, PQerrorMessage(AH->connection));
1331 		}
1332 		AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1333 		if (AH->loFd == -1)
1334 			exit_horribly(modulename, "could not open large object %u: %s",
1335 						  oid, PQerrorMessage(AH->connection));
1336 	}
1337 	else
1338 	{
1339 		if (old_blob_style)
1340 			ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1341 					 oid, INV_WRITE);
1342 		else
1343 			ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1344 					 oid, INV_WRITE);
1345 	}
1346 
1347 	AH->writingBlob = 1;
1348 }
1349 
1350 void
EndRestoreBlob(ArchiveHandle * AH,Oid oid)1351 EndRestoreBlob(ArchiveHandle *AH, Oid oid)
1352 {
1353 	if (AH->lo_buf_used > 0)
1354 	{
1355 		/* Write remaining bytes from the LO buffer */
1356 		dump_lo_buf(AH);
1357 	}
1358 
1359 	AH->writingBlob = 0;
1360 
1361 	if (AH->connection)
1362 	{
1363 		lo_close(AH->connection, AH->loFd);
1364 		AH->loFd = -1;
1365 	}
1366 	else
1367 	{
1368 		ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1369 	}
1370 }
1371 
1372 /***********
1373  * Sorting and Reordering
1374  ***********/
1375 
1376 void
SortTocFromFile(Archive * AHX)1377 SortTocFromFile(Archive *AHX)
1378 {
1379 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1380 	RestoreOptions *ropt = AH->public.ropt;
1381 	FILE	   *fh;
1382 	char		buf[100];
1383 	bool		incomplete_line;
1384 
1385 	/* Allocate space for the 'wanted' array, and init it */
1386 	ropt->idWanted = (bool *) pg_malloc(sizeof(bool) * AH->maxDumpId);
1387 	memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
1388 
1389 	/* Setup the file */
1390 	fh = fopen(ropt->tocFile, PG_BINARY_R);
1391 	if (!fh)
1392 		exit_horribly(modulename, "could not open TOC file \"%s\": %s\n",
1393 					  ropt->tocFile, strerror(errno));
1394 
1395 	incomplete_line = false;
1396 	while (fgets(buf, sizeof(buf), fh) != NULL)
1397 	{
1398 		bool		prev_incomplete_line = incomplete_line;
1399 		int			buflen;
1400 		char	   *cmnt;
1401 		char	   *endptr;
1402 		DumpId		id;
1403 		TocEntry   *te;
1404 
1405 		/*
1406 		 * Some lines in the file might be longer than sizeof(buf).  This is
1407 		 * no problem, since we only care about the leading numeric ID which
1408 		 * can be at most a few characters; but we have to skip continuation
1409 		 * bufferloads when processing a long line.
1410 		 */
1411 		buflen = strlen(buf);
1412 		if (buflen > 0 && buf[buflen - 1] == '\n')
1413 			incomplete_line = false;
1414 		else
1415 			incomplete_line = true;
1416 		if (prev_incomplete_line)
1417 			continue;
1418 
1419 		/* Truncate line at comment, if any */
1420 		cmnt = strchr(buf, ';');
1421 		if (cmnt != NULL)
1422 			cmnt[0] = '\0';
1423 
1424 		/* Ignore if all blank */
1425 		if (strspn(buf, " \t\r\n") == strlen(buf))
1426 			continue;
1427 
1428 		/* Get an ID, check it's valid and not already seen */
1429 		id = strtol(buf, &endptr, 10);
1430 		if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
1431 			ropt->idWanted[id - 1])
1432 		{
1433 			write_msg(modulename, "WARNING: line ignored: %s\n", buf);
1434 			continue;
1435 		}
1436 
1437 		/* Find TOC entry */
1438 		te = getTocEntryByDumpId(AH, id);
1439 		if (!te)
1440 			exit_horribly(modulename, "could not find entry for ID %d\n",
1441 						  id);
1442 
1443 		/* Mark it wanted */
1444 		ropt->idWanted[id - 1] = true;
1445 
1446 		/*
1447 		 * Move each item to the end of the list as it is selected, so that
1448 		 * they are placed in the desired order.  Any unwanted items will end
1449 		 * up at the front of the list, which may seem unintuitive but it's
1450 		 * what we need.  In an ordinary serial restore that makes no
1451 		 * difference, but in a parallel restore we need to mark unrestored
1452 		 * items' dependencies as satisfied before we start examining
1453 		 * restorable items.  Otherwise they could have surprising
1454 		 * side-effects on the order in which restorable items actually get
1455 		 * restored.
1456 		 */
1457 		_moveBefore(AH, AH->toc, te);
1458 	}
1459 
1460 	if (fclose(fh) != 0)
1461 		exit_horribly(modulename, "could not close TOC file: %s\n",
1462 					  strerror(errno));
1463 }
1464 
1465 /**********************
1466  * 'Convenience functions that look like standard IO functions
1467  * for writing data when in dump mode.
1468  **********************/
1469 
1470 /* Public */
1471 void
archputs(const char * s,Archive * AH)1472 archputs(const char *s, Archive *AH)
1473 {
1474 	WriteData(AH, s, strlen(s));
1475 	return;
1476 }
1477 
1478 /* Public */
1479 int
archprintf(Archive * AH,const char * fmt,...)1480 archprintf(Archive *AH, const char *fmt,...)
1481 {
1482 	char	   *p;
1483 	size_t		len = 128;		/* initial assumption about buffer size */
1484 	size_t		cnt;
1485 
1486 	for (;;)
1487 	{
1488 		va_list		args;
1489 
1490 		/* Allocate work buffer. */
1491 		p = (char *) pg_malloc(len);
1492 
1493 		/* Try to format the data. */
1494 		va_start(args, fmt);
1495 		cnt = pvsnprintf(p, len, fmt, args);
1496 		va_end(args);
1497 
1498 		if (cnt < len)
1499 			break;				/* success */
1500 
1501 		/* Release buffer and loop around to try again with larger len. */
1502 		free(p);
1503 		len = cnt;
1504 	}
1505 
1506 	WriteData(AH, p, cnt);
1507 	free(p);
1508 	return (int) cnt;
1509 }
1510 
1511 
1512 /*******************************
1513  * Stuff below here should be 'private' to the archiver routines
1514  *******************************/
1515 
1516 static void
SetOutput(ArchiveHandle * AH,const char * filename,int compression)1517 SetOutput(ArchiveHandle *AH, const char *filename, int compression)
1518 {
1519 	int			fn;
1520 
1521 	if (filename)
1522 	{
1523 		if (strcmp(filename, "-") == 0)
1524 			fn = fileno(stdout);
1525 		else
1526 			fn = -1;
1527 	}
1528 	else if (AH->FH)
1529 		fn = fileno(AH->FH);
1530 	else if (AH->fSpec)
1531 	{
1532 		fn = -1;
1533 		filename = AH->fSpec;
1534 	}
1535 	else
1536 		fn = fileno(stdout);
1537 
1538 	/* If compression explicitly requested, use gzopen */
1539 #ifdef HAVE_LIBZ
1540 	if (compression != 0)
1541 	{
1542 		char		fmode[10];
1543 
1544 		/* Don't use PG_BINARY_x since this is zlib */
1545 		sprintf(fmode, "wb%d", compression);
1546 		if (fn >= 0)
1547 			AH->OF = gzdopen(dup(fn), fmode);
1548 		else
1549 			AH->OF = gzopen(filename, fmode);
1550 		AH->gzOut = 1;
1551 	}
1552 	else
1553 #endif
1554 	{							/* Use fopen */
1555 		if (AH->mode == archModeAppend)
1556 		{
1557 			if (fn >= 0)
1558 				AH->OF = fdopen(dup(fn), PG_BINARY_A);
1559 			else
1560 				AH->OF = fopen(filename, PG_BINARY_A);
1561 		}
1562 		else
1563 		{
1564 			if (fn >= 0)
1565 				AH->OF = fdopen(dup(fn), PG_BINARY_W);
1566 			else
1567 				AH->OF = fopen(filename, PG_BINARY_W);
1568 		}
1569 		AH->gzOut = 0;
1570 	}
1571 
1572 	if (!AH->OF)
1573 	{
1574 		if (filename)
1575 			exit_horribly(modulename, "could not open output file \"%s\": %s\n",
1576 						  filename, strerror(errno));
1577 		else
1578 			exit_horribly(modulename, "could not open output file: %s\n",
1579 						  strerror(errno));
1580 	}
1581 }
1582 
1583 static OutputContext
SaveOutput(ArchiveHandle * AH)1584 SaveOutput(ArchiveHandle *AH)
1585 {
1586 	OutputContext sav;
1587 
1588 	sav.OF = AH->OF;
1589 	sav.gzOut = AH->gzOut;
1590 
1591 	return sav;
1592 }
1593 
1594 static void
RestoreOutput(ArchiveHandle * AH,OutputContext savedContext)1595 RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
1596 {
1597 	int			res;
1598 
1599 	if (AH->gzOut)
1600 		res = GZCLOSE(AH->OF);
1601 	else
1602 		res = fclose(AH->OF);
1603 
1604 	if (res != 0)
1605 		exit_horribly(modulename, "could not close output file: %s\n",
1606 					  strerror(errno));
1607 
1608 	AH->gzOut = savedContext.gzOut;
1609 	AH->OF = savedContext.OF;
1610 }
1611 
1612 
1613 
1614 /*
1615  *	Print formatted text to the output file (usually stdout).
1616  */
1617 int
ahprintf(ArchiveHandle * AH,const char * fmt,...)1618 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1619 {
1620 	char	   *p;
1621 	size_t		len = 128;		/* initial assumption about buffer size */
1622 	size_t		cnt;
1623 
1624 	for (;;)
1625 	{
1626 		va_list		args;
1627 
1628 		/* Allocate work buffer. */
1629 		p = (char *) pg_malloc(len);
1630 
1631 		/* Try to format the data. */
1632 		va_start(args, fmt);
1633 		cnt = pvsnprintf(p, len, fmt, args);
1634 		va_end(args);
1635 
1636 		if (cnt < len)
1637 			break;				/* success */
1638 
1639 		/* Release buffer and loop around to try again with larger len. */
1640 		free(p);
1641 		len = cnt;
1642 	}
1643 
1644 	ahwrite(p, 1, cnt, AH);
1645 	free(p);
1646 	return (int) cnt;
1647 }
1648 
1649 void
ahlog(ArchiveHandle * AH,int level,const char * fmt,...)1650 ahlog(ArchiveHandle *AH, int level, const char *fmt,...)
1651 {
1652 	va_list		ap;
1653 
1654 	if (AH->debugLevel < level && (!AH->public.verbose || level > 1))
1655 		return;
1656 
1657 	va_start(ap, fmt);
1658 	vwrite_msg(NULL, fmt, ap);
1659 	va_end(ap);
1660 }
1661 
1662 /*
1663  * Single place for logic which says 'We are restoring to a direct DB connection'.
1664  */
1665 static int
RestoringToDB(ArchiveHandle * AH)1666 RestoringToDB(ArchiveHandle *AH)
1667 {
1668 	RestoreOptions *ropt = AH->public.ropt;
1669 
1670 	return (ropt && ropt->useDB && AH->connection);
1671 }
1672 
1673 /*
1674  * Dump the current contents of the LO data buffer while writing a BLOB
1675  */
1676 static void
dump_lo_buf(ArchiveHandle * AH)1677 dump_lo_buf(ArchiveHandle *AH)
1678 {
1679 	if (AH->connection)
1680 	{
1681 		size_t		res;
1682 
1683 		res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1684 		ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n",
1685 							  "wrote %lu bytes of large object data (result = %lu)\n",
1686 							  AH->lo_buf_used),
1687 			  (unsigned long) AH->lo_buf_used, (unsigned long) res);
1688 		if (res != AH->lo_buf_used)
1689 			exit_horribly(modulename,
1690 						  "could not write to large object (result: %lu, expected: %lu)\n",
1691 						  (unsigned long) res, (unsigned long) AH->lo_buf_used);
1692 	}
1693 	else
1694 	{
1695 		PQExpBuffer buf = createPQExpBuffer();
1696 
1697 		appendByteaLiteralAHX(buf,
1698 							  (const unsigned char *) AH->lo_buf,
1699 							  AH->lo_buf_used,
1700 							  AH);
1701 
1702 		/* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1703 		AH->writingBlob = 0;
1704 		ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1705 		AH->writingBlob = 1;
1706 
1707 		destroyPQExpBuffer(buf);
1708 	}
1709 	AH->lo_buf_used = 0;
1710 }
1711 
1712 
1713 /*
1714  *	Write buffer to the output file (usually stdout). This is used for
1715  *	outputting 'restore' scripts etc. It is even possible for an archive
1716  *	format to create a custom output routine to 'fake' a restore if it
1717  *	wants to generate a script (see TAR output).
1718  */
1719 void
ahwrite(const void * ptr,size_t size,size_t nmemb,ArchiveHandle * AH)1720 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1721 {
1722 	int			bytes_written = 0;
1723 
1724 	if (AH->writingBlob)
1725 	{
1726 		size_t		remaining = size * nmemb;
1727 
1728 		while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1729 		{
1730 			size_t		avail = AH->lo_buf_size - AH->lo_buf_used;
1731 
1732 			memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1733 			ptr = (const void *) ((const char *) ptr + avail);
1734 			remaining -= avail;
1735 			AH->lo_buf_used += avail;
1736 			dump_lo_buf(AH);
1737 		}
1738 
1739 		memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1740 		AH->lo_buf_used += remaining;
1741 
1742 		bytes_written = size * nmemb;
1743 	}
1744 	else if (AH->gzOut)
1745 		bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
1746 	else if (AH->CustomOutPtr)
1747 		bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1748 
1749 	else
1750 	{
1751 		/*
1752 		 * If we're doing a restore, and it's direct to DB, and we're
1753 		 * connected then send it to the DB.
1754 		 */
1755 		if (RestoringToDB(AH))
1756 			bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1757 		else
1758 			bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
1759 	}
1760 
1761 	if (bytes_written != size * nmemb)
1762 		WRITE_ERROR_EXIT;
1763 
1764 	return;
1765 }
1766 
1767 /* on some error, we may decide to go on... */
1768 void
warn_or_exit_horribly(ArchiveHandle * AH,const char * modulename,const char * fmt,...)1769 warn_or_exit_horribly(ArchiveHandle *AH,
1770 					  const char *modulename, const char *fmt,...)
1771 {
1772 	va_list		ap;
1773 
1774 	switch (AH->stage)
1775 	{
1776 
1777 		case STAGE_NONE:
1778 			/* Do nothing special */
1779 			break;
1780 
1781 		case STAGE_INITIALIZING:
1782 			if (AH->stage != AH->lastErrorStage)
1783 				write_msg(modulename, "Error while INITIALIZING:\n");
1784 			break;
1785 
1786 		case STAGE_PROCESSING:
1787 			if (AH->stage != AH->lastErrorStage)
1788 				write_msg(modulename, "Error while PROCESSING TOC:\n");
1789 			break;
1790 
1791 		case STAGE_FINALIZING:
1792 			if (AH->stage != AH->lastErrorStage)
1793 				write_msg(modulename, "Error while FINALIZING:\n");
1794 			break;
1795 	}
1796 	if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1797 	{
1798 		write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n",
1799 				  AH->currentTE->dumpId,
1800 				  AH->currentTE->catalogId.tableoid,
1801 				  AH->currentTE->catalogId.oid,
1802 				  AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1803 				  AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1804 				  AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1805 	}
1806 	AH->lastErrorStage = AH->stage;
1807 	AH->lastErrorTE = AH->currentTE;
1808 
1809 	va_start(ap, fmt);
1810 	vwrite_msg(modulename, fmt, ap);
1811 	va_end(ap);
1812 
1813 	if (AH->public.exit_on_error)
1814 		exit_nicely(1);
1815 	else
1816 		AH->public.n_errors++;
1817 }
1818 
1819 #ifdef NOT_USED
1820 
1821 static void
_moveAfter(ArchiveHandle * AH,TocEntry * pos,TocEntry * te)1822 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1823 {
1824 	/* Unlink te from list */
1825 	te->prev->next = te->next;
1826 	te->next->prev = te->prev;
1827 
1828 	/* and insert it after "pos" */
1829 	te->prev = pos;
1830 	te->next = pos->next;
1831 	pos->next->prev = te;
1832 	pos->next = te;
1833 }
1834 #endif
1835 
1836 static void
_moveBefore(ArchiveHandle * AH,TocEntry * pos,TocEntry * te)1837 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1838 {
1839 	/* Unlink te from list */
1840 	te->prev->next = te->next;
1841 	te->next->prev = te->prev;
1842 
1843 	/* and insert it before "pos" */
1844 	te->prev = pos->prev;
1845 	te->next = pos;
1846 	pos->prev->next = te;
1847 	pos->prev = te;
1848 }
1849 
1850 /*
1851  * Build index arrays for the TOC list
1852  *
1853  * This should be invoked only after we have created or read in all the TOC
1854  * items.
1855  *
1856  * The arrays are indexed by dump ID (so entry zero is unused).  Note that the
1857  * array entries run only up to maxDumpId.  We might see dependency dump IDs
1858  * beyond that (if the dump was partial); so always check the array bound
1859  * before trying to touch an array entry.
1860  */
1861 static void
buildTocEntryArrays(ArchiveHandle * AH)1862 buildTocEntryArrays(ArchiveHandle *AH)
1863 {
1864 	DumpId		maxDumpId = AH->maxDumpId;
1865 	TocEntry   *te;
1866 
1867 	AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1868 	AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1869 
1870 	for (te = AH->toc->next; te != AH->toc; te = te->next)
1871 	{
1872 		/* this check is purely paranoia, maxDumpId should be correct */
1873 		if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1874 			exit_horribly(modulename, "bad dumpId\n");
1875 
1876 		/* tocsByDumpId indexes all TOCs by their dump ID */
1877 		AH->tocsByDumpId[te->dumpId] = te;
1878 
1879 		/*
1880 		 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1881 		 * TOC entry that has a DATA item.  We compute this by reversing the
1882 		 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1883 		 * just one dependency and it is the TABLE item.
1884 		 */
1885 		if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1886 		{
1887 			DumpId		tableId = te->dependencies[0];
1888 
1889 			/*
1890 			 * The TABLE item might not have been in the archive, if this was
1891 			 * a data-only dump; but its dump ID should be less than its data
1892 			 * item's dump ID, so there should be a place for it in the array.
1893 			 */
1894 			if (tableId <= 0 || tableId > maxDumpId)
1895 				exit_horribly(modulename, "bad table dumpId for TABLE DATA item\n");
1896 
1897 			AH->tableDataId[tableId] = te->dumpId;
1898 		}
1899 	}
1900 }
1901 
1902 TocEntry *
getTocEntryByDumpId(ArchiveHandle * AH,DumpId id)1903 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1904 {
1905 	/* build index arrays if we didn't already */
1906 	if (AH->tocsByDumpId == NULL)
1907 		buildTocEntryArrays(AH);
1908 
1909 	if (id > 0 && id <= AH->maxDumpId)
1910 		return AH->tocsByDumpId[id];
1911 
1912 	return NULL;
1913 }
1914 
1915 teReqs
TocIDRequired(ArchiveHandle * AH,DumpId id)1916 TocIDRequired(ArchiveHandle *AH, DumpId id)
1917 {
1918 	TocEntry   *te = getTocEntryByDumpId(AH, id);
1919 
1920 	if (!te)
1921 		return 0;
1922 
1923 	return te->reqs;
1924 }
1925 
1926 size_t
WriteOffset(ArchiveHandle * AH,pgoff_t o,int wasSet)1927 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1928 {
1929 	int			off;
1930 
1931 	/* Save the flag */
1932 	(*AH->WriteBytePtr) (AH, wasSet);
1933 
1934 	/* Write out pgoff_t smallest byte first, prevents endian mismatch */
1935 	for (off = 0; off < sizeof(pgoff_t); off++)
1936 	{
1937 		(*AH->WriteBytePtr) (AH, o & 0xFF);
1938 		o >>= 8;
1939 	}
1940 	return sizeof(pgoff_t) + 1;
1941 }
1942 
1943 int
ReadOffset(ArchiveHandle * AH,pgoff_t * o)1944 ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1945 {
1946 	int			i;
1947 	int			off;
1948 	int			offsetFlg;
1949 
1950 	/* Initialize to zero */
1951 	*o = 0;
1952 
1953 	/* Check for old version */
1954 	if (AH->version < K_VERS_1_7)
1955 	{
1956 		/* Prior versions wrote offsets using WriteInt */
1957 		i = ReadInt(AH);
1958 		/* -1 means not set */
1959 		if (i < 0)
1960 			return K_OFFSET_POS_NOT_SET;
1961 		else if (i == 0)
1962 			return K_OFFSET_NO_DATA;
1963 
1964 		/* Cast to pgoff_t because it was written as an int. */
1965 		*o = (pgoff_t) i;
1966 		return K_OFFSET_POS_SET;
1967 	}
1968 
1969 	/*
1970 	 * Read the flag indicating the state of the data pointer. Check if valid
1971 	 * and die if not.
1972 	 *
1973 	 * This used to be handled by a negative or zero pointer, now we use an
1974 	 * extra byte specifically for the state.
1975 	 */
1976 	offsetFlg = (*AH->ReadBytePtr) (AH) & 0xFF;
1977 
1978 	switch (offsetFlg)
1979 	{
1980 		case K_OFFSET_POS_NOT_SET:
1981 		case K_OFFSET_NO_DATA:
1982 		case K_OFFSET_POS_SET:
1983 
1984 			break;
1985 
1986 		default:
1987 			exit_horribly(modulename, "unexpected data offset flag %d\n", offsetFlg);
1988 	}
1989 
1990 	/*
1991 	 * Read the bytes
1992 	 */
1993 	for (off = 0; off < AH->offSize; off++)
1994 	{
1995 		if (off < sizeof(pgoff_t))
1996 			*o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8);
1997 		else
1998 		{
1999 			if ((*AH->ReadBytePtr) (AH) != 0)
2000 				exit_horribly(modulename, "file offset in dump file is too large\n");
2001 		}
2002 	}
2003 
2004 	return offsetFlg;
2005 }
2006 
2007 size_t
WriteInt(ArchiveHandle * AH,int i)2008 WriteInt(ArchiveHandle *AH, int i)
2009 {
2010 	int			b;
2011 
2012 	/*
2013 	 * This is a bit yucky, but I don't want to make the binary format very
2014 	 * dependent on representation, and not knowing much about it, I write out
2015 	 * a sign byte. If you change this, don't forget to change the file
2016 	 * version #, and modify readInt to read the new format AS WELL AS the old
2017 	 * formats.
2018 	 */
2019 
2020 	/* SIGN byte */
2021 	if (i < 0)
2022 	{
2023 		(*AH->WriteBytePtr) (AH, 1);
2024 		i = -i;
2025 	}
2026 	else
2027 		(*AH->WriteBytePtr) (AH, 0);
2028 
2029 	for (b = 0; b < AH->intSize; b++)
2030 	{
2031 		(*AH->WriteBytePtr) (AH, i & 0xFF);
2032 		i >>= 8;
2033 	}
2034 
2035 	return AH->intSize + 1;
2036 }
2037 
2038 int
ReadInt(ArchiveHandle * AH)2039 ReadInt(ArchiveHandle *AH)
2040 {
2041 	int			res = 0;
2042 	int			bv,
2043 				b;
2044 	int			sign = 0;		/* Default positive */
2045 	int			bitShift = 0;
2046 
2047 	if (AH->version > K_VERS_1_0)
2048 		/* Read a sign byte */
2049 		sign = (*AH->ReadBytePtr) (AH);
2050 
2051 	for (b = 0; b < AH->intSize; b++)
2052 	{
2053 		bv = (*AH->ReadBytePtr) (AH) & 0xFF;
2054 		if (bv != 0)
2055 			res = res + (bv << bitShift);
2056 		bitShift += 8;
2057 	}
2058 
2059 	if (sign)
2060 		res = -res;
2061 
2062 	return res;
2063 }
2064 
2065 size_t
WriteStr(ArchiveHandle * AH,const char * c)2066 WriteStr(ArchiveHandle *AH, const char *c)
2067 {
2068 	size_t		res;
2069 
2070 	if (c)
2071 	{
2072 		int			len = strlen(c);
2073 
2074 		res = WriteInt(AH, len);
2075 		(*AH->WriteBufPtr) (AH, c, len);
2076 		res += len;
2077 	}
2078 	else
2079 		res = WriteInt(AH, -1);
2080 
2081 	return res;
2082 }
2083 
2084 char *
ReadStr(ArchiveHandle * AH)2085 ReadStr(ArchiveHandle *AH)
2086 {
2087 	char	   *buf;
2088 	int			l;
2089 
2090 	l = ReadInt(AH);
2091 	if (l < 0)
2092 		buf = NULL;
2093 	else
2094 	{
2095 		buf = (char *) pg_malloc(l + 1);
2096 		(*AH->ReadBufPtr) (AH, (void *) buf, l);
2097 
2098 		buf[l] = '\0';
2099 	}
2100 
2101 	return buf;
2102 }
2103 
2104 static int
_discoverArchiveFormat(ArchiveHandle * AH)2105 _discoverArchiveFormat(ArchiveHandle *AH)
2106 {
2107 	FILE	   *fh;
2108 	char		sig[6];			/* More than enough */
2109 	size_t		cnt;
2110 	int			wantClose = 0;
2111 
2112 #if 0
2113 	write_msg(modulename, "attempting to ascertain archive format\n");
2114 #endif
2115 
2116 	if (AH->lookahead)
2117 		free(AH->lookahead);
2118 
2119 	AH->readHeader = 0;
2120 	AH->lookaheadSize = 512;
2121 	AH->lookahead = pg_malloc0(512);
2122 	AH->lookaheadLen = 0;
2123 	AH->lookaheadPos = 0;
2124 
2125 	if (AH->fSpec)
2126 	{
2127 		struct stat st;
2128 
2129 		wantClose = 1;
2130 
2131 		/*
2132 		 * Check if the specified archive is a directory. If so, check if
2133 		 * there's a "toc.dat" (or "toc.dat.gz") file in it.
2134 		 */
2135 		if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2136 		{
2137 			char		buf[MAXPGPATH];
2138 
2139 			if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
2140 				exit_horribly(modulename, "directory name too long: \"%s\"\n",
2141 							  AH->fSpec);
2142 			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2143 			{
2144 				AH->format = archDirectory;
2145 				return AH->format;
2146 			}
2147 
2148 #ifdef HAVE_LIBZ
2149 			if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
2150 				exit_horribly(modulename, "directory name too long: \"%s\"\n",
2151 							  AH->fSpec);
2152 			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2153 			{
2154 				AH->format = archDirectory;
2155 				return AH->format;
2156 			}
2157 #endif
2158 			exit_horribly(modulename, "directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)\n",
2159 						  AH->fSpec);
2160 			fh = NULL;			/* keep compiler quiet */
2161 		}
2162 		else
2163 		{
2164 			fh = fopen(AH->fSpec, PG_BINARY_R);
2165 			if (!fh)
2166 				exit_horribly(modulename, "could not open input file \"%s\": %s\n",
2167 							  AH->fSpec, strerror(errno));
2168 		}
2169 	}
2170 	else
2171 	{
2172 		fh = stdin;
2173 		if (!fh)
2174 			exit_horribly(modulename, "could not open input file: %s\n",
2175 						  strerror(errno));
2176 	}
2177 
2178 	if ((cnt = fread(sig, 1, 5, fh)) != 5)
2179 	{
2180 		if (ferror(fh))
2181 			exit_horribly(modulename, "could not read input file: %s\n", strerror(errno));
2182 		else
2183 			exit_horribly(modulename, "input file is too short (read %lu, expected 5)\n",
2184 						  (unsigned long) cnt);
2185 	}
2186 
2187 	/* Save it, just in case we need it later */
2188 	memcpy(&AH->lookahead[0], sig, 5);
2189 	AH->lookaheadLen = 5;
2190 
2191 	if (strncmp(sig, "PGDMP", 5) == 0)
2192 	{
2193 		/* It's custom format, stop here */
2194 		AH->format = archCustom;
2195 		AH->readHeader = 1;
2196 	}
2197 	else
2198 	{
2199 		/*
2200 		 * *Maybe* we have a tar archive format file or a text dump ... So,
2201 		 * read first 512 byte header...
2202 		 */
2203 		cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2204 		/* read failure is checked below */
2205 		AH->lookaheadLen += cnt;
2206 
2207 		if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2208 			(strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2209 			 strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2210 		{
2211 			/*
2212 			 * looks like it's probably a text format dump. so suggest they
2213 			 * try psql
2214 			 */
2215 			exit_horribly(modulename, "input file appears to be a text format dump. Please use psql.\n");
2216 		}
2217 
2218 		if (AH->lookaheadLen != 512)
2219 		{
2220 			if (feof(fh))
2221 				exit_horribly(modulename, "input file does not appear to be a valid archive (too short?)\n");
2222 			else
2223 				READ_ERROR_EXIT(fh);
2224 		}
2225 
2226 		if (!isValidTarHeader(AH->lookahead))
2227 			exit_horribly(modulename, "input file does not appear to be a valid archive\n");
2228 
2229 		AH->format = archTar;
2230 	}
2231 
2232 	/* Close the file if we opened it */
2233 	if (wantClose)
2234 	{
2235 		if (fclose(fh) != 0)
2236 			exit_horribly(modulename, "could not close input file: %s\n",
2237 						  strerror(errno));
2238 		/* Forget lookahead, since we'll re-read header after re-opening */
2239 		AH->readHeader = 0;
2240 		AH->lookaheadLen = 0;
2241 	}
2242 
2243 	return AH->format;
2244 }
2245 
2246 
2247 /*
2248  * Allocate an archive handle
2249  */
2250 static ArchiveHandle *
_allocAH(const char * FileSpec,const ArchiveFormat fmt,const int compression,bool dosync,ArchiveMode mode,SetupWorkerPtrType setupWorkerPtr)2251 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2252 		 const int compression, bool dosync, ArchiveMode mode,
2253 		 SetupWorkerPtrType setupWorkerPtr)
2254 {
2255 	ArchiveHandle *AH;
2256 
2257 #if 0
2258 	write_msg(modulename, "allocating AH for %s, format %d\n", FileSpec, fmt);
2259 #endif
2260 
2261 	AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2262 
2263 	/* AH->debugLevel = 100; */
2264 
2265 	AH->version = K_VERS_SELF;
2266 
2267 	/* initialize for backwards compatible string processing */
2268 	AH->public.encoding = 0;	/* PG_SQL_ASCII */
2269 	AH->public.std_strings = false;
2270 
2271 	/* sql error handling */
2272 	AH->public.exit_on_error = true;
2273 	AH->public.n_errors = 0;
2274 
2275 	AH->archiveDumpVersion = PG_VERSION;
2276 
2277 	AH->createDate = time(NULL);
2278 
2279 	AH->intSize = sizeof(int);
2280 	AH->offSize = sizeof(pgoff_t);
2281 	if (FileSpec)
2282 	{
2283 		AH->fSpec = pg_strdup(FileSpec);
2284 
2285 		/*
2286 		 * Not used; maybe later....
2287 		 *
2288 		 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2289 		 * i--) if (AH->workDir[i-1] == '/')
2290 		 */
2291 	}
2292 	else
2293 		AH->fSpec = NULL;
2294 
2295 	AH->currUser = NULL;		/* unknown */
2296 	AH->currSchema = NULL;		/* ditto */
2297 	AH->currTablespace = NULL;	/* ditto */
2298 	AH->currWithOids = -1;		/* force SET */
2299 
2300 	AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2301 
2302 	AH->toc->next = AH->toc;
2303 	AH->toc->prev = AH->toc;
2304 
2305 	AH->mode = mode;
2306 	AH->compression = compression;
2307 	AH->dosync = dosync;
2308 
2309 	memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2310 
2311 	/* Open stdout with no compression for AH output handle */
2312 	AH->gzOut = 0;
2313 	AH->OF = stdout;
2314 
2315 	/*
2316 	 * On Windows, we need to use binary mode to read/write non-text files,
2317 	 * which include all archive formats as well as compressed plain text.
2318 	 * Force stdin/stdout into binary mode if that is what we are using.
2319 	 */
2320 #ifdef WIN32
2321 	if ((fmt != archNull || compression != 0) &&
2322 		(AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2323 	{
2324 		if (mode == archModeWrite)
2325 			_setmode(fileno(stdout), O_BINARY);
2326 		else
2327 			_setmode(fileno(stdin), O_BINARY);
2328 	}
2329 #endif
2330 
2331 	AH->SetupWorkerPtr = setupWorkerPtr;
2332 
2333 	if (fmt == archUnknown)
2334 		AH->format = _discoverArchiveFormat(AH);
2335 	else
2336 		AH->format = fmt;
2337 
2338 	switch (AH->format)
2339 	{
2340 		case archCustom:
2341 			InitArchiveFmt_Custom(AH);
2342 			break;
2343 
2344 		case archNull:
2345 			InitArchiveFmt_Null(AH);
2346 			break;
2347 
2348 		case archDirectory:
2349 			InitArchiveFmt_Directory(AH);
2350 			break;
2351 
2352 		case archTar:
2353 			InitArchiveFmt_Tar(AH);
2354 			break;
2355 
2356 		default:
2357 			exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt);
2358 	}
2359 
2360 	return AH;
2361 }
2362 
2363 /*
2364  * Write out all data (tables & blobs)
2365  */
2366 void
WriteDataChunks(ArchiveHandle * AH,ParallelState * pstate)2367 WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2368 {
2369 	TocEntry   *te;
2370 
2371 	for (te = AH->toc->next; te != AH->toc; te = te->next)
2372 	{
2373 		if (!te->dataDumper)
2374 			continue;
2375 
2376 		if ((te->reqs & REQ_DATA) == 0)
2377 			continue;
2378 
2379 		if (pstate && pstate->numWorkers > 1)
2380 		{
2381 			/*
2382 			 * If we are in a parallel backup, then we are always the master
2383 			 * process.  Dispatch each data-transfer job to a worker.
2384 			 */
2385 			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP,
2386 								   mark_dump_job_done, NULL);
2387 		}
2388 		else
2389 			WriteDataChunksForTocEntry(AH, te);
2390 	}
2391 
2392 	/*
2393 	 * If parallel, wait for workers to finish.
2394 	 */
2395 	if (pstate && pstate->numWorkers > 1)
2396 		WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2397 }
2398 
2399 
2400 /*
2401  * Callback function that's invoked in the master process after a step has
2402  * been parallel dumped.
2403  *
2404  * We don't need to do anything except check for worker failure.
2405  */
2406 static void
mark_dump_job_done(ArchiveHandle * AH,TocEntry * te,int status,void * callback_data)2407 mark_dump_job_done(ArchiveHandle *AH,
2408 				   TocEntry *te,
2409 				   int status,
2410 				   void *callback_data)
2411 {
2412 	ahlog(AH, 1, "finished item %d %s %s\n",
2413 		  te->dumpId, te->desc, te->tag);
2414 
2415 	if (status != 0)
2416 		exit_horribly(modulename, "worker process failed: exit code %d\n",
2417 					  status);
2418 }
2419 
2420 
2421 void
WriteDataChunksForTocEntry(ArchiveHandle * AH,TocEntry * te)2422 WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2423 {
2424 	StartDataPtrType startPtr;
2425 	EndDataPtrType endPtr;
2426 
2427 	AH->currToc = te;
2428 
2429 	if (strcmp(te->desc, "BLOBS") == 0)
2430 	{
2431 		startPtr = AH->StartBlobsPtr;
2432 		endPtr = AH->EndBlobsPtr;
2433 	}
2434 	else
2435 	{
2436 		startPtr = AH->StartDataPtr;
2437 		endPtr = AH->EndDataPtr;
2438 	}
2439 
2440 	if (startPtr != NULL)
2441 		(*startPtr) (AH, te);
2442 
2443 	/*
2444 	 * The user-provided DataDumper routine needs to call AH->WriteData
2445 	 */
2446 	(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
2447 
2448 	if (endPtr != NULL)
2449 		(*endPtr) (AH, te);
2450 
2451 	AH->currToc = NULL;
2452 }
2453 
2454 void
WriteToc(ArchiveHandle * AH)2455 WriteToc(ArchiveHandle *AH)
2456 {
2457 	TocEntry   *te;
2458 	char		workbuf[32];
2459 	int			tocCount;
2460 	int			i;
2461 
2462 	/* count entries that will actually be dumped */
2463 	tocCount = 0;
2464 	for (te = AH->toc->next; te != AH->toc; te = te->next)
2465 	{
2466 		if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2467 			tocCount++;
2468 	}
2469 
2470 	/* printf("%d TOC Entries to save\n", tocCount); */
2471 
2472 	WriteInt(AH, tocCount);
2473 
2474 	for (te = AH->toc->next; te != AH->toc; te = te->next)
2475 	{
2476 		if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2477 			continue;
2478 
2479 		WriteInt(AH, te->dumpId);
2480 		WriteInt(AH, te->dataDumper ? 1 : 0);
2481 
2482 		/* OID is recorded as a string for historical reasons */
2483 		sprintf(workbuf, "%u", te->catalogId.tableoid);
2484 		WriteStr(AH, workbuf);
2485 		sprintf(workbuf, "%u", te->catalogId.oid);
2486 		WriteStr(AH, workbuf);
2487 
2488 		WriteStr(AH, te->tag);
2489 		WriteStr(AH, te->desc);
2490 		WriteInt(AH, te->section);
2491 		WriteStr(AH, te->defn);
2492 		WriteStr(AH, te->dropStmt);
2493 		WriteStr(AH, te->copyStmt);
2494 		WriteStr(AH, te->namespace);
2495 		WriteStr(AH, te->tablespace);
2496 		WriteStr(AH, te->owner);
2497 		WriteStr(AH, te->withOids ? "true" : "false");
2498 
2499 		/* Dump list of dependencies */
2500 		for (i = 0; i < te->nDeps; i++)
2501 		{
2502 			sprintf(workbuf, "%d", te->dependencies[i]);
2503 			WriteStr(AH, workbuf);
2504 		}
2505 		WriteStr(AH, NULL);		/* Terminate List */
2506 
2507 		if (AH->WriteExtraTocPtr)
2508 			(*AH->WriteExtraTocPtr) (AH, te);
2509 	}
2510 }
2511 
2512 void
ReadToc(ArchiveHandle * AH)2513 ReadToc(ArchiveHandle *AH)
2514 {
2515 	int			i;
2516 	char	   *tmp;
2517 	DumpId	   *deps;
2518 	int			depIdx;
2519 	int			depSize;
2520 	TocEntry   *te;
2521 
2522 	AH->tocCount = ReadInt(AH);
2523 	AH->maxDumpId = 0;
2524 
2525 	for (i = 0; i < AH->tocCount; i++)
2526 	{
2527 		te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2528 		te->dumpId = ReadInt(AH);
2529 
2530 		if (te->dumpId > AH->maxDumpId)
2531 			AH->maxDumpId = te->dumpId;
2532 
2533 		/* Sanity check */
2534 		if (te->dumpId <= 0)
2535 			exit_horribly(modulename,
2536 						  "entry ID %d out of range -- perhaps a corrupt TOC\n",
2537 						  te->dumpId);
2538 
2539 		te->hadDumper = ReadInt(AH);
2540 
2541 		if (AH->version >= K_VERS_1_8)
2542 		{
2543 			tmp = ReadStr(AH);
2544 			sscanf(tmp, "%u", &te->catalogId.tableoid);
2545 			free(tmp);
2546 		}
2547 		else
2548 			te->catalogId.tableoid = InvalidOid;
2549 		tmp = ReadStr(AH);
2550 		sscanf(tmp, "%u", &te->catalogId.oid);
2551 		free(tmp);
2552 
2553 		te->tag = ReadStr(AH);
2554 		te->desc = ReadStr(AH);
2555 
2556 		if (AH->version >= K_VERS_1_11)
2557 		{
2558 			te->section = ReadInt(AH);
2559 		}
2560 		else
2561 		{
2562 			/*
2563 			 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2564 			 * the entries into sections.  This list need not cover entry
2565 			 * types added later than 8.4.
2566 			 */
2567 			if (strcmp(te->desc, "COMMENT") == 0 ||
2568 				strcmp(te->desc, "ACL") == 0 ||
2569 				strcmp(te->desc, "ACL LANGUAGE") == 0)
2570 				te->section = SECTION_NONE;
2571 			else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2572 					 strcmp(te->desc, "BLOBS") == 0 ||
2573 					 strcmp(te->desc, "BLOB COMMENTS") == 0)
2574 				te->section = SECTION_DATA;
2575 			else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2576 					 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2577 					 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2578 					 strcmp(te->desc, "INDEX") == 0 ||
2579 					 strcmp(te->desc, "RULE") == 0 ||
2580 					 strcmp(te->desc, "TRIGGER") == 0)
2581 				te->section = SECTION_POST_DATA;
2582 			else
2583 				te->section = SECTION_PRE_DATA;
2584 		}
2585 
2586 		te->defn = ReadStr(AH);
2587 		te->dropStmt = ReadStr(AH);
2588 
2589 		if (AH->version >= K_VERS_1_3)
2590 			te->copyStmt = ReadStr(AH);
2591 
2592 		if (AH->version >= K_VERS_1_6)
2593 			te->namespace = ReadStr(AH);
2594 
2595 		if (AH->version >= K_VERS_1_10)
2596 			te->tablespace = ReadStr(AH);
2597 
2598 		te->owner = ReadStr(AH);
2599 		if (AH->version >= K_VERS_1_9)
2600 		{
2601 			if (strcmp(ReadStr(AH), "true") == 0)
2602 				te->withOids = true;
2603 			else
2604 				te->withOids = false;
2605 		}
2606 		else
2607 			te->withOids = true;
2608 
2609 		/* Read TOC entry dependencies */
2610 		if (AH->version >= K_VERS_1_5)
2611 		{
2612 			depSize = 100;
2613 			deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2614 			depIdx = 0;
2615 			for (;;)
2616 			{
2617 				tmp = ReadStr(AH);
2618 				if (!tmp)
2619 					break;		/* end of list */
2620 				if (depIdx >= depSize)
2621 				{
2622 					depSize *= 2;
2623 					deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2624 				}
2625 				sscanf(tmp, "%d", &deps[depIdx]);
2626 				free(tmp);
2627 				depIdx++;
2628 			}
2629 
2630 			if (depIdx > 0)		/* We have a non-null entry */
2631 			{
2632 				deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2633 				te->dependencies = deps;
2634 				te->nDeps = depIdx;
2635 			}
2636 			else
2637 			{
2638 				free(deps);
2639 				te->dependencies = NULL;
2640 				te->nDeps = 0;
2641 			}
2642 		}
2643 		else
2644 		{
2645 			te->dependencies = NULL;
2646 			te->nDeps = 0;
2647 		}
2648 
2649 		if (AH->ReadExtraTocPtr)
2650 			(*AH->ReadExtraTocPtr) (AH, te);
2651 
2652 		ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n",
2653 			  i, te->dumpId, te->desc, te->tag);
2654 
2655 		/* link completed entry into TOC circular list */
2656 		te->prev = AH->toc->prev;
2657 		AH->toc->prev->next = te;
2658 		AH->toc->prev = te;
2659 		te->next = AH->toc;
2660 
2661 		/* special processing immediately upon read for some items */
2662 		if (strcmp(te->desc, "ENCODING") == 0)
2663 			processEncodingEntry(AH, te);
2664 		else if (strcmp(te->desc, "STDSTRINGS") == 0)
2665 			processStdStringsEntry(AH, te);
2666 		else if (strcmp(te->desc, "SEARCHPATH") == 0)
2667 			processSearchPathEntry(AH, te);
2668 	}
2669 }
2670 
2671 static void
processEncodingEntry(ArchiveHandle * AH,TocEntry * te)2672 processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2673 {
2674 	/* te->defn should have the form SET client_encoding = 'foo'; */
2675 	char	   *defn = pg_strdup(te->defn);
2676 	char	   *ptr1;
2677 	char	   *ptr2 = NULL;
2678 	int			encoding;
2679 
2680 	ptr1 = strchr(defn, '\'');
2681 	if (ptr1)
2682 		ptr2 = strchr(++ptr1, '\'');
2683 	if (ptr2)
2684 	{
2685 		*ptr2 = '\0';
2686 		encoding = pg_char_to_encoding(ptr1);
2687 		if (encoding < 0)
2688 			exit_horribly(modulename, "unrecognized encoding \"%s\"\n",
2689 						  ptr1);
2690 		AH->public.encoding = encoding;
2691 	}
2692 	else
2693 		exit_horribly(modulename, "invalid ENCODING item: %s\n",
2694 					  te->defn);
2695 
2696 	free(defn);
2697 }
2698 
2699 static void
processStdStringsEntry(ArchiveHandle * AH,TocEntry * te)2700 processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2701 {
2702 	/* te->defn should have the form SET standard_conforming_strings = 'x'; */
2703 	char	   *ptr1;
2704 
2705 	ptr1 = strchr(te->defn, '\'');
2706 	if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2707 		AH->public.std_strings = true;
2708 	else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2709 		AH->public.std_strings = false;
2710 	else
2711 		exit_horribly(modulename, "invalid STDSTRINGS item: %s\n",
2712 					  te->defn);
2713 }
2714 
2715 static void
processSearchPathEntry(ArchiveHandle * AH,TocEntry * te)2716 processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2717 {
2718 	/*
2719 	 * te->defn should contain a command to set search_path.  We just copy it
2720 	 * verbatim for use later.
2721 	 */
2722 	AH->public.searchpath = pg_strdup(te->defn);
2723 }
2724 
2725 static void
StrictNamesCheck(RestoreOptions * ropt)2726 StrictNamesCheck(RestoreOptions *ropt)
2727 {
2728 	const char *missing_name;
2729 
2730 	Assert(ropt->strict_names);
2731 
2732 	if (ropt->schemaNames.head != NULL)
2733 	{
2734 		missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2735 		if (missing_name != NULL)
2736 			exit_horribly(modulename, "schema \"%s\" not found\n", missing_name);
2737 	}
2738 
2739 	if (ropt->tableNames.head != NULL)
2740 	{
2741 		missing_name = simple_string_list_not_touched(&ropt->tableNames);
2742 		if (missing_name != NULL)
2743 			exit_horribly(modulename, "table \"%s\" not found\n", missing_name);
2744 	}
2745 
2746 	if (ropt->indexNames.head != NULL)
2747 	{
2748 		missing_name = simple_string_list_not_touched(&ropt->indexNames);
2749 		if (missing_name != NULL)
2750 			exit_horribly(modulename, "index \"%s\" not found\n", missing_name);
2751 	}
2752 
2753 	if (ropt->functionNames.head != NULL)
2754 	{
2755 		missing_name = simple_string_list_not_touched(&ropt->functionNames);
2756 		if (missing_name != NULL)
2757 			exit_horribly(modulename, "function \"%s\" not found\n", missing_name);
2758 	}
2759 
2760 	if (ropt->triggerNames.head != NULL)
2761 	{
2762 		missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2763 		if (missing_name != NULL)
2764 			exit_horribly(modulename, "trigger \"%s\" not found\n", missing_name);
2765 	}
2766 }
2767 
2768 static teReqs
_tocEntryRequired(TocEntry * te,teSection curSection,RestoreOptions * ropt)2769 _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt)
2770 {
2771 	teReqs		res = REQ_SCHEMA | REQ_DATA;
2772 
2773 	/* These items are treated specially */
2774 	if (strcmp(te->desc, "ENCODING") == 0 ||
2775 		strcmp(te->desc, "STDSTRINGS") == 0 ||
2776 		strcmp(te->desc, "SEARCHPATH") == 0)
2777 		return REQ_SPECIAL;
2778 
2779 	/* If it's an ACL, maybe ignore it */
2780 	if (ropt->aclsSkip && _tocEntryIsACL(te))
2781 		return 0;
2782 
2783 	/*
2784 	 * If it's a publication or a table part of a publication, maybe ignore
2785 	 * it.
2786 	 */
2787 	if (ropt->no_publications &&
2788 		(strcmp(te->desc, "PUBLICATION") == 0 ||
2789 		 strcmp(te->desc, "PUBLICATION TABLE") == 0))
2790 		return 0;
2791 
2792 	/* If it's security labels, maybe ignore it */
2793 	if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2794 		return 0;
2795 
2796 	/* If it's a subcription, maybe ignore it */
2797 	if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2798 		return 0;
2799 
2800 	/* Ignore it if section is not to be dumped/restored */
2801 	switch (curSection)
2802 	{
2803 		case SECTION_PRE_DATA:
2804 			if (!(ropt->dumpSections & DUMP_PRE_DATA))
2805 				return 0;
2806 			break;
2807 		case SECTION_DATA:
2808 			if (!(ropt->dumpSections & DUMP_DATA))
2809 				return 0;
2810 			break;
2811 		case SECTION_POST_DATA:
2812 			if (!(ropt->dumpSections & DUMP_POST_DATA))
2813 				return 0;
2814 			break;
2815 		default:
2816 			/* shouldn't get here, really, but ignore it */
2817 			return 0;
2818 	}
2819 
2820 	/* Check options for selective dump/restore */
2821 	if (ropt->schemaNames.head != NULL)
2822 	{
2823 		/* If no namespace is specified, it means all. */
2824 		if (!te->namespace)
2825 			return 0;
2826 		if (!(simple_string_list_member(&ropt->schemaNames, te->namespace)))
2827 			return 0;
2828 	}
2829 
2830 	if (ropt->schemaExcludeNames.head != NULL &&
2831 		te->namespace &&
2832 		simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
2833 		return 0;
2834 
2835 	if (ropt->selTypes)
2836 	{
2837 		if (strcmp(te->desc, "TABLE") == 0 ||
2838 			strcmp(te->desc, "TABLE DATA") == 0 ||
2839 			strcmp(te->desc, "VIEW") == 0 ||
2840 			strcmp(te->desc, "FOREIGN TABLE") == 0 ||
2841 			strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
2842 			strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
2843 			strcmp(te->desc, "SEQUENCE") == 0 ||
2844 			strcmp(te->desc, "SEQUENCE SET") == 0)
2845 		{
2846 			if (!ropt->selTable)
2847 				return 0;
2848 			if (ropt->tableNames.head != NULL && (!(simple_string_list_member(&ropt->tableNames, te->tag))))
2849 				return 0;
2850 		}
2851 		else if (strcmp(te->desc, "INDEX") == 0)
2852 		{
2853 			if (!ropt->selIndex)
2854 				return 0;
2855 			if (ropt->indexNames.head != NULL && (!(simple_string_list_member(&ropt->indexNames, te->tag))))
2856 				return 0;
2857 		}
2858 		else if (strcmp(te->desc, "FUNCTION") == 0)
2859 		{
2860 			if (!ropt->selFunction)
2861 				return 0;
2862 			if (ropt->functionNames.head != NULL && (!(simple_string_list_member(&ropt->functionNames, te->tag))))
2863 				return 0;
2864 		}
2865 		else if (strcmp(te->desc, "TRIGGER") == 0)
2866 		{
2867 			if (!ropt->selTrigger)
2868 				return 0;
2869 			if (ropt->triggerNames.head != NULL && (!(simple_string_list_member(&ropt->triggerNames, te->tag))))
2870 				return 0;
2871 		}
2872 		else
2873 			return 0;
2874 	}
2875 
2876 	/*
2877 	 * Check if we had a dataDumper. Indicates if the entry is schema or data
2878 	 */
2879 	if (!te->hadDumper)
2880 	{
2881 		/*
2882 		 * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
2883 		 * it is considered a data entry.  We don't need to check for the
2884 		 * BLOBS entry or old-style BLOB COMMENTS, because they will have
2885 		 * hadDumper = true ... but we do need to check new-style BLOB
2886 		 * comments.
2887 		 */
2888 		if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
2889 			strcmp(te->desc, "BLOB") == 0 ||
2890 			(strcmp(te->desc, "ACL") == 0 &&
2891 			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2892 			(strcmp(te->desc, "COMMENT") == 0 &&
2893 			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2894 			(strcmp(te->desc, "SECURITY LABEL") == 0 &&
2895 			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
2896 			res = res & REQ_DATA;
2897 		else
2898 			res = res & ~REQ_DATA;
2899 	}
2900 
2901 	/*
2902 	 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
2903 	 * always ignore it.
2904 	 */
2905 	if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2906 		return 0;
2907 
2908 	/* Mask it if we only want schema */
2909 	if (ropt->schemaOnly)
2910 	{
2911 		/*
2912 		 * The sequence_data option overrides schema-only for SEQUENCE SET.
2913 		 *
2914 		 * In binary-upgrade mode, even with schema-only set, we do not mask
2915 		 * out large objects.  Only large object definitions, comments and
2916 		 * other information should be generated in binary-upgrade mode (not
2917 		 * the actual data).
2918 		 */
2919 		if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
2920 			!(ropt->binary_upgrade &&
2921 			  (strcmp(te->desc, "BLOB") == 0 ||
2922 			   (strcmp(te->desc, "ACL") == 0 &&
2923 				strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2924 			   (strcmp(te->desc, "COMMENT") == 0 &&
2925 				strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2926 			   (strcmp(te->desc, "SECURITY LABEL") == 0 &&
2927 				strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
2928 			res = res & REQ_SCHEMA;
2929 	}
2930 
2931 	/* Mask it if we only want data */
2932 	if (ropt->dataOnly)
2933 		res = res & REQ_DATA;
2934 
2935 	/* Mask it if we don't have a schema contribution */
2936 	if (!te->defn || strlen(te->defn) == 0)
2937 		res = res & ~REQ_SCHEMA;
2938 
2939 	/* Finally, if there's a per-ID filter, limit based on that as well */
2940 	if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2941 		return 0;
2942 
2943 	return res;
2944 }
2945 
2946 /*
2947  * Identify which pass we should restore this TOC entry in.
2948  *
2949  * See notes with the RestorePass typedef in pg_backup_archiver.h.
2950  */
2951 static RestorePass
_tocEntryRestorePass(TocEntry * te)2952 _tocEntryRestorePass(TocEntry *te)
2953 {
2954 	/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
2955 	if (strcmp(te->desc, "ACL") == 0 ||
2956 		strcmp(te->desc, "ACL LANGUAGE") == 0 ||
2957 		strcmp(te->desc, "DEFAULT ACL") == 0)
2958 		return RESTORE_PASS_ACL;
2959 	if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
2960 		strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
2961 		return RESTORE_PASS_POST_ACL;
2962 
2963 	/*
2964 	 * Comments need to be emitted in the same pass as their parent objects.
2965 	 * ACLs haven't got comments, and neither do matview data objects, but
2966 	 * event triggers do.  (Fortunately, event triggers haven't got ACLs, or
2967 	 * we'd need yet another weird special case.)
2968 	 */
2969 	if (strcmp(te->desc, "COMMENT") == 0 &&
2970 		strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
2971 		return RESTORE_PASS_POST_ACL;
2972 
2973 	/* All else can be handled in the main pass. */
2974 	return RESTORE_PASS_MAIN;
2975 }
2976 
2977 /*
2978  * Identify TOC entries that are ACLs.
2979  *
2980  * Note: it seems worth duplicating some code here to avoid a hard-wired
2981  * assumption that these are exactly the same entries that we restore during
2982  * the RESTORE_PASS_ACL phase.
2983  */
2984 static bool
_tocEntryIsACL(TocEntry * te)2985 _tocEntryIsACL(TocEntry *te)
2986 {
2987 	/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
2988 	if (strcmp(te->desc, "ACL") == 0 ||
2989 		strcmp(te->desc, "ACL LANGUAGE") == 0 ||
2990 		strcmp(te->desc, "DEFAULT ACL") == 0)
2991 		return true;
2992 	return false;
2993 }
2994 
2995 /*
2996  * Issue SET commands for parameters that we want to have set the same way
2997  * at all times during execution of a restore script.
2998  */
2999 static void
_doSetFixedOutputState(ArchiveHandle * AH)3000 _doSetFixedOutputState(ArchiveHandle *AH)
3001 {
3002 	RestoreOptions *ropt = AH->public.ropt;
3003 
3004 	/*
3005 	 * Disable timeouts to allow for slow commands, idle parallel workers, etc
3006 	 */
3007 	ahprintf(AH, "SET statement_timeout = 0;\n");
3008 	ahprintf(AH, "SET lock_timeout = 0;\n");
3009 	ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3010 
3011 	/* Select the correct character set encoding */
3012 	ahprintf(AH, "SET client_encoding = '%s';\n",
3013 			 pg_encoding_to_char(AH->public.encoding));
3014 
3015 	/* Select the correct string literal syntax */
3016 	ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3017 			 AH->public.std_strings ? "on" : "off");
3018 
3019 	/* Select the role to be used during restore */
3020 	if (ropt && ropt->use_role)
3021 		ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3022 
3023 	/* Select the dump-time search_path */
3024 	if (AH->public.searchpath)
3025 		ahprintf(AH, "%s", AH->public.searchpath);
3026 
3027 	/* Make sure function checking is disabled */
3028 	ahprintf(AH, "SET check_function_bodies = false;\n");
3029 
3030 	/* Ensure that all valid XML data will be accepted */
3031 	ahprintf(AH, "SET xmloption = content;\n");
3032 
3033 	/* Avoid annoying notices etc */
3034 	ahprintf(AH, "SET client_min_messages = warning;\n");
3035 	if (!AH->public.std_strings)
3036 		ahprintf(AH, "SET escape_string_warning = off;\n");
3037 
3038 	/* Adjust row-security state */
3039 	if (ropt && ropt->enable_row_security)
3040 		ahprintf(AH, "SET row_security = on;\n");
3041 	else
3042 		ahprintf(AH, "SET row_security = off;\n");
3043 
3044 	ahprintf(AH, "\n");
3045 }
3046 
3047 /*
3048  * Issue a SET SESSION AUTHORIZATION command.  Caller is responsible
3049  * for updating state if appropriate.  If user is NULL or an empty string,
3050  * the specification DEFAULT will be used.
3051  */
3052 static void
_doSetSessionAuth(ArchiveHandle * AH,const char * user)3053 _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3054 {
3055 	PQExpBuffer cmd = createPQExpBuffer();
3056 
3057 	appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3058 
3059 	/*
3060 	 * SQL requires a string literal here.  Might as well be correct.
3061 	 */
3062 	if (user && *user)
3063 		appendStringLiteralAHX(cmd, user, AH);
3064 	else
3065 		appendPQExpBufferStr(cmd, "DEFAULT");
3066 	appendPQExpBufferChar(cmd, ';');
3067 
3068 	if (RestoringToDB(AH))
3069 	{
3070 		PGresult   *res;
3071 
3072 		res = PQexec(AH->connection, cmd->data);
3073 
3074 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3075 			/* NOT warn_or_exit_horribly... use -O instead to skip this. */
3076 			exit_horribly(modulename, "could not set session user to \"%s\": %s",
3077 						  user, PQerrorMessage(AH->connection));
3078 
3079 		PQclear(res);
3080 	}
3081 	else
3082 		ahprintf(AH, "%s\n\n", cmd->data);
3083 
3084 	destroyPQExpBuffer(cmd);
3085 }
3086 
3087 
3088 /*
3089  * Issue a SET default_with_oids command.  Caller is responsible
3090  * for updating state if appropriate.
3091  */
3092 static void
_doSetWithOids(ArchiveHandle * AH,const bool withOids)3093 _doSetWithOids(ArchiveHandle *AH, const bool withOids)
3094 {
3095 	PQExpBuffer cmd = createPQExpBuffer();
3096 
3097 	appendPQExpBuffer(cmd, "SET default_with_oids = %s;", withOids ?
3098 					  "true" : "false");
3099 
3100 	if (RestoringToDB(AH))
3101 	{
3102 		PGresult   *res;
3103 
3104 		res = PQexec(AH->connection, cmd->data);
3105 
3106 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3107 			warn_or_exit_horribly(AH, modulename,
3108 								  "could not set default_with_oids: %s",
3109 								  PQerrorMessage(AH->connection));
3110 
3111 		PQclear(res);
3112 	}
3113 	else
3114 		ahprintf(AH, "%s\n\n", cmd->data);
3115 
3116 	destroyPQExpBuffer(cmd);
3117 }
3118 
3119 
3120 /*
3121  * Issue the commands to connect to the specified database.
3122  *
3123  * If we're currently restoring right into a database, this will
3124  * actually establish a connection. Otherwise it puts a \connect into
3125  * the script output.
3126  */
3127 static void
_reconnectToDB(ArchiveHandle * AH,const char * dbname)3128 _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3129 {
3130 	if (RestoringToDB(AH))
3131 		ReconnectToServer(AH, dbname);
3132 	else
3133 	{
3134 		PQExpBufferData connectbuf;
3135 
3136 		initPQExpBuffer(&connectbuf);
3137 		appendPsqlMetaConnect(&connectbuf, dbname);
3138 		ahprintf(AH, "%s\n", connectbuf.data);
3139 		termPQExpBuffer(&connectbuf);
3140 	}
3141 
3142 	/*
3143 	 * NOTE: currUser keeps track of what the imaginary session user in our
3144 	 * script is.  It's now effectively reset to the original userID.
3145 	 */
3146 	if (AH->currUser)
3147 		free(AH->currUser);
3148 	AH->currUser = NULL;
3149 
3150 	/* don't assume we still know the output schema, tablespace, etc either */
3151 	if (AH->currSchema)
3152 		free(AH->currSchema);
3153 	AH->currSchema = NULL;
3154 	if (AH->currTablespace)
3155 		free(AH->currTablespace);
3156 	AH->currTablespace = NULL;
3157 	AH->currWithOids = -1;
3158 
3159 	/* re-establish fixed state */
3160 	_doSetFixedOutputState(AH);
3161 }
3162 
3163 /*
3164  * Become the specified user, and update state to avoid redundant commands
3165  *
3166  * NULL or empty argument is taken to mean restoring the session default
3167  */
3168 static void
_becomeUser(ArchiveHandle * AH,const char * user)3169 _becomeUser(ArchiveHandle *AH, const char *user)
3170 {
3171 	if (!user)
3172 		user = "";				/* avoid null pointers */
3173 
3174 	if (AH->currUser && strcmp(AH->currUser, user) == 0)
3175 		return;					/* no need to do anything */
3176 
3177 	_doSetSessionAuth(AH, user);
3178 
3179 	/*
3180 	 * NOTE: currUser keeps track of what the imaginary session user in our
3181 	 * script is
3182 	 */
3183 	if (AH->currUser)
3184 		free(AH->currUser);
3185 	AH->currUser = pg_strdup(user);
3186 }
3187 
3188 /*
3189  * Become the owner of the given TOC entry object.  If
3190  * changes in ownership are not allowed, this doesn't do anything.
3191  */
3192 static void
_becomeOwner(ArchiveHandle * AH,TocEntry * te)3193 _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3194 {
3195 	RestoreOptions *ropt = AH->public.ropt;
3196 
3197 	if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3198 		return;
3199 
3200 	_becomeUser(AH, te->owner);
3201 }
3202 
3203 
3204 /*
3205  * Set the proper default_with_oids value for the table.
3206  */
3207 static void
_setWithOids(ArchiveHandle * AH,TocEntry * te)3208 _setWithOids(ArchiveHandle *AH, TocEntry *te)
3209 {
3210 	if (AH->currWithOids != te->withOids)
3211 	{
3212 		_doSetWithOids(AH, te->withOids);
3213 		AH->currWithOids = te->withOids;
3214 	}
3215 }
3216 
3217 
3218 /*
3219  * Issue the commands to select the specified schema as the current schema
3220  * in the target database.
3221  */
3222 static void
_selectOutputSchema(ArchiveHandle * AH,const char * schemaName)3223 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3224 {
3225 	PQExpBuffer qry;
3226 
3227 	/*
3228 	 * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3229 	 * that search_path rather than switching to entry-specific paths.
3230 	 * Otherwise, it's an old archive that will not restore correctly unless
3231 	 * we set the search_path as it's expecting.
3232 	 */
3233 	if (AH->public.searchpath)
3234 		return;
3235 
3236 	if (!schemaName || *schemaName == '\0' ||
3237 		(AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3238 		return;					/* no need to do anything */
3239 
3240 	qry = createPQExpBuffer();
3241 
3242 	appendPQExpBuffer(qry, "SET search_path = %s",
3243 					  fmtId(schemaName));
3244 	if (strcmp(schemaName, "pg_catalog") != 0)
3245 		appendPQExpBufferStr(qry, ", pg_catalog");
3246 
3247 	if (RestoringToDB(AH))
3248 	{
3249 		PGresult   *res;
3250 
3251 		res = PQexec(AH->connection, qry->data);
3252 
3253 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3254 			warn_or_exit_horribly(AH, modulename,
3255 								  "could not set search_path to \"%s\": %s",
3256 								  schemaName, PQerrorMessage(AH->connection));
3257 
3258 		PQclear(res);
3259 	}
3260 	else
3261 		ahprintf(AH, "%s;\n\n", qry->data);
3262 
3263 	if (AH->currSchema)
3264 		free(AH->currSchema);
3265 	AH->currSchema = pg_strdup(schemaName);
3266 
3267 	destroyPQExpBuffer(qry);
3268 }
3269 
3270 /*
3271  * Issue the commands to select the specified tablespace as the current one
3272  * in the target database.
3273  */
3274 static void
_selectTablespace(ArchiveHandle * AH,const char * tablespace)3275 _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3276 {
3277 	RestoreOptions *ropt = AH->public.ropt;
3278 	PQExpBuffer qry;
3279 	const char *want,
3280 			   *have;
3281 
3282 	/* do nothing in --no-tablespaces mode */
3283 	if (ropt->noTablespace)
3284 		return;
3285 
3286 	have = AH->currTablespace;
3287 	want = tablespace;
3288 
3289 	/* no need to do anything for non-tablespace object */
3290 	if (!want)
3291 		return;
3292 
3293 	if (have && strcmp(want, have) == 0)
3294 		return;					/* no need to do anything */
3295 
3296 	qry = createPQExpBuffer();
3297 
3298 	if (strcmp(want, "") == 0)
3299 	{
3300 		/* We want the tablespace to be the database's default */
3301 		appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3302 	}
3303 	else
3304 	{
3305 		/* We want an explicit tablespace */
3306 		appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3307 	}
3308 
3309 	if (RestoringToDB(AH))
3310 	{
3311 		PGresult   *res;
3312 
3313 		res = PQexec(AH->connection, qry->data);
3314 
3315 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3316 			warn_or_exit_horribly(AH, modulename,
3317 								  "could not set default_tablespace to %s: %s",
3318 								  fmtId(want), PQerrorMessage(AH->connection));
3319 
3320 		PQclear(res);
3321 	}
3322 	else
3323 		ahprintf(AH, "%s;\n\n", qry->data);
3324 
3325 	if (AH->currTablespace)
3326 		free(AH->currTablespace);
3327 	AH->currTablespace = pg_strdup(want);
3328 
3329 	destroyPQExpBuffer(qry);
3330 }
3331 
3332 /*
3333  * Extract an object description for a TOC entry, and append it to buf.
3334  *
3335  * This is used for ALTER ... OWNER TO.
3336  */
3337 static void
_getObjectDescription(PQExpBuffer buf,TocEntry * te,ArchiveHandle * AH)3338 _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
3339 {
3340 	const char *type = te->desc;
3341 
3342 	/* Use ALTER TABLE for views and sequences */
3343 	if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
3344 		strcmp(type, "MATERIALIZED VIEW") == 0)
3345 		type = "TABLE";
3346 
3347 	/* objects that don't require special decoration */
3348 	if (strcmp(type, "COLLATION") == 0 ||
3349 		strcmp(type, "CONVERSION") == 0 ||
3350 		strcmp(type, "DOMAIN") == 0 ||
3351 		strcmp(type, "TABLE") == 0 ||
3352 		strcmp(type, "TYPE") == 0 ||
3353 		strcmp(type, "FOREIGN TABLE") == 0 ||
3354 		strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3355 		strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3356 		strcmp(type, "STATISTICS") == 0 ||
3357 	/* non-schema-specified objects */
3358 		strcmp(type, "DATABASE") == 0 ||
3359 		strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3360 		strcmp(type, "SCHEMA") == 0 ||
3361 		strcmp(type, "EVENT TRIGGER") == 0 ||
3362 		strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3363 		strcmp(type, "SERVER") == 0 ||
3364 		strcmp(type, "PUBLICATION") == 0 ||
3365 		strcmp(type, "SUBSCRIPTION") == 0 ||
3366 		strcmp(type, "USER MAPPING") == 0)
3367 	{
3368 		appendPQExpBuffer(buf, "%s ", type);
3369 		if (te->namespace && *te->namespace)
3370 			appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3371 		appendPQExpBufferStr(buf, fmtId(te->tag));
3372 		return;
3373 	}
3374 
3375 	/* BLOBs just have a name, but it's numeric so must not use fmtId */
3376 	if (strcmp(type, "BLOB") == 0)
3377 	{
3378 		appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3379 		return;
3380 	}
3381 
3382 	/*
3383 	 * These object types require additional decoration.  Fortunately, the
3384 	 * information needed is exactly what's in the DROP command.
3385 	 */
3386 	if (strcmp(type, "AGGREGATE") == 0 ||
3387 		strcmp(type, "FUNCTION") == 0 ||
3388 		strcmp(type, "OPERATOR") == 0 ||
3389 		strcmp(type, "OPERATOR CLASS") == 0 ||
3390 		strcmp(type, "OPERATOR FAMILY") == 0)
3391 	{
3392 		/* Chop "DROP " off the front and make a modifiable copy */
3393 		char	   *first = pg_strdup(te->dropStmt + 5);
3394 		char	   *last;
3395 
3396 		/* point to last character in string */
3397 		last = first + strlen(first) - 1;
3398 
3399 		/* Strip off any ';' or '\n' at the end */
3400 		while (last >= first && (*last == '\n' || *last == ';'))
3401 			last--;
3402 		*(last + 1) = '\0';
3403 
3404 		appendPQExpBufferStr(buf, first);
3405 
3406 		free(first);
3407 		return;
3408 	}
3409 
3410 	write_msg(modulename, "WARNING: don't know how to set owner for object type \"%s\"\n",
3411 			  type);
3412 }
3413 
3414 /*
3415  * Emit the SQL commands to create the object represented by a TOC entry
3416  *
3417  * This now also includes issuing an ALTER OWNER command to restore the
3418  * object's ownership, if wanted.  But note that the object's permissions
3419  * will remain at default, until the matching ACL TOC entry is restored.
3420  */
3421 static void
_printTocEntry(ArchiveHandle * AH,TocEntry * te,bool isData)3422 _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3423 {
3424 	RestoreOptions *ropt = AH->public.ropt;
3425 
3426 	/*
3427 	 * Avoid dumping the public schema, as it will already be created ...
3428 	 * unless we are using --clean mode (and *not* --create mode), in which
3429 	 * case we've previously issued a DROP for it so we'd better recreate it.
3430 	 *
3431 	 * Likewise for its comment, if any.  (We could try issuing the COMMENT
3432 	 * command anyway; but it'd fail if the restore is done as non-super-user,
3433 	 * so let's not.)
3434 	 *
3435 	 * XXX it looks pretty ugly to hard-wire the public schema like this, but
3436 	 * it sits in a sort of no-mans-land between being a system object and a
3437 	 * user object, so it really is special in a way.
3438 	 */
3439 	if (!(ropt->dropSchema && !ropt->createDB))
3440 	{
3441 		if (strcmp(te->desc, "SCHEMA") == 0 &&
3442 			strcmp(te->tag, "public") == 0)
3443 			return;
3444 		if (strcmp(te->desc, "COMMENT") == 0 &&
3445 			strcmp(te->tag, "SCHEMA public") == 0)
3446 			return;
3447 	}
3448 
3449 	/* Select owner, schema, and tablespace as necessary */
3450 	_becomeOwner(AH, te);
3451 	_selectOutputSchema(AH, te->namespace);
3452 	_selectTablespace(AH, te->tablespace);
3453 
3454 	/* Set up OID mode too */
3455 	if (strcmp(te->desc, "TABLE") == 0)
3456 		_setWithOids(AH, te);
3457 
3458 	/* Emit header comment for item */
3459 	if (!AH->noTocComments)
3460 	{
3461 		const char *pfx;
3462 		char	   *sanitized_name;
3463 		char	   *sanitized_schema;
3464 		char	   *sanitized_owner;
3465 
3466 		if (isData)
3467 			pfx = "Data for ";
3468 		else
3469 			pfx = "";
3470 
3471 		ahprintf(AH, "--\n");
3472 		if (AH->public.verbose)
3473 		{
3474 			ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3475 					 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3476 			if (te->nDeps > 0)
3477 			{
3478 				int			i;
3479 
3480 				ahprintf(AH, "-- Dependencies:");
3481 				for (i = 0; i < te->nDeps; i++)
3482 					ahprintf(AH, " %d", te->dependencies[i]);
3483 				ahprintf(AH, "\n");
3484 			}
3485 		}
3486 
3487 		/*
3488 		 * Zap any line endings embedded in user-supplied fields, to prevent
3489 		 * corruption of the dump (which could, in the worst case, present an
3490 		 * SQL injection vulnerability if someone were to incautiously load a
3491 		 * dump containing objects with maliciously crafted names).
3492 		 */
3493 		sanitized_name = replace_line_endings(te->tag);
3494 		if (te->namespace)
3495 			sanitized_schema = replace_line_endings(te->namespace);
3496 		else
3497 			sanitized_schema = pg_strdup("-");
3498 		if (!ropt->noOwner)
3499 			sanitized_owner = replace_line_endings(te->owner);
3500 		else
3501 			sanitized_owner = pg_strdup("-");
3502 
3503 		ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3504 				 pfx, sanitized_name, te->desc, sanitized_schema,
3505 				 sanitized_owner);
3506 
3507 		free(sanitized_name);
3508 		free(sanitized_schema);
3509 		free(sanitized_owner);
3510 
3511 		if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3512 		{
3513 			char	   *sanitized_tablespace;
3514 
3515 			sanitized_tablespace = replace_line_endings(te->tablespace);
3516 			ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3517 			free(sanitized_tablespace);
3518 		}
3519 		ahprintf(AH, "\n");
3520 
3521 		if (AH->PrintExtraTocPtr != NULL)
3522 			(*AH->PrintExtraTocPtr) (AH, te);
3523 		ahprintf(AH, "--\n\n");
3524 	}
3525 
3526 	/*
3527 	 * Actually print the definition.
3528 	 *
3529 	 * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
3530 	 * versions put into CREATE SCHEMA.  We have to do this when --no-owner
3531 	 * mode is selected.  This is ugly, but I see no other good way ...
3532 	 */
3533 	if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
3534 	{
3535 		ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3536 	}
3537 	else
3538 	{
3539 		if (strlen(te->defn) > 0)
3540 			ahprintf(AH, "%s\n\n", te->defn);
3541 	}
3542 
3543 	/*
3544 	 * If we aren't using SET SESSION AUTH to determine ownership, we must
3545 	 * instead issue an ALTER OWNER command.  We assume that anything without
3546 	 * a DROP command is not a separately ownable object.  All the categories
3547 	 * with DROP commands must appear in one list or the other.
3548 	 */
3549 	if (!ropt->noOwner && !ropt->use_setsessauth &&
3550 		strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
3551 	{
3552 		if (strcmp(te->desc, "AGGREGATE") == 0 ||
3553 			strcmp(te->desc, "BLOB") == 0 ||
3554 			strcmp(te->desc, "COLLATION") == 0 ||
3555 			strcmp(te->desc, "CONVERSION") == 0 ||
3556 			strcmp(te->desc, "DATABASE") == 0 ||
3557 			strcmp(te->desc, "DOMAIN") == 0 ||
3558 			strcmp(te->desc, "FUNCTION") == 0 ||
3559 			strcmp(te->desc, "OPERATOR") == 0 ||
3560 			strcmp(te->desc, "OPERATOR CLASS") == 0 ||
3561 			strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
3562 			strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
3563 			strcmp(te->desc, "SCHEMA") == 0 ||
3564 			strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3565 			strcmp(te->desc, "TABLE") == 0 ||
3566 			strcmp(te->desc, "TYPE") == 0 ||
3567 			strcmp(te->desc, "VIEW") == 0 ||
3568 			strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3569 			strcmp(te->desc, "SEQUENCE") == 0 ||
3570 			strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3571 			strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
3572 			strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
3573 			strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
3574 			strcmp(te->desc, "SERVER") == 0 ||
3575 			strcmp(te->desc, "STATISTICS") == 0 ||
3576 			strcmp(te->desc, "PUBLICATION") == 0 ||
3577 			strcmp(te->desc, "SUBSCRIPTION") == 0)
3578 		{
3579 			PQExpBuffer temp = createPQExpBuffer();
3580 
3581 			appendPQExpBufferStr(temp, "ALTER ");
3582 			_getObjectDescription(temp, te, AH);
3583 			appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
3584 			ahprintf(AH, "%s\n\n", temp->data);
3585 			destroyPQExpBuffer(temp);
3586 		}
3587 		else if (strcmp(te->desc, "CAST") == 0 ||
3588 				 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3589 				 strcmp(te->desc, "CONSTRAINT") == 0 ||
3590 				 strcmp(te->desc, "DEFAULT") == 0 ||
3591 				 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3592 				 strcmp(te->desc, "INDEX") == 0 ||
3593 				 strcmp(te->desc, "RULE") == 0 ||
3594 				 strcmp(te->desc, "TRIGGER") == 0 ||
3595 				 strcmp(te->desc, "ROW SECURITY") == 0 ||
3596 				 strcmp(te->desc, "POLICY") == 0 ||
3597 				 strcmp(te->desc, "USER MAPPING") == 0)
3598 		{
3599 			/* these object types don't have separate owners */
3600 		}
3601 		else
3602 		{
3603 			write_msg(modulename, "WARNING: don't know how to set owner for object type \"%s\"\n",
3604 					  te->desc);
3605 		}
3606 	}
3607 
3608 	/*
3609 	 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3610 	 * commands, so we can no longer assume we know the current auth setting.
3611 	 */
3612 	if (_tocEntryIsACL(te))
3613 	{
3614 		if (AH->currUser)
3615 			free(AH->currUser);
3616 		AH->currUser = NULL;
3617 	}
3618 }
3619 
3620 /*
3621  * Sanitize a string to be included in an SQL comment or TOC listing,
3622  * by replacing any newlines with spaces.
3623  * The result is a freshly malloc'd string.
3624  */
3625 static char *
replace_line_endings(const char * str)3626 replace_line_endings(const char *str)
3627 {
3628 	char	   *result;
3629 	char	   *s;
3630 
3631 	result = pg_strdup(str);
3632 
3633 	for (s = result; *s != '\0'; s++)
3634 	{
3635 		if (*s == '\n' || *s == '\r')
3636 			*s = ' ';
3637 	}
3638 
3639 	return result;
3640 }
3641 
3642 /*
3643  * Write the file header for a custom-format archive
3644  */
3645 void
WriteHead(ArchiveHandle * AH)3646 WriteHead(ArchiveHandle *AH)
3647 {
3648 	struct tm	crtm;
3649 
3650 	(*AH->WriteBufPtr) (AH, "PGDMP", 5);	/* Magic code */
3651 	(*AH->WriteBytePtr) (AH, ARCHIVE_MAJOR(AH->version));
3652 	(*AH->WriteBytePtr) (AH, ARCHIVE_MINOR(AH->version));
3653 	(*AH->WriteBytePtr) (AH, ARCHIVE_REV(AH->version));
3654 	(*AH->WriteBytePtr) (AH, AH->intSize);
3655 	(*AH->WriteBytePtr) (AH, AH->offSize);
3656 	(*AH->WriteBytePtr) (AH, AH->format);
3657 	WriteInt(AH, AH->compression);
3658 	crtm = *localtime(&AH->createDate);
3659 	WriteInt(AH, crtm.tm_sec);
3660 	WriteInt(AH, crtm.tm_min);
3661 	WriteInt(AH, crtm.tm_hour);
3662 	WriteInt(AH, crtm.tm_mday);
3663 	WriteInt(AH, crtm.tm_mon);
3664 	WriteInt(AH, crtm.tm_year);
3665 	WriteInt(AH, crtm.tm_isdst);
3666 	WriteStr(AH, PQdb(AH->connection));
3667 	WriteStr(AH, AH->public.remoteVersionStr);
3668 	WriteStr(AH, PG_VERSION);
3669 }
3670 
3671 void
ReadHead(ArchiveHandle * AH)3672 ReadHead(ArchiveHandle *AH)
3673 {
3674 	char		vmaj,
3675 				vmin,
3676 				vrev;
3677 	int			fmt;
3678 
3679 	/*
3680 	 * If we haven't already read the header, do so.
3681 	 *
3682 	 * NB: this code must agree with _discoverArchiveFormat().  Maybe find a
3683 	 * way to unify the cases?
3684 	 */
3685 	if (!AH->readHeader)
3686 	{
3687 		char		tmpMag[7];
3688 
3689 		(*AH->ReadBufPtr) (AH, tmpMag, 5);
3690 
3691 		if (strncmp(tmpMag, "PGDMP", 5) != 0)
3692 			exit_horribly(modulename, "did not find magic string in file header\n");
3693 	}
3694 
3695 	vmaj = (*AH->ReadBytePtr) (AH);
3696 	vmin = (*AH->ReadBytePtr) (AH);
3697 
3698 	if (vmaj > 1 || (vmaj == 1 && vmin > 0))	/* Version > 1.0 */
3699 		vrev = (*AH->ReadBytePtr) (AH);
3700 	else
3701 		vrev = 0;
3702 
3703 	AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
3704 
3705 	if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3706 		exit_horribly(modulename, "unsupported version (%d.%d) in file header\n",
3707 					  vmaj, vmin);
3708 
3709 	AH->intSize = (*AH->ReadBytePtr) (AH);
3710 	if (AH->intSize > 32)
3711 		exit_horribly(modulename, "sanity check on integer size (%lu) failed\n",
3712 					  (unsigned long) AH->intSize);
3713 
3714 	if (AH->intSize > sizeof(int))
3715 		write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
3716 
3717 	if (AH->version >= K_VERS_1_7)
3718 		AH->offSize = (*AH->ReadBytePtr) (AH);
3719 	else
3720 		AH->offSize = AH->intSize;
3721 
3722 	fmt = (*AH->ReadBytePtr) (AH);
3723 
3724 	if (AH->format != fmt)
3725 		exit_horribly(modulename, "expected format (%d) differs from format found in file (%d)\n",
3726 					  AH->format, fmt);
3727 
3728 	if (AH->version >= K_VERS_1_2)
3729 	{
3730 		if (AH->version < K_VERS_1_4)
3731 			AH->compression = (*AH->ReadBytePtr) (AH);
3732 		else
3733 			AH->compression = ReadInt(AH);
3734 	}
3735 	else
3736 		AH->compression = Z_DEFAULT_COMPRESSION;
3737 
3738 #ifndef HAVE_LIBZ
3739 	if (AH->compression != 0)
3740 		write_msg(modulename, "WARNING: archive is compressed, but this installation does not support compression -- no data will be available\n");
3741 #endif
3742 
3743 	if (AH->version >= K_VERS_1_4)
3744 	{
3745 		struct tm	crtm;
3746 
3747 		crtm.tm_sec = ReadInt(AH);
3748 		crtm.tm_min = ReadInt(AH);
3749 		crtm.tm_hour = ReadInt(AH);
3750 		crtm.tm_mday = ReadInt(AH);
3751 		crtm.tm_mon = ReadInt(AH);
3752 		crtm.tm_year = ReadInt(AH);
3753 		crtm.tm_isdst = ReadInt(AH);
3754 
3755 		/*
3756 		 * Newer versions of glibc have mktime() report failure if tm_isdst is
3757 		 * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
3758 		 * TZ=UTC.  This is problematic when restoring an archive under a
3759 		 * different timezone setting.  If we get a failure, try again with
3760 		 * tm_isdst set to -1 ("don't know").
3761 		 *
3762 		 * XXX with or without this hack, we reconstruct createDate
3763 		 * incorrectly when the prevailing timezone is different from
3764 		 * pg_dump's.  Next time we bump the archive version, we should flush
3765 		 * this representation and store a plain seconds-since-the-Epoch
3766 		 * timestamp instead.
3767 		 */
3768 		AH->createDate = mktime(&crtm);
3769 		if (AH->createDate == (time_t) -1)
3770 		{
3771 			crtm.tm_isdst = -1;
3772 			AH->createDate = mktime(&crtm);
3773 			if (AH->createDate == (time_t) -1)
3774 				write_msg(modulename,
3775 						  "WARNING: invalid creation date in header\n");
3776 		}
3777 	}
3778 
3779 	if (AH->version >= K_VERS_1_4)
3780 	{
3781 		AH->archdbname = ReadStr(AH);
3782 	}
3783 
3784 	if (AH->version >= K_VERS_1_10)
3785 	{
3786 		AH->archiveRemoteVersion = ReadStr(AH);
3787 		AH->archiveDumpVersion = ReadStr(AH);
3788 	}
3789 }
3790 
3791 
3792 /*
3793  * checkSeek
3794  *	  check to see if ftell/fseek can be performed.
3795  */
3796 bool
checkSeek(FILE * fp)3797 checkSeek(FILE *fp)
3798 {
3799 	pgoff_t		tpos;
3800 
3801 	/*
3802 	 * If pgoff_t is wider than long, we must have "real" fseeko and not an
3803 	 * emulation using fseek.  Otherwise report no seek capability.
3804 	 */
3805 #ifndef HAVE_FSEEKO
3806 	if (sizeof(pgoff_t) > sizeof(long))
3807 		return false;
3808 #endif
3809 
3810 	/* Check that ftello works on this file */
3811 	tpos = ftello(fp);
3812 	if (tpos < 0)
3813 		return false;
3814 
3815 	/*
3816 	 * Check that fseeko(SEEK_SET) works, too.  NB: we used to try to test
3817 	 * this with fseeko(fp, 0, SEEK_CUR).  But some platforms treat that as a
3818 	 * successful no-op even on files that are otherwise unseekable.
3819 	 */
3820 	if (fseeko(fp, tpos, SEEK_SET) != 0)
3821 		return false;
3822 
3823 	return true;
3824 }
3825 
3826 
3827 /*
3828  * dumpTimestamp
3829  */
3830 static void
dumpTimestamp(ArchiveHandle * AH,const char * msg,time_t tim)3831 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3832 {
3833 	char		buf[64];
3834 
3835 	if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3836 		ahprintf(AH, "-- %s %s\n\n", msg, buf);
3837 }
3838 
3839 /*
3840  * Main engine for parallel restore.
3841  *
3842  * Parallel restore is done in three phases.  In this first phase,
3843  * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
3844  * processed in the RESTORE_PASS_MAIN pass.  (In practice, that's all
3845  * PRE_DATA items other than ACLs.)  Entries we can't process now are
3846  * added to the pending_list for later phases to deal with.
3847  */
3848 static void
restore_toc_entries_prefork(ArchiveHandle * AH,TocEntry * pending_list)3849 restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
3850 {
3851 	bool		skipped_some;
3852 	TocEntry   *next_work_item;
3853 
3854 	ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
3855 
3856 	/* Adjust dependency information */
3857 	fix_dependencies(AH);
3858 
3859 	/*
3860 	 * Do all the early stuff in a single connection in the parent. There's no
3861 	 * great point in running it in parallel, in fact it will actually run
3862 	 * faster in a single connection because we avoid all the connection and
3863 	 * setup overhead.  Also, pre-9.2 pg_dump versions were not very good
3864 	 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
3865 	 * not risk trying to process them out-of-order.
3866 	 *
3867 	 * Stuff that we can't do immediately gets added to the pending_list.
3868 	 * Note: we don't yet filter out entries that aren't going to be restored.
3869 	 * They might participate in dependency chains connecting entries that
3870 	 * should be restored, so we treat them as live until we actually process
3871 	 * them.
3872 	 *
3873 	 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
3874 	 * before DATA items, and all DATA items before POST_DATA items.  That is
3875 	 * not certain to be true in older archives, though, and in any case use
3876 	 * of a list file would destroy that ordering (cf. SortTocFromFile).  So
3877 	 * this loop cannot assume that it holds.
3878 	 */
3879 	AH->restorePass = RESTORE_PASS_MAIN;
3880 	skipped_some = false;
3881 	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3882 	{
3883 		bool		do_now = true;
3884 
3885 		if (next_work_item->section != SECTION_PRE_DATA)
3886 		{
3887 			/* DATA and POST_DATA items are just ignored for now */
3888 			if (next_work_item->section == SECTION_DATA ||
3889 				next_work_item->section == SECTION_POST_DATA)
3890 			{
3891 				do_now = false;
3892 				skipped_some = true;
3893 			}
3894 			else
3895 			{
3896 				/*
3897 				 * SECTION_NONE items, such as comments, can be processed now
3898 				 * if we are still in the PRE_DATA part of the archive.  Once
3899 				 * we've skipped any items, we have to consider whether the
3900 				 * comment's dependencies are satisfied, so skip it for now.
3901 				 */
3902 				if (skipped_some)
3903 					do_now = false;
3904 			}
3905 		}
3906 
3907 		/*
3908 		 * Also skip items that need to be forced into later passes.  We need
3909 		 * not set skipped_some in this case, since by assumption no main-pass
3910 		 * items could depend on these.
3911 		 */
3912 		if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
3913 			do_now = false;
3914 
3915 		if (do_now)
3916 		{
3917 			/* OK, restore the item and update its dependencies */
3918 			ahlog(AH, 1, "processing item %d %s %s\n",
3919 				  next_work_item->dumpId,
3920 				  next_work_item->desc, next_work_item->tag);
3921 
3922 			(void) restore_toc_entry(AH, next_work_item, false);
3923 
3924 			/* Reduce dependencies, but don't move anything to ready_list */
3925 			reduce_dependencies(AH, next_work_item, NULL);
3926 		}
3927 		else
3928 		{
3929 			/* Nope, so add it to pending_list */
3930 			par_list_append(pending_list, next_work_item);
3931 		}
3932 	}
3933 
3934 	/*
3935 	 * Now close parent connection in prep for parallel steps.  We do this
3936 	 * mainly to ensure that we don't exceed the specified number of parallel
3937 	 * connections.
3938 	 */
3939 	DisconnectDatabase(&AH->public);
3940 
3941 	/* blow away any transient state from the old connection */
3942 	if (AH->currUser)
3943 		free(AH->currUser);
3944 	AH->currUser = NULL;
3945 	if (AH->currSchema)
3946 		free(AH->currSchema);
3947 	AH->currSchema = NULL;
3948 	if (AH->currTablespace)
3949 		free(AH->currTablespace);
3950 	AH->currTablespace = NULL;
3951 	AH->currWithOids = -1;
3952 }
3953 
3954 /*
3955  * Main engine for parallel restore.
3956  *
3957  * Parallel restore is done in three phases.  In this second phase,
3958  * we process entries by dispatching them to parallel worker children
3959  * (processes on Unix, threads on Windows), each of which connects
3960  * separately to the database.  Inter-entry dependencies are respected,
3961  * and so is the RestorePass multi-pass structure.  When we can no longer
3962  * make any entries ready to process, we exit.  Normally, there will be
3963  * nothing left to do; but if there is, the third phase will mop up.
3964  */
3965 static void
restore_toc_entries_parallel(ArchiveHandle * AH,ParallelState * pstate,TocEntry * pending_list)3966 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
3967 							 TocEntry *pending_list)
3968 {
3969 	TocEntry	ready_list;
3970 	TocEntry   *next_work_item;
3971 
3972 	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
3973 
3974 	/*
3975 	 * The pending_list contains all items that we need to restore.  Move all
3976 	 * items that are available to process immediately into the ready_list.
3977 	 * After this setup, the pending list is everything that needs to be done
3978 	 * but is blocked by one or more dependencies, while the ready list
3979 	 * contains items that have no remaining dependencies and are OK to
3980 	 * process in the current restore pass.
3981 	 */
3982 	par_list_header_init(&ready_list);
3983 	AH->restorePass = RESTORE_PASS_MAIN;
3984 	move_to_ready_list(pending_list, &ready_list, AH->restorePass);
3985 
3986 	/*
3987 	 * main parent loop
3988 	 *
3989 	 * Keep going until there is no worker still running AND there is no work
3990 	 * left to be done.  Note invariant: at top of loop, there should always
3991 	 * be at least one worker available to dispatch a job to.
3992 	 */
3993 	ahlog(AH, 1, "entering main parallel loop\n");
3994 
3995 	for (;;)
3996 	{
3997 		/* Look for an item ready to be dispatched to a worker */
3998 		next_work_item = get_next_work_item(AH, &ready_list, pstate);
3999 		if (next_work_item != NULL)
4000 		{
4001 			/* If not to be restored, don't waste time launching a worker */
4002 			if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4003 			{
4004 				ahlog(AH, 1, "skipping item %d %s %s\n",
4005 					  next_work_item->dumpId,
4006 					  next_work_item->desc, next_work_item->tag);
4007 				/* Drop it from ready_list, and update its dependencies */
4008 				par_list_remove(next_work_item);
4009 				reduce_dependencies(AH, next_work_item, &ready_list);
4010 				/* Loop around to see if anything else can be dispatched */
4011 				continue;
4012 			}
4013 
4014 			ahlog(AH, 1, "launching item %d %s %s\n",
4015 				  next_work_item->dumpId,
4016 				  next_work_item->desc, next_work_item->tag);
4017 
4018 			/* Remove it from ready_list, and dispatch to some worker */
4019 			par_list_remove(next_work_item);
4020 
4021 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4022 								   mark_restore_job_done, &ready_list);
4023 		}
4024 		else if (IsEveryWorkerIdle(pstate))
4025 		{
4026 			/*
4027 			 * Nothing is ready and no worker is running, so we're done with
4028 			 * the current pass or maybe with the whole process.
4029 			 */
4030 			if (AH->restorePass == RESTORE_PASS_LAST)
4031 				break;			/* No more parallel processing is possible */
4032 
4033 			/* Advance to next restore pass */
4034 			AH->restorePass++;
4035 			/* That probably allows some stuff to be made ready */
4036 			move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4037 			/* Loop around to see if anything's now ready */
4038 			continue;
4039 		}
4040 		else
4041 		{
4042 			/*
4043 			 * We have nothing ready, but at least one child is working, so
4044 			 * wait for some subjob to finish.
4045 			 */
4046 		}
4047 
4048 		/*
4049 		 * Before dispatching another job, check to see if anything has
4050 		 * finished.  We should check every time through the loop so as to
4051 		 * reduce dependencies as soon as possible.  If we were unable to
4052 		 * dispatch any job this time through, wait until some worker finishes
4053 		 * (and, hopefully, unblocks some pending item).  If we did dispatch
4054 		 * something, continue as soon as there's at least one idle worker.
4055 		 * Note that in either case, there's guaranteed to be at least one
4056 		 * idle worker when we return to the top of the loop.  This ensures we
4057 		 * won't block inside DispatchJobForTocEntry, which would be
4058 		 * undesirable: we'd rather postpone dispatching until we see what's
4059 		 * been unblocked by finished jobs.
4060 		 */
4061 		WaitForWorkers(AH, pstate,
4062 					   next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4063 	}
4064 
4065 	/* There should now be nothing in ready_list. */
4066 	Assert(ready_list.par_next == &ready_list);
4067 
4068 	ahlog(AH, 1, "finished main parallel loop\n");
4069 }
4070 
4071 /*
4072  * Main engine for parallel restore.
4073  *
4074  * Parallel restore is done in three phases.  In this third phase,
4075  * we mop up any remaining TOC entries by processing them serially.
4076  * This phase normally should have nothing to do, but if we've somehow
4077  * gotten stuck due to circular dependencies or some such, this provides
4078  * at least some chance of completing the restore successfully.
4079  */
4080 static void
restore_toc_entries_postfork(ArchiveHandle * AH,TocEntry * pending_list)4081 restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4082 {
4083 	RestoreOptions *ropt = AH->public.ropt;
4084 	TocEntry   *te;
4085 
4086 	ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
4087 
4088 	/*
4089 	 * Now reconnect the single parent connection.
4090 	 */
4091 	ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4092 
4093 	/* re-establish fixed state */
4094 	_doSetFixedOutputState(AH);
4095 
4096 	/*
4097 	 * Make sure there is no work left due to, say, circular dependencies, or
4098 	 * some other pathological condition.  If so, do it in the single parent
4099 	 * connection.  We don't sweat about RestorePass ordering; it's likely we
4100 	 * already violated that.
4101 	 */
4102 	for (te = pending_list->par_next; te != pending_list; te = te->par_next)
4103 	{
4104 		ahlog(AH, 1, "processing missed item %d %s %s\n",
4105 			  te->dumpId, te->desc, te->tag);
4106 		(void) restore_toc_entry(AH, te, false);
4107 	}
4108 }
4109 
4110 /*
4111  * Check if te1 has an exclusive lock requirement for an item that te2 also
4112  * requires, whether or not te2's requirement is for an exclusive lock.
4113  */
4114 static bool
has_lock_conflicts(TocEntry * te1,TocEntry * te2)4115 has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4116 {
4117 	int			j,
4118 				k;
4119 
4120 	for (j = 0; j < te1->nLockDeps; j++)
4121 	{
4122 		for (k = 0; k < te2->nDeps; k++)
4123 		{
4124 			if (te1->lockDeps[j] == te2->dependencies[k])
4125 				return true;
4126 		}
4127 	}
4128 	return false;
4129 }
4130 
4131 
4132 /*
4133  * Initialize the header of a parallel-processing list.
4134  *
4135  * These are circular lists with a dummy TocEntry as header, just like the
4136  * main TOC list; but we use separate list links so that an entry can be in
4137  * the main TOC list as well as in a parallel-processing list.
4138  */
4139 static void
par_list_header_init(TocEntry * l)4140 par_list_header_init(TocEntry *l)
4141 {
4142 	l->par_prev = l->par_next = l;
4143 }
4144 
4145 /* Append te to the end of the parallel-processing list headed by l */
4146 static void
par_list_append(TocEntry * l,TocEntry * te)4147 par_list_append(TocEntry *l, TocEntry *te)
4148 {
4149 	te->par_prev = l->par_prev;
4150 	l->par_prev->par_next = te;
4151 	l->par_prev = te;
4152 	te->par_next = l;
4153 }
4154 
4155 /* Remove te from whatever parallel-processing list it's in */
4156 static void
par_list_remove(TocEntry * te)4157 par_list_remove(TocEntry *te)
4158 {
4159 	te->par_prev->par_next = te->par_next;
4160 	te->par_next->par_prev = te->par_prev;
4161 	te->par_prev = NULL;
4162 	te->par_next = NULL;
4163 }
4164 
4165 
4166 /*
4167  * Move all immediately-ready items from pending_list to ready_list.
4168  *
4169  * Items are considered ready if they have no remaining dependencies and
4170  * they belong in the current restore pass.  (See also reduce_dependencies,
4171  * which applies the same logic one-at-a-time.)
4172  */
4173 static void
move_to_ready_list(TocEntry * pending_list,TocEntry * ready_list,RestorePass pass)4174 move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
4175 				   RestorePass pass)
4176 {
4177 	TocEntry   *te;
4178 	TocEntry   *next_te;
4179 
4180 	for (te = pending_list->par_next; te != pending_list; te = next_te)
4181 	{
4182 		/* must save list link before possibly moving te to other list */
4183 		next_te = te->par_next;
4184 
4185 		if (te->depCount == 0 &&
4186 			_tocEntryRestorePass(te) == pass)
4187 		{
4188 			/* Remove it from pending_list ... */
4189 			par_list_remove(te);
4190 			/* ... and add to ready_list */
4191 			par_list_append(ready_list, te);
4192 		}
4193 	}
4194 }
4195 
4196 /*
4197  * Find the next work item (if any) that is capable of being run now.
4198  *
4199  * To qualify, the item must have no remaining dependencies
4200  * and no requirements for locks that are incompatible with
4201  * items currently running.  Items in the ready_list are known to have
4202  * no remaining dependencies, but we have to check for lock conflicts.
4203  *
4204  * Note that the returned item has *not* been removed from ready_list.
4205  * The caller must do that after successfully dispatching the item.
4206  *
4207  * pref_non_data is for an alternative selection algorithm that gives
4208  * preference to non-data items if there is already a data load running.
4209  * It is currently disabled.
4210  */
4211 static TocEntry *
get_next_work_item(ArchiveHandle * AH,TocEntry * ready_list,ParallelState * pstate)4212 get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
4213 				   ParallelState *pstate)
4214 {
4215 	bool		pref_non_data = false;	/* or get from AH->ropt */
4216 	TocEntry   *data_te = NULL;
4217 	TocEntry   *te;
4218 	int			i,
4219 				k;
4220 
4221 	/*
4222 	 * Bogus heuristics for pref_non_data
4223 	 */
4224 	if (pref_non_data)
4225 	{
4226 		int			count = 0;
4227 
4228 		for (k = 0; k < pstate->numWorkers; k++)
4229 		{
4230 			TocEntry   *running_te = pstate->te[k];
4231 
4232 			if (running_te != NULL &&
4233 				running_te->section == SECTION_DATA)
4234 				count++;
4235 		}
4236 		if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
4237 			pref_non_data = false;
4238 	}
4239 
4240 	/*
4241 	 * Search the ready_list until we find a suitable item.
4242 	 */
4243 	for (te = ready_list->par_next; te != ready_list; te = te->par_next)
4244 	{
4245 		bool		conflicts = false;
4246 
4247 		/*
4248 		 * Check to see if the item would need exclusive lock on something
4249 		 * that a currently running item also needs lock on, or vice versa. If
4250 		 * so, we don't want to schedule them together.
4251 		 */
4252 		for (i = 0; i < pstate->numWorkers; i++)
4253 		{
4254 			TocEntry   *running_te = pstate->te[i];
4255 
4256 			if (running_te == NULL)
4257 				continue;
4258 			if (has_lock_conflicts(te, running_te) ||
4259 				has_lock_conflicts(running_te, te))
4260 			{
4261 				conflicts = true;
4262 				break;
4263 			}
4264 		}
4265 
4266 		if (conflicts)
4267 			continue;
4268 
4269 		if (pref_non_data && te->section == SECTION_DATA)
4270 		{
4271 			if (data_te == NULL)
4272 				data_te = te;
4273 			continue;
4274 		}
4275 
4276 		/* passed all tests, so this item can run */
4277 		return te;
4278 	}
4279 
4280 	if (data_te != NULL)
4281 		return data_te;
4282 
4283 	ahlog(AH, 2, "no item ready\n");
4284 	return NULL;
4285 }
4286 
4287 
4288 /*
4289  * Restore a single TOC item in parallel with others
4290  *
4291  * this is run in the worker, i.e. in a thread (Windows) or a separate process
4292  * (everything else). A worker process executes several such work items during
4293  * a parallel backup or restore. Once we terminate here and report back that
4294  * our work is finished, the master process will assign us a new work item.
4295  */
4296 int
parallel_restore(ArchiveHandle * AH,TocEntry * te)4297 parallel_restore(ArchiveHandle *AH, TocEntry *te)
4298 {
4299 	int			status;
4300 
4301 	Assert(AH->connection != NULL);
4302 
4303 	/* Count only errors associated with this TOC entry */
4304 	AH->public.n_errors = 0;
4305 
4306 	/* Restore the TOC item */
4307 	status = restore_toc_entry(AH, te, true);
4308 
4309 	return status;
4310 }
4311 
4312 
4313 /*
4314  * Callback function that's invoked in the master process after a step has
4315  * been parallel restored.
4316  *
4317  * Update status and reduce the dependency count of any dependent items.
4318  */
4319 static void
mark_restore_job_done(ArchiveHandle * AH,TocEntry * te,int status,void * callback_data)4320 mark_restore_job_done(ArchiveHandle *AH,
4321 					  TocEntry *te,
4322 					  int status,
4323 					  void *callback_data)
4324 {
4325 	TocEntry   *ready_list = (TocEntry *) callback_data;
4326 
4327 	ahlog(AH, 1, "finished item %d %s %s\n",
4328 		  te->dumpId, te->desc, te->tag);
4329 
4330 	if (status == WORKER_CREATE_DONE)
4331 		mark_create_done(AH, te);
4332 	else if (status == WORKER_INHIBIT_DATA)
4333 	{
4334 		inhibit_data_for_failed_table(AH, te);
4335 		AH->public.n_errors++;
4336 	}
4337 	else if (status == WORKER_IGNORED_ERRORS)
4338 		AH->public.n_errors++;
4339 	else if (status != 0)
4340 		exit_horribly(modulename, "worker process failed: exit code %d\n",
4341 					  status);
4342 
4343 	reduce_dependencies(AH, te, ready_list);
4344 }
4345 
4346 
4347 /*
4348  * Process the dependency information into a form useful for parallel restore.
4349  *
4350  * This function takes care of fixing up some missing or badly designed
4351  * dependencies, and then prepares subsidiary data structures that will be
4352  * used in the main parallel-restore logic, including:
4353  * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4354  * 2. We set up depCount fields that are the number of as-yet-unprocessed
4355  * dependencies for each TOC entry.
4356  *
4357  * We also identify locking dependencies so that we can avoid trying to
4358  * schedule conflicting items at the same time.
4359  */
4360 static void
fix_dependencies(ArchiveHandle * AH)4361 fix_dependencies(ArchiveHandle *AH)
4362 {
4363 	TocEntry   *te;
4364 	int			i;
4365 
4366 	/*
4367 	 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4368 	 * items are marked as not being in any parallel-processing list.
4369 	 */
4370 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4371 	{
4372 		te->depCount = te->nDeps;
4373 		te->revDeps = NULL;
4374 		te->nRevDeps = 0;
4375 		te->par_prev = NULL;
4376 		te->par_next = NULL;
4377 	}
4378 
4379 	/*
4380 	 * POST_DATA items that are shown as depending on a table need to be
4381 	 * re-pointed to depend on that table's data, instead.  This ensures they
4382 	 * won't get scheduled until the data has been loaded.
4383 	 */
4384 	repoint_table_dependencies(AH);
4385 
4386 	/*
4387 	 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4388 	 * COMMENTS to BLOBS.  Cope.  (We assume there's only one BLOBS and only
4389 	 * one BLOB COMMENTS in such files.)
4390 	 */
4391 	if (AH->version < K_VERS_1_11)
4392 	{
4393 		for (te = AH->toc->next; te != AH->toc; te = te->next)
4394 		{
4395 			if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4396 			{
4397 				TocEntry   *te2;
4398 
4399 				for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4400 				{
4401 					if (strcmp(te2->desc, "BLOBS") == 0)
4402 					{
4403 						te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4404 						te->dependencies[0] = te2->dumpId;
4405 						te->nDeps++;
4406 						te->depCount++;
4407 						break;
4408 					}
4409 				}
4410 				break;
4411 			}
4412 		}
4413 	}
4414 
4415 	/*
4416 	 * At this point we start to build the revDeps reverse-dependency arrays,
4417 	 * so all changes of dependencies must be complete.
4418 	 */
4419 
4420 	/*
4421 	 * Count the incoming dependencies for each item.  Also, it is possible
4422 	 * that the dependencies list items that are not in the archive at all
4423 	 * (that should not happen in 9.2 and later, but is highly likely in older
4424 	 * archives).  Subtract such items from the depCounts.
4425 	 */
4426 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4427 	{
4428 		for (i = 0; i < te->nDeps; i++)
4429 		{
4430 			DumpId		depid = te->dependencies[i];
4431 
4432 			if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4433 				AH->tocsByDumpId[depid]->nRevDeps++;
4434 			else
4435 				te->depCount--;
4436 		}
4437 	}
4438 
4439 	/*
4440 	 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4441 	 * it as a counter below.
4442 	 */
4443 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4444 	{
4445 		if (te->nRevDeps > 0)
4446 			te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4447 		te->nRevDeps = 0;
4448 	}
4449 
4450 	/*
4451 	 * Build the revDeps[] arrays of incoming-dependency dumpIds.  This had
4452 	 * better agree with the loops above.
4453 	 */
4454 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4455 	{
4456 		for (i = 0; i < te->nDeps; i++)
4457 		{
4458 			DumpId		depid = te->dependencies[i];
4459 
4460 			if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4461 			{
4462 				TocEntry   *otherte = AH->tocsByDumpId[depid];
4463 
4464 				otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4465 			}
4466 		}
4467 	}
4468 
4469 	/*
4470 	 * Lastly, work out the locking dependencies.
4471 	 */
4472 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4473 	{
4474 		te->lockDeps = NULL;
4475 		te->nLockDeps = 0;
4476 		identify_locking_dependencies(AH, te);
4477 	}
4478 }
4479 
4480 /*
4481  * Change dependencies on table items to depend on table data items instead,
4482  * but only in POST_DATA items.
4483  */
4484 static void
repoint_table_dependencies(ArchiveHandle * AH)4485 repoint_table_dependencies(ArchiveHandle *AH)
4486 {
4487 	TocEntry   *te;
4488 	int			i;
4489 	DumpId		olddep;
4490 
4491 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4492 	{
4493 		if (te->section != SECTION_POST_DATA)
4494 			continue;
4495 		for (i = 0; i < te->nDeps; i++)
4496 		{
4497 			olddep = te->dependencies[i];
4498 			if (olddep <= AH->maxDumpId &&
4499 				AH->tableDataId[olddep] != 0)
4500 			{
4501 				te->dependencies[i] = AH->tableDataId[olddep];
4502 				ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
4503 					  te->dumpId, olddep, AH->tableDataId[olddep]);
4504 			}
4505 		}
4506 	}
4507 }
4508 
4509 /*
4510  * Identify which objects we'll need exclusive lock on in order to restore
4511  * the given TOC entry (*other* than the one identified by the TOC entry
4512  * itself).  Record their dump IDs in the entry's lockDeps[] array.
4513  */
4514 static void
identify_locking_dependencies(ArchiveHandle * AH,TocEntry * te)4515 identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4516 {
4517 	DumpId	   *lockids;
4518 	int			nlockids;
4519 	int			i;
4520 
4521 	/*
4522 	 * We only care about this for POST_DATA items.  PRE_DATA items are not
4523 	 * run in parallel, and DATA items are all independent by assumption.
4524 	 */
4525 	if (te->section != SECTION_POST_DATA)
4526 		return;
4527 
4528 	/* Quick exit if no dependencies at all */
4529 	if (te->nDeps == 0)
4530 		return;
4531 
4532 	/*
4533 	 * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4534 	 * and hence require exclusive lock.  However, we know that CREATE INDEX
4535 	 * does not.  (Maybe someday index-creating CONSTRAINTs will fall in that
4536 	 * category too ... but today is not that day.)
4537 	 */
4538 	if (strcmp(te->desc, "INDEX") == 0)
4539 		return;
4540 
4541 	/*
4542 	 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4543 	 * item listed among its dependencies.  Originally all of these would have
4544 	 * been TABLE items, but repoint_table_dependencies would have repointed
4545 	 * them to the TABLE DATA items if those are present (which they might not
4546 	 * be, eg in a schema-only dump).  Note that all of the entries we are
4547 	 * processing here are POST_DATA; otherwise there might be a significant
4548 	 * difference between a dependency on a table and a dependency on its
4549 	 * data, so that closer analysis would be needed here.
4550 	 */
4551 	lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4552 	nlockids = 0;
4553 	for (i = 0; i < te->nDeps; i++)
4554 	{
4555 		DumpId		depid = te->dependencies[i];
4556 
4557 		if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4558 			((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4559 			 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4560 			lockids[nlockids++] = depid;
4561 	}
4562 
4563 	if (nlockids == 0)
4564 	{
4565 		free(lockids);
4566 		return;
4567 	}
4568 
4569 	te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4570 	te->nLockDeps = nlockids;
4571 }
4572 
4573 /*
4574  * Remove the specified TOC entry from the depCounts of items that depend on
4575  * it, thereby possibly making them ready-to-run.  Any pending item that
4576  * becomes ready should be moved to the ready_list, if that's provided.
4577  */
4578 static void
reduce_dependencies(ArchiveHandle * AH,TocEntry * te,TocEntry * ready_list)4579 reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
4580 {
4581 	int			i;
4582 
4583 	ahlog(AH, 2, "reducing dependencies for %d\n", te->dumpId);
4584 
4585 	for (i = 0; i < te->nRevDeps; i++)
4586 	{
4587 		TocEntry   *otherte = AH->tocsByDumpId[te->revDeps[i]];
4588 
4589 		Assert(otherte->depCount > 0);
4590 		otherte->depCount--;
4591 
4592 		/*
4593 		 * It's ready if it has no remaining dependencies, and it belongs in
4594 		 * the current restore pass, and it is currently a member of the
4595 		 * pending list (that check is needed to prevent double restore in
4596 		 * some cases where a list-file forces out-of-order restoring).
4597 		 * However, if ready_list == NULL then caller doesn't want any list
4598 		 * memberships changed.
4599 		 */
4600 		if (otherte->depCount == 0 &&
4601 			_tocEntryRestorePass(otherte) == AH->restorePass &&
4602 			otherte->par_prev != NULL &&
4603 			ready_list != NULL)
4604 		{
4605 			/* Remove it from pending list ... */
4606 			par_list_remove(otherte);
4607 			/* ... and add to ready_list */
4608 			par_list_append(ready_list, otherte);
4609 		}
4610 	}
4611 }
4612 
4613 /*
4614  * Set the created flag on the DATA member corresponding to the given
4615  * TABLE member
4616  */
4617 static void
mark_create_done(ArchiveHandle * AH,TocEntry * te)4618 mark_create_done(ArchiveHandle *AH, TocEntry *te)
4619 {
4620 	if (AH->tableDataId[te->dumpId] != 0)
4621 	{
4622 		TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4623 
4624 		ted->created = true;
4625 	}
4626 }
4627 
4628 /*
4629  * Mark the DATA member corresponding to the given TABLE member
4630  * as not wanted
4631  */
4632 static void
inhibit_data_for_failed_table(ArchiveHandle * AH,TocEntry * te)4633 inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
4634 {
4635 	ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
4636 		  te->tag);
4637 
4638 	if (AH->tableDataId[te->dumpId] != 0)
4639 	{
4640 		TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4641 
4642 		ted->reqs = 0;
4643 	}
4644 }
4645 
4646 /*
4647  * Clone and de-clone routines used in parallel restoration.
4648  *
4649  * Enough of the structure is cloned to ensure that there is no
4650  * conflict between different threads each with their own clone.
4651  */
4652 ArchiveHandle *
CloneArchive(ArchiveHandle * AH)4653 CloneArchive(ArchiveHandle *AH)
4654 {
4655 	ArchiveHandle *clone;
4656 
4657 	/* Make a "flat" copy */
4658 	clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4659 	memcpy(clone, AH, sizeof(ArchiveHandle));
4660 
4661 	/* Handle format-independent fields */
4662 	memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4663 
4664 	/* The clone will have its own connection, so disregard connection state */
4665 	clone->connection = NULL;
4666 	clone->connCancel = NULL;
4667 	clone->currUser = NULL;
4668 	clone->currSchema = NULL;
4669 	clone->currTablespace = NULL;
4670 	clone->currWithOids = -1;
4671 
4672 	/* savedPassword must be local in case we change it while connecting */
4673 	if (clone->savedPassword)
4674 		clone->savedPassword = pg_strdup(clone->savedPassword);
4675 
4676 	/* clone has its own error count, too */
4677 	clone->public.n_errors = 0;
4678 
4679 	/*
4680 	 * Connect our new clone object to the database, using the same connection
4681 	 * parameters used for the original connection.
4682 	 */
4683 	ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
4684 
4685 	/* re-establish fixed state */
4686 	if (AH->mode == archModeRead)
4687 		_doSetFixedOutputState(clone);
4688 	/* in write case, setupDumpWorker will fix up connection state */
4689 
4690 	/* Let the format-specific code have a chance too */
4691 	(clone->ClonePtr) (clone);
4692 
4693 	Assert(clone->connection != NULL);
4694 	return clone;
4695 }
4696 
4697 /*
4698  * Release clone-local storage.
4699  *
4700  * Note: we assume any clone-local connection was already closed.
4701  */
4702 void
DeCloneArchive(ArchiveHandle * AH)4703 DeCloneArchive(ArchiveHandle *AH)
4704 {
4705 	/* Should not have an open database connection */
4706 	Assert(AH->connection == NULL);
4707 
4708 	/* Clear format-specific state */
4709 	(AH->DeClonePtr) (AH);
4710 
4711 	/* Clear state allocated by CloneArchive */
4712 	if (AH->sqlparse.curCmd)
4713 		destroyPQExpBuffer(AH->sqlparse.curCmd);
4714 
4715 	/* Clear any connection-local state */
4716 	if (AH->currUser)
4717 		free(AH->currUser);
4718 	if (AH->currSchema)
4719 		free(AH->currSchema);
4720 	if (AH->currTablespace)
4721 		free(AH->currTablespace);
4722 	if (AH->savedPassword)
4723 		free(AH->savedPassword);
4724 
4725 	free(AH);
4726 }
4727