1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  *	Private implementation of the archiver routines.
6  *
7  *	See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  *	Rights are granted to use this software in any way so long
11  *	as this notice is not removed.
12  *
13  *	The author is not responsible for loss or damages that may
14  *	result from its use.
15  *
16  *
17  * IDENTIFICATION
18  *		src/bin/pg_dump/pg_backup_archiver.c
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres_fe.h"
23 
24 #include <ctype.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28 #include <sys/wait.h>
29 #ifdef WIN32
30 #include <io.h>
31 #endif
32 
33 #include "dumputils.h"
34 #include "fe_utils/string_utils.h"
35 #include "libpq/libpq-fs.h"
36 #include "parallel.h"
37 #include "pg_backup_archiver.h"
38 #include "pg_backup_db.h"
39 #include "pg_backup_utils.h"
40 
41 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
42 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
43 
44 /* state needed to save/restore an archive's output target */
45 typedef struct _outputContext
46 {
47 	void	   *OF;
48 	int			gzOut;
49 } OutputContext;
50 
51 /*
52  * State for tracking TocEntrys that are ready to process during a parallel
53  * restore.  (This used to be a list, and we still call it that, though now
54  * it's really an array so that we can apply qsort to it.)
55  *
56  * tes[] is sized large enough that we can't overrun it.
57  * The valid entries are indexed first_te .. last_te inclusive.
58  * We periodically sort the array to bring larger-by-dataLength entries to
59  * the front; "sorted" is true if the valid entries are known sorted.
60  */
61 typedef struct _parallelReadyList
62 {
63 	TocEntry  **tes;			/* Ready-to-dump TocEntrys */
64 	int			first_te;		/* index of first valid entry in tes[] */
65 	int			last_te;		/* index of last valid entry in tes[] */
66 	bool		sorted;			/* are valid entries currently sorted? */
67 } ParallelReadyList;
68 
69 
70 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
71 							   const int compression, bool dosync, ArchiveMode mode,
72 							   SetupWorkerPtrType setupWorkerPtr);
73 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
74 								  ArchiveHandle *AH);
75 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
76 static char *sanitize_line(const char *str, bool want_hyphen);
77 static void _doSetFixedOutputState(ArchiveHandle *AH);
78 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
79 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
80 static void _becomeUser(ArchiveHandle *AH, const char *user);
81 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
82 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
83 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
84 static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
85 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
86 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
87 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
88 static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
89 static RestorePass _tocEntryRestorePass(TocEntry *te);
90 static bool _tocEntryIsACL(TocEntry *te);
91 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
92 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
93 static void buildTocEntryArrays(ArchiveHandle *AH);
94 static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
95 static int	_discoverArchiveFormat(ArchiveHandle *AH);
96 
97 static int	RestoringToDB(ArchiveHandle *AH);
98 static void dump_lo_buf(ArchiveHandle *AH);
99 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
100 static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
101 static OutputContext SaveOutput(ArchiveHandle *AH);
102 static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
103 
104 static int	restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
105 static void restore_toc_entries_prefork(ArchiveHandle *AH,
106 										TocEntry *pending_list);
107 static void restore_toc_entries_parallel(ArchiveHandle *AH,
108 										 ParallelState *pstate,
109 										 TocEntry *pending_list);
110 static void restore_toc_entries_postfork(ArchiveHandle *AH,
111 										 TocEntry *pending_list);
112 static void pending_list_header_init(TocEntry *l);
113 static void pending_list_append(TocEntry *l, TocEntry *te);
114 static void pending_list_remove(TocEntry *te);
115 static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
116 static void ready_list_free(ParallelReadyList *ready_list);
117 static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
118 static void ready_list_remove(ParallelReadyList *ready_list, int i);
119 static void ready_list_sort(ParallelReadyList *ready_list);
120 static int	TocEntrySizeCompare(const void *p1, const void *p2);
121 static void move_to_ready_list(TocEntry *pending_list,
122 							   ParallelReadyList *ready_list,
123 							   RestorePass pass);
124 static TocEntry *pop_next_work_item(ArchiveHandle *AH,
125 									ParallelReadyList *ready_list,
126 									ParallelState *pstate);
127 static void mark_dump_job_done(ArchiveHandle *AH,
128 							   TocEntry *te,
129 							   int status,
130 							   void *callback_data);
131 static void mark_restore_job_done(ArchiveHandle *AH,
132 								  TocEntry *te,
133 								  int status,
134 								  void *callback_data);
135 static void fix_dependencies(ArchiveHandle *AH);
136 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
137 static void repoint_table_dependencies(ArchiveHandle *AH);
138 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
139 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
140 								ParallelReadyList *ready_list);
141 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
142 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
143 
144 static void StrictNamesCheck(RestoreOptions *ropt);
145 
146 
147 /*
148  * Allocate a new DumpOptions block containing all default values.
149  */
150 DumpOptions *
NewDumpOptions(void)151 NewDumpOptions(void)
152 {
153 	DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
154 
155 	InitDumpOptions(opts);
156 	return opts;
157 }
158 
159 /*
160  * Initialize a DumpOptions struct to all default values
161  */
162 void
InitDumpOptions(DumpOptions * opts)163 InitDumpOptions(DumpOptions *opts)
164 {
165 	memset(opts, 0, sizeof(DumpOptions));
166 	/* set any fields that shouldn't default to zeroes */
167 	opts->include_everything = true;
168 	opts->cparams.promptPassword = TRI_DEFAULT;
169 	opts->dumpSections = DUMP_UNSECTIONED;
170 }
171 
172 /*
173  * Create a freshly allocated DumpOptions with options equivalent to those
174  * found in the given RestoreOptions.
175  */
176 DumpOptions *
dumpOptionsFromRestoreOptions(RestoreOptions * ropt)177 dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
178 {
179 	DumpOptions *dopt = NewDumpOptions();
180 
181 	/* this is the inverse of what's at the end of pg_dump.c's main() */
182 	dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
183 	dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
184 	dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
185 	dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
186 	dopt->cparams.promptPassword = ropt->cparams.promptPassword;
187 	dopt->outputClean = ropt->dropSchema;
188 	dopt->dataOnly = ropt->dataOnly;
189 	dopt->schemaOnly = ropt->schemaOnly;
190 	dopt->if_exists = ropt->if_exists;
191 	dopt->column_inserts = ropt->column_inserts;
192 	dopt->dumpSections = ropt->dumpSections;
193 	dopt->aclsSkip = ropt->aclsSkip;
194 	dopt->outputSuperuser = ropt->superuser;
195 	dopt->outputCreateDB = ropt->createDB;
196 	dopt->outputNoOwner = ropt->noOwner;
197 	dopt->outputNoTablespaces = ropt->noTablespace;
198 	dopt->disable_triggers = ropt->disable_triggers;
199 	dopt->use_setsessauth = ropt->use_setsessauth;
200 	dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
201 	dopt->dump_inserts = ropt->dump_inserts;
202 	dopt->no_comments = ropt->no_comments;
203 	dopt->no_publications = ropt->no_publications;
204 	dopt->no_security_labels = ropt->no_security_labels;
205 	dopt->no_subscriptions = ropt->no_subscriptions;
206 	dopt->lockWaitTimeout = ropt->lockWaitTimeout;
207 	dopt->include_everything = ropt->include_everything;
208 	dopt->enable_row_security = ropt->enable_row_security;
209 	dopt->sequence_data = ropt->sequence_data;
210 
211 	return dopt;
212 }
213 
214 
215 /*
216  *	Wrapper functions.
217  *
218  *	The objective it to make writing new formats and dumpers as simple
219  *	as possible, if necessary at the expense of extra function calls etc.
220  *
221  */
222 
223 /*
224  * The dump worker setup needs lots of knowledge of the internals of pg_dump,
225  * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
226  * setup doesn't need to know anything much, so it's defined here.
227  */
228 static void
setupRestoreWorker(Archive * AHX)229 setupRestoreWorker(Archive *AHX)
230 {
231 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
232 
233 	AH->ReopenPtr(AH);
234 }
235 
236 
237 /* Create a new archive */
238 /* Public */
239 Archive *
CreateArchive(const char * FileSpec,const ArchiveFormat fmt,const int compression,bool dosync,ArchiveMode mode,SetupWorkerPtrType setupDumpWorker)240 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
241 			  const int compression, bool dosync, ArchiveMode mode,
242 			  SetupWorkerPtrType setupDumpWorker)
243 
244 {
245 	ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, dosync,
246 								 mode, setupDumpWorker);
247 
248 	return (Archive *) AH;
249 }
250 
251 /* Open an existing archive */
252 /* Public */
253 Archive *
OpenArchive(const char * FileSpec,const ArchiveFormat fmt)254 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
255 {
256 	ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker);
257 
258 	return (Archive *) AH;
259 }
260 
261 /* Public */
262 void
CloseArchive(Archive * AHX)263 CloseArchive(Archive *AHX)
264 {
265 	int			res = 0;
266 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
267 
268 	AH->ClosePtr(AH);
269 
270 	/* Close the output */
271 	if (AH->gzOut)
272 		res = GZCLOSE(AH->OF);
273 	else if (AH->OF != stdout)
274 		res = fclose(AH->OF);
275 
276 	if (res != 0)
277 		fatal("could not close output file: %m");
278 }
279 
280 /* Public */
281 void
SetArchiveOptions(Archive * AH,DumpOptions * dopt,RestoreOptions * ropt)282 SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
283 {
284 	/* Caller can omit dump options, in which case we synthesize them */
285 	if (dopt == NULL && ropt != NULL)
286 		dopt = dumpOptionsFromRestoreOptions(ropt);
287 
288 	/* Save options for later access */
289 	AH->dopt = dopt;
290 	AH->ropt = ropt;
291 }
292 
293 /* Public */
294 void
ProcessArchiveRestoreOptions(Archive * AHX)295 ProcessArchiveRestoreOptions(Archive *AHX)
296 {
297 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
298 	RestoreOptions *ropt = AH->public.ropt;
299 	TocEntry   *te;
300 	teSection	curSection;
301 
302 	/* Decide which TOC entries will be dumped/restored, and mark them */
303 	curSection = SECTION_PRE_DATA;
304 	for (te = AH->toc->next; te != AH->toc; te = te->next)
305 	{
306 		/*
307 		 * When writing an archive, we also take this opportunity to check
308 		 * that we have generated the entries in a sane order that respects
309 		 * the section divisions.  When reading, don't complain, since buggy
310 		 * old versions of pg_dump might generate out-of-order archives.
311 		 */
312 		if (AH->mode != archModeRead)
313 		{
314 			switch (te->section)
315 			{
316 				case SECTION_NONE:
317 					/* ok to be anywhere */
318 					break;
319 				case SECTION_PRE_DATA:
320 					if (curSection != SECTION_PRE_DATA)
321 						pg_log_warning("archive items not in correct section order");
322 					break;
323 				case SECTION_DATA:
324 					if (curSection == SECTION_POST_DATA)
325 						pg_log_warning("archive items not in correct section order");
326 					break;
327 				case SECTION_POST_DATA:
328 					/* ok no matter which section we were in */
329 					break;
330 				default:
331 					fatal("unexpected section code %d",
332 						  (int) te->section);
333 					break;
334 			}
335 		}
336 
337 		if (te->section != SECTION_NONE)
338 			curSection = te->section;
339 
340 		te->reqs = _tocEntryRequired(te, curSection, AH);
341 	}
342 
343 	/* Enforce strict names checking */
344 	if (ropt->strict_names)
345 		StrictNamesCheck(ropt);
346 }
347 
348 /* Public */
349 void
RestoreArchive(Archive * AHX)350 RestoreArchive(Archive *AHX)
351 {
352 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
353 	RestoreOptions *ropt = AH->public.ropt;
354 	bool		parallel_mode;
355 	TocEntry   *te;
356 	OutputContext sav;
357 
358 	AH->stage = STAGE_INITIALIZING;
359 
360 	/*
361 	 * If we're going to do parallel restore, there are some restrictions.
362 	 */
363 	parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
364 	if (parallel_mode)
365 	{
366 		/* We haven't got round to making this work for all archive formats */
367 		if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
368 			fatal("parallel restore is not supported with this archive file format");
369 
370 		/* Doesn't work if the archive represents dependencies as OIDs */
371 		if (AH->version < K_VERS_1_8)
372 			fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
373 
374 		/*
375 		 * It's also not gonna work if we can't reopen the input file, so
376 		 * let's try that immediately.
377 		 */
378 		AH->ReopenPtr(AH);
379 	}
380 
381 	/*
382 	 * Make sure we won't need (de)compression we haven't got
383 	 */
384 #ifndef HAVE_LIBZ
385 	if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
386 	{
387 		for (te = AH->toc->next; te != AH->toc; te = te->next)
388 		{
389 			if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
390 				fatal("cannot restore from compressed archive (compression not supported in this installation)");
391 		}
392 	}
393 #endif
394 
395 	/*
396 	 * Prepare index arrays, so we can assume we have them throughout restore.
397 	 * It's possible we already did this, though.
398 	 */
399 	if (AH->tocsByDumpId == NULL)
400 		buildTocEntryArrays(AH);
401 
402 	/*
403 	 * If we're using a DB connection, then connect it.
404 	 */
405 	if (ropt->useDB)
406 	{
407 		pg_log_info("connecting to database for restore");
408 		if (AH->version < K_VERS_1_3)
409 			fatal("direct database connections are not supported in pre-1.3 archives");
410 
411 		/*
412 		 * We don't want to guess at whether the dump will successfully
413 		 * restore; allow the attempt regardless of the version of the restore
414 		 * target.
415 		 */
416 		AHX->minRemoteVersion = 0;
417 		AHX->maxRemoteVersion = 9999999;
418 
419 		ConnectDatabase(AHX, &ropt->cparams, false);
420 
421 		/*
422 		 * If we're talking to the DB directly, don't send comments since they
423 		 * obscure SQL when displaying errors
424 		 */
425 		AH->noTocComments = 1;
426 	}
427 
428 	/*
429 	 * Work out if we have an implied data-only restore. This can happen if
430 	 * the dump was data only or if the user has used a toc list to exclude
431 	 * all of the schema data. All we do is look for schema entries - if none
432 	 * are found then we set the dataOnly flag.
433 	 *
434 	 * We could scan for wanted TABLE entries, but that is not the same as
435 	 * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
436 	 */
437 	if (!ropt->dataOnly)
438 	{
439 		int			impliedDataOnly = 1;
440 
441 		for (te = AH->toc->next; te != AH->toc; te = te->next)
442 		{
443 			if ((te->reqs & REQ_SCHEMA) != 0)
444 			{					/* It's schema, and it's wanted */
445 				impliedDataOnly = 0;
446 				break;
447 			}
448 		}
449 		if (impliedDataOnly)
450 		{
451 			ropt->dataOnly = impliedDataOnly;
452 			pg_log_info("implied data-only restore");
453 		}
454 	}
455 
456 	/*
457 	 * Setup the output file if necessary.
458 	 */
459 	sav = SaveOutput(AH);
460 	if (ropt->filename || ropt->compression)
461 		SetOutput(AH, ropt->filename, ropt->compression);
462 
463 	ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
464 
465 	if (AH->archiveRemoteVersion)
466 		ahprintf(AH, "-- Dumped from database version %s\n",
467 				 AH->archiveRemoteVersion);
468 	if (AH->archiveDumpVersion)
469 		ahprintf(AH, "-- Dumped by pg_dump version %s\n",
470 				 AH->archiveDumpVersion);
471 
472 	ahprintf(AH, "\n");
473 
474 	if (AH->public.verbose)
475 		dumpTimestamp(AH, "Started on", AH->createDate);
476 
477 	if (ropt->single_txn)
478 	{
479 		if (AH->connection)
480 			StartTransaction(AHX);
481 		else
482 			ahprintf(AH, "BEGIN;\n\n");
483 	}
484 
485 	/*
486 	 * Establish important parameter values right away.
487 	 */
488 	_doSetFixedOutputState(AH);
489 
490 	AH->stage = STAGE_PROCESSING;
491 
492 	/*
493 	 * Drop the items at the start, in reverse order
494 	 */
495 	if (ropt->dropSchema)
496 	{
497 		for (te = AH->toc->prev; te != AH->toc; te = te->prev)
498 		{
499 			AH->currentTE = te;
500 
501 			/*
502 			 * In createDB mode, issue a DROP *only* for the database as a
503 			 * whole.  Issuing drops against anything else would be wrong,
504 			 * because at this point we're connected to the wrong database.
505 			 * (The DATABASE PROPERTIES entry, if any, should be treated like
506 			 * the DATABASE entry.)
507 			 */
508 			if (ropt->createDB)
509 			{
510 				if (strcmp(te->desc, "DATABASE") != 0 &&
511 					strcmp(te->desc, "DATABASE PROPERTIES") != 0)
512 					continue;
513 			}
514 
515 			/* Otherwise, drop anything that's selected and has a dropStmt */
516 			if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
517 			{
518 				pg_log_info("dropping %s %s", te->desc, te->tag);
519 				/* Select owner and schema as necessary */
520 				_becomeOwner(AH, te);
521 				_selectOutputSchema(AH, te->namespace);
522 
523 				/*
524 				 * Now emit the DROP command, if the object has one.  Note we
525 				 * don't necessarily emit it verbatim; at this point we add an
526 				 * appropriate IF EXISTS clause, if the user requested it.
527 				 */
528 				if (*te->dropStmt != '\0')
529 				{
530 					if (!ropt->if_exists)
531 					{
532 						/* No --if-exists?	Then just use the original */
533 						ahprintf(AH, "%s", te->dropStmt);
534 					}
535 					else
536 					{
537 						/*
538 						 * Inject an appropriate spelling of "if exists".  For
539 						 * large objects, we have a separate routine that
540 						 * knows how to do it, without depending on
541 						 * te->dropStmt; use that.  For other objects we need
542 						 * to parse the command.
543 						 */
544 						if (strncmp(te->desc, "BLOB", 4) == 0)
545 						{
546 							DropBlobIfExists(AH, te->catalogId.oid);
547 						}
548 						else
549 						{
550 							char	   *dropStmt = pg_strdup(te->dropStmt);
551 							char	   *dropStmtOrig = dropStmt;
552 							PQExpBuffer ftStmt = createPQExpBuffer();
553 
554 							/*
555 							 * Need to inject IF EXISTS clause after ALTER
556 							 * TABLE part in ALTER TABLE .. DROP statement
557 							 */
558 							if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
559 							{
560 								appendPQExpBufferStr(ftStmt,
561 													 "ALTER TABLE IF EXISTS");
562 								dropStmt = dropStmt + 11;
563 							}
564 
565 							/*
566 							 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
567 							 * not support the IF EXISTS clause, and therefore
568 							 * we simply emit the original command for DEFAULT
569 							 * objects (modulo the adjustment made above).
570 							 *
571 							 * Likewise, don't mess with DATABASE PROPERTIES.
572 							 *
573 							 * If we used CREATE OR REPLACE VIEW as a means of
574 							 * quasi-dropping an ON SELECT rule, that should
575 							 * be emitted unchanged as well.
576 							 *
577 							 * For other object types, we need to extract the
578 							 * first part of the DROP which includes the
579 							 * object type.  Most of the time this matches
580 							 * te->desc, so search for that; however for the
581 							 * different kinds of CONSTRAINTs, we know to
582 							 * search for hardcoded "DROP CONSTRAINT" instead.
583 							 */
584 							if (strcmp(te->desc, "DEFAULT") == 0 ||
585 								strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
586 								strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
587 								appendPQExpBufferStr(ftStmt, dropStmt);
588 							else
589 							{
590 								char		buffer[40];
591 								char	   *mark;
592 
593 								if (strcmp(te->desc, "CONSTRAINT") == 0 ||
594 									strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
595 									strcmp(te->desc, "FK CONSTRAINT") == 0)
596 									strcpy(buffer, "DROP CONSTRAINT");
597 								else
598 									snprintf(buffer, sizeof(buffer), "DROP %s",
599 											 te->desc);
600 
601 								mark = strstr(dropStmt, buffer);
602 
603 								if (mark)
604 								{
605 									*mark = '\0';
606 									appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
607 													  dropStmt, buffer,
608 													  mark + strlen(buffer));
609 								}
610 								else
611 								{
612 									/* complain and emit unmodified command */
613 									pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
614 												   dropStmtOrig);
615 									appendPQExpBufferStr(ftStmt, dropStmt);
616 								}
617 							}
618 
619 							ahprintf(AH, "%s", ftStmt->data);
620 
621 							destroyPQExpBuffer(ftStmt);
622 							pg_free(dropStmtOrig);
623 						}
624 					}
625 				}
626 			}
627 		}
628 
629 		/*
630 		 * _selectOutputSchema may have set currSchema to reflect the effect
631 		 * of a "SET search_path" command it emitted.  However, by now we may
632 		 * have dropped that schema; or it might not have existed in the first
633 		 * place.  In either case the effective value of search_path will not
634 		 * be what we think.  Forcibly reset currSchema so that we will
635 		 * re-establish the search_path setting when needed (after creating
636 		 * the schema).
637 		 *
638 		 * If we treated users as pg_dump'able objects then we'd need to reset
639 		 * currUser here too.
640 		 */
641 		if (AH->currSchema)
642 			free(AH->currSchema);
643 		AH->currSchema = NULL;
644 	}
645 
646 	if (parallel_mode)
647 	{
648 		/*
649 		 * In parallel mode, turn control over to the parallel-restore logic.
650 		 */
651 		ParallelState *pstate;
652 		TocEntry	pending_list;
653 
654 		/* The archive format module may need some setup for this */
655 		if (AH->PrepParallelRestorePtr)
656 			AH->PrepParallelRestorePtr(AH);
657 
658 		pending_list_header_init(&pending_list);
659 
660 		/* This runs PRE_DATA items and then disconnects from the database */
661 		restore_toc_entries_prefork(AH, &pending_list);
662 		Assert(AH->connection == NULL);
663 
664 		/* ParallelBackupStart() will actually fork the processes */
665 		pstate = ParallelBackupStart(AH);
666 		restore_toc_entries_parallel(AH, pstate, &pending_list);
667 		ParallelBackupEnd(AH, pstate);
668 
669 		/* reconnect the master and see if we missed something */
670 		restore_toc_entries_postfork(AH, &pending_list);
671 		Assert(AH->connection != NULL);
672 	}
673 	else
674 	{
675 		/*
676 		 * In serial mode, process everything in three phases: normal items,
677 		 * then ACLs, then post-ACL items.  We might be able to skip one or
678 		 * both extra phases in some cases, eg data-only restores.
679 		 */
680 		bool		haveACL = false;
681 		bool		havePostACL = false;
682 
683 		for (te = AH->toc->next; te != AH->toc; te = te->next)
684 		{
685 			if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
686 				continue;		/* ignore if not to be dumped at all */
687 
688 			switch (_tocEntryRestorePass(te))
689 			{
690 				case RESTORE_PASS_MAIN:
691 					(void) restore_toc_entry(AH, te, false);
692 					break;
693 				case RESTORE_PASS_ACL:
694 					haveACL = true;
695 					break;
696 				case RESTORE_PASS_POST_ACL:
697 					havePostACL = true;
698 					break;
699 			}
700 		}
701 
702 		if (haveACL)
703 		{
704 			for (te = AH->toc->next; te != AH->toc; te = te->next)
705 			{
706 				if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
707 					_tocEntryRestorePass(te) == RESTORE_PASS_ACL)
708 					(void) restore_toc_entry(AH, te, false);
709 			}
710 		}
711 
712 		if (havePostACL)
713 		{
714 			for (te = AH->toc->next; te != AH->toc; te = te->next)
715 			{
716 				if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
717 					_tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
718 					(void) restore_toc_entry(AH, te, false);
719 			}
720 		}
721 	}
722 
723 	if (ropt->single_txn)
724 	{
725 		if (AH->connection)
726 			CommitTransaction(AHX);
727 		else
728 			ahprintf(AH, "COMMIT;\n\n");
729 	}
730 
731 	if (AH->public.verbose)
732 		dumpTimestamp(AH, "Completed on", time(NULL));
733 
734 	ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
735 
736 	/*
737 	 * Clean up & we're done.
738 	 */
739 	AH->stage = STAGE_FINALIZING;
740 
741 	if (ropt->filename || ropt->compression)
742 		RestoreOutput(AH, sav);
743 
744 	if (ropt->useDB)
745 		DisconnectDatabase(&AH->public);
746 }
747 
748 /*
749  * Restore a single TOC item.  Used in both parallel and non-parallel restore;
750  * is_parallel is true if we are in a worker child process.
751  *
752  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
753  * the parallel parent has to make the corresponding status update.
754  */
755 static int
restore_toc_entry(ArchiveHandle * AH,TocEntry * te,bool is_parallel)756 restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
757 {
758 	RestoreOptions *ropt = AH->public.ropt;
759 	int			status = WORKER_OK;
760 	teReqs		reqs;
761 	bool		defnDumped;
762 
763 	AH->currentTE = te;
764 
765 	/* Dump any relevant dump warnings to stderr */
766 	if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
767 	{
768 		if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
769 			pg_log_warning("warning from original dump file: %s", te->defn);
770 		else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
771 			pg_log_warning("warning from original dump file: %s", te->copyStmt);
772 	}
773 
774 	/* Work out what, if anything, we want from this entry */
775 	reqs = te->reqs;
776 
777 	defnDumped = false;
778 
779 	/*
780 	 * If it has a schema component that we want, then process that
781 	 */
782 	if ((reqs & REQ_SCHEMA) != 0)
783 	{
784 		/* Show namespace in log message if available */
785 		if (te->namespace)
786 			pg_log_info("creating %s \"%s.%s\"",
787 						te->desc, te->namespace, te->tag);
788 		else
789 			pg_log_info("creating %s \"%s\"",
790 						te->desc, te->tag);
791 
792 		_printTocEntry(AH, te, false);
793 		defnDumped = true;
794 
795 		if (strcmp(te->desc, "TABLE") == 0)
796 		{
797 			if (AH->lastErrorTE == te)
798 			{
799 				/*
800 				 * We failed to create the table. If
801 				 * --no-data-for-failed-tables was given, mark the
802 				 * corresponding TABLE DATA to be ignored.
803 				 *
804 				 * In the parallel case this must be done in the parent, so we
805 				 * just set the return value.
806 				 */
807 				if (ropt->noDataForFailedTables)
808 				{
809 					if (is_parallel)
810 						status = WORKER_INHIBIT_DATA;
811 					else
812 						inhibit_data_for_failed_table(AH, te);
813 				}
814 			}
815 			else
816 			{
817 				/*
818 				 * We created the table successfully.  Mark the corresponding
819 				 * TABLE DATA for possible truncation.
820 				 *
821 				 * In the parallel case this must be done in the parent, so we
822 				 * just set the return value.
823 				 */
824 				if (is_parallel)
825 					status = WORKER_CREATE_DONE;
826 				else
827 					mark_create_done(AH, te);
828 			}
829 		}
830 
831 		/*
832 		 * If we created a DB, connect to it.  Also, if we changed DB
833 		 * properties, reconnect to ensure that relevant GUC settings are
834 		 * applied to our session.
835 		 */
836 		if (strcmp(te->desc, "DATABASE") == 0 ||
837 			strcmp(te->desc, "DATABASE PROPERTIES") == 0)
838 		{
839 			pg_log_info("connecting to new database \"%s\"", te->tag);
840 			_reconnectToDB(AH, te->tag);
841 		}
842 	}
843 
844 	/*
845 	 * If it has a data component that we want, then process that
846 	 */
847 	if ((reqs & REQ_DATA) != 0)
848 	{
849 		/*
850 		 * hadDumper will be set if there is genuine data component for this
851 		 * node. Otherwise, we need to check the defn field for statements
852 		 * that need to be executed in data-only restores.
853 		 */
854 		if (te->hadDumper)
855 		{
856 			/*
857 			 * If we can output the data, then restore it.
858 			 */
859 			if (AH->PrintTocDataPtr != NULL)
860 			{
861 				_printTocEntry(AH, te, true);
862 
863 				if (strcmp(te->desc, "BLOBS") == 0 ||
864 					strcmp(te->desc, "BLOB COMMENTS") == 0)
865 				{
866 					pg_log_info("processing %s", te->desc);
867 
868 					_selectOutputSchema(AH, "pg_catalog");
869 
870 					/* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
871 					if (strcmp(te->desc, "BLOB COMMENTS") == 0)
872 						AH->outputKind = OUTPUT_OTHERDATA;
873 
874 					AH->PrintTocDataPtr(AH, te);
875 
876 					AH->outputKind = OUTPUT_SQLCMDS;
877 				}
878 				else
879 				{
880 					_disableTriggersIfNecessary(AH, te);
881 
882 					/* Select owner and schema as necessary */
883 					_becomeOwner(AH, te);
884 					_selectOutputSchema(AH, te->namespace);
885 
886 					pg_log_info("processing data for table \"%s.%s\"",
887 								te->namespace, te->tag);
888 
889 					/*
890 					 * In parallel restore, if we created the table earlier in
891 					 * the run then we wrap the COPY in a transaction and
892 					 * precede it with a TRUNCATE.  If archiving is not on
893 					 * this prevents WAL-logging the COPY.  This obtains a
894 					 * speedup similar to that from using single_txn mode in
895 					 * non-parallel restores.
896 					 */
897 					if (is_parallel && te->created)
898 					{
899 						/*
900 						 * Parallel restore is always talking directly to a
901 						 * server, so no need to see if we should issue BEGIN.
902 						 */
903 						StartTransaction(&AH->public);
904 
905 						/*
906 						 * If the server version is >= 8.4, make sure we issue
907 						 * TRUNCATE with ONLY so that child tables are not
908 						 * wiped.
909 						 */
910 						ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
911 								 (PQserverVersion(AH->connection) >= 80400 ?
912 								  "ONLY " : ""),
913 								 fmtQualifiedId(te->namespace, te->tag));
914 					}
915 
916 					/*
917 					 * If we have a copy statement, use it.
918 					 */
919 					if (te->copyStmt && strlen(te->copyStmt) > 0)
920 					{
921 						ahprintf(AH, "%s", te->copyStmt);
922 						AH->outputKind = OUTPUT_COPYDATA;
923 					}
924 					else
925 						AH->outputKind = OUTPUT_OTHERDATA;
926 
927 					AH->PrintTocDataPtr(AH, te);
928 
929 					/*
930 					 * Terminate COPY if needed.
931 					 */
932 					if (AH->outputKind == OUTPUT_COPYDATA &&
933 						RestoringToDB(AH))
934 						EndDBCopyMode(&AH->public, te->tag);
935 					AH->outputKind = OUTPUT_SQLCMDS;
936 
937 					/* close out the transaction started above */
938 					if (is_parallel && te->created)
939 						CommitTransaction(&AH->public);
940 
941 					_enableTriggersIfNecessary(AH, te);
942 				}
943 			}
944 		}
945 		else if (!defnDumped)
946 		{
947 			/* If we haven't already dumped the defn part, do so now */
948 			pg_log_info("executing %s %s", te->desc, te->tag);
949 			_printTocEntry(AH, te, false);
950 		}
951 	}
952 
953 	if (AH->public.n_errors > 0 && status == WORKER_OK)
954 		status = WORKER_IGNORED_ERRORS;
955 
956 	return status;
957 }
958 
959 /*
960  * Allocate a new RestoreOptions block.
961  * This is mainly so we can initialize it, but also for future expansion,
962  */
963 RestoreOptions *
NewRestoreOptions(void)964 NewRestoreOptions(void)
965 {
966 	RestoreOptions *opts;
967 
968 	opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
969 
970 	/* set any fields that shouldn't default to zeroes */
971 	opts->format = archUnknown;
972 	opts->cparams.promptPassword = TRI_DEFAULT;
973 	opts->dumpSections = DUMP_UNSECTIONED;
974 
975 	return opts;
976 }
977 
978 static void
_disableTriggersIfNecessary(ArchiveHandle * AH,TocEntry * te)979 _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
980 {
981 	RestoreOptions *ropt = AH->public.ropt;
982 
983 	/* This hack is only needed in a data-only restore */
984 	if (!ropt->dataOnly || !ropt->disable_triggers)
985 		return;
986 
987 	pg_log_info("disabling triggers for %s", te->tag);
988 
989 	/*
990 	 * Become superuser if possible, since they are the only ones who can
991 	 * disable constraint triggers.  If -S was not given, assume the initial
992 	 * user identity is a superuser.  (XXX would it be better to become the
993 	 * table owner?)
994 	 */
995 	_becomeUser(AH, ropt->superuser);
996 
997 	/*
998 	 * Disable them.
999 	 */
1000 	ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1001 			 fmtQualifiedId(te->namespace, te->tag));
1002 }
1003 
1004 static void
_enableTriggersIfNecessary(ArchiveHandle * AH,TocEntry * te)1005 _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1006 {
1007 	RestoreOptions *ropt = AH->public.ropt;
1008 
1009 	/* This hack is only needed in a data-only restore */
1010 	if (!ropt->dataOnly || !ropt->disable_triggers)
1011 		return;
1012 
1013 	pg_log_info("enabling triggers for %s", te->tag);
1014 
1015 	/*
1016 	 * Become superuser if possible, since they are the only ones who can
1017 	 * disable constraint triggers.  If -S was not given, assume the initial
1018 	 * user identity is a superuser.  (XXX would it be better to become the
1019 	 * table owner?)
1020 	 */
1021 	_becomeUser(AH, ropt->superuser);
1022 
1023 	/*
1024 	 * Enable them.
1025 	 */
1026 	ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1027 			 fmtQualifiedId(te->namespace, te->tag));
1028 }
1029 
1030 /*
1031  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1032  */
1033 
1034 /* Public */
1035 void
WriteData(Archive * AHX,const void * data,size_t dLen)1036 WriteData(Archive *AHX, const void *data, size_t dLen)
1037 {
1038 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1039 
1040 	if (!AH->currToc)
1041 		fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1042 
1043 	AH->WriteDataPtr(AH, data, dLen);
1044 }
1045 
1046 /*
1047  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1048  * repository for all metadata. But the name has stuck.
1049  *
1050  * The new entry is added to the Archive's TOC list.  Most callers can ignore
1051  * the result value because nothing else need be done, but a few want to
1052  * manipulate the TOC entry further.
1053  */
1054 
1055 /* Public */
1056 TocEntry *
ArchiveEntry(Archive * AHX,CatalogId catalogId,DumpId dumpId,ArchiveOpts * opts)1057 ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1058 			 ArchiveOpts *opts)
1059 {
1060 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1061 	TocEntry   *newToc;
1062 
1063 	newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1064 
1065 	AH->tocCount++;
1066 	if (dumpId > AH->maxDumpId)
1067 		AH->maxDumpId = dumpId;
1068 
1069 	newToc->prev = AH->toc->prev;
1070 	newToc->next = AH->toc;
1071 	AH->toc->prev->next = newToc;
1072 	AH->toc->prev = newToc;
1073 
1074 	newToc->catalogId = catalogId;
1075 	newToc->dumpId = dumpId;
1076 	newToc->section = opts->section;
1077 
1078 	newToc->tag = pg_strdup(opts->tag);
1079 	newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1080 	newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1081 	newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1082 	newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1083 	newToc->desc = pg_strdup(opts->description);
1084 	newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1085 	newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1086 	newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1087 
1088 	if (opts->nDeps > 0)
1089 	{
1090 		newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1091 		memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1092 		newToc->nDeps = opts->nDeps;
1093 	}
1094 	else
1095 	{
1096 		newToc->dependencies = NULL;
1097 		newToc->nDeps = 0;
1098 	}
1099 
1100 	newToc->dataDumper = opts->dumpFn;
1101 	newToc->dataDumperArg = opts->dumpArg;
1102 	newToc->hadDumper = opts->dumpFn ? true : false;
1103 
1104 	newToc->formatData = NULL;
1105 	newToc->dataLength = 0;
1106 
1107 	if (AH->ArchiveEntryPtr != NULL)
1108 		AH->ArchiveEntryPtr(AH, newToc);
1109 
1110 	return newToc;
1111 }
1112 
1113 /* Public */
1114 void
PrintTOCSummary(Archive * AHX)1115 PrintTOCSummary(Archive *AHX)
1116 {
1117 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1118 	RestoreOptions *ropt = AH->public.ropt;
1119 	TocEntry   *te;
1120 	teSection	curSection;
1121 	OutputContext sav;
1122 	const char *fmtName;
1123 	char		stamp_str[64];
1124 
1125 	sav = SaveOutput(AH);
1126 	if (ropt->filename)
1127 		SetOutput(AH, ropt->filename, 0 /* no compression */ );
1128 
1129 	if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1130 				 localtime(&AH->createDate)) == 0)
1131 		strcpy(stamp_str, "[unknown]");
1132 
1133 	ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1134 	ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
1135 			 sanitize_line(AH->archdbname, false),
1136 			 AH->tocCount, AH->compression);
1137 
1138 	switch (AH->format)
1139 	{
1140 		case archCustom:
1141 			fmtName = "CUSTOM";
1142 			break;
1143 		case archDirectory:
1144 			fmtName = "DIRECTORY";
1145 			break;
1146 		case archTar:
1147 			fmtName = "TAR";
1148 			break;
1149 		default:
1150 			fmtName = "UNKNOWN";
1151 	}
1152 
1153 	ahprintf(AH, ";     Dump Version: %d.%d-%d\n",
1154 			 ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1155 	ahprintf(AH, ";     Format: %s\n", fmtName);
1156 	ahprintf(AH, ";     Integer: %d bytes\n", (int) AH->intSize);
1157 	ahprintf(AH, ";     Offset: %d bytes\n", (int) AH->offSize);
1158 	if (AH->archiveRemoteVersion)
1159 		ahprintf(AH, ";     Dumped from database version: %s\n",
1160 				 AH->archiveRemoteVersion);
1161 	if (AH->archiveDumpVersion)
1162 		ahprintf(AH, ";     Dumped by pg_dump version: %s\n",
1163 				 AH->archiveDumpVersion);
1164 
1165 	ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1166 
1167 	curSection = SECTION_PRE_DATA;
1168 	for (te = AH->toc->next; te != AH->toc; te = te->next)
1169 	{
1170 		if (te->section != SECTION_NONE)
1171 			curSection = te->section;
1172 		if (ropt->verbose ||
1173 			(_tocEntryRequired(te, curSection, AH) & (REQ_SCHEMA | REQ_DATA)) != 0)
1174 		{
1175 			char	   *sanitized_name;
1176 			char	   *sanitized_schema;
1177 			char	   *sanitized_owner;
1178 
1179 			/*
1180 			 */
1181 			sanitized_name = sanitize_line(te->tag, false);
1182 			sanitized_schema = sanitize_line(te->namespace, true);
1183 			sanitized_owner = sanitize_line(te->owner, false);
1184 
1185 			ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1186 					 te->catalogId.tableoid, te->catalogId.oid,
1187 					 te->desc, sanitized_schema, sanitized_name,
1188 					 sanitized_owner);
1189 
1190 			free(sanitized_name);
1191 			free(sanitized_schema);
1192 			free(sanitized_owner);
1193 		}
1194 		if (ropt->verbose && te->nDeps > 0)
1195 		{
1196 			int			i;
1197 
1198 			ahprintf(AH, ";\tdepends on:");
1199 			for (i = 0; i < te->nDeps; i++)
1200 				ahprintf(AH, " %d", te->dependencies[i]);
1201 			ahprintf(AH, "\n");
1202 		}
1203 	}
1204 
1205 	/* Enforce strict names checking */
1206 	if (ropt->strict_names)
1207 		StrictNamesCheck(ropt);
1208 
1209 	if (ropt->filename)
1210 		RestoreOutput(AH, sav);
1211 }
1212 
1213 /***********
1214  * BLOB Archival
1215  ***********/
1216 
1217 /* Called by a dumper to signal start of a BLOB */
1218 int
StartBlob(Archive * AHX,Oid oid)1219 StartBlob(Archive *AHX, Oid oid)
1220 {
1221 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1222 
1223 	if (!AH->StartBlobPtr)
1224 		fatal("large-object output not supported in chosen format");
1225 
1226 	AH->StartBlobPtr(AH, AH->currToc, oid);
1227 
1228 	return 1;
1229 }
1230 
1231 /* Called by a dumper to signal end of a BLOB */
1232 int
EndBlob(Archive * AHX,Oid oid)1233 EndBlob(Archive *AHX, Oid oid)
1234 {
1235 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1236 
1237 	if (AH->EndBlobPtr)
1238 		AH->EndBlobPtr(AH, AH->currToc, oid);
1239 
1240 	return 1;
1241 }
1242 
1243 /**********
1244  * BLOB Restoration
1245  **********/
1246 
1247 /*
1248  * Called by a format handler before any blobs are restored
1249  */
1250 void
StartRestoreBlobs(ArchiveHandle * AH)1251 StartRestoreBlobs(ArchiveHandle *AH)
1252 {
1253 	RestoreOptions *ropt = AH->public.ropt;
1254 
1255 	if (!ropt->single_txn)
1256 	{
1257 		if (AH->connection)
1258 			StartTransaction(&AH->public);
1259 		else
1260 			ahprintf(AH, "BEGIN;\n\n");
1261 	}
1262 
1263 	AH->blobCount = 0;
1264 }
1265 
1266 /*
1267  * Called by a format handler after all blobs are restored
1268  */
1269 void
EndRestoreBlobs(ArchiveHandle * AH)1270 EndRestoreBlobs(ArchiveHandle *AH)
1271 {
1272 	RestoreOptions *ropt = AH->public.ropt;
1273 
1274 	if (!ropt->single_txn)
1275 	{
1276 		if (AH->connection)
1277 			CommitTransaction(&AH->public);
1278 		else
1279 			ahprintf(AH, "COMMIT;\n\n");
1280 	}
1281 
1282 	pg_log_info(ngettext("restored %d large object",
1283 						 "restored %d large objects",
1284 						 AH->blobCount),
1285 				AH->blobCount);
1286 }
1287 
1288 
1289 /*
1290  * Called by a format handler to initiate restoration of a blob
1291  */
1292 void
StartRestoreBlob(ArchiveHandle * AH,Oid oid,bool drop)1293 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
1294 {
1295 	bool		old_blob_style = (AH->version < K_VERS_1_12);
1296 	Oid			loOid;
1297 
1298 	AH->blobCount++;
1299 
1300 	/* Initialize the LO Buffer */
1301 	AH->lo_buf_used = 0;
1302 
1303 	pg_log_info("restoring large object with OID %u", oid);
1304 
1305 	/* With an old archive we must do drop and create logic here */
1306 	if (old_blob_style && drop)
1307 		DropBlobIfExists(AH, oid);
1308 
1309 	if (AH->connection)
1310 	{
1311 		if (old_blob_style)
1312 		{
1313 			loOid = lo_create(AH->connection, oid);
1314 			if (loOid == 0 || loOid != oid)
1315 				fatal("could not create large object %u: %s",
1316 					  oid, PQerrorMessage(AH->connection));
1317 		}
1318 		AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1319 		if (AH->loFd == -1)
1320 			fatal("could not open large object %u: %s",
1321 				  oid, PQerrorMessage(AH->connection));
1322 	}
1323 	else
1324 	{
1325 		if (old_blob_style)
1326 			ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1327 					 oid, INV_WRITE);
1328 		else
1329 			ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1330 					 oid, INV_WRITE);
1331 	}
1332 
1333 	AH->writingBlob = 1;
1334 }
1335 
1336 void
EndRestoreBlob(ArchiveHandle * AH,Oid oid)1337 EndRestoreBlob(ArchiveHandle *AH, Oid oid)
1338 {
1339 	if (AH->lo_buf_used > 0)
1340 	{
1341 		/* Write remaining bytes from the LO buffer */
1342 		dump_lo_buf(AH);
1343 	}
1344 
1345 	AH->writingBlob = 0;
1346 
1347 	if (AH->connection)
1348 	{
1349 		lo_close(AH->connection, AH->loFd);
1350 		AH->loFd = -1;
1351 	}
1352 	else
1353 	{
1354 		ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1355 	}
1356 }
1357 
1358 /***********
1359  * Sorting and Reordering
1360  ***********/
1361 
1362 void
SortTocFromFile(Archive * AHX)1363 SortTocFromFile(Archive *AHX)
1364 {
1365 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1366 	RestoreOptions *ropt = AH->public.ropt;
1367 	FILE	   *fh;
1368 	char		buf[100];
1369 	bool		incomplete_line;
1370 
1371 	/* Allocate space for the 'wanted' array, and init it */
1372 	ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1373 
1374 	/* Setup the file */
1375 	fh = fopen(ropt->tocFile, PG_BINARY_R);
1376 	if (!fh)
1377 		fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1378 
1379 	incomplete_line = false;
1380 	while (fgets(buf, sizeof(buf), fh) != NULL)
1381 	{
1382 		bool		prev_incomplete_line = incomplete_line;
1383 		int			buflen;
1384 		char	   *cmnt;
1385 		char	   *endptr;
1386 		DumpId		id;
1387 		TocEntry   *te;
1388 
1389 		/*
1390 		 * Some lines in the file might be longer than sizeof(buf).  This is
1391 		 * no problem, since we only care about the leading numeric ID which
1392 		 * can be at most a few characters; but we have to skip continuation
1393 		 * bufferloads when processing a long line.
1394 		 */
1395 		buflen = strlen(buf);
1396 		if (buflen > 0 && buf[buflen - 1] == '\n')
1397 			incomplete_line = false;
1398 		else
1399 			incomplete_line = true;
1400 		if (prev_incomplete_line)
1401 			continue;
1402 
1403 		/* Truncate line at comment, if any */
1404 		cmnt = strchr(buf, ';');
1405 		if (cmnt != NULL)
1406 			cmnt[0] = '\0';
1407 
1408 		/* Ignore if all blank */
1409 		if (strspn(buf, " \t\r\n") == strlen(buf))
1410 			continue;
1411 
1412 		/* Get an ID, check it's valid and not already seen */
1413 		id = strtol(buf, &endptr, 10);
1414 		if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
1415 			ropt->idWanted[id - 1])
1416 		{
1417 			pg_log_warning("line ignored: %s", buf);
1418 			continue;
1419 		}
1420 
1421 		/* Find TOC entry */
1422 		te = getTocEntryByDumpId(AH, id);
1423 		if (!te)
1424 			fatal("could not find entry for ID %d",
1425 				  id);
1426 
1427 		/* Mark it wanted */
1428 		ropt->idWanted[id - 1] = true;
1429 
1430 		/*
1431 		 * Move each item to the end of the list as it is selected, so that
1432 		 * they are placed in the desired order.  Any unwanted items will end
1433 		 * up at the front of the list, which may seem unintuitive but it's
1434 		 * what we need.  In an ordinary serial restore that makes no
1435 		 * difference, but in a parallel restore we need to mark unrestored
1436 		 * items' dependencies as satisfied before we start examining
1437 		 * restorable items.  Otherwise they could have surprising
1438 		 * side-effects on the order in which restorable items actually get
1439 		 * restored.
1440 		 */
1441 		_moveBefore(AH, AH->toc, te);
1442 	}
1443 
1444 	if (fclose(fh) != 0)
1445 		fatal("could not close TOC file: %m");
1446 }
1447 
1448 /**********************
1449  * 'Convenience functions that look like standard IO functions
1450  * for writing data when in dump mode.
1451  **********************/
1452 
1453 /* Public */
1454 void
archputs(const char * s,Archive * AH)1455 archputs(const char *s, Archive *AH)
1456 {
1457 	WriteData(AH, s, strlen(s));
1458 }
1459 
1460 /* Public */
1461 int
archprintf(Archive * AH,const char * fmt,...)1462 archprintf(Archive *AH, const char *fmt,...)
1463 {
1464 	int			save_errno = errno;
1465 	char	   *p;
1466 	size_t		len = 128;		/* initial assumption about buffer size */
1467 	size_t		cnt;
1468 
1469 	for (;;)
1470 	{
1471 		va_list		args;
1472 
1473 		/* Allocate work buffer. */
1474 		p = (char *) pg_malloc(len);
1475 
1476 		/* Try to format the data. */
1477 		errno = save_errno;
1478 		va_start(args, fmt);
1479 		cnt = pvsnprintf(p, len, fmt, args);
1480 		va_end(args);
1481 
1482 		if (cnt < len)
1483 			break;				/* success */
1484 
1485 		/* Release buffer and loop around to try again with larger len. */
1486 		free(p);
1487 		len = cnt;
1488 	}
1489 
1490 	WriteData(AH, p, cnt);
1491 	free(p);
1492 	return (int) cnt;
1493 }
1494 
1495 
1496 /*******************************
1497  * Stuff below here should be 'private' to the archiver routines
1498  *******************************/
1499 
1500 static void
SetOutput(ArchiveHandle * AH,const char * filename,int compression)1501 SetOutput(ArchiveHandle *AH, const char *filename, int compression)
1502 {
1503 	int			fn;
1504 
1505 	if (filename)
1506 	{
1507 		if (strcmp(filename, "-") == 0)
1508 			fn = fileno(stdout);
1509 		else
1510 			fn = -1;
1511 	}
1512 	else if (AH->FH)
1513 		fn = fileno(AH->FH);
1514 	else if (AH->fSpec)
1515 	{
1516 		fn = -1;
1517 		filename = AH->fSpec;
1518 	}
1519 	else
1520 		fn = fileno(stdout);
1521 
1522 	/* If compression explicitly requested, use gzopen */
1523 #ifdef HAVE_LIBZ
1524 	if (compression != 0)
1525 	{
1526 		char		fmode[14];
1527 
1528 		/* Don't use PG_BINARY_x since this is zlib */
1529 		sprintf(fmode, "wb%d", compression);
1530 		if (fn >= 0)
1531 			AH->OF = gzdopen(dup(fn), fmode);
1532 		else
1533 			AH->OF = gzopen(filename, fmode);
1534 		AH->gzOut = 1;
1535 	}
1536 	else
1537 #endif
1538 	{							/* Use fopen */
1539 		if (AH->mode == archModeAppend)
1540 		{
1541 			if (fn >= 0)
1542 				AH->OF = fdopen(dup(fn), PG_BINARY_A);
1543 			else
1544 				AH->OF = fopen(filename, PG_BINARY_A);
1545 		}
1546 		else
1547 		{
1548 			if (fn >= 0)
1549 				AH->OF = fdopen(dup(fn), PG_BINARY_W);
1550 			else
1551 				AH->OF = fopen(filename, PG_BINARY_W);
1552 		}
1553 		AH->gzOut = 0;
1554 	}
1555 
1556 	if (!AH->OF)
1557 	{
1558 		if (filename)
1559 			fatal("could not open output file \"%s\": %m", filename);
1560 		else
1561 			fatal("could not open output file: %m");
1562 	}
1563 }
1564 
1565 static OutputContext
SaveOutput(ArchiveHandle * AH)1566 SaveOutput(ArchiveHandle *AH)
1567 {
1568 	OutputContext sav;
1569 
1570 	sav.OF = AH->OF;
1571 	sav.gzOut = AH->gzOut;
1572 
1573 	return sav;
1574 }
1575 
1576 static void
RestoreOutput(ArchiveHandle * AH,OutputContext savedContext)1577 RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
1578 {
1579 	int			res;
1580 
1581 	if (AH->gzOut)
1582 		res = GZCLOSE(AH->OF);
1583 	else
1584 		res = fclose(AH->OF);
1585 
1586 	if (res != 0)
1587 		fatal("could not close output file: %m");
1588 
1589 	AH->gzOut = savedContext.gzOut;
1590 	AH->OF = savedContext.OF;
1591 }
1592 
1593 
1594 
1595 /*
1596  *	Print formatted text to the output file (usually stdout).
1597  */
1598 int
ahprintf(ArchiveHandle * AH,const char * fmt,...)1599 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1600 {
1601 	int			save_errno = errno;
1602 	char	   *p;
1603 	size_t		len = 128;		/* initial assumption about buffer size */
1604 	size_t		cnt;
1605 
1606 	for (;;)
1607 	{
1608 		va_list		args;
1609 
1610 		/* Allocate work buffer. */
1611 		p = (char *) pg_malloc(len);
1612 
1613 		/* Try to format the data. */
1614 		errno = save_errno;
1615 		va_start(args, fmt);
1616 		cnt = pvsnprintf(p, len, fmt, args);
1617 		va_end(args);
1618 
1619 		if (cnt < len)
1620 			break;				/* success */
1621 
1622 		/* Release buffer and loop around to try again with larger len. */
1623 		free(p);
1624 		len = cnt;
1625 	}
1626 
1627 	ahwrite(p, 1, cnt, AH);
1628 	free(p);
1629 	return (int) cnt;
1630 }
1631 
1632 /*
1633  * Single place for logic which says 'We are restoring to a direct DB connection'.
1634  */
1635 static int
RestoringToDB(ArchiveHandle * AH)1636 RestoringToDB(ArchiveHandle *AH)
1637 {
1638 	RestoreOptions *ropt = AH->public.ropt;
1639 
1640 	return (ropt && ropt->useDB && AH->connection);
1641 }
1642 
1643 /*
1644  * Dump the current contents of the LO data buffer while writing a BLOB
1645  */
1646 static void
dump_lo_buf(ArchiveHandle * AH)1647 dump_lo_buf(ArchiveHandle *AH)
1648 {
1649 	if (AH->connection)
1650 	{
1651 		size_t		res;
1652 
1653 		res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1654 		pg_log_debug(ngettext("wrote %lu byte of large object data (result = %lu)",
1655 							  "wrote %lu bytes of large object data (result = %lu)",
1656 							  AH->lo_buf_used),
1657 					 (unsigned long) AH->lo_buf_used, (unsigned long) res);
1658 		if (res != AH->lo_buf_used)
1659 			fatal("could not write to large object (result: %lu, expected: %lu)",
1660 				  (unsigned long) res, (unsigned long) AH->lo_buf_used);
1661 	}
1662 	else
1663 	{
1664 		PQExpBuffer buf = createPQExpBuffer();
1665 
1666 		appendByteaLiteralAHX(buf,
1667 							  (const unsigned char *) AH->lo_buf,
1668 							  AH->lo_buf_used,
1669 							  AH);
1670 
1671 		/* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1672 		AH->writingBlob = 0;
1673 		ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1674 		AH->writingBlob = 1;
1675 
1676 		destroyPQExpBuffer(buf);
1677 	}
1678 	AH->lo_buf_used = 0;
1679 }
1680 
1681 
1682 /*
1683  *	Write buffer to the output file (usually stdout). This is used for
1684  *	outputting 'restore' scripts etc. It is even possible for an archive
1685  *	format to create a custom output routine to 'fake' a restore if it
1686  *	wants to generate a script (see TAR output).
1687  */
1688 void
ahwrite(const void * ptr,size_t size,size_t nmemb,ArchiveHandle * AH)1689 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1690 {
1691 	int			bytes_written = 0;
1692 
1693 	if (AH->writingBlob)
1694 	{
1695 		size_t		remaining = size * nmemb;
1696 
1697 		while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1698 		{
1699 			size_t		avail = AH->lo_buf_size - AH->lo_buf_used;
1700 
1701 			memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1702 			ptr = (const void *) ((const char *) ptr + avail);
1703 			remaining -= avail;
1704 			AH->lo_buf_used += avail;
1705 			dump_lo_buf(AH);
1706 		}
1707 
1708 		memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1709 		AH->lo_buf_used += remaining;
1710 
1711 		bytes_written = size * nmemb;
1712 	}
1713 	else if (AH->gzOut)
1714 		bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
1715 	else if (AH->CustomOutPtr)
1716 		bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1717 
1718 	else
1719 	{
1720 		/*
1721 		 * If we're doing a restore, and it's direct to DB, and we're
1722 		 * connected then send it to the DB.
1723 		 */
1724 		if (RestoringToDB(AH))
1725 			bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1726 		else
1727 			bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
1728 	}
1729 
1730 	if (bytes_written != size * nmemb)
1731 		WRITE_ERROR_EXIT;
1732 }
1733 
1734 /* on some error, we may decide to go on... */
1735 void
warn_or_exit_horribly(ArchiveHandle * AH,const char * fmt,...)1736 warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1737 {
1738 	va_list		ap;
1739 
1740 	switch (AH->stage)
1741 	{
1742 
1743 		case STAGE_NONE:
1744 			/* Do nothing special */
1745 			break;
1746 
1747 		case STAGE_INITIALIZING:
1748 			if (AH->stage != AH->lastErrorStage)
1749 				pg_log_generic(PG_LOG_INFO, "while INITIALIZING:");
1750 			break;
1751 
1752 		case STAGE_PROCESSING:
1753 			if (AH->stage != AH->lastErrorStage)
1754 				pg_log_generic(PG_LOG_INFO, "while PROCESSING TOC:");
1755 			break;
1756 
1757 		case STAGE_FINALIZING:
1758 			if (AH->stage != AH->lastErrorStage)
1759 				pg_log_generic(PG_LOG_INFO, "while FINALIZING:");
1760 			break;
1761 	}
1762 	if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1763 	{
1764 		pg_log_generic(PG_LOG_INFO, "from TOC entry %d; %u %u %s %s %s",
1765 					   AH->currentTE->dumpId,
1766 					   AH->currentTE->catalogId.tableoid,
1767 					   AH->currentTE->catalogId.oid,
1768 					   AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1769 					   AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1770 					   AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1771 	}
1772 	AH->lastErrorStage = AH->stage;
1773 	AH->lastErrorTE = AH->currentTE;
1774 
1775 	va_start(ap, fmt);
1776 	pg_log_generic_v(PG_LOG_ERROR, fmt, ap);
1777 	va_end(ap);
1778 
1779 	if (AH->public.exit_on_error)
1780 		exit_nicely(1);
1781 	else
1782 		AH->public.n_errors++;
1783 }
1784 
1785 #ifdef NOT_USED
1786 
1787 static void
_moveAfter(ArchiveHandle * AH,TocEntry * pos,TocEntry * te)1788 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1789 {
1790 	/* Unlink te from list */
1791 	te->prev->next = te->next;
1792 	te->next->prev = te->prev;
1793 
1794 	/* and insert it after "pos" */
1795 	te->prev = pos;
1796 	te->next = pos->next;
1797 	pos->next->prev = te;
1798 	pos->next = te;
1799 }
1800 #endif
1801 
1802 static void
_moveBefore(ArchiveHandle * AH,TocEntry * pos,TocEntry * te)1803 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1804 {
1805 	/* Unlink te from list */
1806 	te->prev->next = te->next;
1807 	te->next->prev = te->prev;
1808 
1809 	/* and insert it before "pos" */
1810 	te->prev = pos->prev;
1811 	te->next = pos;
1812 	pos->prev->next = te;
1813 	pos->prev = te;
1814 }
1815 
1816 /*
1817  * Build index arrays for the TOC list
1818  *
1819  * This should be invoked only after we have created or read in all the TOC
1820  * items.
1821  *
1822  * The arrays are indexed by dump ID (so entry zero is unused).  Note that the
1823  * array entries run only up to maxDumpId.  We might see dependency dump IDs
1824  * beyond that (if the dump was partial); so always check the array bound
1825  * before trying to touch an array entry.
1826  */
1827 static void
buildTocEntryArrays(ArchiveHandle * AH)1828 buildTocEntryArrays(ArchiveHandle *AH)
1829 {
1830 	DumpId		maxDumpId = AH->maxDumpId;
1831 	TocEntry   *te;
1832 
1833 	AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1834 	AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1835 
1836 	for (te = AH->toc->next; te != AH->toc; te = te->next)
1837 	{
1838 		/* this check is purely paranoia, maxDumpId should be correct */
1839 		if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1840 			fatal("bad dumpId");
1841 
1842 		/* tocsByDumpId indexes all TOCs by their dump ID */
1843 		AH->tocsByDumpId[te->dumpId] = te;
1844 
1845 		/*
1846 		 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1847 		 * TOC entry that has a DATA item.  We compute this by reversing the
1848 		 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1849 		 * just one dependency and it is the TABLE item.
1850 		 */
1851 		if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1852 		{
1853 			DumpId		tableId = te->dependencies[0];
1854 
1855 			/*
1856 			 * The TABLE item might not have been in the archive, if this was
1857 			 * a data-only dump; but its dump ID should be less than its data
1858 			 * item's dump ID, so there should be a place for it in the array.
1859 			 */
1860 			if (tableId <= 0 || tableId > maxDumpId)
1861 				fatal("bad table dumpId for TABLE DATA item");
1862 
1863 			AH->tableDataId[tableId] = te->dumpId;
1864 		}
1865 	}
1866 }
1867 
1868 TocEntry *
getTocEntryByDumpId(ArchiveHandle * AH,DumpId id)1869 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1870 {
1871 	/* build index arrays if we didn't already */
1872 	if (AH->tocsByDumpId == NULL)
1873 		buildTocEntryArrays(AH);
1874 
1875 	if (id > 0 && id <= AH->maxDumpId)
1876 		return AH->tocsByDumpId[id];
1877 
1878 	return NULL;
1879 }
1880 
1881 teReqs
TocIDRequired(ArchiveHandle * AH,DumpId id)1882 TocIDRequired(ArchiveHandle *AH, DumpId id)
1883 {
1884 	TocEntry   *te = getTocEntryByDumpId(AH, id);
1885 
1886 	if (!te)
1887 		return 0;
1888 
1889 	return te->reqs;
1890 }
1891 
1892 size_t
WriteOffset(ArchiveHandle * AH,pgoff_t o,int wasSet)1893 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1894 {
1895 	int			off;
1896 
1897 	/* Save the flag */
1898 	AH->WriteBytePtr(AH, wasSet);
1899 
1900 	/* Write out pgoff_t smallest byte first, prevents endian mismatch */
1901 	for (off = 0; off < sizeof(pgoff_t); off++)
1902 	{
1903 		AH->WriteBytePtr(AH, o & 0xFF);
1904 		o >>= 8;
1905 	}
1906 	return sizeof(pgoff_t) + 1;
1907 }
1908 
1909 int
ReadOffset(ArchiveHandle * AH,pgoff_t * o)1910 ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1911 {
1912 	int			i;
1913 	int			off;
1914 	int			offsetFlg;
1915 
1916 	/* Initialize to zero */
1917 	*o = 0;
1918 
1919 	/* Check for old version */
1920 	if (AH->version < K_VERS_1_7)
1921 	{
1922 		/* Prior versions wrote offsets using WriteInt */
1923 		i = ReadInt(AH);
1924 		/* -1 means not set */
1925 		if (i < 0)
1926 			return K_OFFSET_POS_NOT_SET;
1927 		else if (i == 0)
1928 			return K_OFFSET_NO_DATA;
1929 
1930 		/* Cast to pgoff_t because it was written as an int. */
1931 		*o = (pgoff_t) i;
1932 		return K_OFFSET_POS_SET;
1933 	}
1934 
1935 	/*
1936 	 * Read the flag indicating the state of the data pointer. Check if valid
1937 	 * and die if not.
1938 	 *
1939 	 * This used to be handled by a negative or zero pointer, now we use an
1940 	 * extra byte specifically for the state.
1941 	 */
1942 	offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
1943 
1944 	switch (offsetFlg)
1945 	{
1946 		case K_OFFSET_POS_NOT_SET:
1947 		case K_OFFSET_NO_DATA:
1948 		case K_OFFSET_POS_SET:
1949 
1950 			break;
1951 
1952 		default:
1953 			fatal("unexpected data offset flag %d", offsetFlg);
1954 	}
1955 
1956 	/*
1957 	 * Read the bytes
1958 	 */
1959 	for (off = 0; off < AH->offSize; off++)
1960 	{
1961 		if (off < sizeof(pgoff_t))
1962 			*o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
1963 		else
1964 		{
1965 			if (AH->ReadBytePtr(AH) != 0)
1966 				fatal("file offset in dump file is too large");
1967 		}
1968 	}
1969 
1970 	return offsetFlg;
1971 }
1972 
1973 size_t
WriteInt(ArchiveHandle * AH,int i)1974 WriteInt(ArchiveHandle *AH, int i)
1975 {
1976 	int			b;
1977 
1978 	/*
1979 	 * This is a bit yucky, but I don't want to make the binary format very
1980 	 * dependent on representation, and not knowing much about it, I write out
1981 	 * a sign byte. If you change this, don't forget to change the file
1982 	 * version #, and modify ReadInt to read the new format AS WELL AS the old
1983 	 * formats.
1984 	 */
1985 
1986 	/* SIGN byte */
1987 	if (i < 0)
1988 	{
1989 		AH->WriteBytePtr(AH, 1);
1990 		i = -i;
1991 	}
1992 	else
1993 		AH->WriteBytePtr(AH, 0);
1994 
1995 	for (b = 0; b < AH->intSize; b++)
1996 	{
1997 		AH->WriteBytePtr(AH, i & 0xFF);
1998 		i >>= 8;
1999 	}
2000 
2001 	return AH->intSize + 1;
2002 }
2003 
2004 int
ReadInt(ArchiveHandle * AH)2005 ReadInt(ArchiveHandle *AH)
2006 {
2007 	int			res = 0;
2008 	int			bv,
2009 				b;
2010 	int			sign = 0;		/* Default positive */
2011 	int			bitShift = 0;
2012 
2013 	if (AH->version > K_VERS_1_0)
2014 		/* Read a sign byte */
2015 		sign = AH->ReadBytePtr(AH);
2016 
2017 	for (b = 0; b < AH->intSize; b++)
2018 	{
2019 		bv = AH->ReadBytePtr(AH) & 0xFF;
2020 		if (bv != 0)
2021 			res = res + (bv << bitShift);
2022 		bitShift += 8;
2023 	}
2024 
2025 	if (sign)
2026 		res = -res;
2027 
2028 	return res;
2029 }
2030 
2031 size_t
WriteStr(ArchiveHandle * AH,const char * c)2032 WriteStr(ArchiveHandle *AH, const char *c)
2033 {
2034 	size_t		res;
2035 
2036 	if (c)
2037 	{
2038 		int			len = strlen(c);
2039 
2040 		res = WriteInt(AH, len);
2041 		AH->WriteBufPtr(AH, c, len);
2042 		res += len;
2043 	}
2044 	else
2045 		res = WriteInt(AH, -1);
2046 
2047 	return res;
2048 }
2049 
2050 char *
ReadStr(ArchiveHandle * AH)2051 ReadStr(ArchiveHandle *AH)
2052 {
2053 	char	   *buf;
2054 	int			l;
2055 
2056 	l = ReadInt(AH);
2057 	if (l < 0)
2058 		buf = NULL;
2059 	else
2060 	{
2061 		buf = (char *) pg_malloc(l + 1);
2062 		AH->ReadBufPtr(AH, (void *) buf, l);
2063 
2064 		buf[l] = '\0';
2065 	}
2066 
2067 	return buf;
2068 }
2069 
2070 static int
_discoverArchiveFormat(ArchiveHandle * AH)2071 _discoverArchiveFormat(ArchiveHandle *AH)
2072 {
2073 	FILE	   *fh;
2074 	char		sig[6];			/* More than enough */
2075 	size_t		cnt;
2076 	int			wantClose = 0;
2077 
2078 	pg_log_debug("attempting to ascertain archive format");
2079 
2080 	if (AH->lookahead)
2081 		free(AH->lookahead);
2082 
2083 	AH->readHeader = 0;
2084 	AH->lookaheadSize = 512;
2085 	AH->lookahead = pg_malloc0(512);
2086 	AH->lookaheadLen = 0;
2087 	AH->lookaheadPos = 0;
2088 
2089 	if (AH->fSpec)
2090 	{
2091 		struct stat st;
2092 
2093 		wantClose = 1;
2094 
2095 		/*
2096 		 * Check if the specified archive is a directory. If so, check if
2097 		 * there's a "toc.dat" (or "toc.dat.gz") file in it.
2098 		 */
2099 		if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2100 		{
2101 			char		buf[MAXPGPATH];
2102 
2103 			if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
2104 				fatal("directory name too long: \"%s\"",
2105 					  AH->fSpec);
2106 			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2107 			{
2108 				AH->format = archDirectory;
2109 				return AH->format;
2110 			}
2111 
2112 #ifdef HAVE_LIBZ
2113 			if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
2114 				fatal("directory name too long: \"%s\"",
2115 					  AH->fSpec);
2116 			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2117 			{
2118 				AH->format = archDirectory;
2119 				return AH->format;
2120 			}
2121 #endif
2122 			fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2123 				  AH->fSpec);
2124 			fh = NULL;			/* keep compiler quiet */
2125 		}
2126 		else
2127 		{
2128 			fh = fopen(AH->fSpec, PG_BINARY_R);
2129 			if (!fh)
2130 				fatal("could not open input file \"%s\": %m", AH->fSpec);
2131 		}
2132 	}
2133 	else
2134 	{
2135 		fh = stdin;
2136 		if (!fh)
2137 			fatal("could not open input file: %m");
2138 	}
2139 
2140 	if ((cnt = fread(sig, 1, 5, fh)) != 5)
2141 	{
2142 		if (ferror(fh))
2143 			fatal("could not read input file: %m");
2144 		else
2145 			fatal("input file is too short (read %lu, expected 5)",
2146 				  (unsigned long) cnt);
2147 	}
2148 
2149 	/* Save it, just in case we need it later */
2150 	memcpy(&AH->lookahead[0], sig, 5);
2151 	AH->lookaheadLen = 5;
2152 
2153 	if (strncmp(sig, "PGDMP", 5) == 0)
2154 	{
2155 		/* It's custom format, stop here */
2156 		AH->format = archCustom;
2157 		AH->readHeader = 1;
2158 	}
2159 	else
2160 	{
2161 		/*
2162 		 * *Maybe* we have a tar archive format file or a text dump ... So,
2163 		 * read first 512 byte header...
2164 		 */
2165 		cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2166 		/* read failure is checked below */
2167 		AH->lookaheadLen += cnt;
2168 
2169 		if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2170 			(strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2171 			 strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2172 		{
2173 			/*
2174 			 * looks like it's probably a text format dump. so suggest they
2175 			 * try psql
2176 			 */
2177 			fatal("input file appears to be a text format dump. Please use psql.");
2178 		}
2179 
2180 		if (AH->lookaheadLen != 512)
2181 		{
2182 			if (feof(fh))
2183 				fatal("input file does not appear to be a valid archive (too short?)");
2184 			else
2185 				READ_ERROR_EXIT(fh);
2186 		}
2187 
2188 		if (!isValidTarHeader(AH->lookahead))
2189 			fatal("input file does not appear to be a valid archive");
2190 
2191 		AH->format = archTar;
2192 	}
2193 
2194 	/* Close the file if we opened it */
2195 	if (wantClose)
2196 	{
2197 		if (fclose(fh) != 0)
2198 			fatal("could not close input file: %m");
2199 		/* Forget lookahead, since we'll re-read header after re-opening */
2200 		AH->readHeader = 0;
2201 		AH->lookaheadLen = 0;
2202 	}
2203 
2204 	return AH->format;
2205 }
2206 
2207 
2208 /*
2209  * Allocate an archive handle
2210  */
2211 static ArchiveHandle *
_allocAH(const char * FileSpec,const ArchiveFormat fmt,const int compression,bool dosync,ArchiveMode mode,SetupWorkerPtrType setupWorkerPtr)2212 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2213 		 const int compression, bool dosync, ArchiveMode mode,
2214 		 SetupWorkerPtrType setupWorkerPtr)
2215 {
2216 	ArchiveHandle *AH;
2217 
2218 	pg_log_debug("allocating AH for %s, format %d", FileSpec, fmt);
2219 
2220 	AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2221 
2222 	AH->version = K_VERS_SELF;
2223 
2224 	/* initialize for backwards compatible string processing */
2225 	AH->public.encoding = 0;	/* PG_SQL_ASCII */
2226 	AH->public.std_strings = false;
2227 
2228 	/* sql error handling */
2229 	AH->public.exit_on_error = true;
2230 	AH->public.n_errors = 0;
2231 
2232 	AH->archiveDumpVersion = PG_VERSION;
2233 
2234 	AH->createDate = time(NULL);
2235 
2236 	AH->intSize = sizeof(int);
2237 	AH->offSize = sizeof(pgoff_t);
2238 	if (FileSpec)
2239 	{
2240 		AH->fSpec = pg_strdup(FileSpec);
2241 
2242 		/*
2243 		 * Not used; maybe later....
2244 		 *
2245 		 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2246 		 * i--) if (AH->workDir[i-1] == '/')
2247 		 */
2248 	}
2249 	else
2250 		AH->fSpec = NULL;
2251 
2252 	AH->currUser = NULL;		/* unknown */
2253 	AH->currSchema = NULL;		/* ditto */
2254 	AH->currTablespace = NULL;	/* ditto */
2255 	AH->currTableAm = NULL;		/* ditto */
2256 
2257 	AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2258 
2259 	AH->toc->next = AH->toc;
2260 	AH->toc->prev = AH->toc;
2261 
2262 	AH->mode = mode;
2263 	AH->compression = compression;
2264 	AH->dosync = dosync;
2265 
2266 	memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2267 
2268 	/* Open stdout with no compression for AH output handle */
2269 	AH->gzOut = 0;
2270 	AH->OF = stdout;
2271 
2272 	/*
2273 	 * On Windows, we need to use binary mode to read/write non-text files,
2274 	 * which include all archive formats as well as compressed plain text.
2275 	 * Force stdin/stdout into binary mode if that is what we are using.
2276 	 */
2277 #ifdef WIN32
2278 	if ((fmt != archNull || compression != 0) &&
2279 		(AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2280 	{
2281 		if (mode == archModeWrite)
2282 			_setmode(fileno(stdout), O_BINARY);
2283 		else
2284 			_setmode(fileno(stdin), O_BINARY);
2285 	}
2286 #endif
2287 
2288 	AH->SetupWorkerPtr = setupWorkerPtr;
2289 
2290 	if (fmt == archUnknown)
2291 		AH->format = _discoverArchiveFormat(AH);
2292 	else
2293 		AH->format = fmt;
2294 
2295 	switch (AH->format)
2296 	{
2297 		case archCustom:
2298 			InitArchiveFmt_Custom(AH);
2299 			break;
2300 
2301 		case archNull:
2302 			InitArchiveFmt_Null(AH);
2303 			break;
2304 
2305 		case archDirectory:
2306 			InitArchiveFmt_Directory(AH);
2307 			break;
2308 
2309 		case archTar:
2310 			InitArchiveFmt_Tar(AH);
2311 			break;
2312 
2313 		default:
2314 			fatal("unrecognized file format \"%d\"", fmt);
2315 	}
2316 
2317 	return AH;
2318 }
2319 
2320 /*
2321  * Write out all data (tables & blobs)
2322  */
2323 void
WriteDataChunks(ArchiveHandle * AH,ParallelState * pstate)2324 WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2325 {
2326 	TocEntry   *te;
2327 
2328 	if (pstate && pstate->numWorkers > 1)
2329 	{
2330 		/*
2331 		 * In parallel mode, this code runs in the master process.  We
2332 		 * construct an array of candidate TEs, then sort it into decreasing
2333 		 * size order, then dispatch each TE to a data-transfer worker.  By
2334 		 * dumping larger tables first, we avoid getting into a situation
2335 		 * where we're down to one job and it's big, losing parallelism.
2336 		 */
2337 		TocEntry  **tes;
2338 		int			ntes;
2339 
2340 		tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2341 		ntes = 0;
2342 		for (te = AH->toc->next; te != AH->toc; te = te->next)
2343 		{
2344 			/* Consider only TEs with dataDumper functions ... */
2345 			if (!te->dataDumper)
2346 				continue;
2347 			/* ... and ignore ones not enabled for dump */
2348 			if ((te->reqs & REQ_DATA) == 0)
2349 				continue;
2350 
2351 			tes[ntes++] = te;
2352 		}
2353 
2354 		if (ntes > 1)
2355 			qsort((void *) tes, ntes, sizeof(TocEntry *),
2356 				  TocEntrySizeCompare);
2357 
2358 		for (int i = 0; i < ntes; i++)
2359 			DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2360 								   mark_dump_job_done, NULL);
2361 
2362 		pg_free(tes);
2363 
2364 		/* Now wait for workers to finish. */
2365 		WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2366 	}
2367 	else
2368 	{
2369 		/* Non-parallel mode: just dump all candidate TEs sequentially. */
2370 		for (te = AH->toc->next; te != AH->toc; te = te->next)
2371 		{
2372 			/* Must have same filter conditions as above */
2373 			if (!te->dataDumper)
2374 				continue;
2375 			if ((te->reqs & REQ_DATA) == 0)
2376 				continue;
2377 
2378 			WriteDataChunksForTocEntry(AH, te);
2379 		}
2380 	}
2381 }
2382 
2383 
2384 /*
2385  * Callback function that's invoked in the master process after a step has
2386  * been parallel dumped.
2387  *
2388  * We don't need to do anything except check for worker failure.
2389  */
2390 static void
mark_dump_job_done(ArchiveHandle * AH,TocEntry * te,int status,void * callback_data)2391 mark_dump_job_done(ArchiveHandle *AH,
2392 				   TocEntry *te,
2393 				   int status,
2394 				   void *callback_data)
2395 {
2396 	pg_log_info("finished item %d %s %s",
2397 				te->dumpId, te->desc, te->tag);
2398 
2399 	if (status != 0)
2400 		fatal("worker process failed: exit code %d",
2401 			  status);
2402 }
2403 
2404 
2405 void
WriteDataChunksForTocEntry(ArchiveHandle * AH,TocEntry * te)2406 WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2407 {
2408 	StartDataPtrType startPtr;
2409 	EndDataPtrType endPtr;
2410 
2411 	AH->currToc = te;
2412 
2413 	if (strcmp(te->desc, "BLOBS") == 0)
2414 	{
2415 		startPtr = AH->StartBlobsPtr;
2416 		endPtr = AH->EndBlobsPtr;
2417 	}
2418 	else
2419 	{
2420 		startPtr = AH->StartDataPtr;
2421 		endPtr = AH->EndDataPtr;
2422 	}
2423 
2424 	if (startPtr != NULL)
2425 		(*startPtr) (AH, te);
2426 
2427 	/*
2428 	 * The user-provided DataDumper routine needs to call AH->WriteData
2429 	 */
2430 	te->dataDumper((Archive *) AH, te->dataDumperArg);
2431 
2432 	if (endPtr != NULL)
2433 		(*endPtr) (AH, te);
2434 
2435 	AH->currToc = NULL;
2436 }
2437 
2438 void
WriteToc(ArchiveHandle * AH)2439 WriteToc(ArchiveHandle *AH)
2440 {
2441 	TocEntry   *te;
2442 	char		workbuf[32];
2443 	int			tocCount;
2444 	int			i;
2445 
2446 	/* count entries that will actually be dumped */
2447 	tocCount = 0;
2448 	for (te = AH->toc->next; te != AH->toc; te = te->next)
2449 	{
2450 		if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2451 			tocCount++;
2452 	}
2453 
2454 	/* printf("%d TOC Entries to save\n", tocCount); */
2455 
2456 	WriteInt(AH, tocCount);
2457 
2458 	for (te = AH->toc->next; te != AH->toc; te = te->next)
2459 	{
2460 		if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2461 			continue;
2462 
2463 		WriteInt(AH, te->dumpId);
2464 		WriteInt(AH, te->dataDumper ? 1 : 0);
2465 
2466 		/* OID is recorded as a string for historical reasons */
2467 		sprintf(workbuf, "%u", te->catalogId.tableoid);
2468 		WriteStr(AH, workbuf);
2469 		sprintf(workbuf, "%u", te->catalogId.oid);
2470 		WriteStr(AH, workbuf);
2471 
2472 		WriteStr(AH, te->tag);
2473 		WriteStr(AH, te->desc);
2474 		WriteInt(AH, te->section);
2475 		WriteStr(AH, te->defn);
2476 		WriteStr(AH, te->dropStmt);
2477 		WriteStr(AH, te->copyStmt);
2478 		WriteStr(AH, te->namespace);
2479 		WriteStr(AH, te->tablespace);
2480 		WriteStr(AH, te->tableam);
2481 		WriteStr(AH, te->owner);
2482 		WriteStr(AH, "false");
2483 
2484 		/* Dump list of dependencies */
2485 		for (i = 0; i < te->nDeps; i++)
2486 		{
2487 			sprintf(workbuf, "%d", te->dependencies[i]);
2488 			WriteStr(AH, workbuf);
2489 		}
2490 		WriteStr(AH, NULL);		/* Terminate List */
2491 
2492 		if (AH->WriteExtraTocPtr)
2493 			AH->WriteExtraTocPtr(AH, te);
2494 	}
2495 }
2496 
2497 void
ReadToc(ArchiveHandle * AH)2498 ReadToc(ArchiveHandle *AH)
2499 {
2500 	int			i;
2501 	char	   *tmp;
2502 	DumpId	   *deps;
2503 	int			depIdx;
2504 	int			depSize;
2505 	TocEntry   *te;
2506 
2507 	AH->tocCount = ReadInt(AH);
2508 	AH->maxDumpId = 0;
2509 
2510 	for (i = 0; i < AH->tocCount; i++)
2511 	{
2512 		te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2513 		te->dumpId = ReadInt(AH);
2514 
2515 		if (te->dumpId > AH->maxDumpId)
2516 			AH->maxDumpId = te->dumpId;
2517 
2518 		/* Sanity check */
2519 		if (te->dumpId <= 0)
2520 			fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2521 				  te->dumpId);
2522 
2523 		te->hadDumper = ReadInt(AH);
2524 
2525 		if (AH->version >= K_VERS_1_8)
2526 		{
2527 			tmp = ReadStr(AH);
2528 			sscanf(tmp, "%u", &te->catalogId.tableoid);
2529 			free(tmp);
2530 		}
2531 		else
2532 			te->catalogId.tableoid = InvalidOid;
2533 		tmp = ReadStr(AH);
2534 		sscanf(tmp, "%u", &te->catalogId.oid);
2535 		free(tmp);
2536 
2537 		te->tag = ReadStr(AH);
2538 		te->desc = ReadStr(AH);
2539 
2540 		if (AH->version >= K_VERS_1_11)
2541 		{
2542 			te->section = ReadInt(AH);
2543 		}
2544 		else
2545 		{
2546 			/*
2547 			 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2548 			 * the entries into sections.  This list need not cover entry
2549 			 * types added later than 8.4.
2550 			 */
2551 			if (strcmp(te->desc, "COMMENT") == 0 ||
2552 				strcmp(te->desc, "ACL") == 0 ||
2553 				strcmp(te->desc, "ACL LANGUAGE") == 0)
2554 				te->section = SECTION_NONE;
2555 			else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2556 					 strcmp(te->desc, "BLOBS") == 0 ||
2557 					 strcmp(te->desc, "BLOB COMMENTS") == 0)
2558 				te->section = SECTION_DATA;
2559 			else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2560 					 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2561 					 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2562 					 strcmp(te->desc, "INDEX") == 0 ||
2563 					 strcmp(te->desc, "RULE") == 0 ||
2564 					 strcmp(te->desc, "TRIGGER") == 0)
2565 				te->section = SECTION_POST_DATA;
2566 			else
2567 				te->section = SECTION_PRE_DATA;
2568 		}
2569 
2570 		te->defn = ReadStr(AH);
2571 		te->dropStmt = ReadStr(AH);
2572 
2573 		if (AH->version >= K_VERS_1_3)
2574 			te->copyStmt = ReadStr(AH);
2575 
2576 		if (AH->version >= K_VERS_1_6)
2577 			te->namespace = ReadStr(AH);
2578 
2579 		if (AH->version >= K_VERS_1_10)
2580 			te->tablespace = ReadStr(AH);
2581 
2582 		if (AH->version >= K_VERS_1_14)
2583 			te->tableam = ReadStr(AH);
2584 
2585 		te->owner = ReadStr(AH);
2586 		if (AH->version < K_VERS_1_9 || strcmp(ReadStr(AH), "true") == 0)
2587 			pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2588 
2589 		/* Read TOC entry dependencies */
2590 		if (AH->version >= K_VERS_1_5)
2591 		{
2592 			depSize = 100;
2593 			deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2594 			depIdx = 0;
2595 			for (;;)
2596 			{
2597 				tmp = ReadStr(AH);
2598 				if (!tmp)
2599 					break;		/* end of list */
2600 				if (depIdx >= depSize)
2601 				{
2602 					depSize *= 2;
2603 					deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2604 				}
2605 				sscanf(tmp, "%d", &deps[depIdx]);
2606 				free(tmp);
2607 				depIdx++;
2608 			}
2609 
2610 			if (depIdx > 0)		/* We have a non-null entry */
2611 			{
2612 				deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2613 				te->dependencies = deps;
2614 				te->nDeps = depIdx;
2615 			}
2616 			else
2617 			{
2618 				free(deps);
2619 				te->dependencies = NULL;
2620 				te->nDeps = 0;
2621 			}
2622 		}
2623 		else
2624 		{
2625 			te->dependencies = NULL;
2626 			te->nDeps = 0;
2627 		}
2628 		te->dataLength = 0;
2629 
2630 		if (AH->ReadExtraTocPtr)
2631 			AH->ReadExtraTocPtr(AH, te);
2632 
2633 		pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2634 					 i, te->dumpId, te->desc, te->tag);
2635 
2636 		/* link completed entry into TOC circular list */
2637 		te->prev = AH->toc->prev;
2638 		AH->toc->prev->next = te;
2639 		AH->toc->prev = te;
2640 		te->next = AH->toc;
2641 
2642 		/* special processing immediately upon read for some items */
2643 		if (strcmp(te->desc, "ENCODING") == 0)
2644 			processEncodingEntry(AH, te);
2645 		else if (strcmp(te->desc, "STDSTRINGS") == 0)
2646 			processStdStringsEntry(AH, te);
2647 		else if (strcmp(te->desc, "SEARCHPATH") == 0)
2648 			processSearchPathEntry(AH, te);
2649 	}
2650 }
2651 
2652 static void
processEncodingEntry(ArchiveHandle * AH,TocEntry * te)2653 processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2654 {
2655 	/* te->defn should have the form SET client_encoding = 'foo'; */
2656 	char	   *defn = pg_strdup(te->defn);
2657 	char	   *ptr1;
2658 	char	   *ptr2 = NULL;
2659 	int			encoding;
2660 
2661 	ptr1 = strchr(defn, '\'');
2662 	if (ptr1)
2663 		ptr2 = strchr(++ptr1, '\'');
2664 	if (ptr2)
2665 	{
2666 		*ptr2 = '\0';
2667 		encoding = pg_char_to_encoding(ptr1);
2668 		if (encoding < 0)
2669 			fatal("unrecognized encoding \"%s\"",
2670 				  ptr1);
2671 		AH->public.encoding = encoding;
2672 	}
2673 	else
2674 		fatal("invalid ENCODING item: %s",
2675 			  te->defn);
2676 
2677 	free(defn);
2678 }
2679 
2680 static void
processStdStringsEntry(ArchiveHandle * AH,TocEntry * te)2681 processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2682 {
2683 	/* te->defn should have the form SET standard_conforming_strings = 'x'; */
2684 	char	   *ptr1;
2685 
2686 	ptr1 = strchr(te->defn, '\'');
2687 	if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2688 		AH->public.std_strings = true;
2689 	else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2690 		AH->public.std_strings = false;
2691 	else
2692 		fatal("invalid STDSTRINGS item: %s",
2693 			  te->defn);
2694 }
2695 
2696 static void
processSearchPathEntry(ArchiveHandle * AH,TocEntry * te)2697 processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2698 {
2699 	/*
2700 	 * te->defn should contain a command to set search_path.  We just copy it
2701 	 * verbatim for use later.
2702 	 */
2703 	AH->public.searchpath = pg_strdup(te->defn);
2704 }
2705 
2706 static void
StrictNamesCheck(RestoreOptions * ropt)2707 StrictNamesCheck(RestoreOptions *ropt)
2708 {
2709 	const char *missing_name;
2710 
2711 	Assert(ropt->strict_names);
2712 
2713 	if (ropt->schemaNames.head != NULL)
2714 	{
2715 		missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2716 		if (missing_name != NULL)
2717 			fatal("schema \"%s\" not found", missing_name);
2718 	}
2719 
2720 	if (ropt->tableNames.head != NULL)
2721 	{
2722 		missing_name = simple_string_list_not_touched(&ropt->tableNames);
2723 		if (missing_name != NULL)
2724 			fatal("table \"%s\" not found", missing_name);
2725 	}
2726 
2727 	if (ropt->indexNames.head != NULL)
2728 	{
2729 		missing_name = simple_string_list_not_touched(&ropt->indexNames);
2730 		if (missing_name != NULL)
2731 			fatal("index \"%s\" not found", missing_name);
2732 	}
2733 
2734 	if (ropt->functionNames.head != NULL)
2735 	{
2736 		missing_name = simple_string_list_not_touched(&ropt->functionNames);
2737 		if (missing_name != NULL)
2738 			fatal("function \"%s\" not found", missing_name);
2739 	}
2740 
2741 	if (ropt->triggerNames.head != NULL)
2742 	{
2743 		missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2744 		if (missing_name != NULL)
2745 			fatal("trigger \"%s\" not found", missing_name);
2746 	}
2747 }
2748 
2749 /*
2750  * Determine whether we want to restore this TOC entry.
2751  *
2752  * Returns 0 if entry should be skipped, or some combination of the
2753  * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2754  * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2755  */
2756 static teReqs
_tocEntryRequired(TocEntry * te,teSection curSection,ArchiveHandle * AH)2757 _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2758 {
2759 	teReqs		res = REQ_SCHEMA | REQ_DATA;
2760 	RestoreOptions *ropt = AH->public.ropt;
2761 
2762 	/* These items are treated specially */
2763 	if (strcmp(te->desc, "ENCODING") == 0 ||
2764 		strcmp(te->desc, "STDSTRINGS") == 0 ||
2765 		strcmp(te->desc, "SEARCHPATH") == 0)
2766 		return REQ_SPECIAL;
2767 
2768 	/*
2769 	 * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2770 	 * restored in createDB mode, and not restored otherwise, independently of
2771 	 * all else.
2772 	 */
2773 	if (strcmp(te->desc, "DATABASE") == 0 ||
2774 		strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2775 	{
2776 		if (ropt->createDB)
2777 			return REQ_SCHEMA;
2778 		else
2779 			return 0;
2780 	}
2781 
2782 	/*
2783 	 * Process exclusions that affect certain classes of TOC entries.
2784 	 */
2785 
2786 	/* If it's an ACL, maybe ignore it */
2787 	if (ropt->aclsSkip && _tocEntryIsACL(te))
2788 		return 0;
2789 
2790 	/* If it's a comment, maybe ignore it */
2791 	if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2792 		return 0;
2793 
2794 	/*
2795 	 * If it's a publication or a table part of a publication, maybe ignore
2796 	 * it.
2797 	 */
2798 	if (ropt->no_publications &&
2799 		(strcmp(te->desc, "PUBLICATION") == 0 ||
2800 		 strcmp(te->desc, "PUBLICATION TABLE") == 0))
2801 		return 0;
2802 
2803 	/* If it's a security label, maybe ignore it */
2804 	if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2805 		return 0;
2806 
2807 	/* If it's a subscription, maybe ignore it */
2808 	if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2809 		return 0;
2810 
2811 	/* Ignore it if section is not to be dumped/restored */
2812 	switch (curSection)
2813 	{
2814 		case SECTION_PRE_DATA:
2815 			if (!(ropt->dumpSections & DUMP_PRE_DATA))
2816 				return 0;
2817 			break;
2818 		case SECTION_DATA:
2819 			if (!(ropt->dumpSections & DUMP_DATA))
2820 				return 0;
2821 			break;
2822 		case SECTION_POST_DATA:
2823 			if (!(ropt->dumpSections & DUMP_POST_DATA))
2824 				return 0;
2825 			break;
2826 		default:
2827 			/* shouldn't get here, really, but ignore it */
2828 			return 0;
2829 	}
2830 
2831 	/* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2832 	if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2833 		return 0;
2834 
2835 	/*
2836 	 * Check options for selective dump/restore.
2837 	 */
2838 	if (strcmp(te->desc, "ACL") == 0 ||
2839 		strcmp(te->desc, "COMMENT") == 0 ||
2840 		strcmp(te->desc, "SECURITY LABEL") == 0)
2841 	{
2842 		/* Database properties react to createDB, not selectivity options. */
2843 		if (strncmp(te->tag, "DATABASE ", 9) == 0)
2844 		{
2845 			if (!ropt->createDB)
2846 				return 0;
2847 		}
2848 		else if (ropt->schemaNames.head != NULL ||
2849 				 ropt->schemaExcludeNames.head != NULL ||
2850 				 ropt->selTypes)
2851 		{
2852 			/*
2853 			 * In a selective dump/restore, we want to restore these dependent
2854 			 * TOC entry types only if their parent object is being restored.
2855 			 * Without selectivity options, we let through everything in the
2856 			 * archive.  Note there may be such entries with no parent, eg
2857 			 * non-default ACLs for built-in objects.
2858 			 *
2859 			 * This code depends on the parent having been marked already,
2860 			 * which should be the case; if it isn't, perhaps due to
2861 			 * SortTocFromFile rearrangement, skipping the dependent entry
2862 			 * seems prudent anyway.
2863 			 *
2864 			 * Ideally we'd handle, eg, table CHECK constraints this way too.
2865 			 * But it's hard to tell which of their dependencies is the one to
2866 			 * consult.
2867 			 */
2868 			if (te->nDeps != 1 ||
2869 				TocIDRequired(AH, te->dependencies[0]) == 0)
2870 				return 0;
2871 		}
2872 	}
2873 	else
2874 	{
2875 		/* Apply selective-restore rules for standalone TOC entries. */
2876 		if (ropt->schemaNames.head != NULL)
2877 		{
2878 			/* If no namespace is specified, it means all. */
2879 			if (!te->namespace)
2880 				return 0;
2881 			if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
2882 				return 0;
2883 		}
2884 
2885 		if (ropt->schemaExcludeNames.head != NULL &&
2886 			te->namespace &&
2887 			simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
2888 			return 0;
2889 
2890 		if (ropt->selTypes)
2891 		{
2892 			if (strcmp(te->desc, "TABLE") == 0 ||
2893 				strcmp(te->desc, "TABLE DATA") == 0 ||
2894 				strcmp(te->desc, "VIEW") == 0 ||
2895 				strcmp(te->desc, "FOREIGN TABLE") == 0 ||
2896 				strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
2897 				strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
2898 				strcmp(te->desc, "SEQUENCE") == 0 ||
2899 				strcmp(te->desc, "SEQUENCE SET") == 0)
2900 			{
2901 				if (!ropt->selTable)
2902 					return 0;
2903 				if (ropt->tableNames.head != NULL &&
2904 					!simple_string_list_member(&ropt->tableNames, te->tag))
2905 					return 0;
2906 			}
2907 			else if (strcmp(te->desc, "INDEX") == 0)
2908 			{
2909 				if (!ropt->selIndex)
2910 					return 0;
2911 				if (ropt->indexNames.head != NULL &&
2912 					!simple_string_list_member(&ropt->indexNames, te->tag))
2913 					return 0;
2914 			}
2915 			else if (strcmp(te->desc, "FUNCTION") == 0 ||
2916 					 strcmp(te->desc, "AGGREGATE") == 0 ||
2917 					 strcmp(te->desc, "PROCEDURE") == 0)
2918 			{
2919 				if (!ropt->selFunction)
2920 					return 0;
2921 				if (ropt->functionNames.head != NULL &&
2922 					!simple_string_list_member(&ropt->functionNames, te->tag))
2923 					return 0;
2924 			}
2925 			else if (strcmp(te->desc, "TRIGGER") == 0)
2926 			{
2927 				if (!ropt->selTrigger)
2928 					return 0;
2929 				if (ropt->triggerNames.head != NULL &&
2930 					!simple_string_list_member(&ropt->triggerNames, te->tag))
2931 					return 0;
2932 			}
2933 			else
2934 				return 0;
2935 		}
2936 	}
2937 
2938 	/*
2939 	 * Determine whether the TOC entry contains schema and/or data components,
2940 	 * and mask off inapplicable REQ bits.  If it had a dataDumper, assume
2941 	 * it's both schema and data.  Otherwise it's probably schema-only, but
2942 	 * there are exceptions.
2943 	 */
2944 	if (!te->hadDumper)
2945 	{
2946 		/*
2947 		 * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
2948 		 * it is considered a data entry.  We don't need to check for the
2949 		 * BLOBS entry or old-style BLOB COMMENTS, because they will have
2950 		 * hadDumper = true ... but we do need to check new-style BLOB ACLs,
2951 		 * comments, etc.
2952 		 */
2953 		if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
2954 			strcmp(te->desc, "BLOB") == 0 ||
2955 			(strcmp(te->desc, "ACL") == 0 &&
2956 			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2957 			(strcmp(te->desc, "COMMENT") == 0 &&
2958 			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2959 			(strcmp(te->desc, "SECURITY LABEL") == 0 &&
2960 			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
2961 			res = res & REQ_DATA;
2962 		else
2963 			res = res & ~REQ_DATA;
2964 	}
2965 
2966 	/* If there's no definition command, there's no schema component */
2967 	if (!te->defn || !te->defn[0])
2968 		res = res & ~REQ_SCHEMA;
2969 
2970 	/*
2971 	 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
2972 	 * always ignore it.
2973 	 */
2974 	if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2975 		return 0;
2976 
2977 	/* Mask it if we only want schema */
2978 	if (ropt->schemaOnly)
2979 	{
2980 		/*
2981 		 * The sequence_data option overrides schemaOnly for SEQUENCE SET.
2982 		 *
2983 		 * In binary-upgrade mode, even with schemaOnly set, we do not mask
2984 		 * out large objects.  (Only large object definitions, comments and
2985 		 * other metadata should be generated in binary-upgrade mode, not the
2986 		 * actual data, but that need not concern us here.)
2987 		 */
2988 		if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
2989 			!(ropt->binary_upgrade &&
2990 			  (strcmp(te->desc, "BLOB") == 0 ||
2991 			   (strcmp(te->desc, "ACL") == 0 &&
2992 				strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2993 			   (strcmp(te->desc, "COMMENT") == 0 &&
2994 				strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2995 			   (strcmp(te->desc, "SECURITY LABEL") == 0 &&
2996 				strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
2997 			res = res & REQ_SCHEMA;
2998 	}
2999 
3000 	/* Mask it if we only want data */
3001 	if (ropt->dataOnly)
3002 		res = res & REQ_DATA;
3003 
3004 	return res;
3005 }
3006 
3007 /*
3008  * Identify which pass we should restore this TOC entry in.
3009  *
3010  * See notes with the RestorePass typedef in pg_backup_archiver.h.
3011  */
3012 static RestorePass
_tocEntryRestorePass(TocEntry * te)3013 _tocEntryRestorePass(TocEntry *te)
3014 {
3015 	/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3016 	if (strcmp(te->desc, "ACL") == 0 ||
3017 		strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3018 		strcmp(te->desc, "DEFAULT ACL") == 0)
3019 		return RESTORE_PASS_ACL;
3020 	if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3021 		strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3022 		return RESTORE_PASS_POST_ACL;
3023 
3024 	/*
3025 	 * Comments need to be emitted in the same pass as their parent objects.
3026 	 * ACLs haven't got comments, and neither do matview data objects, but
3027 	 * event triggers do.  (Fortunately, event triggers haven't got ACLs, or
3028 	 * we'd need yet another weird special case.)
3029 	 */
3030 	if (strcmp(te->desc, "COMMENT") == 0 &&
3031 		strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3032 		return RESTORE_PASS_POST_ACL;
3033 
3034 	/* All else can be handled in the main pass. */
3035 	return RESTORE_PASS_MAIN;
3036 }
3037 
3038 /*
3039  * Identify TOC entries that are ACLs.
3040  *
3041  * Note: it seems worth duplicating some code here to avoid a hard-wired
3042  * assumption that these are exactly the same entries that we restore during
3043  * the RESTORE_PASS_ACL phase.
3044  */
3045 static bool
_tocEntryIsACL(TocEntry * te)3046 _tocEntryIsACL(TocEntry *te)
3047 {
3048 	/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3049 	if (strcmp(te->desc, "ACL") == 0 ||
3050 		strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3051 		strcmp(te->desc, "DEFAULT ACL") == 0)
3052 		return true;
3053 	return false;
3054 }
3055 
3056 /*
3057  * Issue SET commands for parameters that we want to have set the same way
3058  * at all times during execution of a restore script.
3059  */
3060 static void
_doSetFixedOutputState(ArchiveHandle * AH)3061 _doSetFixedOutputState(ArchiveHandle *AH)
3062 {
3063 	RestoreOptions *ropt = AH->public.ropt;
3064 
3065 	/*
3066 	 * Disable timeouts to allow for slow commands, idle parallel workers, etc
3067 	 */
3068 	ahprintf(AH, "SET statement_timeout = 0;\n");
3069 	ahprintf(AH, "SET lock_timeout = 0;\n");
3070 	ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3071 
3072 	/* Select the correct character set encoding */
3073 	ahprintf(AH, "SET client_encoding = '%s';\n",
3074 			 pg_encoding_to_char(AH->public.encoding));
3075 
3076 	/* Select the correct string literal syntax */
3077 	ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3078 			 AH->public.std_strings ? "on" : "off");
3079 
3080 	/* Select the role to be used during restore */
3081 	if (ropt && ropt->use_role)
3082 		ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3083 
3084 	/* Select the dump-time search_path */
3085 	if (AH->public.searchpath)
3086 		ahprintf(AH, "%s", AH->public.searchpath);
3087 
3088 	/* Make sure function checking is disabled */
3089 	ahprintf(AH, "SET check_function_bodies = false;\n");
3090 
3091 	/* Ensure that all valid XML data will be accepted */
3092 	ahprintf(AH, "SET xmloption = content;\n");
3093 
3094 	/* Avoid annoying notices etc */
3095 	ahprintf(AH, "SET client_min_messages = warning;\n");
3096 	if (!AH->public.std_strings)
3097 		ahprintf(AH, "SET escape_string_warning = off;\n");
3098 
3099 	/* Adjust row-security state */
3100 	if (ropt && ropt->enable_row_security)
3101 		ahprintf(AH, "SET row_security = on;\n");
3102 	else
3103 		ahprintf(AH, "SET row_security = off;\n");
3104 
3105 	ahprintf(AH, "\n");
3106 }
3107 
3108 /*
3109  * Issue a SET SESSION AUTHORIZATION command.  Caller is responsible
3110  * for updating state if appropriate.  If user is NULL or an empty string,
3111  * the specification DEFAULT will be used.
3112  */
3113 static void
_doSetSessionAuth(ArchiveHandle * AH,const char * user)3114 _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3115 {
3116 	PQExpBuffer cmd = createPQExpBuffer();
3117 
3118 	appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3119 
3120 	/*
3121 	 * SQL requires a string literal here.  Might as well be correct.
3122 	 */
3123 	if (user && *user)
3124 		appendStringLiteralAHX(cmd, user, AH);
3125 	else
3126 		appendPQExpBufferStr(cmd, "DEFAULT");
3127 	appendPQExpBufferChar(cmd, ';');
3128 
3129 	if (RestoringToDB(AH))
3130 	{
3131 		PGresult   *res;
3132 
3133 		res = PQexec(AH->connection, cmd->data);
3134 
3135 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3136 			/* NOT warn_or_exit_horribly... use -O instead to skip this. */
3137 			fatal("could not set session user to \"%s\": %s",
3138 				  user, PQerrorMessage(AH->connection));
3139 
3140 		PQclear(res);
3141 	}
3142 	else
3143 		ahprintf(AH, "%s\n\n", cmd->data);
3144 
3145 	destroyPQExpBuffer(cmd);
3146 }
3147 
3148 
3149 /*
3150  * Issue the commands to connect to the specified database.
3151  *
3152  * If we're currently restoring right into a database, this will
3153  * actually establish a connection. Otherwise it puts a \connect into
3154  * the script output.
3155  */
3156 static void
_reconnectToDB(ArchiveHandle * AH,const char * dbname)3157 _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3158 {
3159 	if (RestoringToDB(AH))
3160 		ReconnectToServer(AH, dbname);
3161 	else
3162 	{
3163 		PQExpBufferData connectbuf;
3164 
3165 		initPQExpBuffer(&connectbuf);
3166 		appendPsqlMetaConnect(&connectbuf, dbname);
3167 		ahprintf(AH, "%s\n", connectbuf.data);
3168 		termPQExpBuffer(&connectbuf);
3169 	}
3170 
3171 	/*
3172 	 * NOTE: currUser keeps track of what the imaginary session user in our
3173 	 * script is.  It's now effectively reset to the original userID.
3174 	 */
3175 	if (AH->currUser)
3176 		free(AH->currUser);
3177 	AH->currUser = NULL;
3178 
3179 	/* don't assume we still know the output schema, tablespace, etc either */
3180 	if (AH->currSchema)
3181 		free(AH->currSchema);
3182 	AH->currSchema = NULL;
3183 	if (AH->currTablespace)
3184 		free(AH->currTablespace);
3185 	AH->currTablespace = NULL;
3186 
3187 	/* re-establish fixed state */
3188 	_doSetFixedOutputState(AH);
3189 }
3190 
3191 /*
3192  * Become the specified user, and update state to avoid redundant commands
3193  *
3194  * NULL or empty argument is taken to mean restoring the session default
3195  */
3196 static void
_becomeUser(ArchiveHandle * AH,const char * user)3197 _becomeUser(ArchiveHandle *AH, const char *user)
3198 {
3199 	if (!user)
3200 		user = "";				/* avoid null pointers */
3201 
3202 	if (AH->currUser && strcmp(AH->currUser, user) == 0)
3203 		return;					/* no need to do anything */
3204 
3205 	_doSetSessionAuth(AH, user);
3206 
3207 	/*
3208 	 * NOTE: currUser keeps track of what the imaginary session user in our
3209 	 * script is
3210 	 */
3211 	if (AH->currUser)
3212 		free(AH->currUser);
3213 	AH->currUser = pg_strdup(user);
3214 }
3215 
3216 /*
3217  * Become the owner of the given TOC entry object.  If
3218  * changes in ownership are not allowed, this doesn't do anything.
3219  */
3220 static void
_becomeOwner(ArchiveHandle * AH,TocEntry * te)3221 _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3222 {
3223 	RestoreOptions *ropt = AH->public.ropt;
3224 
3225 	if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3226 		return;
3227 
3228 	_becomeUser(AH, te->owner);
3229 }
3230 
3231 
3232 /*
3233  * Issue the commands to select the specified schema as the current schema
3234  * in the target database.
3235  */
3236 static void
_selectOutputSchema(ArchiveHandle * AH,const char * schemaName)3237 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3238 {
3239 	PQExpBuffer qry;
3240 
3241 	/*
3242 	 * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3243 	 * that search_path rather than switching to entry-specific paths.
3244 	 * Otherwise, it's an old archive that will not restore correctly unless
3245 	 * we set the search_path as it's expecting.
3246 	 */
3247 	if (AH->public.searchpath)
3248 		return;
3249 
3250 	if (!schemaName || *schemaName == '\0' ||
3251 		(AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3252 		return;					/* no need to do anything */
3253 
3254 	qry = createPQExpBuffer();
3255 
3256 	appendPQExpBuffer(qry, "SET search_path = %s",
3257 					  fmtId(schemaName));
3258 	if (strcmp(schemaName, "pg_catalog") != 0)
3259 		appendPQExpBufferStr(qry, ", pg_catalog");
3260 
3261 	if (RestoringToDB(AH))
3262 	{
3263 		PGresult   *res;
3264 
3265 		res = PQexec(AH->connection, qry->data);
3266 
3267 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3268 			warn_or_exit_horribly(AH,
3269 								  "could not set search_path to \"%s\": %s",
3270 								  schemaName, PQerrorMessage(AH->connection));
3271 
3272 		PQclear(res);
3273 	}
3274 	else
3275 		ahprintf(AH, "%s;\n\n", qry->data);
3276 
3277 	if (AH->currSchema)
3278 		free(AH->currSchema);
3279 	AH->currSchema = pg_strdup(schemaName);
3280 
3281 	destroyPQExpBuffer(qry);
3282 }
3283 
3284 /*
3285  * Issue the commands to select the specified tablespace as the current one
3286  * in the target database.
3287  */
3288 static void
_selectTablespace(ArchiveHandle * AH,const char * tablespace)3289 _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3290 {
3291 	RestoreOptions *ropt = AH->public.ropt;
3292 	PQExpBuffer qry;
3293 	const char *want,
3294 			   *have;
3295 
3296 	/* do nothing in --no-tablespaces mode */
3297 	if (ropt->noTablespace)
3298 		return;
3299 
3300 	have = AH->currTablespace;
3301 	want = tablespace;
3302 
3303 	/* no need to do anything for non-tablespace object */
3304 	if (!want)
3305 		return;
3306 
3307 	if (have && strcmp(want, have) == 0)
3308 		return;					/* no need to do anything */
3309 
3310 	qry = createPQExpBuffer();
3311 
3312 	if (strcmp(want, "") == 0)
3313 	{
3314 		/* We want the tablespace to be the database's default */
3315 		appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3316 	}
3317 	else
3318 	{
3319 		/* We want an explicit tablespace */
3320 		appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3321 	}
3322 
3323 	if (RestoringToDB(AH))
3324 	{
3325 		PGresult   *res;
3326 
3327 		res = PQexec(AH->connection, qry->data);
3328 
3329 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3330 			warn_or_exit_horribly(AH,
3331 								  "could not set default_tablespace to %s: %s",
3332 								  fmtId(want), PQerrorMessage(AH->connection));
3333 
3334 		PQclear(res);
3335 	}
3336 	else
3337 		ahprintf(AH, "%s;\n\n", qry->data);
3338 
3339 	if (AH->currTablespace)
3340 		free(AH->currTablespace);
3341 	AH->currTablespace = pg_strdup(want);
3342 
3343 	destroyPQExpBuffer(qry);
3344 }
3345 
3346 /*
3347  * Set the proper default_table_access_method value for the table.
3348  */
3349 static void
_selectTableAccessMethod(ArchiveHandle * AH,const char * tableam)3350 _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3351 {
3352 	PQExpBuffer cmd;
3353 	const char *want,
3354 			   *have;
3355 
3356 	have = AH->currTableAm;
3357 	want = tableam;
3358 
3359 	if (!want)
3360 		return;
3361 
3362 	if (have && strcmp(want, have) == 0)
3363 		return;
3364 
3365 	cmd = createPQExpBuffer();
3366 	appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3367 
3368 	if (RestoringToDB(AH))
3369 	{
3370 		PGresult   *res;
3371 
3372 		res = PQexec(AH->connection, cmd->data);
3373 
3374 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3375 			warn_or_exit_horribly(AH,
3376 								  "could not set default_table_access_method: %s",
3377 								  PQerrorMessage(AH->connection));
3378 
3379 		PQclear(res);
3380 	}
3381 	else
3382 		ahprintf(AH, "%s\n\n", cmd->data);
3383 
3384 	destroyPQExpBuffer(cmd);
3385 
3386 	AH->currTableAm = pg_strdup(want);
3387 }
3388 
3389 /*
3390  * Extract an object description for a TOC entry, and append it to buf.
3391  *
3392  * This is used for ALTER ... OWNER TO.
3393  */
3394 static void
_getObjectDescription(PQExpBuffer buf,TocEntry * te,ArchiveHandle * AH)3395 _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
3396 {
3397 	const char *type = te->desc;
3398 
3399 	/* Use ALTER TABLE for views and sequences */
3400 	if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
3401 		strcmp(type, "MATERIALIZED VIEW") == 0)
3402 		type = "TABLE";
3403 
3404 	/* objects that don't require special decoration */
3405 	if (strcmp(type, "COLLATION") == 0 ||
3406 		strcmp(type, "CONVERSION") == 0 ||
3407 		strcmp(type, "DOMAIN") == 0 ||
3408 		strcmp(type, "TABLE") == 0 ||
3409 		strcmp(type, "TYPE") == 0 ||
3410 		strcmp(type, "FOREIGN TABLE") == 0 ||
3411 		strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3412 		strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3413 		strcmp(type, "STATISTICS") == 0 ||
3414 	/* non-schema-specified objects */
3415 		strcmp(type, "DATABASE") == 0 ||
3416 		strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3417 		strcmp(type, "SCHEMA") == 0 ||
3418 		strcmp(type, "EVENT TRIGGER") == 0 ||
3419 		strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3420 		strcmp(type, "SERVER") == 0 ||
3421 		strcmp(type, "PUBLICATION") == 0 ||
3422 		strcmp(type, "SUBSCRIPTION") == 0 ||
3423 		strcmp(type, "USER MAPPING") == 0)
3424 	{
3425 		appendPQExpBuffer(buf, "%s ", type);
3426 		if (te->namespace && *te->namespace)
3427 			appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3428 		appendPQExpBufferStr(buf, fmtId(te->tag));
3429 		return;
3430 	}
3431 
3432 	/* BLOBs just have a name, but it's numeric so must not use fmtId */
3433 	if (strcmp(type, "BLOB") == 0)
3434 	{
3435 		appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3436 		return;
3437 	}
3438 
3439 	/*
3440 	 * These object types require additional decoration.  Fortunately, the
3441 	 * information needed is exactly what's in the DROP command.
3442 	 */
3443 	if (strcmp(type, "AGGREGATE") == 0 ||
3444 		strcmp(type, "FUNCTION") == 0 ||
3445 		strcmp(type, "OPERATOR") == 0 ||
3446 		strcmp(type, "OPERATOR CLASS") == 0 ||
3447 		strcmp(type, "OPERATOR FAMILY") == 0 ||
3448 		strcmp(type, "PROCEDURE") == 0)
3449 	{
3450 		/* Chop "DROP " off the front and make a modifiable copy */
3451 		char	   *first = pg_strdup(te->dropStmt + 5);
3452 		char	   *last;
3453 
3454 		/* point to last character in string */
3455 		last = first + strlen(first) - 1;
3456 
3457 		/* Strip off any ';' or '\n' at the end */
3458 		while (last >= first && (*last == '\n' || *last == ';'))
3459 			last--;
3460 		*(last + 1) = '\0';
3461 
3462 		appendPQExpBufferStr(buf, first);
3463 
3464 		free(first);
3465 		return;
3466 	}
3467 
3468 	pg_log_warning("don't know how to set owner for object type \"%s\"",
3469 				   type);
3470 }
3471 
3472 /*
3473  * Emit the SQL commands to create the object represented by a TOC entry
3474  *
3475  * This now also includes issuing an ALTER OWNER command to restore the
3476  * object's ownership, if wanted.  But note that the object's permissions
3477  * will remain at default, until the matching ACL TOC entry is restored.
3478  */
3479 static void
_printTocEntry(ArchiveHandle * AH,TocEntry * te,bool isData)3480 _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3481 {
3482 	RestoreOptions *ropt = AH->public.ropt;
3483 
3484 	/* Select owner, schema, tablespace and default AM as necessary */
3485 	_becomeOwner(AH, te);
3486 	_selectOutputSchema(AH, te->namespace);
3487 	_selectTablespace(AH, te->tablespace);
3488 	_selectTableAccessMethod(AH, te->tableam);
3489 
3490 	/* Emit header comment for item */
3491 	if (!AH->noTocComments)
3492 	{
3493 		const char *pfx;
3494 		char	   *sanitized_name;
3495 		char	   *sanitized_schema;
3496 		char	   *sanitized_owner;
3497 
3498 		if (isData)
3499 			pfx = "Data for ";
3500 		else
3501 			pfx = "";
3502 
3503 		ahprintf(AH, "--\n");
3504 		if (AH->public.verbose)
3505 		{
3506 			ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3507 					 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3508 			if (te->nDeps > 0)
3509 			{
3510 				int			i;
3511 
3512 				ahprintf(AH, "-- Dependencies:");
3513 				for (i = 0; i < te->nDeps; i++)
3514 					ahprintf(AH, " %d", te->dependencies[i]);
3515 				ahprintf(AH, "\n");
3516 			}
3517 		}
3518 
3519 		sanitized_name = sanitize_line(te->tag, false);
3520 		sanitized_schema = sanitize_line(te->namespace, true);
3521 		sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3522 
3523 		ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3524 				 pfx, sanitized_name, te->desc, sanitized_schema,
3525 				 sanitized_owner);
3526 
3527 		free(sanitized_name);
3528 		free(sanitized_schema);
3529 		free(sanitized_owner);
3530 
3531 		if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3532 		{
3533 			char	   *sanitized_tablespace;
3534 
3535 			sanitized_tablespace = sanitize_line(te->tablespace, false);
3536 			ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3537 			free(sanitized_tablespace);
3538 		}
3539 		ahprintf(AH, "\n");
3540 
3541 		if (AH->PrintExtraTocPtr != NULL)
3542 			AH->PrintExtraTocPtr(AH, te);
3543 		ahprintf(AH, "--\n\n");
3544 	}
3545 
3546 	/*
3547 	 * Actually print the definition.
3548 	 *
3549 	 * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
3550 	 * versions put into CREATE SCHEMA.  We have to do this when --no-owner
3551 	 * mode is selected.  This is ugly, but I see no other good way ...
3552 	 */
3553 	if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
3554 	{
3555 		ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3556 	}
3557 	else
3558 	{
3559 		if (te->defn && strlen(te->defn) > 0)
3560 			ahprintf(AH, "%s\n\n", te->defn);
3561 	}
3562 
3563 	/*
3564 	 * If we aren't using SET SESSION AUTH to determine ownership, we must
3565 	 * instead issue an ALTER OWNER command.  We assume that anything without
3566 	 * a DROP command is not a separately ownable object.  All the categories
3567 	 * with DROP commands must appear in one list or the other.
3568 	 */
3569 	if (!ropt->noOwner && !ropt->use_setsessauth &&
3570 		te->owner && strlen(te->owner) > 0 &&
3571 		te->dropStmt && strlen(te->dropStmt) > 0)
3572 	{
3573 		if (strcmp(te->desc, "AGGREGATE") == 0 ||
3574 			strcmp(te->desc, "BLOB") == 0 ||
3575 			strcmp(te->desc, "COLLATION") == 0 ||
3576 			strcmp(te->desc, "CONVERSION") == 0 ||
3577 			strcmp(te->desc, "DATABASE") == 0 ||
3578 			strcmp(te->desc, "DOMAIN") == 0 ||
3579 			strcmp(te->desc, "FUNCTION") == 0 ||
3580 			strcmp(te->desc, "OPERATOR") == 0 ||
3581 			strcmp(te->desc, "OPERATOR CLASS") == 0 ||
3582 			strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
3583 			strcmp(te->desc, "PROCEDURE") == 0 ||
3584 			strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
3585 			strcmp(te->desc, "SCHEMA") == 0 ||
3586 			strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3587 			strcmp(te->desc, "TABLE") == 0 ||
3588 			strcmp(te->desc, "TYPE") == 0 ||
3589 			strcmp(te->desc, "VIEW") == 0 ||
3590 			strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3591 			strcmp(te->desc, "SEQUENCE") == 0 ||
3592 			strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3593 			strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
3594 			strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
3595 			strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
3596 			strcmp(te->desc, "SERVER") == 0 ||
3597 			strcmp(te->desc, "STATISTICS") == 0 ||
3598 			strcmp(te->desc, "PUBLICATION") == 0 ||
3599 			strcmp(te->desc, "SUBSCRIPTION") == 0)
3600 		{
3601 			PQExpBuffer temp = createPQExpBuffer();
3602 
3603 			appendPQExpBufferStr(temp, "ALTER ");
3604 			_getObjectDescription(temp, te, AH);
3605 			appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
3606 			ahprintf(AH, "%s\n\n", temp->data);
3607 			destroyPQExpBuffer(temp);
3608 		}
3609 		else if (strcmp(te->desc, "CAST") == 0 ||
3610 				 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3611 				 strcmp(te->desc, "CONSTRAINT") == 0 ||
3612 				 strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
3613 				 strcmp(te->desc, "DEFAULT") == 0 ||
3614 				 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3615 				 strcmp(te->desc, "INDEX") == 0 ||
3616 				 strcmp(te->desc, "RULE") == 0 ||
3617 				 strcmp(te->desc, "TRIGGER") == 0 ||
3618 				 strcmp(te->desc, "ROW SECURITY") == 0 ||
3619 				 strcmp(te->desc, "POLICY") == 0 ||
3620 				 strcmp(te->desc, "USER MAPPING") == 0)
3621 		{
3622 			/* these object types don't have separate owners */
3623 		}
3624 		else
3625 		{
3626 			pg_log_warning("don't know how to set owner for object type \"%s\"",
3627 						   te->desc);
3628 		}
3629 	}
3630 
3631 	/*
3632 	 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3633 	 * commands, so we can no longer assume we know the current auth setting.
3634 	 */
3635 	if (_tocEntryIsACL(te))
3636 	{
3637 		if (AH->currUser)
3638 			free(AH->currUser);
3639 		AH->currUser = NULL;
3640 	}
3641 }
3642 
3643 /*
3644  * Sanitize a string to be included in an SQL comment or TOC listing, by
3645  * replacing any newlines with spaces.  This ensures each logical output line
3646  * is in fact one physical output line, to prevent corruption of the dump
3647  * (which could, in the worst case, present an SQL injection vulnerability
3648  * if someone were to incautiously load a dump containing objects with
3649  * maliciously crafted names).
3650  *
3651  * The result is a freshly malloc'd string.  If the input string is NULL,
3652  * return a malloc'ed empty string, unless want_hyphen, in which case return a
3653  * malloc'ed hyphen.
3654  *
3655  * Note that we currently don't bother to quote names, meaning that the name
3656  * fields aren't automatically parseable.  "pg_restore -L" doesn't care because
3657  * it only examines the dumpId field, but someday we might want to try harder.
3658  */
3659 static char *
sanitize_line(const char * str,bool want_hyphen)3660 sanitize_line(const char *str, bool want_hyphen)
3661 {
3662 	char	   *result;
3663 	char	   *s;
3664 
3665 	if (!str)
3666 		return pg_strdup(want_hyphen ? "-" : "");
3667 
3668 	result = pg_strdup(str);
3669 
3670 	for (s = result; *s != '\0'; s++)
3671 	{
3672 		if (*s == '\n' || *s == '\r')
3673 			*s = ' ';
3674 	}
3675 
3676 	return result;
3677 }
3678 
3679 /*
3680  * Write the file header for a custom-format archive
3681  */
3682 void
WriteHead(ArchiveHandle * AH)3683 WriteHead(ArchiveHandle *AH)
3684 {
3685 	struct tm	crtm;
3686 
3687 	AH->WriteBufPtr(AH, "PGDMP", 5);	/* Magic code */
3688 	AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3689 	AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3690 	AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3691 	AH->WriteBytePtr(AH, AH->intSize);
3692 	AH->WriteBytePtr(AH, AH->offSize);
3693 	AH->WriteBytePtr(AH, AH->format);
3694 	WriteInt(AH, AH->compression);
3695 	crtm = *localtime(&AH->createDate);
3696 	WriteInt(AH, crtm.tm_sec);
3697 	WriteInt(AH, crtm.tm_min);
3698 	WriteInt(AH, crtm.tm_hour);
3699 	WriteInt(AH, crtm.tm_mday);
3700 	WriteInt(AH, crtm.tm_mon);
3701 	WriteInt(AH, crtm.tm_year);
3702 	WriteInt(AH, crtm.tm_isdst);
3703 	WriteStr(AH, PQdb(AH->connection));
3704 	WriteStr(AH, AH->public.remoteVersionStr);
3705 	WriteStr(AH, PG_VERSION);
3706 }
3707 
3708 void
ReadHead(ArchiveHandle * AH)3709 ReadHead(ArchiveHandle *AH)
3710 {
3711 	char		vmaj,
3712 				vmin,
3713 				vrev;
3714 	int			fmt;
3715 
3716 	/*
3717 	 * If we haven't already read the header, do so.
3718 	 *
3719 	 * NB: this code must agree with _discoverArchiveFormat().  Maybe find a
3720 	 * way to unify the cases?
3721 	 */
3722 	if (!AH->readHeader)
3723 	{
3724 		char		tmpMag[7];
3725 
3726 		AH->ReadBufPtr(AH, tmpMag, 5);
3727 
3728 		if (strncmp(tmpMag, "PGDMP", 5) != 0)
3729 			fatal("did not find magic string in file header");
3730 	}
3731 
3732 	vmaj = AH->ReadBytePtr(AH);
3733 	vmin = AH->ReadBytePtr(AH);
3734 
3735 	if (vmaj > 1 || (vmaj == 1 && vmin > 0))	/* Version > 1.0 */
3736 		vrev = AH->ReadBytePtr(AH);
3737 	else
3738 		vrev = 0;
3739 
3740 	AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
3741 
3742 	if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3743 		fatal("unsupported version (%d.%d) in file header",
3744 			  vmaj, vmin);
3745 
3746 	AH->intSize = AH->ReadBytePtr(AH);
3747 	if (AH->intSize > 32)
3748 		fatal("sanity check on integer size (%lu) failed",
3749 			  (unsigned long) AH->intSize);
3750 
3751 	if (AH->intSize > sizeof(int))
3752 		pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
3753 
3754 	if (AH->version >= K_VERS_1_7)
3755 		AH->offSize = AH->ReadBytePtr(AH);
3756 	else
3757 		AH->offSize = AH->intSize;
3758 
3759 	fmt = AH->ReadBytePtr(AH);
3760 
3761 	if (AH->format != fmt)
3762 		fatal("expected format (%d) differs from format found in file (%d)",
3763 			  AH->format, fmt);
3764 
3765 	if (AH->version >= K_VERS_1_2)
3766 	{
3767 		if (AH->version < K_VERS_1_4)
3768 			AH->compression = AH->ReadBytePtr(AH);
3769 		else
3770 			AH->compression = ReadInt(AH);
3771 	}
3772 	else
3773 		AH->compression = Z_DEFAULT_COMPRESSION;
3774 
3775 #ifndef HAVE_LIBZ
3776 	if (AH->compression != 0)
3777 		pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
3778 #endif
3779 
3780 	if (AH->version >= K_VERS_1_4)
3781 	{
3782 		struct tm	crtm;
3783 
3784 		crtm.tm_sec = ReadInt(AH);
3785 		crtm.tm_min = ReadInt(AH);
3786 		crtm.tm_hour = ReadInt(AH);
3787 		crtm.tm_mday = ReadInt(AH);
3788 		crtm.tm_mon = ReadInt(AH);
3789 		crtm.tm_year = ReadInt(AH);
3790 		crtm.tm_isdst = ReadInt(AH);
3791 
3792 		/*
3793 		 * Newer versions of glibc have mktime() report failure if tm_isdst is
3794 		 * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
3795 		 * TZ=UTC.  This is problematic when restoring an archive under a
3796 		 * different timezone setting.  If we get a failure, try again with
3797 		 * tm_isdst set to -1 ("don't know").
3798 		 *
3799 		 * XXX with or without this hack, we reconstruct createDate
3800 		 * incorrectly when the prevailing timezone is different from
3801 		 * pg_dump's.  Next time we bump the archive version, we should flush
3802 		 * this representation and store a plain seconds-since-the-Epoch
3803 		 * timestamp instead.
3804 		 */
3805 		AH->createDate = mktime(&crtm);
3806 		if (AH->createDate == (time_t) -1)
3807 		{
3808 			crtm.tm_isdst = -1;
3809 			AH->createDate = mktime(&crtm);
3810 			if (AH->createDate == (time_t) -1)
3811 				pg_log_warning("invalid creation date in header");
3812 		}
3813 	}
3814 
3815 	if (AH->version >= K_VERS_1_4)
3816 	{
3817 		AH->archdbname = ReadStr(AH);
3818 	}
3819 
3820 	if (AH->version >= K_VERS_1_10)
3821 	{
3822 		AH->archiveRemoteVersion = ReadStr(AH);
3823 		AH->archiveDumpVersion = ReadStr(AH);
3824 	}
3825 }
3826 
3827 
3828 /*
3829  * checkSeek
3830  *	  check to see if ftell/fseek can be performed.
3831  */
3832 bool
checkSeek(FILE * fp)3833 checkSeek(FILE *fp)
3834 {
3835 	pgoff_t		tpos;
3836 
3837 	/* Check that ftello works on this file */
3838 	tpos = ftello(fp);
3839 	if (tpos < 0)
3840 		return false;
3841 
3842 	/*
3843 	 * Check that fseeko(SEEK_SET) works, too.  NB: we used to try to test
3844 	 * this with fseeko(fp, 0, SEEK_CUR).  But some platforms treat that as a
3845 	 * successful no-op even on files that are otherwise unseekable.
3846 	 */
3847 	if (fseeko(fp, tpos, SEEK_SET) != 0)
3848 		return false;
3849 
3850 	return true;
3851 }
3852 
3853 
3854 /*
3855  * dumpTimestamp
3856  */
3857 static void
dumpTimestamp(ArchiveHandle * AH,const char * msg,time_t tim)3858 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3859 {
3860 	char		buf[64];
3861 
3862 	if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3863 		ahprintf(AH, "-- %s %s\n\n", msg, buf);
3864 }
3865 
3866 /*
3867  * Main engine for parallel restore.
3868  *
3869  * Parallel restore is done in three phases.  In this first phase,
3870  * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
3871  * processed in the RESTORE_PASS_MAIN pass.  (In practice, that's all
3872  * PRE_DATA items other than ACLs.)  Entries we can't process now are
3873  * added to the pending_list for later phases to deal with.
3874  */
3875 static void
restore_toc_entries_prefork(ArchiveHandle * AH,TocEntry * pending_list)3876 restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
3877 {
3878 	bool		skipped_some;
3879 	TocEntry   *next_work_item;
3880 
3881 	pg_log_debug("entering restore_toc_entries_prefork");
3882 
3883 	/* Adjust dependency information */
3884 	fix_dependencies(AH);
3885 
3886 	/*
3887 	 * Do all the early stuff in a single connection in the parent. There's no
3888 	 * great point in running it in parallel, in fact it will actually run
3889 	 * faster in a single connection because we avoid all the connection and
3890 	 * setup overhead.  Also, pre-9.2 pg_dump versions were not very good
3891 	 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
3892 	 * not risk trying to process them out-of-order.
3893 	 *
3894 	 * Stuff that we can't do immediately gets added to the pending_list.
3895 	 * Note: we don't yet filter out entries that aren't going to be restored.
3896 	 * They might participate in dependency chains connecting entries that
3897 	 * should be restored, so we treat them as live until we actually process
3898 	 * them.
3899 	 *
3900 	 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
3901 	 * before DATA items, and all DATA items before POST_DATA items.  That is
3902 	 * not certain to be true in older archives, though, and in any case use
3903 	 * of a list file would destroy that ordering (cf. SortTocFromFile).  So
3904 	 * this loop cannot assume that it holds.
3905 	 */
3906 	AH->restorePass = RESTORE_PASS_MAIN;
3907 	skipped_some = false;
3908 	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3909 	{
3910 		bool		do_now = true;
3911 
3912 		if (next_work_item->section != SECTION_PRE_DATA)
3913 		{
3914 			/* DATA and POST_DATA items are just ignored for now */
3915 			if (next_work_item->section == SECTION_DATA ||
3916 				next_work_item->section == SECTION_POST_DATA)
3917 			{
3918 				do_now = false;
3919 				skipped_some = true;
3920 			}
3921 			else
3922 			{
3923 				/*
3924 				 * SECTION_NONE items, such as comments, can be processed now
3925 				 * if we are still in the PRE_DATA part of the archive.  Once
3926 				 * we've skipped any items, we have to consider whether the
3927 				 * comment's dependencies are satisfied, so skip it for now.
3928 				 */
3929 				if (skipped_some)
3930 					do_now = false;
3931 			}
3932 		}
3933 
3934 		/*
3935 		 * Also skip items that need to be forced into later passes.  We need
3936 		 * not set skipped_some in this case, since by assumption no main-pass
3937 		 * items could depend on these.
3938 		 */
3939 		if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
3940 			do_now = false;
3941 
3942 		if (do_now)
3943 		{
3944 			/* OK, restore the item and update its dependencies */
3945 			pg_log_info("processing item %d %s %s",
3946 						next_work_item->dumpId,
3947 						next_work_item->desc, next_work_item->tag);
3948 
3949 			(void) restore_toc_entry(AH, next_work_item, false);
3950 
3951 			/* Reduce dependencies, but don't move anything to ready_list */
3952 			reduce_dependencies(AH, next_work_item, NULL);
3953 		}
3954 		else
3955 		{
3956 			/* Nope, so add it to pending_list */
3957 			pending_list_append(pending_list, next_work_item);
3958 		}
3959 	}
3960 
3961 	/*
3962 	 * Now close parent connection in prep for parallel steps.  We do this
3963 	 * mainly to ensure that we don't exceed the specified number of parallel
3964 	 * connections.
3965 	 */
3966 	DisconnectDatabase(&AH->public);
3967 
3968 	/* blow away any transient state from the old connection */
3969 	if (AH->currUser)
3970 		free(AH->currUser);
3971 	AH->currUser = NULL;
3972 	if (AH->currSchema)
3973 		free(AH->currSchema);
3974 	AH->currSchema = NULL;
3975 	if (AH->currTablespace)
3976 		free(AH->currTablespace);
3977 	AH->currTablespace = NULL;
3978 	if (AH->currTableAm)
3979 		free(AH->currTableAm);
3980 	AH->currTableAm = NULL;
3981 }
3982 
3983 /*
3984  * Main engine for parallel restore.
3985  *
3986  * Parallel restore is done in three phases.  In this second phase,
3987  * we process entries by dispatching them to parallel worker children
3988  * (processes on Unix, threads on Windows), each of which connects
3989  * separately to the database.  Inter-entry dependencies are respected,
3990  * and so is the RestorePass multi-pass structure.  When we can no longer
3991  * make any entries ready to process, we exit.  Normally, there will be
3992  * nothing left to do; but if there is, the third phase will mop up.
3993  */
3994 static void
restore_toc_entries_parallel(ArchiveHandle * AH,ParallelState * pstate,TocEntry * pending_list)3995 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
3996 							 TocEntry *pending_list)
3997 {
3998 	ParallelReadyList ready_list;
3999 	TocEntry   *next_work_item;
4000 
4001 	pg_log_debug("entering restore_toc_entries_parallel");
4002 
4003 	/* Set up ready_list with enough room for all known TocEntrys */
4004 	ready_list_init(&ready_list, AH->tocCount);
4005 
4006 	/*
4007 	 * The pending_list contains all items that we need to restore.  Move all
4008 	 * items that are available to process immediately into the ready_list.
4009 	 * After this setup, the pending list is everything that needs to be done
4010 	 * but is blocked by one or more dependencies, while the ready list
4011 	 * contains items that have no remaining dependencies and are OK to
4012 	 * process in the current restore pass.
4013 	 */
4014 	AH->restorePass = RESTORE_PASS_MAIN;
4015 	move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4016 
4017 	/*
4018 	 * main parent loop
4019 	 *
4020 	 * Keep going until there is no worker still running AND there is no work
4021 	 * left to be done.  Note invariant: at top of loop, there should always
4022 	 * be at least one worker available to dispatch a job to.
4023 	 */
4024 	pg_log_info("entering main parallel loop");
4025 
4026 	for (;;)
4027 	{
4028 		/* Look for an item ready to be dispatched to a worker */
4029 		next_work_item = pop_next_work_item(AH, &ready_list, pstate);
4030 		if (next_work_item != NULL)
4031 		{
4032 			/* If not to be restored, don't waste time launching a worker */
4033 			if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4034 			{
4035 				pg_log_info("skipping item %d %s %s",
4036 							next_work_item->dumpId,
4037 							next_work_item->desc, next_work_item->tag);
4038 				/* Update its dependencies as though we'd completed it */
4039 				reduce_dependencies(AH, next_work_item, &ready_list);
4040 				/* Loop around to see if anything else can be dispatched */
4041 				continue;
4042 			}
4043 
4044 			pg_log_info("launching item %d %s %s",
4045 						next_work_item->dumpId,
4046 						next_work_item->desc, next_work_item->tag);
4047 
4048 			/* Dispatch to some worker */
4049 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4050 								   mark_restore_job_done, &ready_list);
4051 		}
4052 		else if (IsEveryWorkerIdle(pstate))
4053 		{
4054 			/*
4055 			 * Nothing is ready and no worker is running, so we're done with
4056 			 * the current pass or maybe with the whole process.
4057 			 */
4058 			if (AH->restorePass == RESTORE_PASS_LAST)
4059 				break;			/* No more parallel processing is possible */
4060 
4061 			/* Advance to next restore pass */
4062 			AH->restorePass++;
4063 			/* That probably allows some stuff to be made ready */
4064 			move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4065 			/* Loop around to see if anything's now ready */
4066 			continue;
4067 		}
4068 		else
4069 		{
4070 			/*
4071 			 * We have nothing ready, but at least one child is working, so
4072 			 * wait for some subjob to finish.
4073 			 */
4074 		}
4075 
4076 		/*
4077 		 * Before dispatching another job, check to see if anything has
4078 		 * finished.  We should check every time through the loop so as to
4079 		 * reduce dependencies as soon as possible.  If we were unable to
4080 		 * dispatch any job this time through, wait until some worker finishes
4081 		 * (and, hopefully, unblocks some pending item).  If we did dispatch
4082 		 * something, continue as soon as there's at least one idle worker.
4083 		 * Note that in either case, there's guaranteed to be at least one
4084 		 * idle worker when we return to the top of the loop.  This ensures we
4085 		 * won't block inside DispatchJobForTocEntry, which would be
4086 		 * undesirable: we'd rather postpone dispatching until we see what's
4087 		 * been unblocked by finished jobs.
4088 		 */
4089 		WaitForWorkers(AH, pstate,
4090 					   next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4091 	}
4092 
4093 	/* There should now be nothing in ready_list. */
4094 	Assert(ready_list.first_te > ready_list.last_te);
4095 
4096 	ready_list_free(&ready_list);
4097 
4098 	pg_log_info("finished main parallel loop");
4099 }
4100 
4101 /*
4102  * Main engine for parallel restore.
4103  *
4104  * Parallel restore is done in three phases.  In this third phase,
4105  * we mop up any remaining TOC entries by processing them serially.
4106  * This phase normally should have nothing to do, but if we've somehow
4107  * gotten stuck due to circular dependencies or some such, this provides
4108  * at least some chance of completing the restore successfully.
4109  */
4110 static void
restore_toc_entries_postfork(ArchiveHandle * AH,TocEntry * pending_list)4111 restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4112 {
4113 	RestoreOptions *ropt = AH->public.ropt;
4114 	TocEntry   *te;
4115 
4116 	pg_log_debug("entering restore_toc_entries_postfork");
4117 
4118 	/*
4119 	 * Now reconnect the single parent connection.
4120 	 */
4121 	ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4122 
4123 	/* re-establish fixed state */
4124 	_doSetFixedOutputState(AH);
4125 
4126 	/*
4127 	 * Make sure there is no work left due to, say, circular dependencies, or
4128 	 * some other pathological condition.  If so, do it in the single parent
4129 	 * connection.  We don't sweat about RestorePass ordering; it's likely we
4130 	 * already violated that.
4131 	 */
4132 	for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4133 	{
4134 		pg_log_info("processing missed item %d %s %s",
4135 					te->dumpId, te->desc, te->tag);
4136 		(void) restore_toc_entry(AH, te, false);
4137 	}
4138 }
4139 
4140 /*
4141  * Check if te1 has an exclusive lock requirement for an item that te2 also
4142  * requires, whether or not te2's requirement is for an exclusive lock.
4143  */
4144 static bool
has_lock_conflicts(TocEntry * te1,TocEntry * te2)4145 has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4146 {
4147 	int			j,
4148 				k;
4149 
4150 	for (j = 0; j < te1->nLockDeps; j++)
4151 	{
4152 		for (k = 0; k < te2->nDeps; k++)
4153 		{
4154 			if (te1->lockDeps[j] == te2->dependencies[k])
4155 				return true;
4156 		}
4157 	}
4158 	return false;
4159 }
4160 
4161 
4162 /*
4163  * Initialize the header of the pending-items list.
4164  *
4165  * This is a circular list with a dummy TocEntry as header, just like the
4166  * main TOC list; but we use separate list links so that an entry can be in
4167  * the main TOC list as well as in the pending list.
4168  */
4169 static void
pending_list_header_init(TocEntry * l)4170 pending_list_header_init(TocEntry *l)
4171 {
4172 	l->pending_prev = l->pending_next = l;
4173 }
4174 
4175 /* Append te to the end of the pending-list headed by l */
4176 static void
pending_list_append(TocEntry * l,TocEntry * te)4177 pending_list_append(TocEntry *l, TocEntry *te)
4178 {
4179 	te->pending_prev = l->pending_prev;
4180 	l->pending_prev->pending_next = te;
4181 	l->pending_prev = te;
4182 	te->pending_next = l;
4183 }
4184 
4185 /* Remove te from the pending-list */
4186 static void
pending_list_remove(TocEntry * te)4187 pending_list_remove(TocEntry *te)
4188 {
4189 	te->pending_prev->pending_next = te->pending_next;
4190 	te->pending_next->pending_prev = te->pending_prev;
4191 	te->pending_prev = NULL;
4192 	te->pending_next = NULL;
4193 }
4194 
4195 
4196 /*
4197  * Initialize the ready_list with enough room for up to tocCount entries.
4198  */
4199 static void
ready_list_init(ParallelReadyList * ready_list,int tocCount)4200 ready_list_init(ParallelReadyList *ready_list, int tocCount)
4201 {
4202 	ready_list->tes = (TocEntry **)
4203 		pg_malloc(tocCount * sizeof(TocEntry *));
4204 	ready_list->first_te = 0;
4205 	ready_list->last_te = -1;
4206 	ready_list->sorted = false;
4207 }
4208 
4209 /*
4210  * Free storage for a ready_list.
4211  */
4212 static void
ready_list_free(ParallelReadyList * ready_list)4213 ready_list_free(ParallelReadyList *ready_list)
4214 {
4215 	pg_free(ready_list->tes);
4216 }
4217 
4218 /* Add te to the ready_list */
4219 static void
ready_list_insert(ParallelReadyList * ready_list,TocEntry * te)4220 ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
4221 {
4222 	ready_list->tes[++ready_list->last_te] = te;
4223 	/* List is (probably) not sorted anymore. */
4224 	ready_list->sorted = false;
4225 }
4226 
4227 /* Remove the i'th entry in the ready_list */
4228 static void
ready_list_remove(ParallelReadyList * ready_list,int i)4229 ready_list_remove(ParallelReadyList *ready_list, int i)
4230 {
4231 	int			f = ready_list->first_te;
4232 
4233 	Assert(i >= f && i <= ready_list->last_te);
4234 
4235 	/*
4236 	 * In the typical case where the item to be removed is the first ready
4237 	 * entry, we need only increment first_te to remove it.  Otherwise, move
4238 	 * the entries before it to compact the list.  (This preserves sortedness,
4239 	 * if any.)  We could alternatively move the entries after i, but there
4240 	 * are typically many more of those.
4241 	 */
4242 	if (i > f)
4243 	{
4244 		TocEntry  **first_te_ptr = &ready_list->tes[f];
4245 
4246 		memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
4247 	}
4248 	ready_list->first_te++;
4249 }
4250 
4251 /* Sort the ready_list into the desired order */
4252 static void
ready_list_sort(ParallelReadyList * ready_list)4253 ready_list_sort(ParallelReadyList *ready_list)
4254 {
4255 	if (!ready_list->sorted)
4256 	{
4257 		int			n = ready_list->last_te - ready_list->first_te + 1;
4258 
4259 		if (n > 1)
4260 			qsort(ready_list->tes + ready_list->first_te, n,
4261 				  sizeof(TocEntry *),
4262 				  TocEntrySizeCompare);
4263 		ready_list->sorted = true;
4264 	}
4265 }
4266 
4267 /* qsort comparator for sorting TocEntries by dataLength */
4268 static int
TocEntrySizeCompare(const void * p1,const void * p2)4269 TocEntrySizeCompare(const void *p1, const void *p2)
4270 {
4271 	const TocEntry *te1 = *(const TocEntry *const *) p1;
4272 	const TocEntry *te2 = *(const TocEntry *const *) p2;
4273 
4274 	/* Sort by decreasing dataLength */
4275 	if (te1->dataLength > te2->dataLength)
4276 		return -1;
4277 	if (te1->dataLength < te2->dataLength)
4278 		return 1;
4279 
4280 	/* For equal dataLengths, sort by dumpId, just to be stable */
4281 	if (te1->dumpId < te2->dumpId)
4282 		return -1;
4283 	if (te1->dumpId > te2->dumpId)
4284 		return 1;
4285 
4286 	return 0;
4287 }
4288 
4289 
4290 /*
4291  * Move all immediately-ready items from pending_list to ready_list.
4292  *
4293  * Items are considered ready if they have no remaining dependencies and
4294  * they belong in the current restore pass.  (See also reduce_dependencies,
4295  * which applies the same logic one-at-a-time.)
4296  */
4297 static void
move_to_ready_list(TocEntry * pending_list,ParallelReadyList * ready_list,RestorePass pass)4298 move_to_ready_list(TocEntry *pending_list,
4299 				   ParallelReadyList *ready_list,
4300 				   RestorePass pass)
4301 {
4302 	TocEntry   *te;
4303 	TocEntry   *next_te;
4304 
4305 	for (te = pending_list->pending_next; te != pending_list; te = next_te)
4306 	{
4307 		/* must save list link before possibly removing te from list */
4308 		next_te = te->pending_next;
4309 
4310 		if (te->depCount == 0 &&
4311 			_tocEntryRestorePass(te) == pass)
4312 		{
4313 			/* Remove it from pending_list ... */
4314 			pending_list_remove(te);
4315 			/* ... and add to ready_list */
4316 			ready_list_insert(ready_list, te);
4317 		}
4318 	}
4319 }
4320 
4321 /*
4322  * Find the next work item (if any) that is capable of being run now,
4323  * and remove it from the ready_list.
4324  *
4325  * Returns the item, or NULL if nothing is runnable.
4326  *
4327  * To qualify, the item must have no remaining dependencies
4328  * and no requirements for locks that are incompatible with
4329  * items currently running.  Items in the ready_list are known to have
4330  * no remaining dependencies, but we have to check for lock conflicts.
4331  */
4332 static TocEntry *
pop_next_work_item(ArchiveHandle * AH,ParallelReadyList * ready_list,ParallelState * pstate)4333 pop_next_work_item(ArchiveHandle *AH, ParallelReadyList *ready_list,
4334 				   ParallelState *pstate)
4335 {
4336 	/*
4337 	 * Sort the ready_list so that we'll tackle larger jobs first.
4338 	 */
4339 	ready_list_sort(ready_list);
4340 
4341 	/*
4342 	 * Search the ready_list until we find a suitable item.
4343 	 */
4344 	for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
4345 	{
4346 		TocEntry   *te = ready_list->tes[i];
4347 		bool		conflicts = false;
4348 
4349 		/*
4350 		 * Check to see if the item would need exclusive lock on something
4351 		 * that a currently running item also needs lock on, or vice versa. If
4352 		 * so, we don't want to schedule them together.
4353 		 */
4354 		for (int k = 0; k < pstate->numWorkers; k++)
4355 		{
4356 			TocEntry   *running_te = pstate->te[k];
4357 
4358 			if (running_te == NULL)
4359 				continue;
4360 			if (has_lock_conflicts(te, running_te) ||
4361 				has_lock_conflicts(running_te, te))
4362 			{
4363 				conflicts = true;
4364 				break;
4365 			}
4366 		}
4367 
4368 		if (conflicts)
4369 			continue;
4370 
4371 		/* passed all tests, so this item can run */
4372 		ready_list_remove(ready_list, i);
4373 		return te;
4374 	}
4375 
4376 	pg_log_debug("no item ready");
4377 	return NULL;
4378 }
4379 
4380 
4381 /*
4382  * Restore a single TOC item in parallel with others
4383  *
4384  * this is run in the worker, i.e. in a thread (Windows) or a separate process
4385  * (everything else). A worker process executes several such work items during
4386  * a parallel backup or restore. Once we terminate here and report back that
4387  * our work is finished, the master process will assign us a new work item.
4388  */
4389 int
parallel_restore(ArchiveHandle * AH,TocEntry * te)4390 parallel_restore(ArchiveHandle *AH, TocEntry *te)
4391 {
4392 	int			status;
4393 
4394 	Assert(AH->connection != NULL);
4395 
4396 	/* Count only errors associated with this TOC entry */
4397 	AH->public.n_errors = 0;
4398 
4399 	/* Restore the TOC item */
4400 	status = restore_toc_entry(AH, te, true);
4401 
4402 	return status;
4403 }
4404 
4405 
4406 /*
4407  * Callback function that's invoked in the master process after a step has
4408  * been parallel restored.
4409  *
4410  * Update status and reduce the dependency count of any dependent items.
4411  */
4412 static void
mark_restore_job_done(ArchiveHandle * AH,TocEntry * te,int status,void * callback_data)4413 mark_restore_job_done(ArchiveHandle *AH,
4414 					  TocEntry *te,
4415 					  int status,
4416 					  void *callback_data)
4417 {
4418 	ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
4419 
4420 	pg_log_info("finished item %d %s %s",
4421 				te->dumpId, te->desc, te->tag);
4422 
4423 	if (status == WORKER_CREATE_DONE)
4424 		mark_create_done(AH, te);
4425 	else if (status == WORKER_INHIBIT_DATA)
4426 	{
4427 		inhibit_data_for_failed_table(AH, te);
4428 		AH->public.n_errors++;
4429 	}
4430 	else if (status == WORKER_IGNORED_ERRORS)
4431 		AH->public.n_errors++;
4432 	else if (status != 0)
4433 		fatal("worker process failed: exit code %d",
4434 			  status);
4435 
4436 	reduce_dependencies(AH, te, ready_list);
4437 }
4438 
4439 
4440 /*
4441  * Process the dependency information into a form useful for parallel restore.
4442  *
4443  * This function takes care of fixing up some missing or badly designed
4444  * dependencies, and then prepares subsidiary data structures that will be
4445  * used in the main parallel-restore logic, including:
4446  * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4447  * 2. We set up depCount fields that are the number of as-yet-unprocessed
4448  * dependencies for each TOC entry.
4449  *
4450  * We also identify locking dependencies so that we can avoid trying to
4451  * schedule conflicting items at the same time.
4452  */
4453 static void
fix_dependencies(ArchiveHandle * AH)4454 fix_dependencies(ArchiveHandle *AH)
4455 {
4456 	TocEntry   *te;
4457 	int			i;
4458 
4459 	/*
4460 	 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4461 	 * items are marked as not being in any parallel-processing list.
4462 	 */
4463 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4464 	{
4465 		te->depCount = te->nDeps;
4466 		te->revDeps = NULL;
4467 		te->nRevDeps = 0;
4468 		te->pending_prev = NULL;
4469 		te->pending_next = NULL;
4470 	}
4471 
4472 	/*
4473 	 * POST_DATA items that are shown as depending on a table need to be
4474 	 * re-pointed to depend on that table's data, instead.  This ensures they
4475 	 * won't get scheduled until the data has been loaded.
4476 	 */
4477 	repoint_table_dependencies(AH);
4478 
4479 	/*
4480 	 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4481 	 * COMMENTS to BLOBS.  Cope.  (We assume there's only one BLOBS and only
4482 	 * one BLOB COMMENTS in such files.)
4483 	 */
4484 	if (AH->version < K_VERS_1_11)
4485 	{
4486 		for (te = AH->toc->next; te != AH->toc; te = te->next)
4487 		{
4488 			if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4489 			{
4490 				TocEntry   *te2;
4491 
4492 				for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4493 				{
4494 					if (strcmp(te2->desc, "BLOBS") == 0)
4495 					{
4496 						te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4497 						te->dependencies[0] = te2->dumpId;
4498 						te->nDeps++;
4499 						te->depCount++;
4500 						break;
4501 					}
4502 				}
4503 				break;
4504 			}
4505 		}
4506 	}
4507 
4508 	/*
4509 	 * At this point we start to build the revDeps reverse-dependency arrays,
4510 	 * so all changes of dependencies must be complete.
4511 	 */
4512 
4513 	/*
4514 	 * Count the incoming dependencies for each item.  Also, it is possible
4515 	 * that the dependencies list items that are not in the archive at all
4516 	 * (that should not happen in 9.2 and later, but is highly likely in older
4517 	 * archives).  Subtract such items from the depCounts.
4518 	 */
4519 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4520 	{
4521 		for (i = 0; i < te->nDeps; i++)
4522 		{
4523 			DumpId		depid = te->dependencies[i];
4524 
4525 			if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4526 				AH->tocsByDumpId[depid]->nRevDeps++;
4527 			else
4528 				te->depCount--;
4529 		}
4530 	}
4531 
4532 	/*
4533 	 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4534 	 * it as a counter below.
4535 	 */
4536 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4537 	{
4538 		if (te->nRevDeps > 0)
4539 			te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4540 		te->nRevDeps = 0;
4541 	}
4542 
4543 	/*
4544 	 * Build the revDeps[] arrays of incoming-dependency dumpIds.  This had
4545 	 * better agree with the loops above.
4546 	 */
4547 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4548 	{
4549 		for (i = 0; i < te->nDeps; i++)
4550 		{
4551 			DumpId		depid = te->dependencies[i];
4552 
4553 			if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4554 			{
4555 				TocEntry   *otherte = AH->tocsByDumpId[depid];
4556 
4557 				otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4558 			}
4559 		}
4560 	}
4561 
4562 	/*
4563 	 * Lastly, work out the locking dependencies.
4564 	 */
4565 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4566 	{
4567 		te->lockDeps = NULL;
4568 		te->nLockDeps = 0;
4569 		identify_locking_dependencies(AH, te);
4570 	}
4571 }
4572 
4573 /*
4574  * Change dependencies on table items to depend on table data items instead,
4575  * but only in POST_DATA items.
4576  *
4577  * Also, for any item having such dependency(s), set its dataLength to the
4578  * largest dataLength of the table data items it depends on.  This ensures
4579  * that parallel restore will prioritize larger jobs (index builds, FK
4580  * constraint checks, etc) over smaller ones, avoiding situations where we
4581  * end a restore with only one active job working on a large table.
4582  */
4583 static void
repoint_table_dependencies(ArchiveHandle * AH)4584 repoint_table_dependencies(ArchiveHandle *AH)
4585 {
4586 	TocEntry   *te;
4587 	int			i;
4588 	DumpId		olddep;
4589 
4590 	for (te = AH->toc->next; te != AH->toc; te = te->next)
4591 	{
4592 		if (te->section != SECTION_POST_DATA)
4593 			continue;
4594 		for (i = 0; i < te->nDeps; i++)
4595 		{
4596 			olddep = te->dependencies[i];
4597 			if (olddep <= AH->maxDumpId &&
4598 				AH->tableDataId[olddep] != 0)
4599 			{
4600 				DumpId		tabledataid = AH->tableDataId[olddep];
4601 				TocEntry   *tabledatate = AH->tocsByDumpId[tabledataid];
4602 
4603 				te->dependencies[i] = tabledataid;
4604 				te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4605 				pg_log_debug("transferring dependency %d -> %d to %d",
4606 							 te->dumpId, olddep, tabledataid);
4607 			}
4608 		}
4609 	}
4610 }
4611 
4612 /*
4613  * Identify which objects we'll need exclusive lock on in order to restore
4614  * the given TOC entry (*other* than the one identified by the TOC entry
4615  * itself).  Record their dump IDs in the entry's lockDeps[] array.
4616  */
4617 static void
identify_locking_dependencies(ArchiveHandle * AH,TocEntry * te)4618 identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4619 {
4620 	DumpId	   *lockids;
4621 	int			nlockids;
4622 	int			i;
4623 
4624 	/*
4625 	 * We only care about this for POST_DATA items.  PRE_DATA items are not
4626 	 * run in parallel, and DATA items are all independent by assumption.
4627 	 */
4628 	if (te->section != SECTION_POST_DATA)
4629 		return;
4630 
4631 	/* Quick exit if no dependencies at all */
4632 	if (te->nDeps == 0)
4633 		return;
4634 
4635 	/*
4636 	 * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4637 	 * and hence require exclusive lock.  However, we know that CREATE INDEX
4638 	 * does not.  (Maybe someday index-creating CONSTRAINTs will fall in that
4639 	 * category too ... but today is not that day.)
4640 	 */
4641 	if (strcmp(te->desc, "INDEX") == 0)
4642 		return;
4643 
4644 	/*
4645 	 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4646 	 * item listed among its dependencies.  Originally all of these would have
4647 	 * been TABLE items, but repoint_table_dependencies would have repointed
4648 	 * them to the TABLE DATA items if those are present (which they might not
4649 	 * be, eg in a schema-only dump).  Note that all of the entries we are
4650 	 * processing here are POST_DATA; otherwise there might be a significant
4651 	 * difference between a dependency on a table and a dependency on its
4652 	 * data, so that closer analysis would be needed here.
4653 	 */
4654 	lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4655 	nlockids = 0;
4656 	for (i = 0; i < te->nDeps; i++)
4657 	{
4658 		DumpId		depid = te->dependencies[i];
4659 
4660 		if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4661 			((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4662 			 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4663 			lockids[nlockids++] = depid;
4664 	}
4665 
4666 	if (nlockids == 0)
4667 	{
4668 		free(lockids);
4669 		return;
4670 	}
4671 
4672 	te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4673 	te->nLockDeps = nlockids;
4674 }
4675 
4676 /*
4677  * Remove the specified TOC entry from the depCounts of items that depend on
4678  * it, thereby possibly making them ready-to-run.  Any pending item that
4679  * becomes ready should be moved to the ready_list, if that's provided.
4680  */
4681 static void
reduce_dependencies(ArchiveHandle * AH,TocEntry * te,ParallelReadyList * ready_list)4682 reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
4683 					ParallelReadyList *ready_list)
4684 {
4685 	int			i;
4686 
4687 	pg_log_debug("reducing dependencies for %d", te->dumpId);
4688 
4689 	for (i = 0; i < te->nRevDeps; i++)
4690 	{
4691 		TocEntry   *otherte = AH->tocsByDumpId[te->revDeps[i]];
4692 
4693 		Assert(otherte->depCount > 0);
4694 		otherte->depCount--;
4695 
4696 		/*
4697 		 * It's ready if it has no remaining dependencies, and it belongs in
4698 		 * the current restore pass, and it is currently a member of the
4699 		 * pending list (that check is needed to prevent double restore in
4700 		 * some cases where a list-file forces out-of-order restoring).
4701 		 * However, if ready_list == NULL then caller doesn't want any list
4702 		 * memberships changed.
4703 		 */
4704 		if (otherte->depCount == 0 &&
4705 			_tocEntryRestorePass(otherte) == AH->restorePass &&
4706 			otherte->pending_prev != NULL &&
4707 			ready_list != NULL)
4708 		{
4709 			/* Remove it from pending list ... */
4710 			pending_list_remove(otherte);
4711 			/* ... and add to ready_list */
4712 			ready_list_insert(ready_list, otherte);
4713 		}
4714 	}
4715 }
4716 
4717 /*
4718  * Set the created flag on the DATA member corresponding to the given
4719  * TABLE member
4720  */
4721 static void
mark_create_done(ArchiveHandle * AH,TocEntry * te)4722 mark_create_done(ArchiveHandle *AH, TocEntry *te)
4723 {
4724 	if (AH->tableDataId[te->dumpId] != 0)
4725 	{
4726 		TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4727 
4728 		ted->created = true;
4729 	}
4730 }
4731 
4732 /*
4733  * Mark the DATA member corresponding to the given TABLE member
4734  * as not wanted
4735  */
4736 static void
inhibit_data_for_failed_table(ArchiveHandle * AH,TocEntry * te)4737 inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
4738 {
4739 	pg_log_info("table \"%s\" could not be created, will not restore its data",
4740 				te->tag);
4741 
4742 	if (AH->tableDataId[te->dumpId] != 0)
4743 	{
4744 		TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4745 
4746 		ted->reqs = 0;
4747 	}
4748 }
4749 
4750 /*
4751  * Clone and de-clone routines used in parallel restoration.
4752  *
4753  * Enough of the structure is cloned to ensure that there is no
4754  * conflict between different threads each with their own clone.
4755  */
4756 ArchiveHandle *
CloneArchive(ArchiveHandle * AH)4757 CloneArchive(ArchiveHandle *AH)
4758 {
4759 	ArchiveHandle *clone;
4760 
4761 	/* Make a "flat" copy */
4762 	clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4763 	memcpy(clone, AH, sizeof(ArchiveHandle));
4764 
4765 	/* Handle format-independent fields */
4766 	memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4767 
4768 	/* The clone will have its own connection, so disregard connection state */
4769 	clone->connection = NULL;
4770 	clone->connCancel = NULL;
4771 	clone->currUser = NULL;
4772 	clone->currSchema = NULL;
4773 	clone->currTablespace = NULL;
4774 
4775 	/* savedPassword must be local in case we change it while connecting */
4776 	if (clone->savedPassword)
4777 		clone->savedPassword = pg_strdup(clone->savedPassword);
4778 
4779 	/* clone has its own error count, too */
4780 	clone->public.n_errors = 0;
4781 
4782 	/*
4783 	 * Connect our new clone object to the database, using the same connection
4784 	 * parameters used for the original connection.
4785 	 */
4786 	ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
4787 
4788 	/* re-establish fixed state */
4789 	if (AH->mode == archModeRead)
4790 		_doSetFixedOutputState(clone);
4791 	/* in write case, setupDumpWorker will fix up connection state */
4792 
4793 	/* Let the format-specific code have a chance too */
4794 	clone->ClonePtr(clone);
4795 
4796 	Assert(clone->connection != NULL);
4797 	return clone;
4798 }
4799 
4800 /*
4801  * Release clone-local storage.
4802  *
4803  * Note: we assume any clone-local connection was already closed.
4804  */
4805 void
DeCloneArchive(ArchiveHandle * AH)4806 DeCloneArchive(ArchiveHandle *AH)
4807 {
4808 	/* Should not have an open database connection */
4809 	Assert(AH->connection == NULL);
4810 
4811 	/* Clear format-specific state */
4812 	AH->DeClonePtr(AH);
4813 
4814 	/* Clear state allocated by CloneArchive */
4815 	if (AH->sqlparse.curCmd)
4816 		destroyPQExpBuffer(AH->sqlparse.curCmd);
4817 
4818 	/* Clear any connection-local state */
4819 	if (AH->currUser)
4820 		free(AH->currUser);
4821 	if (AH->currSchema)
4822 		free(AH->currSchema);
4823 	if (AH->currTablespace)
4824 		free(AH->currTablespace);
4825 	if (AH->currTableAm)
4826 		free(AH->currTableAm);
4827 	if (AH->savedPassword)
4828 		free(AH->savedPassword);
4829 
4830 	free(AH);
4831 }
4832