1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  *	Private implementation of the archiver routines.
6  *
7  *	See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  *	Rights are granted to use this software in any way so long
11  *	as this notice is not removed.
12  *
13  *	The author is not responsible for loss or damages that may
14  *	result from its use.
15  *
16  *
17  * IDENTIFICATION
18  *		src/bin/pg_dump/pg_backup_archiver.c
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres_fe.h"
23 
24 #include "parallel.h"
25 #include "pg_backup_archiver.h"
26 #include "pg_backup_db.h"
27 #include "pg_backup_utils.h"
28 #include "dumputils.h"
29 #include "fe_utils/string_utils.h"
30 
31 #include <ctype.h>
32 #include <fcntl.h>
33 #include <unistd.h>
34 #include <sys/stat.h>
35 #include <sys/types.h>
36 #include <sys/wait.h>
37 
38 #ifdef WIN32
39 #include <io.h>
40 #endif
41 
42 #include "libpq/libpq-fs.h"
43 
44 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
45 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
46 
47 /* state needed to save/restore an archive's output target */
48 typedef struct _outputContext
49 {
50 	void	   *OF;
51 	int			gzOut;
52 } OutputContext;
53 
54 /* translator: this is a module name */
55 static const char *modulename = gettext_noop("archiver");
56 
57 
58 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
59 	 const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr);
60 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
61 					  ArchiveHandle *AH);
62 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
63 static char *replace_line_endings(const char *str);
64 static void _doSetFixedOutputState(ArchiveHandle *AH);
65 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
66 static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
67 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
68 static void _becomeUser(ArchiveHandle *AH, const char *user);
69 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
70 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
71 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
72 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
73 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
74 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
75 static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt);
76 static RestorePass _tocEntryRestorePass(TocEntry *te);
77 static bool _tocEntryIsACL(TocEntry *te);
78 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
79 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
80 static void buildTocEntryArrays(ArchiveHandle *AH);
81 static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
82 static int	_discoverArchiveFormat(ArchiveHandle *AH);
83 
84 static int	RestoringToDB(ArchiveHandle *AH);
85 static void dump_lo_buf(ArchiveHandle *AH);
86 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
87 static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
88 static OutputContext SaveOutput(ArchiveHandle *AH);
89 static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
90 
91 static int	restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
92 static void restore_toc_entries_prefork(ArchiveHandle *AH,
93 							TocEntry *pending_list);
94 static void restore_toc_entries_parallel(ArchiveHandle *AH,
95 							 ParallelState *pstate,
96 							 TocEntry *pending_list);
97 static void restore_toc_entries_postfork(ArchiveHandle *AH,
98 							 TocEntry *pending_list);
99 static void par_list_header_init(TocEntry *l);
100 static void par_list_append(TocEntry *l, TocEntry *te);
101 static void par_list_remove(TocEntry *te);
102 static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
103 				   RestorePass pass);
104 static TocEntry *get_next_work_item(ArchiveHandle *AH,
105 				   TocEntry *ready_list,
106 				   ParallelState *pstate);
107 static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
108 			   int worker, int status,
109 			   ParallelState *pstate);
110 static void fix_dependencies(ArchiveHandle *AH);
111 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
112 static void repoint_table_dependencies(ArchiveHandle *AH);
113 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
114 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
115 					TocEntry *ready_list);
116 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
117 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
118 
119 static void StrictNamesCheck(RestoreOptions *ropt);
120 
121 
122 /*
123  * Allocate a new DumpOptions block containing all default values.
124  */
125 DumpOptions *
NewDumpOptions(void)126 NewDumpOptions(void)
127 {
128 	DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
129 
130 	InitDumpOptions(opts);
131 	return opts;
132 }
133 
134 /*
135  * Initialize a DumpOptions struct to all default values
136  */
137 void
InitDumpOptions(DumpOptions * opts)138 InitDumpOptions(DumpOptions *opts)
139 {
140 	memset(opts, 0, sizeof(DumpOptions));
141 	/* set any fields that shouldn't default to zeroes */
142 	opts->include_everything = true;
143 	opts->cparams.promptPassword = TRI_DEFAULT;
144 	opts->dumpSections = DUMP_UNSECTIONED;
145 }
146 
147 /*
148  * Create a freshly allocated DumpOptions with options equivalent to those
149  * found in the given RestoreOptions.
150  */
151 DumpOptions *
dumpOptionsFromRestoreOptions(RestoreOptions * ropt)152 dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
153 {
154 	DumpOptions *dopt = NewDumpOptions();
155 
156 	/* this is the inverse of what's at the end of pg_dump.c's main() */
157 	dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
158 	dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
159 	dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
160 	dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
161 	dopt->cparams.promptPassword = ropt->cparams.promptPassword;
162 	dopt->outputClean = ropt->dropSchema;
163 	dopt->dataOnly = ropt->dataOnly;
164 	dopt->schemaOnly = ropt->schemaOnly;
165 	dopt->if_exists = ropt->if_exists;
166 	dopt->column_inserts = ropt->column_inserts;
167 	dopt->dumpSections = ropt->dumpSections;
168 	dopt->aclsSkip = ropt->aclsSkip;
169 	dopt->outputSuperuser = ropt->superuser;
170 	dopt->outputCreateDB = ropt->createDB;
171 	dopt->outputNoOwner = ropt->noOwner;
172 	dopt->outputNoTablespaces = ropt->noTablespace;
173 	dopt->disable_triggers = ropt->disable_triggers;
174 	dopt->use_setsessauth = ropt->use_setsessauth;
175 
176 	dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
177 	dopt->dump_inserts = ropt->dump_inserts;
178 	dopt->no_security_labels = ropt->no_security_labels;
179 	dopt->lockWaitTimeout = ropt->lockWaitTimeout;
180 	dopt->include_everything = ropt->include_everything;
181 	dopt->enable_row_security = ropt->enable_row_security;
182 
183 	return dopt;
184 }
185 
186 
187 /*
188  *	Wrapper functions.
189  *
190  *	The objective it to make writing new formats and dumpers as simple
191  *	as possible, if necessary at the expense of extra function calls etc.
192  *
193  */
194 
195 /*
196  * The dump worker setup needs lots of knowledge of the internals of pg_dump,
197  * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
198  * setup doesn't need to know anything much, so it's defined here.
199  */
200 static void
setupRestoreWorker(Archive * AHX)201 setupRestoreWorker(Archive *AHX)
202 {
203 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
204 
205 	(AH->ReopenPtr) (AH);
206 }
207 
208 
209 /* Create a new archive */
210 /* Public */
211 Archive *
CreateArchive(const char * FileSpec,const ArchiveFormat fmt,const int compression,ArchiveMode mode,SetupWorkerPtr setupDumpWorker)212 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
213 	 const int compression, ArchiveMode mode, SetupWorkerPtr setupDumpWorker)
214 
215 {
216 	ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode, setupDumpWorker);
217 
218 	return (Archive *) AH;
219 }
220 
221 /* Open an existing archive */
222 /* Public */
223 Archive *
OpenArchive(const char * FileSpec,const ArchiveFormat fmt)224 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
225 {
226 	ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead, setupRestoreWorker);
227 
228 	return (Archive *) AH;
229 }
230 
231 /* Public */
232 void
CloseArchive(Archive * AHX)233 CloseArchive(Archive *AHX)
234 {
235 	int			res = 0;
236 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
237 
238 	(*AH->ClosePtr) (AH);
239 
240 	/* Close the output */
241 	if (AH->gzOut)
242 		res = GZCLOSE(AH->OF);
243 	else if (AH->OF != stdout)
244 		res = fclose(AH->OF);
245 
246 	if (res != 0)
247 		exit_horribly(modulename, "could not close output file: %s\n",
248 					  strerror(errno));
249 }
250 
251 /* Public */
252 void
SetArchiveOptions(Archive * AH,DumpOptions * dopt,RestoreOptions * ropt)253 SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
254 {
255 	/* Caller can omit dump options, in which case we synthesize them */
256 	if (dopt == NULL && ropt != NULL)
257 		dopt = dumpOptionsFromRestoreOptions(ropt);
258 
259 	/* Save options for later access */
260 	AH->dopt = dopt;
261 	AH->ropt = ropt;
262 }
263 
264 /* Public */
265 void
ProcessArchiveRestoreOptions(Archive * AHX)266 ProcessArchiveRestoreOptions(Archive *AHX)
267 {
268 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
269 	RestoreOptions *ropt = AH->public.ropt;
270 	TocEntry   *te;
271 	teSection	curSection;
272 
273 	/* Decide which TOC entries will be dumped/restored, and mark them */
274 	curSection = SECTION_PRE_DATA;
275 	for (te = AH->toc->next; te != AH->toc; te = te->next)
276 	{
277 		/*
278 		 * When writing an archive, we also take this opportunity to check
279 		 * that we have generated the entries in a sane order that respects
280 		 * the section divisions.  When reading, don't complain, since buggy
281 		 * old versions of pg_dump might generate out-of-order archives.
282 		 */
283 		if (AH->mode != archModeRead)
284 		{
285 			switch (te->section)
286 			{
287 				case SECTION_NONE:
288 					/* ok to be anywhere */
289 					break;
290 				case SECTION_PRE_DATA:
291 					if (curSection != SECTION_PRE_DATA)
292 						write_msg(modulename,
293 								  "WARNING: archive items not in correct section order\n");
294 					break;
295 				case SECTION_DATA:
296 					if (curSection == SECTION_POST_DATA)
297 						write_msg(modulename,
298 								  "WARNING: archive items not in correct section order\n");
299 					break;
300 				case SECTION_POST_DATA:
301 					/* ok no matter which section we were in */
302 					break;
303 				default:
304 					exit_horribly(modulename, "unexpected section code %d\n",
305 								  (int) te->section);
306 					break;
307 			}
308 		}
309 
310 		if (te->section != SECTION_NONE)
311 			curSection = te->section;
312 
313 		te->reqs = _tocEntryRequired(te, curSection, ropt);
314 	}
315 
316 	/* Enforce strict names checking */
317 	if (ropt->strict_names)
318 		StrictNamesCheck(ropt);
319 }
320 
321 /* Public */
322 void
RestoreArchive(Archive * AHX)323 RestoreArchive(Archive *AHX)
324 {
325 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
326 	RestoreOptions *ropt = AH->public.ropt;
327 	bool		parallel_mode;
328 	TocEntry   *te;
329 	OutputContext sav;
330 
331 	AH->stage = STAGE_INITIALIZING;
332 
333 	/*
334 	 * Check for nonsensical option combinations.
335 	 *
336 	 * -C is not compatible with -1, because we can't create a database inside
337 	 * a transaction block.
338 	 */
339 	if (ropt->createDB && ropt->single_txn)
340 		exit_horribly(modulename, "-C and -1 are incompatible options\n");
341 
342 	/*
343 	 * If we're going to do parallel restore, there are some restrictions.
344 	 */
345 	parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
346 	if (parallel_mode)
347 	{
348 		/* We haven't got round to making this work for all archive formats */
349 		if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
350 			exit_horribly(modulename, "parallel restore is not supported with this archive file format\n");
351 
352 		/* Doesn't work if the archive represents dependencies as OIDs */
353 		if (AH->version < K_VERS_1_8)
354 			exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
355 
356 		/*
357 		 * It's also not gonna work if we can't reopen the input file, so
358 		 * let's try that immediately.
359 		 */
360 		(AH->ReopenPtr) (AH);
361 	}
362 
363 	/*
364 	 * Make sure we won't need (de)compression we haven't got
365 	 */
366 #ifndef HAVE_LIBZ
367 	if (AH->compression != 0 && AH->PrintTocDataPtr !=NULL)
368 	{
369 		for (te = AH->toc->next; te != AH->toc; te = te->next)
370 		{
371 			if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
372 				exit_horribly(modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
373 		}
374 	}
375 #endif
376 
377 	/*
378 	 * Prepare index arrays, so we can assume we have them throughout restore.
379 	 * It's possible we already did this, though.
380 	 */
381 	if (AH->tocsByDumpId == NULL)
382 		buildTocEntryArrays(AH);
383 
384 	/*
385 	 * If we're using a DB connection, then connect it.
386 	 */
387 	if (ropt->useDB)
388 	{
389 		ahlog(AH, 1, "connecting to database for restore\n");
390 		if (AH->version < K_VERS_1_3)
391 			exit_horribly(modulename, "direct database connections are not supported in pre-1.3 archives\n");
392 
393 		/*
394 		 * We don't want to guess at whether the dump will successfully
395 		 * restore; allow the attempt regardless of the version of the restore
396 		 * target.
397 		 */
398 		AHX->minRemoteVersion = 0;
399 		AHX->maxRemoteVersion = 999999;
400 
401 		ConnectDatabase(AHX, &ropt->cparams, false);
402 
403 		/*
404 		 * If we're talking to the DB directly, don't send comments since they
405 		 * obscure SQL when displaying errors
406 		 */
407 		AH->noTocComments = 1;
408 	}
409 
410 	/*
411 	 * Work out if we have an implied data-only restore. This can happen if
412 	 * the dump was data only or if the user has used a toc list to exclude
413 	 * all of the schema data. All we do is look for schema entries - if none
414 	 * are found then we set the dataOnly flag.
415 	 *
416 	 * We could scan for wanted TABLE entries, but that is not the same as
417 	 * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
418 	 */
419 	if (!ropt->dataOnly)
420 	{
421 		int			impliedDataOnly = 1;
422 
423 		for (te = AH->toc->next; te != AH->toc; te = te->next)
424 		{
425 			if ((te->reqs & REQ_SCHEMA) != 0)
426 			{					/* It's schema, and it's wanted */
427 				impliedDataOnly = 0;
428 				break;
429 			}
430 		}
431 		if (impliedDataOnly)
432 		{
433 			ropt->dataOnly = impliedDataOnly;
434 			ahlog(AH, 1, "implied data-only restore\n");
435 		}
436 	}
437 
438 	/*
439 	 * Setup the output file if necessary.
440 	 */
441 	sav = SaveOutput(AH);
442 	if (ropt->filename || ropt->compression)
443 		SetOutput(AH, ropt->filename, ropt->compression);
444 
445 	ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
446 
447 	if (AH->archiveRemoteVersion)
448 		ahprintf(AH, "-- Dumped from database version %s\n",
449 				 AH->archiveRemoteVersion);
450 	if (AH->archiveDumpVersion)
451 		ahprintf(AH, "-- Dumped by pg_dump version %s\n",
452 				 AH->archiveDumpVersion);
453 
454 	ahprintf(AH, "\n");
455 
456 	if (AH->public.verbose)
457 		dumpTimestamp(AH, "Started on", AH->createDate);
458 
459 	if (ropt->single_txn)
460 	{
461 		if (AH->connection)
462 			StartTransaction(AHX);
463 		else
464 			ahprintf(AH, "BEGIN;\n\n");
465 	}
466 
467 	/*
468 	 * Establish important parameter values right away.
469 	 */
470 	_doSetFixedOutputState(AH);
471 
472 	AH->stage = STAGE_PROCESSING;
473 
474 	/*
475 	 * Drop the items at the start, in reverse order
476 	 */
477 	if (ropt->dropSchema)
478 	{
479 		for (te = AH->toc->prev; te != AH->toc; te = te->prev)
480 		{
481 			AH->currentTE = te;
482 
483 			/*
484 			 * In createDB mode, issue a DROP *only* for the database as a
485 			 * whole.  Issuing drops against anything else would be wrong,
486 			 * because at this point we're connected to the wrong database.
487 			 * Conversely, if we're not in createDB mode, we'd better not
488 			 * issue a DROP against the database at all.
489 			 */
490 			if (ropt->createDB)
491 			{
492 				if (strcmp(te->desc, "DATABASE") != 0)
493 					continue;
494 			}
495 			else
496 			{
497 				if (strcmp(te->desc, "DATABASE") == 0)
498 					continue;
499 			}
500 
501 			/* Otherwise, drop anything that's selected and has a dropStmt */
502 			if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
503 			{
504 				ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
505 				/* Select owner and schema as necessary */
506 				_becomeOwner(AH, te);
507 				_selectOutputSchema(AH, te->namespace);
508 
509 				/*
510 				 * Now emit the DROP command, if the object has one.  Note we
511 				 * don't necessarily emit it verbatim; at this point we add an
512 				 * appropriate IF EXISTS clause, if the user requested it.
513 				 */
514 				if (*te->dropStmt != '\0')
515 				{
516 					if (!ropt->if_exists)
517 					{
518 						/* No --if-exists?	Then just use the original */
519 						ahprintf(AH, "%s", te->dropStmt);
520 					}
521 					else
522 					{
523 						/*
524 						 * Inject an appropriate spelling of "if exists".  For
525 						 * large objects, we have a separate routine that
526 						 * knows how to do it, without depending on
527 						 * te->dropStmt; use that.  For other objects we need
528 						 * to parse the command.
529 						 */
530 						if (strncmp(te->desc, "BLOB", 4) == 0)
531 						{
532 							DropBlobIfExists(AH, te->catalogId.oid);
533 						}
534 						else
535 						{
536 							char	   *dropStmt = pg_strdup(te->dropStmt);
537 							char	   *dropStmtOrig = dropStmt;
538 							PQExpBuffer ftStmt = createPQExpBuffer();
539 
540 							/*
541 							 * Need to inject IF EXISTS clause after ALTER
542 							 * TABLE part in ALTER TABLE .. DROP statement
543 							 */
544 							if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
545 							{
546 								appendPQExpBuffer(ftStmt,
547 												  "ALTER TABLE IF EXISTS");
548 								dropStmt = dropStmt + 11;
549 							}
550 
551 							/*
552 							 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
553 							 * not support the IF EXISTS clause, and therefore
554 							 * we simply emit the original command for DEFAULT
555 							 * objects (modulo the adjustment made above).
556 							 *
557 							 * If we used CREATE OR REPLACE VIEW as a means of
558 							 * quasi-dropping an ON SELECT rule, that should
559 							 * be emitted unchanged as well.
560 							 *
561 							 * For other object types, we need to extract the
562 							 * first part of the DROP which includes the
563 							 * object type.  Most of the time this matches
564 							 * te->desc, so search for that; however for the
565 							 * different kinds of CONSTRAINTs, we know to
566 							 * search for hardcoded "DROP CONSTRAINT" instead.
567 							 */
568 							if (strcmp(te->desc, "DEFAULT") == 0 ||
569 								strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
570 								appendPQExpBufferStr(ftStmt, dropStmt);
571 							else
572 							{
573 								char		buffer[40];
574 								char	   *mark;
575 
576 								if (strcmp(te->desc, "CONSTRAINT") == 0 ||
577 								 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
578 									strcmp(te->desc, "FK CONSTRAINT") == 0)
579 									strcpy(buffer, "DROP CONSTRAINT");
580 								else
581 									snprintf(buffer, sizeof(buffer), "DROP %s",
582 											 te->desc);
583 
584 								mark = strstr(dropStmt, buffer);
585 
586 								if (mark)
587 								{
588 									*mark = '\0';
589 									appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
590 													  dropStmt, buffer,
591 													  mark + strlen(buffer));
592 								}
593 								else
594 								{
595 									/* complain and emit unmodified command */
596 									write_msg(modulename,
597 											  "WARNING: could not find where to insert IF EXISTS in statement \"%s\"\n",
598 											  dropStmtOrig);
599 									appendPQExpBufferStr(ftStmt, dropStmt);
600 								}
601 							}
602 
603 							ahprintf(AH, "%s", ftStmt->data);
604 
605 							destroyPQExpBuffer(ftStmt);
606 							pg_free(dropStmtOrig);
607 						}
608 					}
609 				}
610 			}
611 		}
612 
613 		/*
614 		 * _selectOutputSchema may have set currSchema to reflect the effect
615 		 * of a "SET search_path" command it emitted.  However, by now we may
616 		 * have dropped that schema; or it might not have existed in the first
617 		 * place.  In either case the effective value of search_path will not
618 		 * be what we think.  Forcibly reset currSchema so that we will
619 		 * re-establish the search_path setting when needed (after creating
620 		 * the schema).
621 		 *
622 		 * If we treated users as pg_dump'able objects then we'd need to reset
623 		 * currUser here too.
624 		 */
625 		if (AH->currSchema)
626 			free(AH->currSchema);
627 		AH->currSchema = NULL;
628 	}
629 
630 	if (parallel_mode)
631 	{
632 		/*
633 		 * In parallel mode, turn control over to the parallel-restore logic.
634 		 */
635 		ParallelState *pstate;
636 		TocEntry	pending_list;
637 
638 		par_list_header_init(&pending_list);
639 
640 		/* This runs PRE_DATA items and then disconnects from the database */
641 		restore_toc_entries_prefork(AH, &pending_list);
642 		Assert(AH->connection == NULL);
643 
644 		/* ParallelBackupStart() will actually fork the processes */
645 		pstate = ParallelBackupStart(AH);
646 		restore_toc_entries_parallel(AH, pstate, &pending_list);
647 		ParallelBackupEnd(AH, pstate);
648 
649 		/* reconnect the master and see if we missed something */
650 		restore_toc_entries_postfork(AH, &pending_list);
651 		Assert(AH->connection != NULL);
652 	}
653 	else
654 	{
655 		/*
656 		 * In serial mode, process everything in three phases: normal items,
657 		 * then ACLs, then post-ACL items.  We might be able to skip one or
658 		 * both extra phases in some cases, eg data-only restores.
659 		 */
660 		bool		haveACL = false;
661 		bool		havePostACL = false;
662 
663 		for (te = AH->toc->next; te != AH->toc; te = te->next)
664 		{
665 			if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
666 				continue;		/* ignore if not to be dumped at all */
667 
668 			switch (_tocEntryRestorePass(te))
669 			{
670 				case RESTORE_PASS_MAIN:
671 					(void) restore_toc_entry(AH, te, false);
672 					break;
673 				case RESTORE_PASS_ACL:
674 					haveACL = true;
675 					break;
676 				case RESTORE_PASS_POST_ACL:
677 					havePostACL = true;
678 					break;
679 			}
680 		}
681 
682 		if (haveACL)
683 		{
684 			for (te = AH->toc->next; te != AH->toc; te = te->next)
685 			{
686 				if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
687 					_tocEntryRestorePass(te) == RESTORE_PASS_ACL)
688 					(void) restore_toc_entry(AH, te, false);
689 			}
690 		}
691 
692 		if (havePostACL)
693 		{
694 			for (te = AH->toc->next; te != AH->toc; te = te->next)
695 			{
696 				if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
697 					_tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
698 					(void) restore_toc_entry(AH, te, false);
699 			}
700 		}
701 	}
702 
703 	if (ropt->single_txn)
704 	{
705 		if (AH->connection)
706 			CommitTransaction(AHX);
707 		else
708 			ahprintf(AH, "COMMIT;\n\n");
709 	}
710 
711 	if (AH->public.verbose)
712 		dumpTimestamp(AH, "Completed on", time(NULL));
713 
714 	ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
715 
716 	/*
717 	 * Clean up & we're done.
718 	 */
719 	AH->stage = STAGE_FINALIZING;
720 
721 	if (ropt->filename || ropt->compression)
722 		RestoreOutput(AH, sav);
723 
724 	if (ropt->useDB)
725 		DisconnectDatabase(&AH->public);
726 }
727 
728 /*
729  * Restore a single TOC item.  Used in both parallel and non-parallel restore;
730  * is_parallel is true if we are in a worker child process.
731  *
732  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
733  * the parallel parent has to make the corresponding status update.
734  */
735 static int
restore_toc_entry(ArchiveHandle * AH,TocEntry * te,bool is_parallel)736 restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
737 {
738 	RestoreOptions *ropt = AH->public.ropt;
739 	int			status = WORKER_OK;
740 	teReqs		reqs;
741 	bool		defnDumped;
742 
743 	AH->currentTE = te;
744 
745 	/* Work out what, if anything, we want from this entry */
746 	reqs = te->reqs;
747 
748 	/*
749 	 * Ignore DATABASE entry unless we should create it.  We must check this
750 	 * here, not in _tocEntryRequired, because the createDB option should not
751 	 * affect emitting a DATABASE entry to an archive file.
752 	 */
753 	if (!ropt->createDB && strcmp(te->desc, "DATABASE") == 0)
754 		reqs = 0;
755 
756 	/* Dump any relevant dump warnings to stderr */
757 	if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
758 	{
759 		if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
760 			write_msg(modulename, "warning from original dump file: %s\n", te->defn);
761 		else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
762 			write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
763 	}
764 
765 	defnDumped = false;
766 
767 	/*
768 	 * If it has a schema component that we want, then process that
769 	 */
770 	if ((reqs & REQ_SCHEMA) != 0)
771 	{
772 		/* Show namespace in log message if available */
773 		if (te->namespace)
774 			ahlog(AH, 1, "creating %s \"%s.%s\"\n",
775 				  te->desc, te->namespace, te->tag);
776 		else
777 			ahlog(AH, 1, "creating %s \"%s\"\n", te->desc, te->tag);
778 
779 		_printTocEntry(AH, te, false);
780 		defnDumped = true;
781 
782 		if (strcmp(te->desc, "TABLE") == 0)
783 		{
784 			if (AH->lastErrorTE == te)
785 			{
786 				/*
787 				 * We failed to create the table. If
788 				 * --no-data-for-failed-tables was given, mark the
789 				 * corresponding TABLE DATA to be ignored.
790 				 *
791 				 * In the parallel case this must be done in the parent, so we
792 				 * just set the return value.
793 				 */
794 				if (ropt->noDataForFailedTables)
795 				{
796 					if (is_parallel)
797 						status = WORKER_INHIBIT_DATA;
798 					else
799 						inhibit_data_for_failed_table(AH, te);
800 				}
801 			}
802 			else
803 			{
804 				/*
805 				 * We created the table successfully.  Mark the corresponding
806 				 * TABLE DATA for possible truncation.
807 				 *
808 				 * In the parallel case this must be done in the parent, so we
809 				 * just set the return value.
810 				 */
811 				if (is_parallel)
812 					status = WORKER_CREATE_DONE;
813 				else
814 					mark_create_done(AH, te);
815 			}
816 		}
817 
818 		/* If we created a DB, connect to it... */
819 		if (strcmp(te->desc, "DATABASE") == 0)
820 		{
821 			ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
822 			_reconnectToDB(AH, te->tag);
823 		}
824 	}
825 
826 	/*
827 	 * If it has a data component that we want, then process that
828 	 */
829 	if ((reqs & REQ_DATA) != 0)
830 	{
831 		/*
832 		 * hadDumper will be set if there is genuine data component for this
833 		 * node. Otherwise, we need to check the defn field for statements
834 		 * that need to be executed in data-only restores.
835 		 */
836 		if (te->hadDumper)
837 		{
838 			/*
839 			 * If we can output the data, then restore it.
840 			 */
841 			if (AH->PrintTocDataPtr !=NULL)
842 			{
843 				_printTocEntry(AH, te, true);
844 
845 				if (strcmp(te->desc, "BLOBS") == 0 ||
846 					strcmp(te->desc, "BLOB COMMENTS") == 0)
847 				{
848 					ahlog(AH, 1, "processing %s\n", te->desc);
849 
850 					_selectOutputSchema(AH, "pg_catalog");
851 
852 					/* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
853 					if (strcmp(te->desc, "BLOB COMMENTS") == 0)
854 						AH->outputKind = OUTPUT_OTHERDATA;
855 
856 					(*AH->PrintTocDataPtr) (AH, te);
857 
858 					AH->outputKind = OUTPUT_SQLCMDS;
859 				}
860 				else
861 				{
862 					_disableTriggersIfNecessary(AH, te);
863 
864 					/* Select owner and schema as necessary */
865 					_becomeOwner(AH, te);
866 					_selectOutputSchema(AH, te->namespace);
867 
868 					ahlog(AH, 1, "processing data for table \"%s.%s\"\n",
869 						  te->namespace, te->tag);
870 
871 					/*
872 					 * In parallel restore, if we created the table earlier in
873 					 * the run then we wrap the COPY in a transaction and
874 					 * precede it with a TRUNCATE.  If archiving is not on
875 					 * this prevents WAL-logging the COPY.  This obtains a
876 					 * speedup similar to that from using single_txn mode in
877 					 * non-parallel restores.
878 					 */
879 					if (is_parallel && te->created)
880 					{
881 						/*
882 						 * Parallel restore is always talking directly to a
883 						 * server, so no need to see if we should issue BEGIN.
884 						 */
885 						StartTransaction(&AH->public);
886 
887 						/*
888 						 * If the server version is >= 8.4, make sure we issue
889 						 * TRUNCATE with ONLY so that child tables are not
890 						 * wiped.
891 						 */
892 						ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
893 								 (PQserverVersion(AH->connection) >= 80400 ?
894 								  "ONLY " : ""),
895 							  fmtQualifiedId(PQserverVersion(AH->connection),
896 											 te->namespace,
897 											 te->tag));
898 					}
899 
900 					/*
901 					 * If we have a copy statement, use it.
902 					 */
903 					if (te->copyStmt && strlen(te->copyStmt) > 0)
904 					{
905 						ahprintf(AH, "%s", te->copyStmt);
906 						AH->outputKind = OUTPUT_COPYDATA;
907 					}
908 					else
909 						AH->outputKind = OUTPUT_OTHERDATA;
910 
911 					(*AH->PrintTocDataPtr) (AH, te);
912 
913 					/*
914 					 * Terminate COPY if needed.
915 					 */
916 					if (AH->outputKind == OUTPUT_COPYDATA &&
917 						RestoringToDB(AH))
918 						EndDBCopyMode(&AH->public, te->tag);
919 					AH->outputKind = OUTPUT_SQLCMDS;
920 
921 					/* close out the transaction started above */
922 					if (is_parallel && te->created)
923 						CommitTransaction(&AH->public);
924 
925 					_enableTriggersIfNecessary(AH, te);
926 				}
927 			}
928 		}
929 		else if (!defnDumped)
930 		{
931 			/* If we haven't already dumped the defn part, do so now */
932 			ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
933 			_printTocEntry(AH, te, false);
934 		}
935 	}
936 
937 	if (AH->public.n_errors > 0 && status == WORKER_OK)
938 		status = WORKER_IGNORED_ERRORS;
939 
940 	return status;
941 }
942 
943 /*
944  * Allocate a new RestoreOptions block.
945  * This is mainly so we can initialize it, but also for future expansion,
946  */
947 RestoreOptions *
NewRestoreOptions(void)948 NewRestoreOptions(void)
949 {
950 	RestoreOptions *opts;
951 
952 	opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
953 
954 	/* set any fields that shouldn't default to zeroes */
955 	opts->format = archUnknown;
956 	opts->cparams.promptPassword = TRI_DEFAULT;
957 	opts->dumpSections = DUMP_UNSECTIONED;
958 
959 	return opts;
960 }
961 
962 static void
_disableTriggersIfNecessary(ArchiveHandle * AH,TocEntry * te)963 _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
964 {
965 	RestoreOptions *ropt = AH->public.ropt;
966 
967 	/* This hack is only needed in a data-only restore */
968 	if (!ropt->dataOnly || !ropt->disable_triggers)
969 		return;
970 
971 	ahlog(AH, 1, "disabling triggers for %s\n", te->tag);
972 
973 	/*
974 	 * Become superuser if possible, since they are the only ones who can
975 	 * disable constraint triggers.  If -S was not given, assume the initial
976 	 * user identity is a superuser.  (XXX would it be better to become the
977 	 * table owner?)
978 	 */
979 	_becomeUser(AH, ropt->superuser);
980 
981 	/*
982 	 * Disable them.  Assume that the table name should be schema-qualified
983 	 * (we can't look at PQserverVersion, since we might not have any
984 	 * connection; and anyway we don't promise our output will load pre-7.3).
985 	 */
986 	ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
987 			 fmtQualifiedId(70300,
988 							te->namespace,
989 							te->tag));
990 }
991 
992 static void
_enableTriggersIfNecessary(ArchiveHandle * AH,TocEntry * te)993 _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
994 {
995 	RestoreOptions *ropt = AH->public.ropt;
996 
997 	/* This hack is only needed in a data-only restore */
998 	if (!ropt->dataOnly || !ropt->disable_triggers)
999 		return;
1000 
1001 	ahlog(AH, 1, "enabling triggers for %s\n", te->tag);
1002 
1003 	/*
1004 	 * Become superuser if possible, since they are the only ones who can
1005 	 * disable constraint triggers.  If -S was not given, assume the initial
1006 	 * user identity is a superuser.  (XXX would it be better to become the
1007 	 * table owner?)
1008 	 */
1009 	_becomeUser(AH, ropt->superuser);
1010 
1011 	/*
1012 	 * Enable them.  As above, force schema qualification.
1013 	 */
1014 	ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1015 			 fmtQualifiedId(70300,
1016 							te->namespace,
1017 							te->tag));
1018 }
1019 
1020 /*
1021  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1022  */
1023 
1024 /* Public */
1025 void
WriteData(Archive * AHX,const void * data,size_t dLen)1026 WriteData(Archive *AHX, const void *data, size_t dLen)
1027 {
1028 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1029 
1030 	if (!AH->currToc)
1031 		exit_horribly(modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
1032 
1033 	(*AH->WriteDataPtr) (AH, data, dLen);
1034 
1035 	return;
1036 }
1037 
1038 /*
1039  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1040  * repository for all metadata. But the name has stuck.
1041  */
1042 
1043 /* Public */
1044 void
ArchiveEntry(Archive * AHX,CatalogId catalogId,DumpId dumpId,const char * tag,const char * namespace,const char * tablespace,const char * owner,bool withOids,const char * desc,teSection section,const char * defn,const char * dropStmt,const char * copyStmt,const DumpId * deps,int nDeps,DataDumperPtr dumpFn,void * dumpArg)1045 ArchiveEntry(Archive *AHX,
1046 			 CatalogId catalogId, DumpId dumpId,
1047 			 const char *tag,
1048 			 const char *namespace,
1049 			 const char *tablespace,
1050 			 const char *owner, bool withOids,
1051 			 const char *desc, teSection section,
1052 			 const char *defn,
1053 			 const char *dropStmt, const char *copyStmt,
1054 			 const DumpId *deps, int nDeps,
1055 			 DataDumperPtr dumpFn, void *dumpArg)
1056 {
1057 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1058 	TocEntry   *newToc;
1059 
1060 	newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1061 
1062 	AH->tocCount++;
1063 	if (dumpId > AH->maxDumpId)
1064 		AH->maxDumpId = dumpId;
1065 
1066 	newToc->prev = AH->toc->prev;
1067 	newToc->next = AH->toc;
1068 	AH->toc->prev->next = newToc;
1069 	AH->toc->prev = newToc;
1070 
1071 	newToc->catalogId = catalogId;
1072 	newToc->dumpId = dumpId;
1073 	newToc->section = section;
1074 
1075 	newToc->tag = pg_strdup(tag);
1076 	newToc->namespace = namespace ? pg_strdup(namespace) : NULL;
1077 	newToc->tablespace = tablespace ? pg_strdup(tablespace) : NULL;
1078 	newToc->owner = pg_strdup(owner);
1079 	newToc->withOids = withOids;
1080 	newToc->desc = pg_strdup(desc);
1081 	newToc->defn = pg_strdup(defn);
1082 	newToc->dropStmt = pg_strdup(dropStmt);
1083 	newToc->copyStmt = copyStmt ? pg_strdup(copyStmt) : NULL;
1084 
1085 	if (nDeps > 0)
1086 	{
1087 		newToc->dependencies = (DumpId *) pg_malloc(nDeps * sizeof(DumpId));
1088 		memcpy(newToc->dependencies, deps, nDeps * sizeof(DumpId));
1089 		newToc->nDeps = nDeps;
1090 	}
1091 	else
1092 	{
1093 		newToc->dependencies = NULL;
1094 		newToc->nDeps = 0;
1095 	}
1096 
1097 	newToc->dataDumper = dumpFn;
1098 	newToc->dataDumperArg = dumpArg;
1099 	newToc->hadDumper = dumpFn ? true : false;
1100 
1101 	newToc->formatData = NULL;
1102 
1103 	if (AH->ArchiveEntryPtr !=NULL)
1104 		(*AH->ArchiveEntryPtr) (AH, newToc);
1105 }
1106 
1107 /* Public */
1108 void
PrintTOCSummary(Archive * AHX)1109 PrintTOCSummary(Archive *AHX)
1110 {
1111 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1112 	RestoreOptions *ropt = AH->public.ropt;
1113 	TocEntry   *te;
1114 	teSection	curSection;
1115 	OutputContext sav;
1116 	const char *fmtName;
1117 	char		stamp_str[64];
1118 
1119 	sav = SaveOutput(AH);
1120 	if (ropt->filename)
1121 		SetOutput(AH, ropt->filename, 0 /* no compression */ );
1122 
1123 	if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1124 				 localtime(&AH->createDate)) == 0)
1125 		strcpy(stamp_str, "[unknown]");
1126 
1127 	ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1128 	ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
1129 			 replace_line_endings(AH->archdbname),
1130 			 AH->tocCount, AH->compression);
1131 
1132 	switch (AH->format)
1133 	{
1134 		case archCustom:
1135 			fmtName = "CUSTOM";
1136 			break;
1137 		case archDirectory:
1138 			fmtName = "DIRECTORY";
1139 			break;
1140 		case archTar:
1141 			fmtName = "TAR";
1142 			break;
1143 		default:
1144 			fmtName = "UNKNOWN";
1145 	}
1146 
1147 	ahprintf(AH, ";     Dump Version: %d.%d-%d\n", AH->vmaj, AH->vmin, AH->vrev);
1148 	ahprintf(AH, ";     Format: %s\n", fmtName);
1149 	ahprintf(AH, ";     Integer: %d bytes\n", (int) AH->intSize);
1150 	ahprintf(AH, ";     Offset: %d bytes\n", (int) AH->offSize);
1151 	if (AH->archiveRemoteVersion)
1152 		ahprintf(AH, ";     Dumped from database version: %s\n",
1153 				 AH->archiveRemoteVersion);
1154 	if (AH->archiveDumpVersion)
1155 		ahprintf(AH, ";     Dumped by pg_dump version: %s\n",
1156 				 AH->archiveDumpVersion);
1157 
1158 	ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1159 
1160 	curSection = SECTION_PRE_DATA;
1161 	for (te = AH->toc->next; te != AH->toc; te = te->next)
1162 	{
1163 		if (te->section != SECTION_NONE)
1164 			curSection = te->section;
1165 		if (ropt->verbose ||
1166 			(_tocEntryRequired(te, curSection, ropt) & (REQ_SCHEMA | REQ_DATA)) != 0)
1167 		{
1168 			char	   *sanitized_name;
1169 			char	   *sanitized_schema;
1170 			char	   *sanitized_owner;
1171 
1172 			/*
1173 			 * As in _printTocEntry(), sanitize strings that might contain
1174 			 * newlines, to ensure that each logical output line is in fact
1175 			 * one physical output line.  This prevents confusion when the
1176 			 * file is read by "pg_restore -L".  Note that we currently don't
1177 			 * bother to quote names, meaning that the name fields aren't
1178 			 * automatically parseable.  "pg_restore -L" doesn't care because
1179 			 * it only examines the dumpId field, but someday we might want to
1180 			 * try harder.
1181 			 */
1182 			sanitized_name = replace_line_endings(te->tag);
1183 			if (te->namespace)
1184 				sanitized_schema = replace_line_endings(te->namespace);
1185 			else
1186 				sanitized_schema = pg_strdup("-");
1187 			sanitized_owner = replace_line_endings(te->owner);
1188 
1189 			ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1190 					 te->catalogId.tableoid, te->catalogId.oid,
1191 					 te->desc, sanitized_schema, sanitized_name,
1192 					 sanitized_owner);
1193 
1194 			free(sanitized_name);
1195 			free(sanitized_schema);
1196 			free(sanitized_owner);
1197 		}
1198 		if (ropt->verbose && te->nDeps > 0)
1199 		{
1200 			int			i;
1201 
1202 			ahprintf(AH, ";\tdepends on:");
1203 			for (i = 0; i < te->nDeps; i++)
1204 				ahprintf(AH, " %d", te->dependencies[i]);
1205 			ahprintf(AH, "\n");
1206 		}
1207 	}
1208 
1209 	/* Enforce strict names checking */
1210 	if (ropt->strict_names)
1211 		StrictNamesCheck(ropt);
1212 
1213 	if (ropt->filename)
1214 		RestoreOutput(AH, sav);
1215 }
1216 
1217 /***********
1218  * BLOB Archival
1219  ***********/
1220 
1221 /* Called by a dumper to signal start of a BLOB */
1222 int
StartBlob(Archive * AHX,Oid oid)1223 StartBlob(Archive *AHX, Oid oid)
1224 {
1225 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1226 
1227 	if (!AH->StartBlobPtr)
1228 		exit_horribly(modulename, "large-object output not supported in chosen format\n");
1229 
1230 	(*AH->StartBlobPtr) (AH, AH->currToc, oid);
1231 
1232 	return 1;
1233 }
1234 
1235 /* Called by a dumper to signal end of a BLOB */
1236 int
EndBlob(Archive * AHX,Oid oid)1237 EndBlob(Archive *AHX, Oid oid)
1238 {
1239 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1240 
1241 	if (AH->EndBlobPtr)
1242 		(*AH->EndBlobPtr) (AH, AH->currToc, oid);
1243 
1244 	return 1;
1245 }
1246 
1247 /**********
1248  * BLOB Restoration
1249  **********/
1250 
1251 /*
1252  * Called by a format handler before any blobs are restored
1253  */
1254 void
StartRestoreBlobs(ArchiveHandle * AH)1255 StartRestoreBlobs(ArchiveHandle *AH)
1256 {
1257 	RestoreOptions *ropt = AH->public.ropt;
1258 
1259 	if (!ropt->single_txn)
1260 	{
1261 		if (AH->connection)
1262 			StartTransaction(&AH->public);
1263 		else
1264 			ahprintf(AH, "BEGIN;\n\n");
1265 	}
1266 
1267 	AH->blobCount = 0;
1268 }
1269 
1270 /*
1271  * Called by a format handler after all blobs are restored
1272  */
1273 void
EndRestoreBlobs(ArchiveHandle * AH)1274 EndRestoreBlobs(ArchiveHandle *AH)
1275 {
1276 	RestoreOptions *ropt = AH->public.ropt;
1277 
1278 	if (!ropt->single_txn)
1279 	{
1280 		if (AH->connection)
1281 			CommitTransaction(&AH->public);
1282 		else
1283 			ahprintf(AH, "COMMIT;\n\n");
1284 	}
1285 
1286 	ahlog(AH, 1, ngettext("restored %d large object\n",
1287 						  "restored %d large objects\n",
1288 						  AH->blobCount),
1289 		  AH->blobCount);
1290 }
1291 
1292 
1293 /*
1294  * Called by a format handler to initiate restoration of a blob
1295  */
1296 void
StartRestoreBlob(ArchiveHandle * AH,Oid oid,bool drop)1297 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
1298 {
1299 	bool		old_blob_style = (AH->version < K_VERS_1_12);
1300 	Oid			loOid;
1301 
1302 	AH->blobCount++;
1303 
1304 	/* Initialize the LO Buffer */
1305 	AH->lo_buf_used = 0;
1306 
1307 	ahlog(AH, 1, "restoring large object with OID %u\n", oid);
1308 
1309 	/* With an old archive we must do drop and create logic here */
1310 	if (old_blob_style && drop)
1311 		DropBlobIfExists(AH, oid);
1312 
1313 	if (AH->connection)
1314 	{
1315 		if (old_blob_style)
1316 		{
1317 			loOid = lo_create(AH->connection, oid);
1318 			if (loOid == 0 || loOid != oid)
1319 				exit_horribly(modulename, "could not create large object %u: %s",
1320 							  oid, PQerrorMessage(AH->connection));
1321 		}
1322 		AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1323 		if (AH->loFd == -1)
1324 			exit_horribly(modulename, "could not open large object %u: %s",
1325 						  oid, PQerrorMessage(AH->connection));
1326 	}
1327 	else
1328 	{
1329 		if (old_blob_style)
1330 			ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1331 					 oid, INV_WRITE);
1332 		else
1333 			ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1334 					 oid, INV_WRITE);
1335 	}
1336 
1337 	AH->writingBlob = 1;
1338 }
1339 
1340 void
EndRestoreBlob(ArchiveHandle * AH,Oid oid)1341 EndRestoreBlob(ArchiveHandle *AH, Oid oid)
1342 {
1343 	if (AH->lo_buf_used > 0)
1344 	{
1345 		/* Write remaining bytes from the LO buffer */
1346 		dump_lo_buf(AH);
1347 	}
1348 
1349 	AH->writingBlob = 0;
1350 
1351 	if (AH->connection)
1352 	{
1353 		lo_close(AH->connection, AH->loFd);
1354 		AH->loFd = -1;
1355 	}
1356 	else
1357 	{
1358 		ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1359 	}
1360 }
1361 
1362 /***********
1363  * Sorting and Reordering
1364  ***********/
1365 
1366 void
SortTocFromFile(Archive * AHX)1367 SortTocFromFile(Archive *AHX)
1368 {
1369 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1370 	RestoreOptions *ropt = AH->public.ropt;
1371 	FILE	   *fh;
1372 	char		buf[100];
1373 	bool		incomplete_line;
1374 
1375 	/* Allocate space for the 'wanted' array, and init it */
1376 	ropt->idWanted = (bool *) pg_malloc(sizeof(bool) * AH->maxDumpId);
1377 	memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
1378 
1379 	/* Setup the file */
1380 	fh = fopen(ropt->tocFile, PG_BINARY_R);
1381 	if (!fh)
1382 		exit_horribly(modulename, "could not open TOC file \"%s\": %s\n",
1383 					  ropt->tocFile, strerror(errno));
1384 
1385 	incomplete_line = false;
1386 	while (fgets(buf, sizeof(buf), fh) != NULL)
1387 	{
1388 		bool		prev_incomplete_line = incomplete_line;
1389 		int			buflen;
1390 		char	   *cmnt;
1391 		char	   *endptr;
1392 		DumpId		id;
1393 		TocEntry   *te;
1394 
1395 		/*
1396 		 * Some lines in the file might be longer than sizeof(buf).  This is
1397 		 * no problem, since we only care about the leading numeric ID which
1398 		 * can be at most a few characters; but we have to skip continuation
1399 		 * bufferloads when processing a long line.
1400 		 */
1401 		buflen = strlen(buf);
1402 		if (buflen > 0 && buf[buflen - 1] == '\n')
1403 			incomplete_line = false;
1404 		else
1405 			incomplete_line = true;
1406 		if (prev_incomplete_line)
1407 			continue;
1408 
1409 		/* Truncate line at comment, if any */
1410 		cmnt = strchr(buf, ';');
1411 		if (cmnt != NULL)
1412 			cmnt[0] = '\0';
1413 
1414 		/* Ignore if all blank */
1415 		if (strspn(buf, " \t\r\n") == strlen(buf))
1416 			continue;
1417 
1418 		/* Get an ID, check it's valid and not already seen */
1419 		id = strtol(buf, &endptr, 10);
1420 		if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
1421 			ropt->idWanted[id - 1])
1422 		{
1423 			write_msg(modulename, "WARNING: line ignored: %s\n", buf);
1424 			continue;
1425 		}
1426 
1427 		/* Find TOC entry */
1428 		te = getTocEntryByDumpId(AH, id);
1429 		if (!te)
1430 			exit_horribly(modulename, "could not find entry for ID %d\n",
1431 						  id);
1432 
1433 		/* Mark it wanted */
1434 		ropt->idWanted[id - 1] = true;
1435 
1436 		/*
1437 		 * Move each item to the end of the list as it is selected, so that
1438 		 * they are placed in the desired order.  Any unwanted items will end
1439 		 * up at the front of the list, which may seem unintuitive but it's
1440 		 * what we need.  In an ordinary serial restore that makes no
1441 		 * difference, but in a parallel restore we need to mark unrestored
1442 		 * items' dependencies as satisfied before we start examining
1443 		 * restorable items.  Otherwise they could have surprising
1444 		 * side-effects on the order in which restorable items actually get
1445 		 * restored.
1446 		 */
1447 		_moveBefore(AH, AH->toc, te);
1448 	}
1449 
1450 	if (fclose(fh) != 0)
1451 		exit_horribly(modulename, "could not close TOC file: %s\n",
1452 					  strerror(errno));
1453 }
1454 
1455 /**********************
1456  * 'Convenience functions that look like standard IO functions
1457  * for writing data when in dump mode.
1458  **********************/
1459 
1460 /* Public */
1461 void
archputs(const char * s,Archive * AH)1462 archputs(const char *s, Archive *AH)
1463 {
1464 	WriteData(AH, s, strlen(s));
1465 	return;
1466 }
1467 
1468 /* Public */
1469 int
archprintf(Archive * AH,const char * fmt,...)1470 archprintf(Archive *AH, const char *fmt,...)
1471 {
1472 	char	   *p;
1473 	size_t		len = 128;		/* initial assumption about buffer size */
1474 	size_t		cnt;
1475 
1476 	for (;;)
1477 	{
1478 		va_list		args;
1479 
1480 		/* Allocate work buffer. */
1481 		p = (char *) pg_malloc(len);
1482 
1483 		/* Try to format the data. */
1484 		va_start(args, fmt);
1485 		cnt = pvsnprintf(p, len, fmt, args);
1486 		va_end(args);
1487 
1488 		if (cnt < len)
1489 			break;				/* success */
1490 
1491 		/* Release buffer and loop around to try again with larger len. */
1492 		free(p);
1493 		len = cnt;
1494 	}
1495 
1496 	WriteData(AH, p, cnt);
1497 	free(p);
1498 	return (int) cnt;
1499 }
1500 
1501 
1502 /*******************************
1503  * Stuff below here should be 'private' to the archiver routines
1504  *******************************/
1505 
1506 static void
SetOutput(ArchiveHandle * AH,const char * filename,int compression)1507 SetOutput(ArchiveHandle *AH, const char *filename, int compression)
1508 {
1509 	int			fn;
1510 
1511 	if (filename)
1512 	{
1513 		if (strcmp(filename, "-") == 0)
1514 			fn = fileno(stdout);
1515 		else
1516 			fn = -1;
1517 	}
1518 	else if (AH->FH)
1519 		fn = fileno(AH->FH);
1520 	else if (AH->fSpec)
1521 	{
1522 		fn = -1;
1523 		filename = AH->fSpec;
1524 	}
1525 	else
1526 		fn = fileno(stdout);
1527 
1528 	/* If compression explicitly requested, use gzopen */
1529 #ifdef HAVE_LIBZ
1530 	if (compression != 0)
1531 	{
1532 		char		fmode[10];
1533 
1534 		/* Don't use PG_BINARY_x since this is zlib */
1535 		sprintf(fmode, "wb%d", compression);
1536 		if (fn >= 0)
1537 			AH->OF = gzdopen(dup(fn), fmode);
1538 		else
1539 			AH->OF = gzopen(filename, fmode);
1540 		AH->gzOut = 1;
1541 	}
1542 	else
1543 #endif
1544 	{							/* Use fopen */
1545 		if (AH->mode == archModeAppend)
1546 		{
1547 			if (fn >= 0)
1548 				AH->OF = fdopen(dup(fn), PG_BINARY_A);
1549 			else
1550 				AH->OF = fopen(filename, PG_BINARY_A);
1551 		}
1552 		else
1553 		{
1554 			if (fn >= 0)
1555 				AH->OF = fdopen(dup(fn), PG_BINARY_W);
1556 			else
1557 				AH->OF = fopen(filename, PG_BINARY_W);
1558 		}
1559 		AH->gzOut = 0;
1560 	}
1561 
1562 	if (!AH->OF)
1563 	{
1564 		if (filename)
1565 			exit_horribly(modulename, "could not open output file \"%s\": %s\n",
1566 						  filename, strerror(errno));
1567 		else
1568 			exit_horribly(modulename, "could not open output file: %s\n",
1569 						  strerror(errno));
1570 	}
1571 }
1572 
1573 static OutputContext
SaveOutput(ArchiveHandle * AH)1574 SaveOutput(ArchiveHandle *AH)
1575 {
1576 	OutputContext sav;
1577 
1578 	sav.OF = AH->OF;
1579 	sav.gzOut = AH->gzOut;
1580 
1581 	return sav;
1582 }
1583 
1584 static void
RestoreOutput(ArchiveHandle * AH,OutputContext savedContext)1585 RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
1586 {
1587 	int			res;
1588 
1589 	if (AH->gzOut)
1590 		res = GZCLOSE(AH->OF);
1591 	else
1592 		res = fclose(AH->OF);
1593 
1594 	if (res != 0)
1595 		exit_horribly(modulename, "could not close output file: %s\n",
1596 					  strerror(errno));
1597 
1598 	AH->gzOut = savedContext.gzOut;
1599 	AH->OF = savedContext.OF;
1600 }
1601 
1602 
1603 
1604 /*
1605  *	Print formatted text to the output file (usually stdout).
1606  */
1607 int
ahprintf(ArchiveHandle * AH,const char * fmt,...)1608 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1609 {
1610 	char	   *p;
1611 	size_t		len = 128;		/* initial assumption about buffer size */
1612 	size_t		cnt;
1613 
1614 	for (;;)
1615 	{
1616 		va_list		args;
1617 
1618 		/* Allocate work buffer. */
1619 		p = (char *) pg_malloc(len);
1620 
1621 		/* Try to format the data. */
1622 		va_start(args, fmt);
1623 		cnt = pvsnprintf(p, len, fmt, args);
1624 		va_end(args);
1625 
1626 		if (cnt < len)
1627 			break;				/* success */
1628 
1629 		/* Release buffer and loop around to try again with larger len. */
1630 		free(p);
1631 		len = cnt;
1632 	}
1633 
1634 	ahwrite(p, 1, cnt, AH);
1635 	free(p);
1636 	return (int) cnt;
1637 }
1638 
1639 void
ahlog(ArchiveHandle * AH,int level,const char * fmt,...)1640 ahlog(ArchiveHandle *AH, int level, const char *fmt,...)
1641 {
1642 	va_list		ap;
1643 
1644 	if (AH->debugLevel < level && (!AH->public.verbose || level > 1))
1645 		return;
1646 
1647 	va_start(ap, fmt);
1648 	vwrite_msg(NULL, fmt, ap);
1649 	va_end(ap);
1650 }
1651 
1652 /*
1653  * Single place for logic which says 'We are restoring to a direct DB connection'.
1654  */
1655 static int
RestoringToDB(ArchiveHandle * AH)1656 RestoringToDB(ArchiveHandle *AH)
1657 {
1658 	RestoreOptions *ropt = AH->public.ropt;
1659 
1660 	return (ropt && ropt->useDB && AH->connection);
1661 }
1662 
1663 /*
1664  * Dump the current contents of the LO data buffer while writing a BLOB
1665  */
1666 static void
dump_lo_buf(ArchiveHandle * AH)1667 dump_lo_buf(ArchiveHandle *AH)
1668 {
1669 	if (AH->connection)
1670 	{
1671 		size_t		res;
1672 
1673 		res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1674 		ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n",
1675 					 "wrote %lu bytes of large object data (result = %lu)\n",
1676 							  AH->lo_buf_used),
1677 			  (unsigned long) AH->lo_buf_used, (unsigned long) res);
1678 		if (res != AH->lo_buf_used)
1679 			exit_horribly(modulename,
1680 			"could not write to large object (result: %lu, expected: %lu)\n",
1681 					   (unsigned long) res, (unsigned long) AH->lo_buf_used);
1682 	}
1683 	else
1684 	{
1685 		PQExpBuffer buf = createPQExpBuffer();
1686 
1687 		appendByteaLiteralAHX(buf,
1688 							  (const unsigned char *) AH->lo_buf,
1689 							  AH->lo_buf_used,
1690 							  AH);
1691 
1692 		/* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1693 		AH->writingBlob = 0;
1694 		ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1695 		AH->writingBlob = 1;
1696 
1697 		destroyPQExpBuffer(buf);
1698 	}
1699 	AH->lo_buf_used = 0;
1700 }
1701 
1702 
1703 /*
1704  *	Write buffer to the output file (usually stdout). This is used for
1705  *	outputting 'restore' scripts etc. It is even possible for an archive
1706  *	format to create a custom output routine to 'fake' a restore if it
1707  *	wants to generate a script (see TAR output).
1708  */
1709 void
ahwrite(const void * ptr,size_t size,size_t nmemb,ArchiveHandle * AH)1710 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1711 {
1712 	int			bytes_written = 0;
1713 
1714 	if (AH->writingBlob)
1715 	{
1716 		size_t		remaining = size * nmemb;
1717 
1718 		while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1719 		{
1720 			size_t		avail = AH->lo_buf_size - AH->lo_buf_used;
1721 
1722 			memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1723 			ptr = (const void *) ((const char *) ptr + avail);
1724 			remaining -= avail;
1725 			AH->lo_buf_used += avail;
1726 			dump_lo_buf(AH);
1727 		}
1728 
1729 		memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1730 		AH->lo_buf_used += remaining;
1731 
1732 		bytes_written = size * nmemb;
1733 	}
1734 	else if (AH->gzOut)
1735 		bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
1736 	else if (AH->CustomOutPtr)
1737 		bytes_written = AH->CustomOutPtr (AH, ptr, size * nmemb);
1738 
1739 	else
1740 	{
1741 		/*
1742 		 * If we're doing a restore, and it's direct to DB, and we're
1743 		 * connected then send it to the DB.
1744 		 */
1745 		if (RestoringToDB(AH))
1746 			bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1747 		else
1748 			bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
1749 	}
1750 
1751 	if (bytes_written != size * nmemb)
1752 		WRITE_ERROR_EXIT;
1753 
1754 	return;
1755 }
1756 
1757 /* on some error, we may decide to go on... */
1758 void
warn_or_exit_horribly(ArchiveHandle * AH,const char * modulename,const char * fmt,...)1759 warn_or_exit_horribly(ArchiveHandle *AH,
1760 					  const char *modulename, const char *fmt,...)
1761 {
1762 	va_list		ap;
1763 
1764 	switch (AH->stage)
1765 	{
1766 
1767 		case STAGE_NONE:
1768 			/* Do nothing special */
1769 			break;
1770 
1771 		case STAGE_INITIALIZING:
1772 			if (AH->stage != AH->lastErrorStage)
1773 				write_msg(modulename, "Error while INITIALIZING:\n");
1774 			break;
1775 
1776 		case STAGE_PROCESSING:
1777 			if (AH->stage != AH->lastErrorStage)
1778 				write_msg(modulename, "Error while PROCESSING TOC:\n");
1779 			break;
1780 
1781 		case STAGE_FINALIZING:
1782 			if (AH->stage != AH->lastErrorStage)
1783 				write_msg(modulename, "Error while FINALIZING:\n");
1784 			break;
1785 	}
1786 	if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1787 	{
1788 		write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n",
1789 				  AH->currentTE->dumpId,
1790 				  AH->currentTE->catalogId.tableoid,
1791 				  AH->currentTE->catalogId.oid,
1792 				  AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1793 				  AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1794 				  AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1795 	}
1796 	AH->lastErrorStage = AH->stage;
1797 	AH->lastErrorTE = AH->currentTE;
1798 
1799 	va_start(ap, fmt);
1800 	vwrite_msg(modulename, fmt, ap);
1801 	va_end(ap);
1802 
1803 	if (AH->public.exit_on_error)
1804 		exit_nicely(1);
1805 	else
1806 		AH->public.n_errors++;
1807 }
1808 
1809 #ifdef NOT_USED
1810 
1811 static void
_moveAfter(ArchiveHandle * AH,TocEntry * pos,TocEntry * te)1812 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1813 {
1814 	/* Unlink te from list */
1815 	te->prev->next = te->next;
1816 	te->next->prev = te->prev;
1817 
1818 	/* and insert it after "pos" */
1819 	te->prev = pos;
1820 	te->next = pos->next;
1821 	pos->next->prev = te;
1822 	pos->next = te;
1823 }
1824 #endif
1825 
1826 static void
_moveBefore(ArchiveHandle * AH,TocEntry * pos,TocEntry * te)1827 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1828 {
1829 	/* Unlink te from list */
1830 	te->prev->next = te->next;
1831 	te->next->prev = te->prev;
1832 
1833 	/* and insert it before "pos" */
1834 	te->prev = pos->prev;
1835 	te->next = pos;
1836 	pos->prev->next = te;
1837 	pos->prev = te;
1838 }
1839 
1840 /*
1841  * Build index arrays for the TOC list
1842  *
1843  * This should be invoked only after we have created or read in all the TOC
1844  * items.
1845  *
1846  * The arrays are indexed by dump ID (so entry zero is unused).  Note that the
1847  * array entries run only up to maxDumpId.  We might see dependency dump IDs
1848  * beyond that (if the dump was partial); so always check the array bound
1849  * before trying to touch an array entry.
1850  */
1851 static void
buildTocEntryArrays(ArchiveHandle * AH)1852 buildTocEntryArrays(ArchiveHandle *AH)
1853 {
1854 	DumpId		maxDumpId = AH->maxDumpId;
1855 	TocEntry   *te;
1856 
1857 	AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1858 	AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1859 
1860 	for (te = AH->toc->next; te != AH->toc; te = te->next)
1861 	{
1862 		/* this check is purely paranoia, maxDumpId should be correct */
1863 		if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1864 			exit_horribly(modulename, "bad dumpId\n");
1865 
1866 		/* tocsByDumpId indexes all TOCs by their dump ID */
1867 		AH->tocsByDumpId[te->dumpId] = te;
1868 
1869 		/*
1870 		 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1871 		 * TOC entry that has a DATA item.  We compute this by reversing the
1872 		 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1873 		 * just one dependency and it is the TABLE item.
1874 		 */
1875 		if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1876 		{
1877 			DumpId		tableId = te->dependencies[0];
1878 
1879 			/*
1880 			 * The TABLE item might not have been in the archive, if this was
1881 			 * a data-only dump; but its dump ID should be less than its data
1882 			 * item's dump ID, so there should be a place for it in the array.
1883 			 */
1884 			if (tableId <= 0 || tableId > maxDumpId)
1885 				exit_horribly(modulename, "bad table dumpId for TABLE DATA item\n");
1886 
1887 			AH->tableDataId[tableId] = te->dumpId;
1888 		}
1889 	}
1890 }
1891 
1892 TocEntry *
getTocEntryByDumpId(ArchiveHandle * AH,DumpId id)1893 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1894 {
1895 	/* build index arrays if we didn't already */
1896 	if (AH->tocsByDumpId == NULL)
1897 		buildTocEntryArrays(AH);
1898 
1899 	if (id > 0 && id <= AH->maxDumpId)
1900 		return AH->tocsByDumpId[id];
1901 
1902 	return NULL;
1903 }
1904 
1905 teReqs
TocIDRequired(ArchiveHandle * AH,DumpId id)1906 TocIDRequired(ArchiveHandle *AH, DumpId id)
1907 {
1908 	TocEntry   *te = getTocEntryByDumpId(AH, id);
1909 
1910 	if (!te)
1911 		return 0;
1912 
1913 	return te->reqs;
1914 }
1915 
1916 size_t
WriteOffset(ArchiveHandle * AH,pgoff_t o,int wasSet)1917 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1918 {
1919 	int			off;
1920 
1921 	/* Save the flag */
1922 	(*AH->WriteBytePtr) (AH, wasSet);
1923 
1924 	/* Write out pgoff_t smallest byte first, prevents endian mismatch */
1925 	for (off = 0; off < sizeof(pgoff_t); off++)
1926 	{
1927 		(*AH->WriteBytePtr) (AH, o & 0xFF);
1928 		o >>= 8;
1929 	}
1930 	return sizeof(pgoff_t) + 1;
1931 }
1932 
1933 int
ReadOffset(ArchiveHandle * AH,pgoff_t * o)1934 ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1935 {
1936 	int			i;
1937 	int			off;
1938 	int			offsetFlg;
1939 
1940 	/* Initialize to zero */
1941 	*o = 0;
1942 
1943 	/* Check for old version */
1944 	if (AH->version < K_VERS_1_7)
1945 	{
1946 		/* Prior versions wrote offsets using WriteInt */
1947 		i = ReadInt(AH);
1948 		/* -1 means not set */
1949 		if (i < 0)
1950 			return K_OFFSET_POS_NOT_SET;
1951 		else if (i == 0)
1952 			return K_OFFSET_NO_DATA;
1953 
1954 		/* Cast to pgoff_t because it was written as an int. */
1955 		*o = (pgoff_t) i;
1956 		return K_OFFSET_POS_SET;
1957 	}
1958 
1959 	/*
1960 	 * Read the flag indicating the state of the data pointer. Check if valid
1961 	 * and die if not.
1962 	 *
1963 	 * This used to be handled by a negative or zero pointer, now we use an
1964 	 * extra byte specifically for the state.
1965 	 */
1966 	offsetFlg = (*AH->ReadBytePtr) (AH) & 0xFF;
1967 
1968 	switch (offsetFlg)
1969 	{
1970 		case K_OFFSET_POS_NOT_SET:
1971 		case K_OFFSET_NO_DATA:
1972 		case K_OFFSET_POS_SET:
1973 
1974 			break;
1975 
1976 		default:
1977 			exit_horribly(modulename, "unexpected data offset flag %d\n", offsetFlg);
1978 	}
1979 
1980 	/*
1981 	 * Read the bytes
1982 	 */
1983 	for (off = 0; off < AH->offSize; off++)
1984 	{
1985 		if (off < sizeof(pgoff_t))
1986 			*o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8);
1987 		else
1988 		{
1989 			if ((*AH->ReadBytePtr) (AH) != 0)
1990 				exit_horribly(modulename, "file offset in dump file is too large\n");
1991 		}
1992 	}
1993 
1994 	return offsetFlg;
1995 }
1996 
1997 size_t
WriteInt(ArchiveHandle * AH,int i)1998 WriteInt(ArchiveHandle *AH, int i)
1999 {
2000 	int			b;
2001 
2002 	/*
2003 	 * This is a bit yucky, but I don't want to make the binary format very
2004 	 * dependent on representation, and not knowing much about it, I write out
2005 	 * a sign byte. If you change this, don't forget to change the file
2006 	 * version #, and modify readInt to read the new format AS WELL AS the old
2007 	 * formats.
2008 	 */
2009 
2010 	/* SIGN byte */
2011 	if (i < 0)
2012 	{
2013 		(*AH->WriteBytePtr) (AH, 1);
2014 		i = -i;
2015 	}
2016 	else
2017 		(*AH->WriteBytePtr) (AH, 0);
2018 
2019 	for (b = 0; b < AH->intSize; b++)
2020 	{
2021 		(*AH->WriteBytePtr) (AH, i & 0xFF);
2022 		i >>= 8;
2023 	}
2024 
2025 	return AH->intSize + 1;
2026 }
2027 
2028 int
ReadInt(ArchiveHandle * AH)2029 ReadInt(ArchiveHandle *AH)
2030 {
2031 	int			res = 0;
2032 	int			bv,
2033 				b;
2034 	int			sign = 0;		/* Default positive */
2035 	int			bitShift = 0;
2036 
2037 	if (AH->version > K_VERS_1_0)
2038 		/* Read a sign byte */
2039 		sign = (*AH->ReadBytePtr) (AH);
2040 
2041 	for (b = 0; b < AH->intSize; b++)
2042 	{
2043 		bv = (*AH->ReadBytePtr) (AH) & 0xFF;
2044 		if (bv != 0)
2045 			res = res + (bv << bitShift);
2046 		bitShift += 8;
2047 	}
2048 
2049 	if (sign)
2050 		res = -res;
2051 
2052 	return res;
2053 }
2054 
2055 size_t
WriteStr(ArchiveHandle * AH,const char * c)2056 WriteStr(ArchiveHandle *AH, const char *c)
2057 {
2058 	size_t		res;
2059 
2060 	if (c)
2061 	{
2062 		int			len = strlen(c);
2063 
2064 		res = WriteInt(AH, len);
2065 		(*AH->WriteBufPtr) (AH, c, len);
2066 		res += len;
2067 	}
2068 	else
2069 		res = WriteInt(AH, -1);
2070 
2071 	return res;
2072 }
2073 
2074 char *
ReadStr(ArchiveHandle * AH)2075 ReadStr(ArchiveHandle *AH)
2076 {
2077 	char	   *buf;
2078 	int			l;
2079 
2080 	l = ReadInt(AH);
2081 	if (l < 0)
2082 		buf = NULL;
2083 	else
2084 	{
2085 		buf = (char *) pg_malloc(l + 1);
2086 		(*AH->ReadBufPtr) (AH, (void *) buf, l);
2087 
2088 		buf[l] = '\0';
2089 	}
2090 
2091 	return buf;
2092 }
2093 
2094 static int
_discoverArchiveFormat(ArchiveHandle * AH)2095 _discoverArchiveFormat(ArchiveHandle *AH)
2096 {
2097 	FILE	   *fh;
2098 	char		sig[6];			/* More than enough */
2099 	size_t		cnt;
2100 	int			wantClose = 0;
2101 
2102 #if 0
2103 	write_msg(modulename, "attempting to ascertain archive format\n");
2104 #endif
2105 
2106 	if (AH->lookahead)
2107 		free(AH->lookahead);
2108 
2109 	AH->readHeader = 0;
2110 	AH->lookaheadSize = 512;
2111 	AH->lookahead = pg_malloc0(512);
2112 	AH->lookaheadLen = 0;
2113 	AH->lookaheadPos = 0;
2114 
2115 	if (AH->fSpec)
2116 	{
2117 		struct stat st;
2118 
2119 		wantClose = 1;
2120 
2121 		/*
2122 		 * Check if the specified archive is a directory. If so, check if
2123 		 * there's a "toc.dat" (or "toc.dat.gz") file in it.
2124 		 */
2125 		if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2126 		{
2127 			char		buf[MAXPGPATH];
2128 
2129 			if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
2130 				exit_horribly(modulename, "directory name too long: \"%s\"\n",
2131 							  AH->fSpec);
2132 			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2133 			{
2134 				AH->format = archDirectory;
2135 				return AH->format;
2136 			}
2137 
2138 #ifdef HAVE_LIBZ
2139 			if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
2140 				exit_horribly(modulename, "directory name too long: \"%s\"\n",
2141 							  AH->fSpec);
2142 			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2143 			{
2144 				AH->format = archDirectory;
2145 				return AH->format;
2146 			}
2147 #endif
2148 			exit_horribly(modulename, "directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)\n",
2149 						  AH->fSpec);
2150 			fh = NULL;			/* keep compiler quiet */
2151 		}
2152 		else
2153 		{
2154 			fh = fopen(AH->fSpec, PG_BINARY_R);
2155 			if (!fh)
2156 				exit_horribly(modulename, "could not open input file \"%s\": %s\n",
2157 							  AH->fSpec, strerror(errno));
2158 		}
2159 	}
2160 	else
2161 	{
2162 		fh = stdin;
2163 		if (!fh)
2164 			exit_horribly(modulename, "could not open input file: %s\n",
2165 						  strerror(errno));
2166 	}
2167 
2168 	if ((cnt = fread(sig, 1, 5, fh)) != 5)
2169 	{
2170 		if (ferror(fh))
2171 			exit_horribly(modulename, "could not read input file: %s\n", strerror(errno));
2172 		else
2173 			exit_horribly(modulename, "input file is too short (read %lu, expected 5)\n",
2174 						  (unsigned long) cnt);
2175 	}
2176 
2177 	/* Save it, just in case we need it later */
2178 	memcpy(&AH->lookahead[0], sig, 5);
2179 	AH->lookaheadLen = 5;
2180 
2181 	if (strncmp(sig, "PGDMP", 5) == 0)
2182 	{
2183 		/* It's custom format, stop here */
2184 		AH->format = archCustom;
2185 		AH->readHeader = 1;
2186 	}
2187 	else
2188 	{
2189 		/*
2190 		 * *Maybe* we have a tar archive format file or a text dump ... So,
2191 		 * read first 512 byte header...
2192 		 */
2193 		cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2194 		/* read failure is checked below */
2195 		AH->lookaheadLen += cnt;
2196 
2197 		if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2198 			(strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2199 			 strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2200 		{
2201 			/*
2202 			 * looks like it's probably a text format dump. so suggest they
2203 			 * try psql
2204 			 */
2205 			exit_horribly(modulename, "input file appears to be a text format dump. Please use psql.\n");
2206 		}
2207 
2208 		if (AH->lookaheadLen != 512)
2209 		{
2210 			if (feof(fh))
2211 				exit_horribly(modulename, "input file does not appear to be a valid archive (too short?)\n");
2212 			else
2213 				READ_ERROR_EXIT(fh);
2214 		}
2215 
2216 		if (!isValidTarHeader(AH->lookahead))
2217 			exit_horribly(modulename, "input file does not appear to be a valid archive\n");
2218 
2219 		AH->format = archTar;
2220 	}
2221 
2222 	/* Close the file if we opened it */
2223 	if (wantClose)
2224 	{
2225 		if (fclose(fh) != 0)
2226 			exit_horribly(modulename, "could not close input file: %s\n",
2227 						  strerror(errno));
2228 		/* Forget lookahead, since we'll re-read header after re-opening */
2229 		AH->readHeader = 0;
2230 		AH->lookaheadLen = 0;
2231 	}
2232 
2233 	return AH->format;
2234 }
2235 
2236 
2237 /*
2238  * Allocate an archive handle
2239  */
2240 static ArchiveHandle *
_allocAH(const char * FileSpec,const ArchiveFormat fmt,const int compression,ArchiveMode mode,SetupWorkerPtr setupWorkerPtr)2241 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2242 	  const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr)
2243 {
2244 	ArchiveHandle *AH;
2245 
2246 #if 0
2247 	write_msg(modulename, "allocating AH for %s, format %d\n", FileSpec, fmt);
2248 #endif
2249 
2250 	AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2251 
2252 	/* AH->debugLevel = 100; */
2253 
2254 	AH->vmaj = K_VERS_MAJOR;
2255 	AH->vmin = K_VERS_MINOR;
2256 	AH->vrev = K_VERS_REV;
2257 
2258 	/* Make a convenient integer <maj><min><rev>00 */
2259 	AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
2260 
2261 	/* initialize for backwards compatible string processing */
2262 	AH->public.encoding = 0;	/* PG_SQL_ASCII */
2263 	AH->public.std_strings = false;
2264 
2265 	/* sql error handling */
2266 	AH->public.exit_on_error = true;
2267 	AH->public.n_errors = 0;
2268 
2269 	AH->archiveDumpVersion = PG_VERSION;
2270 
2271 	AH->createDate = time(NULL);
2272 
2273 	AH->intSize = sizeof(int);
2274 	AH->offSize = sizeof(pgoff_t);
2275 	if (FileSpec)
2276 	{
2277 		AH->fSpec = pg_strdup(FileSpec);
2278 
2279 		/*
2280 		 * Not used; maybe later....
2281 		 *
2282 		 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2283 		 * i--) if (AH->workDir[i-1] == '/')
2284 		 */
2285 	}
2286 	else
2287 		AH->fSpec = NULL;
2288 
2289 	AH->currUser = NULL;		/* unknown */
2290 	AH->currSchema = NULL;		/* ditto */
2291 	AH->currTablespace = NULL;	/* ditto */
2292 	AH->currWithOids = -1;		/* force SET */
2293 
2294 	AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2295 
2296 	AH->toc->next = AH->toc;
2297 	AH->toc->prev = AH->toc;
2298 
2299 	AH->mode = mode;
2300 	AH->compression = compression;
2301 
2302 	memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2303 
2304 	/* Open stdout with no compression for AH output handle */
2305 	AH->gzOut = 0;
2306 	AH->OF = stdout;
2307 
2308 	/*
2309 	 * On Windows, we need to use binary mode to read/write non-text files,
2310 	 * which include all archive formats as well as compressed plain text.
2311 	 * Force stdin/stdout into binary mode if that is what we are using.
2312 	 */
2313 #ifdef WIN32
2314 	if ((fmt != archNull || compression != 0) &&
2315 		(AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2316 	{
2317 		if (mode == archModeWrite)
2318 			_setmode(fileno(stdout), O_BINARY);
2319 		else
2320 			_setmode(fileno(stdin), O_BINARY);
2321 	}
2322 #endif
2323 
2324 	AH->SetupWorkerPtr = setupWorkerPtr;
2325 
2326 	if (fmt == archUnknown)
2327 		AH->format = _discoverArchiveFormat(AH);
2328 	else
2329 		AH->format = fmt;
2330 
2331 	switch (AH->format)
2332 	{
2333 		case archCustom:
2334 			InitArchiveFmt_Custom(AH);
2335 			break;
2336 
2337 		case archNull:
2338 			InitArchiveFmt_Null(AH);
2339 			break;
2340 
2341 		case archDirectory:
2342 			InitArchiveFmt_Directory(AH);
2343 			break;
2344 
2345 		case archTar:
2346 			InitArchiveFmt_Tar(AH);
2347 			break;
2348 
2349 		default:
2350 			exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt);
2351 	}
2352 
2353 	return AH;
2354 }
2355 
2356 /*
2357  * Write out all data (tables & blobs)
2358  */
2359 void
WriteDataChunks(ArchiveHandle * AH,ParallelState * pstate)2360 WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2361 {
2362 	TocEntry   *te;
2363 
2364 	for (te = AH->toc->next; te != AH->toc; te = te->next)
2365 	{
2366 		if (!te->dataDumper)
2367 			continue;
2368 
2369 		if ((te->reqs & REQ_DATA) == 0)
2370 			continue;
2371 
2372 		if (pstate && pstate->numWorkers > 1)
2373 		{
2374 			/*
2375 			 * If we are in a parallel backup, then we are always the master
2376 			 * process.  Dispatch each data-transfer job to a worker.
2377 			 */
2378 			EnsureIdleWorker(AH, pstate);
2379 			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
2380 		}
2381 		else
2382 			WriteDataChunksForTocEntry(AH, te);
2383 	}
2384 
2385 	/*
2386 	 * If parallel, wait for workers to finish.
2387 	 */
2388 	EnsureWorkersFinished(AH, pstate);
2389 }
2390 
2391 void
WriteDataChunksForTocEntry(ArchiveHandle * AH,TocEntry * te)2392 WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2393 {
2394 	StartDataPtr startPtr;
2395 	EndDataPtr	endPtr;
2396 
2397 	AH->currToc = te;
2398 
2399 	if (strcmp(te->desc, "BLOBS") == 0)
2400 	{
2401 		startPtr = AH->StartBlobsPtr;
2402 		endPtr = AH->EndBlobsPtr;
2403 	}
2404 	else
2405 	{
2406 		startPtr = AH->StartDataPtr;
2407 		endPtr = AH->EndDataPtr;
2408 	}
2409 
2410 	if (startPtr != NULL)
2411 		(*startPtr) (AH, te);
2412 
2413 	/*
2414 	 * The user-provided DataDumper routine needs to call AH->WriteData
2415 	 */
2416 	(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
2417 
2418 	if (endPtr != NULL)
2419 		(*endPtr) (AH, te);
2420 
2421 	AH->currToc = NULL;
2422 }
2423 
2424 void
WriteToc(ArchiveHandle * AH)2425 WriteToc(ArchiveHandle *AH)
2426 {
2427 	TocEntry   *te;
2428 	char		workbuf[32];
2429 	int			tocCount;
2430 	int			i;
2431 
2432 	/* count entries that will actually be dumped */
2433 	tocCount = 0;
2434 	for (te = AH->toc->next; te != AH->toc; te = te->next)
2435 	{
2436 		if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2437 			tocCount++;
2438 	}
2439 
2440 	/* printf("%d TOC Entries to save\n", tocCount); */
2441 
2442 	WriteInt(AH, tocCount);
2443 
2444 	for (te = AH->toc->next; te != AH->toc; te = te->next)
2445 	{
2446 		if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2447 			continue;
2448 
2449 		WriteInt(AH, te->dumpId);
2450 		WriteInt(AH, te->dataDumper ? 1 : 0);
2451 
2452 		/* OID is recorded as a string for historical reasons */
2453 		sprintf(workbuf, "%u", te->catalogId.tableoid);
2454 		WriteStr(AH, workbuf);
2455 		sprintf(workbuf, "%u", te->catalogId.oid);
2456 		WriteStr(AH, workbuf);
2457 
2458 		WriteStr(AH, te->tag);
2459 		WriteStr(AH, te->desc);
2460 		WriteInt(AH, te->section);
2461 		WriteStr(AH, te->defn);
2462 		WriteStr(AH, te->dropStmt);
2463 		WriteStr(AH, te->copyStmt);
2464 		WriteStr(AH, te->namespace);
2465 		WriteStr(AH, te->tablespace);
2466 		WriteStr(AH, te->owner);
2467 		WriteStr(AH, te->withOids ? "true" : "false");
2468 
2469 		/* Dump list of dependencies */
2470 		for (i = 0; i < te->nDeps; i++)
2471 		{
2472 			sprintf(workbuf, "%d", te->dependencies[i]);
2473 			WriteStr(AH, workbuf);
2474 		}
2475 		WriteStr(AH, NULL);		/* Terminate List */
2476 
2477 		if (AH->WriteExtraTocPtr)
2478 			(*AH->WriteExtraTocPtr) (AH, te);
2479 	}
2480 }
2481 
2482 void
ReadToc(ArchiveHandle * AH)2483 ReadToc(ArchiveHandle *AH)
2484 {
2485 	int			i;
2486 	char	   *tmp;
2487 	DumpId	   *deps;
2488 	int			depIdx;
2489 	int			depSize;
2490 	TocEntry   *te;
2491 
2492 	AH->tocCount = ReadInt(AH);
2493 	AH->maxDumpId = 0;
2494 
2495 	for (i = 0; i < AH->tocCount; i++)
2496 	{
2497 		te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2498 		te->dumpId = ReadInt(AH);
2499 
2500 		if (te->dumpId > AH->maxDumpId)
2501 			AH->maxDumpId = te->dumpId;
2502 
2503 		/* Sanity check */
2504 		if (te->dumpId <= 0)
2505 			exit_horribly(modulename,
2506 					   "entry ID %d out of range -- perhaps a corrupt TOC\n",
2507 						  te->dumpId);
2508 
2509 		te->hadDumper = ReadInt(AH);
2510 
2511 		if (AH->version >= K_VERS_1_8)
2512 		{
2513 			tmp = ReadStr(AH);
2514 			sscanf(tmp, "%u", &te->catalogId.tableoid);
2515 			free(tmp);
2516 		}
2517 		else
2518 			te->catalogId.tableoid = InvalidOid;
2519 		tmp = ReadStr(AH);
2520 		sscanf(tmp, "%u", &te->catalogId.oid);
2521 		free(tmp);
2522 
2523 		te->tag = ReadStr(AH);
2524 		te->desc = ReadStr(AH);
2525 
2526 		if (AH->version >= K_VERS_1_11)
2527 		{
2528 			te->section = ReadInt(AH);
2529 		}
2530 		else
2531 		{
2532 			/*
2533 			 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2534 			 * the entries into sections.  This list need not cover entry
2535 			 * types added later than 8.4.
2536 			 */
2537 			if (strcmp(te->desc, "COMMENT") == 0 ||
2538 				strcmp(te->desc, "ACL") == 0 ||
2539 				strcmp(te->desc, "ACL LANGUAGE") == 0)
2540 				te->section = SECTION_NONE;
2541 			else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2542 					 strcmp(te->desc, "BLOBS") == 0 ||
2543 					 strcmp(te->desc, "BLOB COMMENTS") == 0)
2544 				te->section = SECTION_DATA;
2545 			else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2546 					 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2547 					 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2548 					 strcmp(te->desc, "INDEX") == 0 ||
2549 					 strcmp(te->desc, "RULE") == 0 ||
2550 					 strcmp(te->desc, "TRIGGER") == 0)
2551 				te->section = SECTION_POST_DATA;
2552 			else
2553 				te->section = SECTION_PRE_DATA;
2554 		}
2555 
2556 		te->defn = ReadStr(AH);
2557 		te->dropStmt = ReadStr(AH);
2558 
2559 		if (AH->version >= K_VERS_1_3)
2560 			te->copyStmt = ReadStr(AH);
2561 
2562 		if (AH->version >= K_VERS_1_6)
2563 			te->namespace = ReadStr(AH);
2564 
2565 		if (AH->version >= K_VERS_1_10)
2566 			te->tablespace = ReadStr(AH);
2567 
2568 		te->owner = ReadStr(AH);
2569 		if (AH->version >= K_VERS_1_9)
2570 		{
2571 			if (strcmp(ReadStr(AH), "true") == 0)
2572 				te->withOids = true;
2573 			else
2574 				te->withOids = false;
2575 		}
2576 		else
2577 			te->withOids = true;
2578 
2579 		/* Read TOC entry dependencies */
2580 		if (AH->version >= K_VERS_1_5)
2581 		{
2582 			depSize = 100;
2583 			deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2584 			depIdx = 0;
2585 			for (;;)
2586 			{
2587 				tmp = ReadStr(AH);
2588 				if (!tmp)
2589 					break;		/* end of list */
2590 				if (depIdx >= depSize)
2591 				{
2592 					depSize *= 2;
2593 					deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2594 				}
2595 				sscanf(tmp, "%d", &deps[depIdx]);
2596 				free(tmp);
2597 				depIdx++;
2598 			}
2599 
2600 			if (depIdx > 0)		/* We have a non-null entry */
2601 			{
2602 				deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2603 				te->dependencies = deps;
2604 				te->nDeps = depIdx;
2605 			}
2606 			else
2607 			{
2608 				free(deps);
2609 				te->dependencies = NULL;
2610 				te->nDeps = 0;
2611 			}
2612 		}
2613 		else
2614 		{
2615 			te->dependencies = NULL;
2616 			te->nDeps = 0;
2617 		}
2618 
2619 		if (AH->ReadExtraTocPtr)
2620 			(*AH->ReadExtraTocPtr) (AH, te);
2621 
2622 		ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n",
2623 			  i, te->dumpId, te->desc, te->tag);
2624 
2625 		/* link completed entry into TOC circular list */
2626 		te->prev = AH->toc->prev;
2627 		AH->toc->prev->next = te;
2628 		AH->toc->prev = te;
2629 		te->next = AH->toc;
2630 
2631 		/* special processing immediately upon read for some items */
2632 		if (strcmp(te->desc, "ENCODING") == 0)
2633 			processEncodingEntry(AH, te);
2634 		else if (strcmp(te->desc, "STDSTRINGS") == 0)
2635 			processStdStringsEntry(AH, te);
2636 		else if (strcmp(te->desc, "SEARCHPATH") == 0)
2637 			processSearchPathEntry(AH, te);
2638 	}
2639 }
2640 
2641 static void
processEncodingEntry(ArchiveHandle * AH,TocEntry * te)2642 processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2643 {
2644 	/* te->defn should have the form SET client_encoding = 'foo'; */
2645 	char	   *defn = pg_strdup(te->defn);
2646 	char	   *ptr1;
2647 	char	   *ptr2 = NULL;
2648 	int			encoding;
2649 
2650 	ptr1 = strchr(defn, '\'');
2651 	if (ptr1)
2652 		ptr2 = strchr(++ptr1, '\'');
2653 	if (ptr2)
2654 	{
2655 		*ptr2 = '\0';
2656 		encoding = pg_char_to_encoding(ptr1);
2657 		if (encoding < 0)
2658 			exit_horribly(modulename, "unrecognized encoding \"%s\"\n",
2659 						  ptr1);
2660 		AH->public.encoding = encoding;
2661 	}
2662 	else
2663 		exit_horribly(modulename, "invalid ENCODING item: %s\n",
2664 					  te->defn);
2665 
2666 	free(defn);
2667 }
2668 
2669 static void
processStdStringsEntry(ArchiveHandle * AH,TocEntry * te)2670 processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2671 {
2672 	/* te->defn should have the form SET standard_conforming_strings = 'x'; */
2673 	char	   *ptr1;
2674 
2675 	ptr1 = strchr(te->defn, '\'');
2676 	if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2677 		AH->public.std_strings = true;
2678 	else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2679 		AH->public.std_strings = false;
2680 	else
2681 		exit_horribly(modulename, "invalid STDSTRINGS item: %s\n",
2682 					  te->defn);
2683 }
2684 
2685 static void
processSearchPathEntry(ArchiveHandle * AH,TocEntry * te)2686 processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2687 {
2688 	/*
2689 	 * te->defn should contain a command to set search_path.  We just copy it
2690 	 * verbatim for use later.
2691 	 */
2692 	AH->public.searchpath = pg_strdup(te->defn);
2693 }
2694 
2695 static void
StrictNamesCheck(RestoreOptions * ropt)2696 StrictNamesCheck(RestoreOptions *ropt)
2697 {
2698 	const char *missing_name;
2699 
2700 	Assert(ropt->strict_names);
2701 
2702 	if (ropt->schemaNames.head != NULL)
2703 	{
2704 		missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2705 		if (missing_name != NULL)
2706 			exit_horribly(modulename, "schema \"%s\" not found\n", missing_name);
2707 	}
2708 
2709 	if (ropt->tableNames.head != NULL)
2710 	{
2711 		missing_name = simple_string_list_not_touched(&ropt->tableNames);
2712 		if (missing_name != NULL)
2713 			exit_horribly(modulename, "table \"%s\" not found\n", missing_name);
2714 	}
2715 
2716 	if (ropt->indexNames.head != NULL)
2717 	{
2718 		missing_name = simple_string_list_not_touched(&ropt->indexNames);
2719 		if (missing_name != NULL)
2720 			exit_horribly(modulename, "index \"%s\" not found\n", missing_name);
2721 	}
2722 
2723 	if (ropt->functionNames.head != NULL)
2724 	{
2725 		missing_name = simple_string_list_not_touched(&ropt->functionNames);
2726 		if (missing_name != NULL)
2727 			exit_horribly(modulename, "function \"%s\" not found\n", missing_name);
2728 	}
2729 
2730 	if (ropt->triggerNames.head != NULL)
2731 	{
2732 		missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2733 		if (missing_name != NULL)
2734 			exit_horribly(modulename, "trigger \"%s\" not found\n", missing_name);
2735 	}
2736 }
2737 
2738 static teReqs
_tocEntryRequired(TocEntry * te,teSection curSection,RestoreOptions * ropt)2739 _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt)
2740 {
2741 	teReqs		res = REQ_SCHEMA | REQ_DATA;
2742 
2743 	/* These items are treated specially */
2744 	if (strcmp(te->desc, "ENCODING") == 0 ||
2745 		strcmp(te->desc, "STDSTRINGS") == 0 ||
2746 		strcmp(te->desc, "SEARCHPATH") == 0)
2747 		return REQ_SPECIAL;
2748 
2749 	/* If it's an ACL, maybe ignore it */
2750 	if (ropt->aclsSkip && _tocEntryIsACL(te))
2751 		return 0;
2752 
2753 	/* If it's security labels, maybe ignore it */
2754 	if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2755 		return 0;
2756 
2757 	/* Ignore it if section is not to be dumped/restored */
2758 	switch (curSection)
2759 	{
2760 		case SECTION_PRE_DATA:
2761 			if (!(ropt->dumpSections & DUMP_PRE_DATA))
2762 				return 0;
2763 			break;
2764 		case SECTION_DATA:
2765 			if (!(ropt->dumpSections & DUMP_DATA))
2766 				return 0;
2767 			break;
2768 		case SECTION_POST_DATA:
2769 			if (!(ropt->dumpSections & DUMP_POST_DATA))
2770 				return 0;
2771 			break;
2772 		default:
2773 			/* shouldn't get here, really, but ignore it */
2774 			return 0;
2775 	}
2776 
2777 	/* Check options for selective dump/restore */
2778 	if (ropt->schemaNames.head != NULL)
2779 	{
2780 		/* If no namespace is specified, it means all. */
2781 		if (!te->namespace)
2782 			return 0;
2783 		if (!(simple_string_list_member(&ropt->schemaNames, te->namespace)))
2784 			return 0;
2785 	}
2786 
2787 	if (ropt->selTypes)
2788 	{
2789 		if (strcmp(te->desc, "TABLE") == 0 ||
2790 			strcmp(te->desc, "TABLE DATA") == 0 ||
2791 			strcmp(te->desc, "VIEW") == 0 ||
2792 			strcmp(te->desc, "FOREIGN TABLE") == 0 ||
2793 			strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
2794 			strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
2795 			strcmp(te->desc, "SEQUENCE") == 0 ||
2796 			strcmp(te->desc, "SEQUENCE SET") == 0)
2797 		{
2798 			if (!ropt->selTable)
2799 				return 0;
2800 			if (ropt->tableNames.head != NULL && (!(simple_string_list_member(&ropt->tableNames, te->tag))))
2801 				return 0;
2802 		}
2803 		else if (strcmp(te->desc, "INDEX") == 0)
2804 		{
2805 			if (!ropt->selIndex)
2806 				return 0;
2807 			if (ropt->indexNames.head != NULL && (!(simple_string_list_member(&ropt->indexNames, te->tag))))
2808 				return 0;
2809 		}
2810 		else if (strcmp(te->desc, "FUNCTION") == 0)
2811 		{
2812 			if (!ropt->selFunction)
2813 				return 0;
2814 			if (ropt->functionNames.head != NULL && (!(simple_string_list_member(&ropt->functionNames, te->tag))))
2815 				return 0;
2816 		}
2817 		else if (strcmp(te->desc, "TRIGGER") == 0)
2818 		{
2819 			if (!ropt->selTrigger)
2820 				return 0;
2821 			if (ropt->triggerNames.head != NULL && (!(simple_string_list_member(&ropt->triggerNames, te->tag))))
2822 				return 0;
2823 		}
2824 		else
2825 			return 0;
2826 	}
2827 
2828 	/*
2829 	 * Check if we had a dataDumper. Indicates if the entry is schema or data
2830 	 */
2831 	if (!te->hadDumper)
2832 	{
2833 		/*
2834 		 * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
2835 		 * it is considered a data entry.  We don't need to check for the
2836 		 * BLOBS entry or old-style BLOB COMMENTS, because they will have
2837 		 * hadDumper = true ... but we do need to check new-style BLOB
2838 		 * comments.
2839 		 */
2840 		if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
2841 			strcmp(te->desc, "BLOB") == 0 ||
2842 			(strcmp(te->desc, "ACL") == 0 &&
2843 			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2844 			(strcmp(te->desc, "COMMENT") == 0 &&
2845 			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2846 			(strcmp(te->desc, "SECURITY LABEL") == 0 &&
2847 			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
2848 			res = res & REQ_DATA;
2849 		else
2850 			res = res & ~REQ_DATA;
2851 	}
2852 
2853 	/*
2854 	 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
2855 	 * always ignore it.
2856 	 */
2857 	if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2858 		return 0;
2859 
2860 	/* Mask it if we only want schema */
2861 	if (ropt->schemaOnly)
2862 	{
2863 		/*
2864 		 * In binary-upgrade mode, even with schema-only set, we do not mask
2865 		 * out large objects.  Only large object definitions, comments and
2866 		 * other information should be generated in binary-upgrade mode (not
2867 		 * the actual data).
2868 		 */
2869 		if (!(ropt->binary_upgrade &&
2870 			  (strcmp(te->desc, "BLOB") == 0 ||
2871 			   (strcmp(te->desc, "ACL") == 0 &&
2872 				strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2873 			   (strcmp(te->desc, "COMMENT") == 0 &&
2874 				strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2875 			   (strcmp(te->desc, "SECURITY LABEL") == 0 &&
2876 				strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
2877 			res = res & REQ_SCHEMA;
2878 	}
2879 
2880 	/* Mask it if we only want data */
2881 	if (ropt->dataOnly)
2882 		res = res & REQ_DATA;
2883 
2884 	/* Mask it if we don't have a schema contribution */
2885 	if (!te->defn || strlen(te->defn) == 0)
2886 		res = res & ~REQ_SCHEMA;
2887 
2888 	/* Finally, if there's a per-ID filter, limit based on that as well */
2889 	if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2890 		return 0;
2891 
2892 	return res;
2893 }
2894 
2895 /*
2896  * Identify which pass we should restore this TOC entry in.
2897  *
2898  * See notes with the RestorePass typedef in pg_backup_archiver.h.
2899  */
2900 static RestorePass
_tocEntryRestorePass(TocEntry * te)2901 _tocEntryRestorePass(TocEntry *te)
2902 {
2903 	/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
2904 	if (strcmp(te->desc, "ACL") == 0 ||
2905 		strcmp(te->desc, "ACL LANGUAGE") == 0 ||
2906 		strcmp(te->desc, "DEFAULT ACL") == 0)
2907 		return RESTORE_PASS_ACL;
2908 	if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
2909 		strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
2910 		return RESTORE_PASS_POST_ACL;
2911 
2912 	/*
2913 	 * Comments need to be emitted in the same pass as their parent objects.
2914 	 * ACLs haven't got comments, and neither do matview data objects, but
2915 	 * event triggers do.  (Fortunately, event triggers haven't got ACLs, or
2916 	 * we'd need yet another weird special case.)
2917 	 */
2918 	if (strcmp(te->desc, "COMMENT") == 0 &&
2919 		strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
2920 		return RESTORE_PASS_POST_ACL;
2921 
2922 	/* All else can be handled in the main pass. */
2923 	return RESTORE_PASS_MAIN;
2924 }
2925 
2926 /*
2927  * Identify TOC entries that are ACLs.
2928  *
2929  * Note: it seems worth duplicating some code here to avoid a hard-wired
2930  * assumption that these are exactly the same entries that we restore during
2931  * the RESTORE_PASS_ACL phase.
2932  */
2933 static bool
_tocEntryIsACL(TocEntry * te)2934 _tocEntryIsACL(TocEntry *te)
2935 {
2936 	/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
2937 	if (strcmp(te->desc, "ACL") == 0 ||
2938 		strcmp(te->desc, "ACL LANGUAGE") == 0 ||
2939 		strcmp(te->desc, "DEFAULT ACL") == 0)
2940 		return true;
2941 	return false;
2942 }
2943 
2944 /*
2945  * Issue SET commands for parameters that we want to have set the same way
2946  * at all times during execution of a restore script.
2947  */
2948 static void
_doSetFixedOutputState(ArchiveHandle * AH)2949 _doSetFixedOutputState(ArchiveHandle *AH)
2950 {
2951 	RestoreOptions *ropt = AH->public.ropt;
2952 
2953 	/*
2954 	 * Disable timeouts to allow for slow commands, idle parallel workers, etc
2955 	 */
2956 	ahprintf(AH, "SET statement_timeout = 0;\n");
2957 	ahprintf(AH, "SET lock_timeout = 0;\n");
2958 	ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
2959 
2960 	/* Select the correct character set encoding */
2961 	ahprintf(AH, "SET client_encoding = '%s';\n",
2962 			 pg_encoding_to_char(AH->public.encoding));
2963 
2964 	/* Select the correct string literal syntax */
2965 	ahprintf(AH, "SET standard_conforming_strings = %s;\n",
2966 			 AH->public.std_strings ? "on" : "off");
2967 
2968 	/* Select the role to be used during restore */
2969 	if (ropt && ropt->use_role)
2970 		ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
2971 
2972 	/* Select the dump-time search_path */
2973 	if (AH->public.searchpath)
2974 		ahprintf(AH, "%s", AH->public.searchpath);
2975 
2976 	/* Make sure function checking is disabled */
2977 	ahprintf(AH, "SET check_function_bodies = false;\n");
2978 
2979 	/* Ensure that all valid XML data will be accepted */
2980 	ahprintf(AH, "SET xmloption = content;\n");
2981 
2982 	/* Avoid annoying notices etc */
2983 	ahprintf(AH, "SET client_min_messages = warning;\n");
2984 	if (!AH->public.std_strings)
2985 		ahprintf(AH, "SET escape_string_warning = off;\n");
2986 
2987 	/* Adjust row-security state */
2988 	if (ropt && ropt->enable_row_security)
2989 		ahprintf(AH, "SET row_security = on;\n");
2990 	else
2991 		ahprintf(AH, "SET row_security = off;\n");
2992 
2993 	ahprintf(AH, "\n");
2994 }
2995 
2996 /*
2997  * Issue a SET SESSION AUTHORIZATION command.  Caller is responsible
2998  * for updating state if appropriate.  If user is NULL or an empty string,
2999  * the specification DEFAULT will be used.
3000  */
3001 static void
_doSetSessionAuth(ArchiveHandle * AH,const char * user)3002 _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3003 {
3004 	PQExpBuffer cmd = createPQExpBuffer();
3005 
3006 	appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3007 
3008 	/*
3009 	 * SQL requires a string literal here.  Might as well be correct.
3010 	 */
3011 	if (user && *user)
3012 		appendStringLiteralAHX(cmd, user, AH);
3013 	else
3014 		appendPQExpBufferStr(cmd, "DEFAULT");
3015 	appendPQExpBufferChar(cmd, ';');
3016 
3017 	if (RestoringToDB(AH))
3018 	{
3019 		PGresult   *res;
3020 
3021 		res = PQexec(AH->connection, cmd->data);
3022 
3023 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3024 			/* NOT warn_or_exit_horribly... use -O instead to skip this. */
3025 			exit_horribly(modulename, "could not set session user to \"%s\": %s",
3026 						  user, PQerrorMessage(AH->connection));
3027 
3028 		PQclear(res);
3029 	}
3030 	else
3031 		ahprintf(AH, "%s\n\n", cmd->data);
3032 
3033 	destroyPQExpBuffer(cmd);
3034 }
3035 
3036 
3037 /*
3038  * Issue a SET default_with_oids command.  Caller is responsible
3039  * for updating state if appropriate.
3040  */
3041 static void
_doSetWithOids(ArchiveHandle * AH,const bool withOids)3042 _doSetWithOids(ArchiveHandle *AH, const bool withOids)
3043 {
3044 	PQExpBuffer cmd = createPQExpBuffer();
3045 
3046 	appendPQExpBuffer(cmd, "SET default_with_oids = %s;", withOids ?
3047 					  "true" : "false");
3048 
3049 	if (RestoringToDB(AH))
3050 	{
3051 		PGresult   *res;
3052 
3053 		res = PQexec(AH->connection, cmd->data);
3054 
3055 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3056 			warn_or_exit_horribly(AH, modulename,
3057 								  "could not set default_with_oids: %s",
3058 								  PQerrorMessage(AH->connection));
3059 
3060 		PQclear(res);
3061 	}
3062 	else
3063 		ahprintf(AH, "%s\n\n", cmd->data);
3064 
3065 	destroyPQExpBuffer(cmd);
3066 }
3067 
3068 
3069 /*
3070  * Issue the commands to connect to the specified database.
3071  *
3072  * If we're currently restoring right into a database, this will
3073  * actually establish a connection. Otherwise it puts a \connect into
3074  * the script output.
3075  */
3076 static void
_reconnectToDB(ArchiveHandle * AH,const char * dbname)3077 _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3078 {
3079 	if (RestoringToDB(AH))
3080 		ReconnectToServer(AH, dbname);
3081 	else
3082 	{
3083 		PQExpBufferData connectbuf;
3084 
3085 		initPQExpBuffer(&connectbuf);
3086 		appendPsqlMetaConnect(&connectbuf, dbname);
3087 		ahprintf(AH, "%s\n", connectbuf.data);
3088 		termPQExpBuffer(&connectbuf);
3089 	}
3090 
3091 	/*
3092 	 * NOTE: currUser keeps track of what the imaginary session user in our
3093 	 * script is.  It's now effectively reset to the original userID.
3094 	 */
3095 	if (AH->currUser)
3096 		free(AH->currUser);
3097 	AH->currUser = NULL;
3098 
3099 	/* don't assume we still know the output schema, tablespace, etc either */
3100 	if (AH->currSchema)
3101 		free(AH->currSchema);
3102 	AH->currSchema = NULL;
3103 	if (AH->currTablespace)
3104 		free(AH->currTablespace);
3105 	AH->currTablespace = NULL;
3106 	AH->currWithOids = -1;
3107 
3108 	/* re-establish fixed state */
3109 	_doSetFixedOutputState(AH);
3110 }
3111 
3112 /*
3113  * Become the specified user, and update state to avoid redundant commands
3114  *
3115  * NULL or empty argument is taken to mean restoring the session default
3116  */
3117 static void
_becomeUser(ArchiveHandle * AH,const char * user)3118 _becomeUser(ArchiveHandle *AH, const char *user)
3119 {
3120 	if (!user)
3121 		user = "";				/* avoid null pointers */
3122 
3123 	if (AH->currUser && strcmp(AH->currUser, user) == 0)
3124 		return;					/* no need to do anything */
3125 
3126 	_doSetSessionAuth(AH, user);
3127 
3128 	/*
3129 	 * NOTE: currUser keeps track of what the imaginary session user in our
3130 	 * script is
3131 	 */
3132 	if (AH->currUser)
3133 		free(AH->currUser);
3134 	AH->currUser = pg_strdup(user);
3135 }
3136 
3137 /*
3138  * Become the owner of the given TOC entry object.  If
3139  * changes in ownership are not allowed, this doesn't do anything.
3140  */
3141 static void
_becomeOwner(ArchiveHandle * AH,TocEntry * te)3142 _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3143 {
3144 	RestoreOptions *ropt = AH->public.ropt;
3145 
3146 	if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3147 		return;
3148 
3149 	_becomeUser(AH, te->owner);
3150 }
3151 
3152 
3153 /*
3154  * Set the proper default_with_oids value for the table.
3155  */
3156 static void
_setWithOids(ArchiveHandle * AH,TocEntry * te)3157 _setWithOids(ArchiveHandle *AH, TocEntry *te)
3158 {
3159 	if (AH->currWithOids != te->withOids)
3160 	{
3161 		_doSetWithOids(AH, te->withOids);
3162 		AH->currWithOids = te->withOids;
3163 	}
3164 }
3165 
3166 
3167 /*
3168  * Issue the commands to select the specified schema as the current schema
3169  * in the target database.
3170  */
3171 static void
_selectOutputSchema(ArchiveHandle * AH,const char * schemaName)3172 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3173 {
3174 	PQExpBuffer qry;
3175 
3176 	/*
3177 	 * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3178 	 * that search_path rather than switching to entry-specific paths.
3179 	 * Otherwise, it's an old archive that will not restore correctly unless
3180 	 * we set the search_path as it's expecting.
3181 	 */
3182 	if (AH->public.searchpath)
3183 		return;
3184 
3185 	if (!schemaName || *schemaName == '\0' ||
3186 		(AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3187 		return;					/* no need to do anything */
3188 
3189 	qry = createPQExpBuffer();
3190 
3191 	appendPQExpBuffer(qry, "SET search_path = %s",
3192 					  fmtId(schemaName));
3193 	if (strcmp(schemaName, "pg_catalog") != 0)
3194 		appendPQExpBufferStr(qry, ", pg_catalog");
3195 
3196 	if (RestoringToDB(AH))
3197 	{
3198 		PGresult   *res;
3199 
3200 		res = PQexec(AH->connection, qry->data);
3201 
3202 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3203 			warn_or_exit_horribly(AH, modulename,
3204 								  "could not set search_path to \"%s\": %s",
3205 								  schemaName, PQerrorMessage(AH->connection));
3206 
3207 		PQclear(res);
3208 	}
3209 	else
3210 		ahprintf(AH, "%s;\n\n", qry->data);
3211 
3212 	if (AH->currSchema)
3213 		free(AH->currSchema);
3214 	AH->currSchema = pg_strdup(schemaName);
3215 
3216 	destroyPQExpBuffer(qry);
3217 }
3218 
3219 /*
3220  * Issue the commands to select the specified tablespace as the current one
3221  * in the target database.
3222  */
3223 static void
_selectTablespace(ArchiveHandle * AH,const char * tablespace)3224 _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3225 {
3226 	RestoreOptions *ropt = AH->public.ropt;
3227 	PQExpBuffer qry;
3228 	const char *want,
3229 			   *have;
3230 
3231 	/* do nothing in --no-tablespaces mode */
3232 	if (ropt->noTablespace)
3233 		return;
3234 
3235 	have = AH->currTablespace;
3236 	want = tablespace;
3237 
3238 	/* no need to do anything for non-tablespace object */
3239 	if (!want)
3240 		return;
3241 
3242 	if (have && strcmp(want, have) == 0)
3243 		return;					/* no need to do anything */
3244 
3245 	qry = createPQExpBuffer();
3246 
3247 	if (strcmp(want, "") == 0)
3248 	{
3249 		/* We want the tablespace to be the database's default */
3250 		appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3251 	}
3252 	else
3253 	{
3254 		/* We want an explicit tablespace */
3255 		appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3256 	}
3257 
3258 	if (RestoringToDB(AH))
3259 	{
3260 		PGresult   *res;
3261 
3262 		res = PQexec(AH->connection, qry->data);
3263 
3264 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3265 			warn_or_exit_horribly(AH, modulename,
3266 								"could not set default_tablespace to %s: %s",
3267 								fmtId(want), PQerrorMessage(AH->connection));
3268 
3269 		PQclear(res);
3270 	}
3271 	else
3272 		ahprintf(AH, "%s;\n\n", qry->data);
3273 
3274 	if (AH->currTablespace)
3275 		free(AH->currTablespace);
3276 	AH->currTablespace = pg_strdup(want);
3277 
3278 	destroyPQExpBuffer(qry);
3279 }
3280 
3281 /*
3282  * Extract an object description for a TOC entry, and append it to buf.
3283  *
3284  * This is used for ALTER ... OWNER TO.
3285  */
3286 static void
_getObjectDescription(PQExpBuffer buf,TocEntry * te,ArchiveHandle * AH)3287 _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
3288 {
3289 	const char *type = te->desc;
3290 
3291 	/* Use ALTER TABLE for views and sequences */
3292 	if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
3293 		strcmp(type, "MATERIALIZED VIEW") == 0)
3294 		type = "TABLE";
3295 
3296 	/* objects that don't require special decoration */
3297 	if (strcmp(type, "COLLATION") == 0 ||
3298 		strcmp(type, "CONVERSION") == 0 ||
3299 		strcmp(type, "DOMAIN") == 0 ||
3300 		strcmp(type, "TABLE") == 0 ||
3301 		strcmp(type, "TYPE") == 0 ||
3302 		strcmp(type, "FOREIGN TABLE") == 0 ||
3303 		strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3304 		strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3305 	/* non-schema-specified objects */
3306 		strcmp(type, "DATABASE") == 0 ||
3307 		strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3308 		strcmp(type, "SCHEMA") == 0 ||
3309 		strcmp(type, "EVENT TRIGGER") == 0 ||
3310 		strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3311 		strcmp(type, "SERVER") == 0 ||
3312 		strcmp(type, "USER MAPPING") == 0)
3313 	{
3314 		appendPQExpBuffer(buf, "%s ", type);
3315 		if (te->namespace && *te->namespace)
3316 			appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3317 		appendPQExpBufferStr(buf, fmtId(te->tag));
3318 		return;
3319 	}
3320 
3321 	/* BLOBs just have a name, but it's numeric so must not use fmtId */
3322 	if (strcmp(type, "BLOB") == 0)
3323 	{
3324 		appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3325 		return;
3326 	}
3327 
3328 	/*
3329 	 * These object types require additional decoration.  Fortunately, the
3330 	 * information needed is exactly what's in the DROP command.
3331 	 */
3332 	if (strcmp(type, "AGGREGATE") == 0 ||
3333 		strcmp(type, "FUNCTION") == 0 ||
3334 		strcmp(type, "OPERATOR") == 0 ||
3335 		strcmp(type, "OPERATOR CLASS") == 0 ||
3336 		strcmp(type, "OPERATOR FAMILY") == 0)
3337 	{
3338 		/* Chop "DROP " off the front and make a modifiable copy */
3339 		char	   *first = pg_strdup(te->dropStmt + 5);
3340 		char	   *last;
3341 
3342 		/* point to last character in string */
3343 		last = first + strlen(first) - 1;
3344 
3345 		/* Strip off any ';' or '\n' at the end */
3346 		while (last >= first && (*last == '\n' || *last == ';'))
3347 			last--;
3348 		*(last + 1) = '\0';
3349 
3350 		appendPQExpBufferStr(buf, first);
3351 
3352 		free(first);
3353 		return;
3354 	}
3355 
3356 	write_msg(modulename, "WARNING: don't know how to set owner for object type \"%s\"\n",
3357 			  type);
3358 }
3359 
3360 /*
3361  * Emit the SQL commands to create the object represented by a TOC entry
3362  *
3363  * This now also includes issuing an ALTER OWNER command to restore the
3364  * object's ownership, if wanted.  But note that the object's permissions
3365  * will remain at default, until the matching ACL TOC entry is restored.
3366  */
3367 static void
_printTocEntry(ArchiveHandle * AH,TocEntry * te,bool isData)3368 _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3369 {
3370 	RestoreOptions *ropt = AH->public.ropt;
3371 
3372 	/*
3373 	 * Avoid dumping the public schema, as it will already be created ...
3374 	 * unless we are using --clean mode (and *not* --create mode), in which
3375 	 * case we've previously issued a DROP for it so we'd better recreate it.
3376 	 *
3377 	 * Likewise for its comment, if any.  (We could try issuing the COMMENT
3378 	 * command anyway; but it'd fail if the restore is done as non-super-user,
3379 	 * so let's not.)
3380 	 *
3381 	 * XXX it looks pretty ugly to hard-wire the public schema like this, but
3382 	 * it sits in a sort of no-mans-land between being a system object and a
3383 	 * user object, so it really is special in a way.
3384 	 */
3385 	if (!(ropt->dropSchema && !ropt->createDB))
3386 	{
3387 		if (strcmp(te->desc, "SCHEMA") == 0 &&
3388 			strcmp(te->tag, "public") == 0)
3389 			return;
3390 		if (strcmp(te->desc, "COMMENT") == 0 &&
3391 			strcmp(te->tag, "SCHEMA public") == 0)
3392 			return;
3393 	}
3394 
3395 	/* Select owner, schema, and tablespace as necessary */
3396 	_becomeOwner(AH, te);
3397 	_selectOutputSchema(AH, te->namespace);
3398 	_selectTablespace(AH, te->tablespace);
3399 
3400 	/* Set up OID mode too */
3401 	if (strcmp(te->desc, "TABLE") == 0)
3402 		_setWithOids(AH, te);
3403 
3404 	/* Emit header comment for item */
3405 	if (!AH->noTocComments)
3406 	{
3407 		const char *pfx;
3408 		char	   *sanitized_name;
3409 		char	   *sanitized_schema;
3410 		char	   *sanitized_owner;
3411 
3412 		if (isData)
3413 			pfx = "Data for ";
3414 		else
3415 			pfx = "";
3416 
3417 		ahprintf(AH, "--\n");
3418 		if (AH->public.verbose)
3419 		{
3420 			ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3421 					 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3422 			if (te->nDeps > 0)
3423 			{
3424 				int			i;
3425 
3426 				ahprintf(AH, "-- Dependencies:");
3427 				for (i = 0; i < te->nDeps; i++)
3428 					ahprintf(AH, " %d", te->dependencies[i]);
3429 				ahprintf(AH, "\n");
3430 			}
3431 		}
3432 
3433 		/*
3434 		 * Zap any line endings embedded in user-supplied fields, to prevent
3435 		 * corruption of the dump (which could, in the worst case, present an
3436 		 * SQL injection vulnerability if someone were to incautiously load a
3437 		 * dump containing objects with maliciously crafted names).
3438 		 */
3439 		sanitized_name = replace_line_endings(te->tag);
3440 		if (te->namespace)
3441 			sanitized_schema = replace_line_endings(te->namespace);
3442 		else
3443 			sanitized_schema = pg_strdup("-");
3444 		if (!ropt->noOwner)
3445 			sanitized_owner = replace_line_endings(te->owner);
3446 		else
3447 			sanitized_owner = pg_strdup("-");
3448 
3449 		ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3450 				 pfx, sanitized_name, te->desc, sanitized_schema,
3451 				 sanitized_owner);
3452 
3453 		free(sanitized_name);
3454 		free(sanitized_schema);
3455 		free(sanitized_owner);
3456 
3457 		if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3458 		{
3459 			char	   *sanitized_tablespace;
3460 
3461 			sanitized_tablespace = replace_line_endings(te->tablespace);
3462 			ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3463 			free(sanitized_tablespace);
3464 		}
3465 		ahprintf(AH, "\n");
3466 
3467 		if (AH->PrintExtraTocPtr !=NULL)
3468 			(*AH->PrintExtraTocPtr) (AH, te);
3469 		ahprintf(AH, "--\n\n");
3470 	}
3471 
3472 	/*
3473 	 * Actually print the definition.
3474 	 *
3475 	 * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
3476 	 * versions put into CREATE SCHEMA.  We have to do this when --no-owner
3477 	 * mode is selected.  This is ugly, but I see no other good way ...
3478 	 */
3479 	if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
3480 	{
3481 		ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3482 	}
3483 	else
3484 	{
3485 		if (strlen(te->defn) > 0)
3486 			ahprintf(AH, "%s\n\n", te->defn);
3487 	}
3488 
3489 	/*
3490 	 * If we aren't using SET SESSION AUTH to determine ownership, we must
3491 	 * instead issue an ALTER OWNER command.  We assume that anything without
3492 	 * a DROP command is not a separately ownable object.  All the categories
3493 	 * with DROP commands must appear in one list or the other.
3494 	 */
3495 	if (!ropt->noOwner && !ropt->use_setsessauth &&
3496 		strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
3497 	{
3498 		if (strcmp(te->desc, "AGGREGATE") == 0 ||
3499 			strcmp(te->desc, "BLOB") == 0 ||
3500 			strcmp(te->desc, "COLLATION") == 0 ||
3501 			strcmp(te->desc, "CONVERSION") == 0 ||
3502 			strcmp(te->desc, "DATABASE") == 0 ||
3503 			strcmp(te->desc, "DOMAIN") == 0 ||
3504 			strcmp(te->desc, "FUNCTION") == 0 ||
3505 			strcmp(te->desc, "OPERATOR") == 0 ||
3506 			strcmp(te->desc, "OPERATOR CLASS") == 0 ||
3507 			strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
3508 			strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
3509 			strcmp(te->desc, "SCHEMA") == 0 ||
3510 			strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3511 			strcmp(te->desc, "TABLE") == 0 ||
3512 			strcmp(te->desc, "TYPE") == 0 ||
3513 			strcmp(te->desc, "VIEW") == 0 ||
3514 			strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3515 			strcmp(te->desc, "SEQUENCE") == 0 ||
3516 			strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3517 			strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
3518 			strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
3519 			strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
3520 			strcmp(te->desc, "SERVER") == 0)
3521 		{
3522 			PQExpBuffer temp = createPQExpBuffer();
3523 
3524 			appendPQExpBufferStr(temp, "ALTER ");
3525 			_getObjectDescription(temp, te, AH);
3526 			appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
3527 			ahprintf(AH, "%s\n\n", temp->data);
3528 			destroyPQExpBuffer(temp);
3529 		}
3530 		else if (strcmp(te->desc, "CAST") == 0 ||
3531 				 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3532 				 strcmp(te->desc, "CONSTRAINT") == 0 ||
3533 				 strcmp(te->desc, "DEFAULT") == 0 ||
3534 				 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3535 				 strcmp(te->desc, "INDEX") == 0 ||
3536 				 strcmp(te->desc, "RULE") == 0 ||
3537 				 strcmp(te->desc, "TRIGGER") == 0 ||
3538 				 strcmp(te->desc, "ROW SECURITY") == 0 ||
3539 				 strcmp(te->desc, "POLICY") == 0 ||
3540 				 strcmp(te->desc, "USER MAPPING") == 0)
3541 		{
3542 			/* these object types don't have separate owners */
3543 		}
3544 		else
3545 		{
3546 			write_msg(modulename, "WARNING: don't know how to set owner for object type \"%s\"\n",
3547 					  te->desc);
3548 		}
3549 	}
3550 
3551 	/*
3552 	 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3553 	 * commands, so we can no longer assume we know the current auth setting.
3554 	 */
3555 	if (_tocEntryIsACL(te))
3556 	{
3557 		if (AH->currUser)
3558 			free(AH->currUser);
3559 		AH->currUser = NULL;
3560 	}
3561 }
3562 
3563 /*
3564  * Sanitize a string to be included in an SQL comment or TOC listing,
3565  * by replacing any newlines with spaces.
3566  * The result is a freshly malloc'd string.
3567  */
3568 static char *
replace_line_endings(const char * str)3569 replace_line_endings(const char *str)
3570 {
3571 	char	   *result;
3572 	char	   *s;
3573 
3574 	result = pg_strdup(str);
3575 
3576 	for (s = result; *s != '\0'; s++)
3577 	{
3578 		if (*s == '\n' || *s == '\r')
3579 			*s = ' ';
3580 	}
3581 
3582 	return result;
3583 }
3584 
3585 /*
3586  * Write the file header for a custom-format archive
3587  */
3588 void
WriteHead(ArchiveHandle * AH)3589 WriteHead(ArchiveHandle *AH)
3590 {
3591 	struct tm	crtm;
3592 
3593 	(*AH->WriteBufPtr) (AH, "PGDMP", 5);		/* Magic code */
3594 	(*AH->WriteBytePtr) (AH, AH->vmaj);
3595 	(*AH->WriteBytePtr) (AH, AH->vmin);
3596 	(*AH->WriteBytePtr) (AH, AH->vrev);
3597 	(*AH->WriteBytePtr) (AH, AH->intSize);
3598 	(*AH->WriteBytePtr) (AH, AH->offSize);
3599 	(*AH->WriteBytePtr) (AH, AH->format);
3600 	WriteInt(AH, AH->compression);
3601 	crtm = *localtime(&AH->createDate);
3602 	WriteInt(AH, crtm.tm_sec);
3603 	WriteInt(AH, crtm.tm_min);
3604 	WriteInt(AH, crtm.tm_hour);
3605 	WriteInt(AH, crtm.tm_mday);
3606 	WriteInt(AH, crtm.tm_mon);
3607 	WriteInt(AH, crtm.tm_year);
3608 	WriteInt(AH, crtm.tm_isdst);
3609 	WriteStr(AH, PQdb(AH->connection));
3610 	WriteStr(AH, AH->public.remoteVersionStr);
3611 	WriteStr(AH, PG_VERSION);
3612 }
3613 
3614 void
ReadHead(ArchiveHandle * AH)3615 ReadHead(ArchiveHandle *AH)
3616 {
3617 	int			fmt;
3618 
3619 	/*
3620 	 * If we haven't already read the header, do so.
3621 	 *
3622 	 * NB: this code must agree with _discoverArchiveFormat().  Maybe find a
3623 	 * way to unify the cases?
3624 	 */
3625 	if (!AH->readHeader)
3626 	{
3627 		char		tmpMag[7];
3628 
3629 		(*AH->ReadBufPtr) (AH, tmpMag, 5);
3630 
3631 		if (strncmp(tmpMag, "PGDMP", 5) != 0)
3632 			exit_horribly(modulename, "did not find magic string in file header\n");
3633 	}
3634 
3635 	AH->vmaj = (*AH->ReadBytePtr) (AH);
3636 	AH->vmin = (*AH->ReadBytePtr) (AH);
3637 
3638 	if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))	/* Version > 1.0 */
3639 		AH->vrev = (*AH->ReadBytePtr) (AH);
3640 	else
3641 		AH->vrev = 0;
3642 
3643 	AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
3644 
3645 	if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3646 		exit_horribly(modulename, "unsupported version (%d.%d) in file header\n",
3647 					  AH->vmaj, AH->vmin);
3648 
3649 	AH->intSize = (*AH->ReadBytePtr) (AH);
3650 	if (AH->intSize > 32)
3651 		exit_horribly(modulename, "sanity check on integer size (%lu) failed\n",
3652 					  (unsigned long) AH->intSize);
3653 
3654 	if (AH->intSize > sizeof(int))
3655 		write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
3656 
3657 	if (AH->version >= K_VERS_1_7)
3658 		AH->offSize = (*AH->ReadBytePtr) (AH);
3659 	else
3660 		AH->offSize = AH->intSize;
3661 
3662 	fmt = (*AH->ReadBytePtr) (AH);
3663 
3664 	if (AH->format != fmt)
3665 		exit_horribly(modulename, "expected format (%d) differs from format found in file (%d)\n",
3666 					  AH->format, fmt);
3667 
3668 	if (AH->version >= K_VERS_1_2)
3669 	{
3670 		if (AH->version < K_VERS_1_4)
3671 			AH->compression = (*AH->ReadBytePtr) (AH);
3672 		else
3673 			AH->compression = ReadInt(AH);
3674 	}
3675 	else
3676 		AH->compression = Z_DEFAULT_COMPRESSION;
3677 
3678 #ifndef HAVE_LIBZ
3679 	if (AH->compression != 0)
3680 		write_msg(modulename, "WARNING: archive is compressed, but this installation does not support compression -- no data will be available\n");
3681 #endif
3682 
3683 	if (AH->version >= K_VERS_1_4)
3684 	{
3685 		struct tm	crtm;
3686 
3687 		crtm.tm_sec = ReadInt(AH);
3688 		crtm.tm_min = ReadInt(AH);
3689 		crtm.tm_hour = ReadInt(AH);
3690 		crtm.tm_mday = ReadInt(AH);
3691 		crtm.tm_mon = ReadInt(AH);
3692 		crtm.tm_year = ReadInt(AH);
3693 		crtm.tm_isdst = ReadInt(AH);
3694 
3695 		/*
3696 		 * Newer versions of glibc have mktime() report failure if tm_isdst is
3697 		 * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
3698 		 * TZ=UTC.  This is problematic when restoring an archive under a
3699 		 * different timezone setting.  If we get a failure, try again with
3700 		 * tm_isdst set to -1 ("don't know").
3701 		 *
3702 		 * XXX with or without this hack, we reconstruct createDate
3703 		 * incorrectly when the prevailing timezone is different from
3704 		 * pg_dump's.  Next time we bump the archive version, we should flush
3705 		 * this representation and store a plain seconds-since-the-Epoch
3706 		 * timestamp instead.
3707 		 */
3708 		AH->createDate = mktime(&crtm);
3709 		if (AH->createDate == (time_t) -1)
3710 		{
3711 			crtm.tm_isdst = -1;
3712 			AH->createDate = mktime(&crtm);
3713 			if (AH->createDate == (time_t) -1)
3714 				write_msg(modulename,
3715 						  "WARNING: invalid creation date in header\n");
3716 		}
3717 	}
3718 
3719 	if (AH->version >= K_VERS_1_4)
3720 	{
3721 		AH->archdbname = ReadStr(AH);
3722 	}
3723 
3724 	if (AH->version >= K_VERS_1_10)
3725 	{
3726 		AH->archiveRemoteVersion = ReadStr(AH);
3727 		AH->archiveDumpVersion = ReadStr(AH);
3728 	}
3729 }
3730 
3731 
3732 /*
3733  * checkSeek
3734  *	  check to see if ftell/fseek can be performed.
3735  */
3736 bool
checkSeek(FILE * fp)3737 checkSeek(FILE *fp)
3738 {
3739 	pgoff_t		tpos;
3740 
3741 	/*
3742 	 * If pgoff_t is wider than long, we must have "real" fseeko and not an
3743 	 * emulation using fseek.  Otherwise report no seek capability.
3744 	 */
3745 #ifndef HAVE_FSEEKO
3746 	if (sizeof(pgoff_t) > sizeof(long))
3747 		return false;
3748 #endif
3749 
3750 	/* Check that ftello works on this file */
3751 	tpos = ftello(fp);
3752 	if (tpos < 0)
3753 		return false;
3754 
3755 	/*
3756 	 * Check that fseeko(SEEK_SET) works, too.  NB: we used to try to test
3757 	 * this with fseeko(fp, 0, SEEK_CUR).  But some platforms treat that as a
3758 	 * successful no-op even on files that are otherwise unseekable.
3759 	 */
3760 	if (fseeko(fp, tpos, SEEK_SET) != 0)
3761 		return false;
3762 
3763 	return true;
3764 }
3765 
3766 
3767 /*
3768  * dumpTimestamp
3769  */
3770 static void
dumpTimestamp(ArchiveHandle * AH,const char * msg,time_t tim)3771 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3772 {
3773 	char		buf[64];
3774 
3775 	if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3776 		ahprintf(AH, "-- %s %s\n\n", msg, buf);
3777 }
3778 
3779 /*
3780  * Main engine for parallel restore.
3781  *
3782  * Parallel restore is done in three phases.  In this first phase,
3783  * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
3784  * processed in the RESTORE_PASS_MAIN pass.  (In practice, that's all
3785  * PRE_DATA items other than ACLs.)  Entries we can't process now are
3786  * added to the pending_list for later phases to deal with.
3787  */
3788 static void
restore_toc_entries_prefork(ArchiveHandle * AH,TocEntry * pending_list)3789 restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
3790 {
3791 	bool		skipped_some;
3792 	TocEntry   *next_work_item;
3793 
3794 	ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
3795 
3796 	/* Adjust dependency information */
3797 	fix_dependencies(AH);
3798 
3799 	/*
3800 	 * Do all the early stuff in a single connection in the parent. There's no
3801 	 * great point in running it in parallel, in fact it will actually run
3802 	 * faster in a single connection because we avoid all the connection and
3803 	 * setup overhead.  Also, pre-9.2 pg_dump versions were not very good
3804 	 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
3805 	 * not risk trying to process them out-of-order.
3806 	 *
3807 	 * Stuff that we can't do immediately gets added to the pending_list.
3808 	 * Note: we don't yet filter out entries that aren't going to be restored.
3809 	 * They might participate in dependency chains connecting entries that
3810 	 * should be restored, so we treat them as live until we actually process
3811 	 * them.
3812 	 *
3813 	 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
3814 	 * before DATA items, and all DATA items before POST_DATA items.  That is
3815 	 * not certain to be true in older archives, though, and in any case use
3816 	 * of a list file would destroy that ordering (cf. SortTocFromFile).  So
3817 	 * this loop cannot assume that it holds.
3818 	 */
3819 	AH->restorePass = RESTORE_PASS_MAIN;
3820 	skipped_some = false;
3821 	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3822 	{
3823 		bool		do_now = true;
3824 
3825 		if (next_work_item->section != SECTION_PRE_DATA)
3826 		{
3827 			/* DATA and POST_DATA items are just ignored for now */
3828 			if (next_work_item->section == SECTION_DATA ||
3829 				next_work_item->section == SECTION_POST_DATA)
3830 			{
3831 				do_now = false;
3832 				skipped_some = true;
3833 			}
3834 			else
3835 			{
3836 				/*
3837 				 * SECTION_NONE items, such as comments, can be processed now
3838 				 * if we are still in the PRE_DATA part of the archive.  Once
3839 				 * we've skipped any items, we have to consider whether the
3840 				 * comment's dependencies are satisfied, so skip it for now.
3841 				 */
3842 				if (skipped_some)
3843 					do_now = false;
3844 			}
3845 		}
3846 
3847 		/*
3848 		 * Also skip items that need to be forced into later passes.  We need
3849 		 * not set skipped_some in this case, since by assumption no main-pass
3850 		 * items could depend on these.
3851 		 */
3852 		if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
3853 			do_now = false;
3854 
3855 		if (do_now)
3856 		{
3857 			/* OK, restore the item and update its dependencies */
3858 			ahlog(AH, 1, "processing item %d %s %s\n",
3859 				  next_work_item->dumpId,
3860 				  next_work_item->desc, next_work_item->tag);
3861 
3862 			(void) restore_toc_entry(AH, next_work_item, false);
3863 
3864 			/* Reduce dependencies, but don't move anything to ready_list */
3865 			reduce_dependencies(AH, next_work_item, NULL);
3866 		}
3867 		else
3868 		{
3869 			/* Nope, so add it to pending_list */
3870 			par_list_append(pending_list, next_work_item);
3871 		}
3872 	}
3873 
3874 	/*
3875 	 * Now close parent connection in prep for parallel steps.  We do this
3876 	 * mainly to ensure that we don't exceed the specified number of parallel
3877 	 * connections.
3878 	 */
3879 	DisconnectDatabase(&AH->public);
3880 
3881 	/* blow away any transient state from the old connection */
3882 	if (AH->currUser)
3883 		free(AH->currUser);
3884 	AH->currUser = NULL;
3885 	if (AH->currSchema)
3886 		free(AH->currSchema);
3887 	AH->currSchema = NULL;
3888 	if (AH->currTablespace)
3889 		free(AH->currTablespace);
3890 	AH->currTablespace = NULL;
3891 	AH->currWithOids = -1;
3892 }
3893 
3894 /*
3895  * Main engine for parallel restore.
3896  *
3897  * Parallel restore is done in three phases.  In this second phase,
3898  * we process entries by dispatching them to parallel worker children
3899  * (processes on Unix, threads on Windows), each of which connects
3900  * separately to the database.  Inter-entry dependencies are respected,
3901  * and so is the RestorePass multi-pass structure.  When we can no longer
3902  * make any entries ready to process, we exit.  Normally, there will be
3903  * nothing left to do; but if there is, the third phase will mop up.
3904  */
3905 static void
restore_toc_entries_parallel(ArchiveHandle * AH,ParallelState * pstate,TocEntry * pending_list)3906 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
3907 							 TocEntry *pending_list)
3908 {
3909 	TocEntry	ready_list;
3910 	TocEntry   *next_work_item;
3911 
3912 	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
3913 
3914 	/*
3915 	 * The pending_list contains all items that we need to restore.  Move all
3916 	 * items that are available to process immediately into the ready_list.
3917 	 * After this setup, the pending list is everything that needs to be done
3918 	 * but is blocked by one or more dependencies, while the ready list
3919 	 * contains items that have no remaining dependencies and are OK to
3920 	 * process in the current restore pass.
3921 	 */
3922 	par_list_header_init(&ready_list);
3923 	AH->restorePass = RESTORE_PASS_MAIN;
3924 	move_to_ready_list(pending_list, &ready_list, AH->restorePass);
3925 
3926 	/*
3927 	 * main parent loop
3928 	 *
3929 	 * Keep going until there is no worker still running AND there is no work
3930 	 * left to be done.  Note invariant: at top of loop, there should always
3931 	 * be at least one worker available to dispatch a job to.
3932 	 */
3933 	ahlog(AH, 1, "entering main parallel loop\n");
3934 
3935 	for (;;)
3936 	{
3937 		/* Look for an item ready to be dispatched to a worker */
3938 		next_work_item = get_next_work_item(AH, &ready_list, pstate);
3939 		if (next_work_item != NULL)
3940 		{
3941 			/* If not to be restored, don't waste time launching a worker */
3942 			if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
3943 			{
3944 				ahlog(AH, 1, "skipping item %d %s %s\n",
3945 					  next_work_item->dumpId,
3946 					  next_work_item->desc, next_work_item->tag);
3947 				/* Drop it from ready_list, and update its dependencies */
3948 				par_list_remove(next_work_item);
3949 				reduce_dependencies(AH, next_work_item, &ready_list);
3950 				/* Loop around to see if anything else can be dispatched */
3951 				continue;
3952 			}
3953 
3954 			ahlog(AH, 1, "launching item %d %s %s\n",
3955 				  next_work_item->dumpId,
3956 				  next_work_item->desc, next_work_item->tag);
3957 
3958 			/* Remove it from ready_list, and dispatch to some worker */
3959 			par_list_remove(next_work_item);
3960 
3961 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
3962 		}
3963 		else if (IsEveryWorkerIdle(pstate))
3964 		{
3965 			/*
3966 			 * Nothing is ready and no worker is running, so we're done with
3967 			 * the current pass or maybe with the whole process.
3968 			 */
3969 			if (AH->restorePass == RESTORE_PASS_LAST)
3970 				break;			/* No more parallel processing is possible */
3971 
3972 			/* Advance to next restore pass */
3973 			AH->restorePass++;
3974 			/* That probably allows some stuff to be made ready */
3975 			move_to_ready_list(pending_list, &ready_list, AH->restorePass);
3976 			/* Loop around to see if anything's now ready */
3977 			continue;
3978 		}
3979 		else
3980 		{
3981 			/*
3982 			 * We have nothing ready, but at least one child is working, so
3983 			 * wait for some subjob to finish.
3984 			 */
3985 		}
3986 
3987 		for (;;)
3988 		{
3989 			int			nTerm = 0;
3990 			int			ret_child;
3991 			int			work_status;
3992 
3993 			/*
3994 			 * In order to reduce dependencies as soon as possible and
3995 			 * especially to reap the status of workers who are working on
3996 			 * items that pending items depend on, we do a non-blocking check
3997 			 * for ended workers first.
3998 			 *
3999 			 * However, if we do not have any other work items currently that
4000 			 * workers can work on, we do not busy-loop here but instead
4001 			 * really wait for at least one worker to terminate. Hence we call
4002 			 * ListenToWorkers(..., ..., do_wait = true) in this case.
4003 			 */
4004 			ListenToWorkers(AH, pstate, !next_work_item);
4005 
4006 			while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
4007 			{
4008 				nTerm++;
4009 				mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
4010 			}
4011 
4012 			/*
4013 			 * We need to make sure that we have an idle worker before
4014 			 * re-running the loop. If nTerm > 0 we already have that (quick
4015 			 * check).
4016 			 */
4017 			if (nTerm > 0)
4018 				break;
4019 
4020 			/* if nobody terminated, explicitly check for an idle worker */
4021 			if (GetIdleWorker(pstate) != NO_SLOT)
4022 				break;
4023 
4024 			/*
4025 			 * If we have no idle worker, read the result of one or more
4026 			 * workers and loop the loop to call ReapWorkerStatus() on them.
4027 			 */
4028 			ListenToWorkers(AH, pstate, true);
4029 		}
4030 	}
4031 
4032 	/* There should now be nothing in ready_list. */
4033 	Assert(ready_list.par_next == &ready_list);
4034 
4035 	ahlog(AH, 1, "finished main parallel loop\n");
4036 }
4037 
4038 /*
4039  * Main engine for parallel restore.
4040  *
4041  * Parallel restore is done in three phases.  In this third phase,
4042  * we mop up any remaining TOC entries by processing them serially.
4043  * This phase normally should have nothing to do, but if we've somehow
4044  * gotten stuck due to circular dependencies or some such, this provides
4045  * at least some chance of completing the restore successfully.
4046  */
4047 static void
restore_toc_entries_postfork(ArchiveHandle * AH,TocEntry * pending_list)4048 restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4049 {
4050 	RestoreOptions *ropt = AH->public.ropt;
4051 	TocEntry   *te;
4052 
4053 	ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
4054 
4055 	/*
4056 	 * Now reconnect the single parent connection.
4057 	 */
4058 	ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4059 
4060 	/* re-establish fixed state */
4061 	_doSetFixedOutputState(AH);
4062 
4063 	/*
4064 	 * Make sure there is no work left due to, say, circular dependencies, or
4065 	 * some other pathological condition.  If so, do it in the single parent
4066 	 * connection.  We don't sweat about RestorePass ordering; it's likely we
4067 	 * already violated that.
4068 	 */
4069 	for (te = pending_list->par_next; te != pending_list; te = te->par_next)
4070 	{
4071 		ahlog(AH, 1, "processing missed item %d %s %s\n",
4072 			  te->dumpId, te->desc, te->tag);
4073 		(void) restore_toc_entry(AH, te, false);
4074 	}
4075 }
4076 
4077 /*
4078  * Check if te1 has an exclusive lock requirement for an item that te2 also
4079  * requires, whether or not te2's requirement is for an exclusive lock.
4080  */
4081 static bool
has_lock_conflicts(TocEntry * te1,TocEntry * te2)4082 has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4083 {
4084 	int			j,
4085 				k;
4086 
4087 	for (j = 0; j < te1->nLockDeps; j++)
4088 	{
4089 		for (k = 0; k < te2->nDeps; k++)
4090 		{
4091 			if (te1->lockDeps[j] == te2->dependencies[k])
4092 				return true;
4093 		}
4094 	}
4095 	return false;
4096 }
4097 
4098 
4099 /*
4100  * Initialize the header of a parallel-processing list.
4101  *
4102  * These are circular lists with a dummy TocEntry as header, just like the
4103  * main TOC list; but we use separate list links so that an entry can be in
4104  * the main TOC list as well as in a parallel-processing list.
4105  */
4106 static void
par_list_header_init(TocEntry * l)4107 par_list_header_init(TocEntry *l)
4108 {
4109 	l->par_prev = l->par_next = l;
4110 }
4111 
4112 /* Append te to the end of the parallel-processing list headed by l */
4113 static void
par_list_append(TocEntry * l,TocEntry * te)4114 par_list_append(TocEntry *l, TocEntry *te)
4115 {
4116 	te->par_prev = l->par_prev;
4117 	l->par_prev->par_next = te;
4118 	l->par_prev = te;
4119 	te->par_next = l;
4120 }
4121 
4122 /* Remove te from whatever parallel-processing list it's in */
4123 static void
par_list_remove(TocEntry * te)4124 par_list_remove(TocEntry *te)
4125 {
4126 	te->par_prev->par_next = te->par_next;
4127 	te->par_next->par_prev = te->par_prev;
4128 	te->par_prev = NULL;
4129 	te->par_next = NULL;
4130 }
4131 
4132 
4133 /*
4134  * Move all immediately-ready items from pending_list to ready_list.
4135  *
4136  * Items are considered ready if they have no remaining dependencies and
4137  * they belong in the current restore pass.  (See also reduce_dependencies,
4138  * which applies the same logic one-at-a-time.)
4139  */
4140 static void
move_to_ready_list(TocEntry * pending_list,TocEntry * ready_list,RestorePass pass)4141 move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
4142 				   RestorePass pass)
4143 {
4144 	TocEntry   *te;
4145 	TocEntry   *next_te;
4146 
4147 	for (te = pending_list->par_next; te != pending_list; te = next_te)
4148 	{
4149 		/* must save list link before possibly moving te to other list */
4150 		next_te = te->par_next;
4151 
4152 		if (te->depCount == 0 &&
4153 			_tocEntryRestorePass(te) == pass)
4154 		{
4155 			/* Remove it from pending_list ... */
4156 			par_list_remove(te);
4157 			/* ... and add to ready_list */
4158 			par_list_append(ready_list, te);
4159 		}
4160 	}
4161 }
4162 
4163 /*
4164  * Find the next work item (if any) that is capable of being run now.
4165  *
4166  * To qualify, the item must have no remaining dependencies
4167  * and no requirements for locks that are incompatible with
4168  * items currently running.  Items in the ready_list are known to have
4169  * no remaining dependencies, but we have to check for lock conflicts.
4170  *
4171  * Note that the returned item has *not* been removed from ready_list.
4172  * The caller must do that after successfully dispatching the item.
4173  *
4174  * pref_non_data is for an alternative selection algorithm that gives
4175  * preference to non-data items if there is already a data load running.
4176  * It is currently disabled.
4177  */
4178 static TocEntry *
get_next_work_item(ArchiveHandle * AH,TocEntry * ready_list,ParallelState * pstate)4179 get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
4180 				   ParallelState *pstate)
4181 {
4182 	bool		pref_non_data = false;	/* or get from AH->ropt */
4183 	TocEntry   *data_te = NULL;
4184 	TocEntry   *te;
4185 	int			i,
4186 				k;
4187 
4188 	/*
4189 	 * Bogus heuristics for pref_non_data
4190 	 */
4191 	if (pref_non_data)
4192 	{
4193 		int			count = 0;
4194 
4195 		for (k = 0; k < pstate->numWorkers; k++)
4196 			if (pstate->parallelSlot[k].args->te != NULL &&
4197 				pstate->parallelSlot[k].args->te->section == SECTION_DATA)
4198 				count++;
4199 		if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
4200 			pref_non_data = false;
4201 	}
4202 
4203 	/*
4204 	 * Search the ready_list until we find a suitable item.
4205 	 */
4206 	for (te = ready_list->par_next; te != ready_list; te = te->par_next)
4207 	{
4208 		bool		conflicts = false;
4209 
4210 		/*
4211 		 * Check to see if the item would need exclusive lock on something
4212 		 * that a currently running item also needs lock on, or vice versa. If
4213 		 * so, we don't want to schedule them together.
4214 		 */
4215 		for (i = 0; i < pstate->numWorkers && !conflicts; i++)
4216 		{
4217 			TocEntry   *running_te;
4218 
4219 			if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
4220 				continue;
4221 			running_te = pstate->parallelSlot[i].args->te;
4222 
4223 			if (has_lock_conflicts(te, running_te) ||
4224 				has_lock_conflicts(running_te, te))
4225 			{
4226 				conflicts = true;
4227 				break;
4228 			}
4229 		}
4230 
4231 		if (conflicts)
4232 			continue;
4233 
4234 		if (pref_non_data && te->section == SECTION_DATA)
4235 		{
4236 			if (data_te == NULL)
4237 				data_te = te;
4238 			continue;
4239 		}
4240 
4241 		/* passed all tests, so this item can run */
4242 		return te;
4243 	}
4244 
4245 	if (data_te != NULL)
4246 		return data_te;
4247 
4248 	ahlog(AH, 2, "no item ready\n");
4249 	return NULL;
4250 }
4251 
4252 
4253 /*
4254  * Restore a single TOC item in parallel with others
4255  *
4256  * this is run in the worker, i.e. in a thread (Windows) or a separate process
4257  * (everything else). A worker process executes several such work items during
4258  * a parallel backup or restore. Once we terminate here and report back that
4259  * our work is finished, the master process will assign us a new work item.
4260  */
4261 int
parallel_restore(ParallelArgs * args)4262 parallel_restore(ParallelArgs *args)
4263 {
4264 	ArchiveHandle *AH = args->AH;
4265 	TocEntry   *te = args->te;
4266 	int			status;
4267 
4268 	Assert(AH->connection != NULL);
4269 
4270 	/* Count only errors associated with this TOC entry */
4271 	AH->public.n_errors = 0;
4272 
4273 	/* Restore the TOC item */
4274 	status = restore_toc_entry(AH, te, true);
4275 
4276 	return status;
4277 }
4278 
4279 
4280 /*
4281  * Housekeeping to be done after a step has been parallel restored.
4282  *
4283  * Clear the appropriate slot, free all the extra memory we allocated,
4284  * update status, and reduce the dependency count of any dependent items.
4285  */
4286 static void
mark_work_done(ArchiveHandle * AH,TocEntry * ready_list,int worker,int status,ParallelState * pstate)4287 mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
4288 			   int worker, int status,
4289 			   ParallelState *pstate)
4290 {
4291 	TocEntry   *te = NULL;
4292 
4293 	te = pstate->parallelSlot[worker].args->te;
4294 
4295 	if (te == NULL)
4296 		exit_horribly(modulename, "could not find slot of finished worker\n");
4297 
4298 	ahlog(AH, 1, "finished item %d %s %s\n",
4299 		  te->dumpId, te->desc, te->tag);
4300 
4301 	if (status == WORKER_CREATE_DONE)
4302 		mark_create_done(AH, te);
4303 	else if (status == WORKER_INHIBIT_DATA)
4304 	{
4305 		inhibit_data_for_failed_table(AH, te);
4306 		AH->public.n_errors++;
4307 	}
4308 	else if (status == WORKER_IGNORED_ERRORS)
4309 		AH->public.n_errors++;
4310 	else if (status != 0)
4311 		exit_horribly(modulename, "worker process failed: exit code %d\n",
4312 					  status);
4313 
4314 	reduce_dependencies(AH, te, ready_list);
4315 }
4316 
4317 
4318 /*
4319  * Process the dependency information into a form useful for parallel restore.
4320  *
4321  * This function takes care of fixing up some missing or badly designed
4322  * dependencies, and then prepares subsidiary data structures that will be
4323  * used in the main parallel-restore logic, including:
4324  * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4325  * 2. We set up depCount fields that are the number of as-yet-unprocessed
4326  * dependencies for each TOC entry.
4327  *
4328  * We also identify locking dependencies so that we can avoid trying to
4329  * schedule conflicting items at the same time.
4330  */
4331 static void
fix_dependencies(ArchiveHandle * AH)4332 fix_dependencies(ArchiveHandle *AH)
4333 {
4334 	TocEntry   *te;
4335 	int			i;
4336 
4337 	/*
4338 	 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4339 	 * items are marked as not being in any parallel-processing list.
4340 	 */
4341 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4342 	{
4343 		te->depCount = te->nDeps;
4344 		te->revDeps = NULL;
4345 		te->nRevDeps = 0;
4346 		te->par_prev = NULL;
4347 		te->par_next = NULL;
4348 	}
4349 
4350 	/*
4351 	 * POST_DATA items that are shown as depending on a table need to be
4352 	 * re-pointed to depend on that table's data, instead.  This ensures they
4353 	 * won't get scheduled until the data has been loaded.
4354 	 */
4355 	repoint_table_dependencies(AH);
4356 
4357 	/*
4358 	 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4359 	 * COMMENTS to BLOBS.  Cope.  (We assume there's only one BLOBS and only
4360 	 * one BLOB COMMENTS in such files.)
4361 	 */
4362 	if (AH->version < K_VERS_1_11)
4363 	{
4364 		for (te = AH->toc->next; te != AH->toc; te = te->next)
4365 		{
4366 			if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4367 			{
4368 				TocEntry   *te2;
4369 
4370 				for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4371 				{
4372 					if (strcmp(te2->desc, "BLOBS") == 0)
4373 					{
4374 						te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4375 						te->dependencies[0] = te2->dumpId;
4376 						te->nDeps++;
4377 						te->depCount++;
4378 						break;
4379 					}
4380 				}
4381 				break;
4382 			}
4383 		}
4384 	}
4385 
4386 	/*
4387 	 * At this point we start to build the revDeps reverse-dependency arrays,
4388 	 * so all changes of dependencies must be complete.
4389 	 */
4390 
4391 	/*
4392 	 * Count the incoming dependencies for each item.  Also, it is possible
4393 	 * that the dependencies list items that are not in the archive at all
4394 	 * (that should not happen in 9.2 and later, but is highly likely in older
4395 	 * archives).  Subtract such items from the depCounts.
4396 	 */
4397 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4398 	{
4399 		for (i = 0; i < te->nDeps; i++)
4400 		{
4401 			DumpId		depid = te->dependencies[i];
4402 
4403 			if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4404 				AH->tocsByDumpId[depid]->nRevDeps++;
4405 			else
4406 				te->depCount--;
4407 		}
4408 	}
4409 
4410 	/*
4411 	 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4412 	 * it as a counter below.
4413 	 */
4414 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4415 	{
4416 		if (te->nRevDeps > 0)
4417 			te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4418 		te->nRevDeps = 0;
4419 	}
4420 
4421 	/*
4422 	 * Build the revDeps[] arrays of incoming-dependency dumpIds.  This had
4423 	 * better agree with the loops above.
4424 	 */
4425 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4426 	{
4427 		for (i = 0; i < te->nDeps; i++)
4428 		{
4429 			DumpId		depid = te->dependencies[i];
4430 
4431 			if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4432 			{
4433 				TocEntry   *otherte = AH->tocsByDumpId[depid];
4434 
4435 				otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4436 			}
4437 		}
4438 	}
4439 
4440 	/*
4441 	 * Lastly, work out the locking dependencies.
4442 	 */
4443 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4444 	{
4445 		te->lockDeps = NULL;
4446 		te->nLockDeps = 0;
4447 		identify_locking_dependencies(AH, te);
4448 	}
4449 }
4450 
4451 /*
4452  * Change dependencies on table items to depend on table data items instead,
4453  * but only in POST_DATA items.
4454  */
4455 static void
repoint_table_dependencies(ArchiveHandle * AH)4456 repoint_table_dependencies(ArchiveHandle *AH)
4457 {
4458 	TocEntry   *te;
4459 	int			i;
4460 	DumpId		olddep;
4461 
4462 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4463 	{
4464 		if (te->section != SECTION_POST_DATA)
4465 			continue;
4466 		for (i = 0; i < te->nDeps; i++)
4467 		{
4468 			olddep = te->dependencies[i];
4469 			if (olddep <= AH->maxDumpId &&
4470 				AH->tableDataId[olddep] != 0)
4471 			{
4472 				te->dependencies[i] = AH->tableDataId[olddep];
4473 				ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
4474 					  te->dumpId, olddep, AH->tableDataId[olddep]);
4475 			}
4476 		}
4477 	}
4478 }
4479 
4480 /*
4481  * Identify which objects we'll need exclusive lock on in order to restore
4482  * the given TOC entry (*other* than the one identified by the TOC entry
4483  * itself).  Record their dump IDs in the entry's lockDeps[] array.
4484  */
4485 static void
identify_locking_dependencies(ArchiveHandle * AH,TocEntry * te)4486 identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4487 {
4488 	DumpId	   *lockids;
4489 	int			nlockids;
4490 	int			i;
4491 
4492 	/*
4493 	 * We only care about this for POST_DATA items.  PRE_DATA items are not
4494 	 * run in parallel, and DATA items are all independent by assumption.
4495 	 */
4496 	if (te->section != SECTION_POST_DATA)
4497 		return;
4498 
4499 	/* Quick exit if no dependencies at all */
4500 	if (te->nDeps == 0)
4501 		return;
4502 
4503 	/*
4504 	 * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4505 	 * and hence require exclusive lock.  However, we know that CREATE INDEX
4506 	 * does not.  (Maybe someday index-creating CONSTRAINTs will fall in that
4507 	 * category too ... but today is not that day.)
4508 	 */
4509 	if (strcmp(te->desc, "INDEX") == 0)
4510 		return;
4511 
4512 	/*
4513 	 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4514 	 * item listed among its dependencies.  Originally all of these would have
4515 	 * been TABLE items, but repoint_table_dependencies would have repointed
4516 	 * them to the TABLE DATA items if those are present (which they might not
4517 	 * be, eg in a schema-only dump).  Note that all of the entries we are
4518 	 * processing here are POST_DATA; otherwise there might be a significant
4519 	 * difference between a dependency on a table and a dependency on its
4520 	 * data, so that closer analysis would be needed here.
4521 	 */
4522 	lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4523 	nlockids = 0;
4524 	for (i = 0; i < te->nDeps; i++)
4525 	{
4526 		DumpId		depid = te->dependencies[i];
4527 
4528 		if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4529 			((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4530 			 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4531 			lockids[nlockids++] = depid;
4532 	}
4533 
4534 	if (nlockids == 0)
4535 	{
4536 		free(lockids);
4537 		return;
4538 	}
4539 
4540 	te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4541 	te->nLockDeps = nlockids;
4542 }
4543 
4544 /*
4545  * Remove the specified TOC entry from the depCounts of items that depend on
4546  * it, thereby possibly making them ready-to-run.  Any pending item that
4547  * becomes ready should be moved to the ready_list, if that's provided.
4548  */
4549 static void
reduce_dependencies(ArchiveHandle * AH,TocEntry * te,TocEntry * ready_list)4550 reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
4551 {
4552 	int			i;
4553 
4554 	ahlog(AH, 2, "reducing dependencies for %d\n", te->dumpId);
4555 
4556 	for (i = 0; i < te->nRevDeps; i++)
4557 	{
4558 		TocEntry   *otherte = AH->tocsByDumpId[te->revDeps[i]];
4559 
4560 		Assert(otherte->depCount > 0);
4561 		otherte->depCount--;
4562 
4563 		/*
4564 		 * It's ready if it has no remaining dependencies, and it belongs in
4565 		 * the current restore pass, and it is currently a member of the
4566 		 * pending list (that check is needed to prevent double restore in
4567 		 * some cases where a list-file forces out-of-order restoring).
4568 		 * However, if ready_list == NULL then caller doesn't want any list
4569 		 * memberships changed.
4570 		 */
4571 		if (otherte->depCount == 0 &&
4572 			_tocEntryRestorePass(otherte) == AH->restorePass &&
4573 			otherte->par_prev != NULL &&
4574 			ready_list != NULL)
4575 		{
4576 			/* Remove it from pending list ... */
4577 			par_list_remove(otherte);
4578 			/* ... and add to ready_list */
4579 			par_list_append(ready_list, otherte);
4580 		}
4581 	}
4582 }
4583 
4584 /*
4585  * Set the created flag on the DATA member corresponding to the given
4586  * TABLE member
4587  */
4588 static void
mark_create_done(ArchiveHandle * AH,TocEntry * te)4589 mark_create_done(ArchiveHandle *AH, TocEntry *te)
4590 {
4591 	if (AH->tableDataId[te->dumpId] != 0)
4592 	{
4593 		TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4594 
4595 		ted->created = true;
4596 	}
4597 }
4598 
4599 /*
4600  * Mark the DATA member corresponding to the given TABLE member
4601  * as not wanted
4602  */
4603 static void
inhibit_data_for_failed_table(ArchiveHandle * AH,TocEntry * te)4604 inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
4605 {
4606 	ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
4607 		  te->tag);
4608 
4609 	if (AH->tableDataId[te->dumpId] != 0)
4610 	{
4611 		TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4612 
4613 		ted->reqs = 0;
4614 	}
4615 }
4616 
4617 /*
4618  * Clone and de-clone routines used in parallel restoration.
4619  *
4620  * Enough of the structure is cloned to ensure that there is no
4621  * conflict between different threads each with their own clone.
4622  */
4623 ArchiveHandle *
CloneArchive(ArchiveHandle * AH)4624 CloneArchive(ArchiveHandle *AH)
4625 {
4626 	ArchiveHandle *clone;
4627 
4628 	/* Make a "flat" copy */
4629 	clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4630 	memcpy(clone, AH, sizeof(ArchiveHandle));
4631 
4632 	/* Handle format-independent fields */
4633 	memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4634 
4635 	/* The clone will have its own connection, so disregard connection state */
4636 	clone->connection = NULL;
4637 	clone->connCancel = NULL;
4638 	clone->currUser = NULL;
4639 	clone->currSchema = NULL;
4640 	clone->currTablespace = NULL;
4641 	clone->currWithOids = -1;
4642 
4643 	/* savedPassword must be local in case we change it while connecting */
4644 	if (clone->savedPassword)
4645 		clone->savedPassword = pg_strdup(clone->savedPassword);
4646 
4647 	/* clone has its own error count, too */
4648 	clone->public.n_errors = 0;
4649 
4650 	/*
4651 	 * Connect our new clone object to the database, using the same connection
4652 	 * parameters used for the original connection.
4653 	 */
4654 	ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
4655 
4656 	/* re-establish fixed state */
4657 	if (AH->mode == archModeRead)
4658 		_doSetFixedOutputState(clone);
4659 	/* in write case, setupDumpWorker will fix up connection state */
4660 
4661 	/* Let the format-specific code have a chance too */
4662 	(clone->ClonePtr) (clone);
4663 
4664 	Assert(clone->connection != NULL);
4665 	return clone;
4666 }
4667 
4668 /*
4669  * Release clone-local storage.
4670  *
4671  * Note: we assume any clone-local connection was already closed.
4672  */
4673 void
DeCloneArchive(ArchiveHandle * AH)4674 DeCloneArchive(ArchiveHandle *AH)
4675 {
4676 	/* Should not have an open database connection */
4677 	Assert(AH->connection == NULL);
4678 
4679 	/* Clear format-specific state */
4680 	(AH->DeClonePtr) (AH);
4681 
4682 	/* Clear state allocated by CloneArchive */
4683 	if (AH->sqlparse.curCmd)
4684 		destroyPQExpBuffer(AH->sqlparse.curCmd);
4685 
4686 	/* Clear any connection-local state */
4687 	if (AH->currUser)
4688 		free(AH->currUser);
4689 	if (AH->currSchema)
4690 		free(AH->currSchema);
4691 	if (AH->currTablespace)
4692 		free(AH->currTablespace);
4693 	if (AH->savedPassword)
4694 		free(AH->savedPassword);
4695 
4696 	free(AH);
4697 }
4698