1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_directory.c
4  *
5  *	A directory format dump is a directory, which contains a "toc.dat" file
6  *	for the TOC, and a separate file for each data entry, named "<oid>.dat".
7  *	Large objects (BLOBs) are stored in separate files named "blob_<oid>.dat",
8  *	and there's a plain-text TOC file for them called "blobs.toc". If
9  *	compression is used, each data file is individually compressed and the
10  *	".gz" suffix is added to the filenames. The TOC files are never
11  *	compressed by pg_dump, however they are accepted with the .gz suffix too,
12  *	in case the user has manually compressed them with 'gzip'.
13  *
14  *	NOTE: This format is identical to the files written in the tar file in
15  *	the 'tar' format, except that we don't write the restore.sql file (TODO),
16  *	and the tar format doesn't support compression. Please keep the formats in
17  *	sync.
18  *
19  *
20  *	Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
21  *	Portions Copyright (c) 1994, Regents of the University of California
22  *	Portions Copyright (c) 2000, Philip Warner
23  *
24  *	Rights are granted to use this software in any way so long
25  *	as this notice is not removed.
26  *
27  *	The author is not responsible for loss or damages that may
28  *	result from its use.
29  *
30  * IDENTIFICATION
31  *		src/bin/pg_dump/pg_backup_directory.c
32  *
33  *-------------------------------------------------------------------------
34  */
35 #include "postgres_fe.h"
36 
37 #include <dirent.h>
38 #include <sys/stat.h>
39 
40 #include "common/file_utils.h"
41 #include "compress_io.h"
42 #include "parallel.h"
43 #include "pg_backup_utils.h"
44 
45 typedef struct
46 {
47 	/*
48 	 * Our archive location. This is basically what the user specified as his
49 	 * backup file but of course here it is a directory.
50 	 */
51 	char	   *directory;
52 
53 	cfp		   *dataFH;			/* currently open data file */
54 
55 	cfp		   *blobsTocFH;		/* file handle for blobs.toc */
56 	ParallelState *pstate;		/* for parallel backup / restore */
57 } lclContext;
58 
59 typedef struct
60 {
61 	char	   *filename;		/* filename excluding the directory (basename) */
62 } lclTocEntry;
63 
64 /* prototypes for private functions */
65 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
66 static void _StartData(ArchiveHandle *AH, TocEntry *te);
67 static void _EndData(ArchiveHandle *AH, TocEntry *te);
68 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
69 static int	_WriteByte(ArchiveHandle *AH, const int i);
70 static int	_ReadByte(ArchiveHandle *);
71 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
72 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
73 static void _CloseArchive(ArchiveHandle *AH);
74 static void _ReopenArchive(ArchiveHandle *AH);
75 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
76 
77 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
78 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
79 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
80 
81 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
82 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
83 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
84 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
85 static void _LoadBlobs(ArchiveHandle *AH);
86 
87 static void _PrepParallelRestore(ArchiveHandle *AH);
88 static void _Clone(ArchiveHandle *AH);
89 static void _DeClone(ArchiveHandle *AH);
90 
91 static int	_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
92 static int	_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
93 
94 static void setFilePath(ArchiveHandle *AH, char *buf,
95 						const char *relativeFilename);
96 
97 /*
98  *	Init routine required by ALL formats. This is a global routine
99  *	and should be declared in pg_backup_archiver.h
100  *
101  *	Its task is to create any extra archive context (using AH->formatData),
102  *	and to initialize the supported function pointers.
103  *
104  *	It should also prepare whatever its input source is for reading/writing,
105  *	and in the case of a read mode connection, it should load the Header & TOC.
106  */
107 void
InitArchiveFmt_Directory(ArchiveHandle * AH)108 InitArchiveFmt_Directory(ArchiveHandle *AH)
109 {
110 	lclContext *ctx;
111 
112 	/* Assuming static functions, this can be copied for each format. */
113 	AH->ArchiveEntryPtr = _ArchiveEntry;
114 	AH->StartDataPtr = _StartData;
115 	AH->WriteDataPtr = _WriteData;
116 	AH->EndDataPtr = _EndData;
117 	AH->WriteBytePtr = _WriteByte;
118 	AH->ReadBytePtr = _ReadByte;
119 	AH->WriteBufPtr = _WriteBuf;
120 	AH->ReadBufPtr = _ReadBuf;
121 	AH->ClosePtr = _CloseArchive;
122 	AH->ReopenPtr = _ReopenArchive;
123 	AH->PrintTocDataPtr = _PrintTocData;
124 	AH->ReadExtraTocPtr = _ReadExtraToc;
125 	AH->WriteExtraTocPtr = _WriteExtraToc;
126 	AH->PrintExtraTocPtr = _PrintExtraToc;
127 
128 	AH->StartBlobsPtr = _StartBlobs;
129 	AH->StartBlobPtr = _StartBlob;
130 	AH->EndBlobPtr = _EndBlob;
131 	AH->EndBlobsPtr = _EndBlobs;
132 
133 	AH->PrepParallelRestorePtr = _PrepParallelRestore;
134 	AH->ClonePtr = _Clone;
135 	AH->DeClonePtr = _DeClone;
136 
137 	AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
138 	AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
139 
140 	/* Set up our private context */
141 	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
142 	AH->formatData = (void *) ctx;
143 
144 	ctx->dataFH = NULL;
145 	ctx->blobsTocFH = NULL;
146 
147 	/* Initialize LO buffering */
148 	AH->lo_buf_size = LOBBUFSIZE;
149 	AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
150 
151 	/*
152 	 * Now open the TOC file
153 	 */
154 
155 	if (!AH->fSpec || strcmp(AH->fSpec, "") == 0)
156 		fatal("no output directory specified");
157 
158 	ctx->directory = AH->fSpec;
159 
160 	if (AH->mode == archModeWrite)
161 	{
162 		struct stat st;
163 		bool		is_empty = false;
164 
165 		/* we accept an empty existing directory */
166 		if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
167 		{
168 			DIR		   *dir = opendir(ctx->directory);
169 
170 			if (dir)
171 			{
172 				struct dirent *d;
173 
174 				is_empty = true;
175 				while (errno = 0, (d = readdir(dir)))
176 				{
177 					if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
178 					{
179 						is_empty = false;
180 						break;
181 					}
182 				}
183 
184 				if (errno)
185 					fatal("could not read directory \"%s\": %m",
186 						  ctx->directory);
187 
188 				if (closedir(dir))
189 					fatal("could not close directory \"%s\": %m",
190 						  ctx->directory);
191 			}
192 		}
193 
194 		if (!is_empty && mkdir(ctx->directory, 0700) < 0)
195 			fatal("could not create directory \"%s\": %m",
196 				  ctx->directory);
197 	}
198 	else
199 	{							/* Read Mode */
200 		char		fname[MAXPGPATH];
201 		cfp		   *tocFH;
202 
203 		setFilePath(AH, fname, "toc.dat");
204 
205 		tocFH = cfopen_read(fname, PG_BINARY_R);
206 		if (tocFH == NULL)
207 			fatal("could not open input file \"%s\": %m", fname);
208 
209 		ctx->dataFH = tocFH;
210 
211 		/*
212 		 * The TOC of a directory format dump shares the format code of the
213 		 * tar format.
214 		 */
215 		AH->format = archTar;
216 		ReadHead(AH);
217 		AH->format = archDirectory;
218 		ReadToc(AH);
219 
220 		/* Nothing else in the file, so close it again... */
221 		if (cfclose(tocFH) != 0)
222 			fatal("could not close TOC file: %m");
223 		ctx->dataFH = NULL;
224 	}
225 }
226 
227 /*
228  * Called by the Archiver when the dumper creates a new TOC entry.
229  *
230  * We determine the filename for this entry.
231 */
232 static void
_ArchiveEntry(ArchiveHandle * AH,TocEntry * te)233 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
234 {
235 	lclTocEntry *tctx;
236 	char		fn[MAXPGPATH];
237 
238 	tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
239 	if (strcmp(te->desc, "BLOBS") == 0)
240 		tctx->filename = pg_strdup("blobs.toc");
241 	else if (te->dataDumper)
242 	{
243 		snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
244 		tctx->filename = pg_strdup(fn);
245 	}
246 	else
247 		tctx->filename = NULL;
248 
249 	te->formatData = (void *) tctx;
250 }
251 
252 /*
253  * Called by the Archiver to save any extra format-related TOC entry
254  * data.
255  *
256  * Use the Archiver routines to write data - they are non-endian, and
257  * maintain other important file information.
258  */
259 static void
_WriteExtraToc(ArchiveHandle * AH,TocEntry * te)260 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
261 {
262 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
263 
264 	/*
265 	 * A dumpable object has set tctx->filename, any other object has not.
266 	 * (see _ArchiveEntry).
267 	 */
268 	if (tctx->filename)
269 		WriteStr(AH, tctx->filename);
270 	else
271 		WriteStr(AH, "");
272 }
273 
274 /*
275  * Called by the Archiver to read any extra format-related TOC data.
276  *
277  * Needs to match the order defined in _WriteExtraToc, and should also
278  * use the Archiver input routines.
279  */
280 static void
_ReadExtraToc(ArchiveHandle * AH,TocEntry * te)281 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
282 {
283 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
284 
285 	if (tctx == NULL)
286 	{
287 		tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
288 		te->formatData = (void *) tctx;
289 	}
290 
291 	tctx->filename = ReadStr(AH);
292 	if (strlen(tctx->filename) == 0)
293 	{
294 		free(tctx->filename);
295 		tctx->filename = NULL;
296 	}
297 }
298 
299 /*
300  * Called by the Archiver when restoring an archive to output a comment
301  * that includes useful information about the TOC entry.
302  */
303 static void
_PrintExtraToc(ArchiveHandle * AH,TocEntry * te)304 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
305 {
306 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
307 
308 	if (AH->public.verbose && tctx->filename)
309 		ahprintf(AH, "-- File: %s\n", tctx->filename);
310 }
311 
312 /*
313  * Called by the archiver when saving TABLE DATA (not schema). This routine
314  * should save whatever format-specific information is needed to read
315  * the archive back.
316  *
317  * It is called just prior to the dumper's 'DataDumper' routine being called.
318  *
319  * We create the data file for writing.
320  */
321 static void
_StartData(ArchiveHandle * AH,TocEntry * te)322 _StartData(ArchiveHandle *AH, TocEntry *te)
323 {
324 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
325 	lclContext *ctx = (lclContext *) AH->formatData;
326 	char		fname[MAXPGPATH];
327 
328 	setFilePath(AH, fname, tctx->filename);
329 
330 	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
331 	if (ctx->dataFH == NULL)
332 		fatal("could not open output file \"%s\": %m", fname);
333 }
334 
335 /*
336  * Called by archiver when dumper calls WriteData. This routine is
337  * called for both BLOB and TABLE data; it is the responsibility of
338  * the format to manage each kind of data using StartBlob/StartData.
339  *
340  * It should only be called from within a DataDumper routine.
341  *
342  * We write the data to the open data file.
343  */
344 static void
_WriteData(ArchiveHandle * AH,const void * data,size_t dLen)345 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
346 {
347 	lclContext *ctx = (lclContext *) AH->formatData;
348 
349 	errno = 0;
350 	if (dLen > 0 && cfwrite(data, dLen, ctx->dataFH) != dLen)
351 	{
352 		/* if write didn't set errno, assume problem is no disk space */
353 		if (errno == 0)
354 			errno = ENOSPC;
355 		fatal("could not write to output file: %s",
356 			  get_cfp_error(ctx->dataFH));
357 	}
358 }
359 
360 /*
361  * Called by the archiver when a dumper's 'DataDumper' routine has
362  * finished.
363  *
364  * We close the data file.
365  */
366 static void
_EndData(ArchiveHandle * AH,TocEntry * te)367 _EndData(ArchiveHandle *AH, TocEntry *te)
368 {
369 	lclContext *ctx = (lclContext *) AH->formatData;
370 
371 	/* Close the file */
372 	cfclose(ctx->dataFH);
373 
374 	ctx->dataFH = NULL;
375 }
376 
377 /*
378  * Print data for a given file (can be a BLOB as well)
379  */
380 static void
_PrintFileData(ArchiveHandle * AH,char * filename)381 _PrintFileData(ArchiveHandle *AH, char *filename)
382 {
383 	size_t		cnt;
384 	char	   *buf;
385 	size_t		buflen;
386 	cfp		   *cfp;
387 
388 	if (!filename)
389 		return;
390 
391 	cfp = cfopen_read(filename, PG_BINARY_R);
392 
393 	if (!cfp)
394 		fatal("could not open input file \"%s\": %m", filename);
395 
396 	buf = pg_malloc(ZLIB_OUT_SIZE);
397 	buflen = ZLIB_OUT_SIZE;
398 
399 	while ((cnt = cfread(buf, buflen, cfp)))
400 	{
401 		ahwrite(buf, 1, cnt, AH);
402 	}
403 
404 	free(buf);
405 	if (cfclose(cfp) != 0)
406 		fatal("could not close data file \"%s\": %m", filename);
407 }
408 
409 /*
410  * Print data for a given TOC entry
411 */
412 static void
_PrintTocData(ArchiveHandle * AH,TocEntry * te)413 _PrintTocData(ArchiveHandle *AH, TocEntry *te)
414 {
415 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
416 
417 	if (!tctx->filename)
418 		return;
419 
420 	if (strcmp(te->desc, "BLOBS") == 0)
421 		_LoadBlobs(AH);
422 	else
423 	{
424 		char		fname[MAXPGPATH];
425 
426 		setFilePath(AH, fname, tctx->filename);
427 		_PrintFileData(AH, fname);
428 	}
429 }
430 
431 static void
_LoadBlobs(ArchiveHandle * AH)432 _LoadBlobs(ArchiveHandle *AH)
433 {
434 	Oid			oid;
435 	lclContext *ctx = (lclContext *) AH->formatData;
436 	char		tocfname[MAXPGPATH];
437 	char		line[MAXPGPATH];
438 
439 	StartRestoreBlobs(AH);
440 
441 	setFilePath(AH, tocfname, "blobs.toc");
442 
443 	ctx->blobsTocFH = cfopen_read(tocfname, PG_BINARY_R);
444 
445 	if (ctx->blobsTocFH == NULL)
446 		fatal("could not open large object TOC file \"%s\" for input: %m",
447 			  tocfname);
448 
449 	/* Read the blobs TOC file line-by-line, and process each blob */
450 	while ((cfgets(ctx->blobsTocFH, line, MAXPGPATH)) != NULL)
451 	{
452 		char		blobfname[MAXPGPATH + 1];
453 		char		path[MAXPGPATH];
454 
455 		/* Can't overflow because line and blobfname are the same length */
456 		if (sscanf(line, "%u %" CppAsString2(MAXPGPATH) "s\n", &oid, blobfname) != 2)
457 			fatal("invalid line in large object TOC file \"%s\": \"%s\"",
458 				  tocfname, line);
459 
460 		StartRestoreBlob(AH, oid, AH->public.ropt->dropSchema);
461 		snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, blobfname);
462 		_PrintFileData(AH, path);
463 		EndRestoreBlob(AH, oid);
464 	}
465 	if (!cfeof(ctx->blobsTocFH))
466 		fatal("error reading large object TOC file \"%s\"",
467 			  tocfname);
468 
469 	if (cfclose(ctx->blobsTocFH) != 0)
470 		fatal("could not close large object TOC file \"%s\": %m",
471 			  tocfname);
472 
473 	ctx->blobsTocFH = NULL;
474 
475 	EndRestoreBlobs(AH);
476 }
477 
478 
479 /*
480  * Write a byte of data to the archive.
481  * Called by the archiver to do integer & byte output to the archive.
482  * These routines are only used to read & write the headers & TOC.
483  */
484 static int
_WriteByte(ArchiveHandle * AH,const int i)485 _WriteByte(ArchiveHandle *AH, const int i)
486 {
487 	unsigned char c = (unsigned char) i;
488 	lclContext *ctx = (lclContext *) AH->formatData;
489 
490 	errno = 0;
491 	if (cfwrite(&c, 1, ctx->dataFH) != 1)
492 	{
493 		/* if write didn't set errno, assume problem is no disk space */
494 		if (errno == 0)
495 			errno = ENOSPC;
496 		fatal("could not write to output file: %s",
497 			  get_cfp_error(ctx->dataFH));
498 	}
499 
500 	return 1;
501 }
502 
503 /*
504  * Read a byte of data from the archive.
505  * Called by the archiver to read bytes & integers from the archive.
506  * These routines are only used to read & write headers & TOC.
507  * EOF should be treated as a fatal error.
508  */
509 static int
_ReadByte(ArchiveHandle * AH)510 _ReadByte(ArchiveHandle *AH)
511 {
512 	lclContext *ctx = (lclContext *) AH->formatData;
513 
514 	return cfgetc(ctx->dataFH);
515 }
516 
517 /*
518  * Write a buffer of data to the archive.
519  * Called by the archiver to write a block of bytes to the TOC or a data file.
520  */
521 static void
_WriteBuf(ArchiveHandle * AH,const void * buf,size_t len)522 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
523 {
524 	lclContext *ctx = (lclContext *) AH->formatData;
525 
526 	errno = 0;
527 	if (cfwrite(buf, len, ctx->dataFH) != len)
528 	{
529 		/* if write didn't set errno, assume problem is no disk space */
530 		if (errno == 0)
531 			errno = ENOSPC;
532 		fatal("could not write to output file: %s",
533 			  get_cfp_error(ctx->dataFH));
534 	}
535 }
536 
537 /*
538  * Read a block of bytes from the archive.
539  *
540  * Called by the archiver to read a block of bytes from the archive
541  */
542 static void
_ReadBuf(ArchiveHandle * AH,void * buf,size_t len)543 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
544 {
545 	lclContext *ctx = (lclContext *) AH->formatData;
546 
547 	/*
548 	 * If there was an I/O error, we already exited in cfread(), so here we
549 	 * exit on short reads.
550 	 */
551 	if (cfread(buf, len, ctx->dataFH) != len)
552 		fatal("could not read from input file: end of file");
553 }
554 
555 /*
556  * Close the archive.
557  *
558  * When writing the archive, this is the routine that actually starts
559  * the process of saving it to files. No data should be written prior
560  * to this point, since the user could sort the TOC after creating it.
561  *
562  * If an archive is to be written, this routine must call:
563  *		WriteHead			to save the archive header
564  *		WriteToc			to save the TOC entries
565  *		WriteDataChunks		to save all DATA & BLOBs.
566  */
567 static void
_CloseArchive(ArchiveHandle * AH)568 _CloseArchive(ArchiveHandle *AH)
569 {
570 	lclContext *ctx = (lclContext *) AH->formatData;
571 
572 	if (AH->mode == archModeWrite)
573 	{
574 		cfp		   *tocFH;
575 		char		fname[MAXPGPATH];
576 
577 		setFilePath(AH, fname, "toc.dat");
578 
579 		/* this will actually fork the processes for a parallel backup */
580 		ctx->pstate = ParallelBackupStart(AH);
581 
582 		/* The TOC is always created uncompressed */
583 		tocFH = cfopen_write(fname, PG_BINARY_W, 0);
584 		if (tocFH == NULL)
585 			fatal("could not open output file \"%s\": %m", fname);
586 		ctx->dataFH = tocFH;
587 
588 		/*
589 		 * Write 'tar' in the format field of the toc.dat file. The directory
590 		 * is compatible with 'tar', so there's no point having a different
591 		 * format code for it.
592 		 */
593 		AH->format = archTar;
594 		WriteHead(AH);
595 		AH->format = archDirectory;
596 		WriteToc(AH);
597 		if (cfclose(tocFH) != 0)
598 			fatal("could not close TOC file: %m");
599 		WriteDataChunks(AH, ctx->pstate);
600 
601 		ParallelBackupEnd(AH, ctx->pstate);
602 
603 		/*
604 		 * In directory mode, there is no need to sync all the entries
605 		 * individually. Just recurse once through all the files generated.
606 		 */
607 		if (AH->dosync)
608 			fsync_dir_recurse(ctx->directory);
609 	}
610 	AH->FH = NULL;
611 }
612 
613 /*
614  * Reopen the archive's file handle.
615  */
616 static void
_ReopenArchive(ArchiveHandle * AH)617 _ReopenArchive(ArchiveHandle *AH)
618 {
619 	/*
620 	 * Our TOC is in memory, our data files are opened by each child anyway as
621 	 * they are separate. We support reopening the archive by just doing
622 	 * nothing.
623 	 */
624 }
625 
626 /*
627  * BLOB support
628  */
629 
630 /*
631  * Called by the archiver when starting to save all BLOB DATA (not schema).
632  * It is called just prior to the dumper's DataDumper routine.
633  *
634  * We open the large object TOC file here, so that we can append a line to
635  * it for each blob.
636  */
637 static void
_StartBlobs(ArchiveHandle * AH,TocEntry * te)638 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
639 {
640 	lclContext *ctx = (lclContext *) AH->formatData;
641 	char		fname[MAXPGPATH];
642 
643 	setFilePath(AH, fname, "blobs.toc");
644 
645 	/* The blob TOC file is never compressed */
646 	ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
647 	if (ctx->blobsTocFH == NULL)
648 		fatal("could not open output file \"%s\": %m", fname);
649 }
650 
651 /*
652  * Called by the archiver when we're about to start dumping a blob.
653  *
654  * We create a file to write the blob to.
655  */
656 static void
_StartBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)657 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
658 {
659 	lclContext *ctx = (lclContext *) AH->formatData;
660 	char		fname[MAXPGPATH];
661 
662 	snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
663 
664 	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
665 
666 	if (ctx->dataFH == NULL)
667 		fatal("could not open output file \"%s\": %m", fname);
668 }
669 
670 /*
671  * Called by the archiver when the dumper is finished writing a blob.
672  *
673  * We close the blob file and write an entry to the blob TOC file for it.
674  */
675 static void
_EndBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)676 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
677 {
678 	lclContext *ctx = (lclContext *) AH->formatData;
679 	char		buf[50];
680 	int			len;
681 
682 	/* Close the BLOB data file itself */
683 	cfclose(ctx->dataFH);
684 	ctx->dataFH = NULL;
685 
686 	/* register the blob in blobs.toc */
687 	len = snprintf(buf, sizeof(buf), "%u blob_%u.dat\n", oid, oid);
688 	if (cfwrite(buf, len, ctx->blobsTocFH) != len)
689 		fatal("could not write to blobs TOC file");
690 }
691 
692 /*
693  * Called by the archiver when finishing saving all BLOB DATA.
694  *
695  * We close the blobs TOC file.
696  */
697 static void
_EndBlobs(ArchiveHandle * AH,TocEntry * te)698 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
699 {
700 	lclContext *ctx = (lclContext *) AH->formatData;
701 
702 	cfclose(ctx->blobsTocFH);
703 	ctx->blobsTocFH = NULL;
704 }
705 
706 /*
707  * Gets a relative file name and prepends the output directory, writing the
708  * result to buf. The caller needs to make sure that buf is MAXPGPATH bytes
709  * big. Can't use a static char[MAXPGPATH] inside the function because we run
710  * multithreaded on Windows.
711  */
712 static void
setFilePath(ArchiveHandle * AH,char * buf,const char * relativeFilename)713 setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
714 {
715 	lclContext *ctx = (lclContext *) AH->formatData;
716 	char	   *dname;
717 
718 	dname = ctx->directory;
719 
720 	if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
721 		fatal("file name too long: \"%s\"", dname);
722 
723 	strcpy(buf, dname);
724 	strcat(buf, "/");
725 	strcat(buf, relativeFilename);
726 }
727 
728 /*
729  * Prepare for parallel restore.
730  *
731  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
732  * TOC entries' dataLength fields with appropriate values to guide the
733  * ordering of restore jobs.  The source of said data is format-dependent,
734  * as is the exact meaning of the values.
735  *
736  * A format module might also choose to do other setup here.
737  */
738 static void
_PrepParallelRestore(ArchiveHandle * AH)739 _PrepParallelRestore(ArchiveHandle *AH)
740 {
741 	TocEntry   *te;
742 
743 	for (te = AH->toc->next; te != AH->toc; te = te->next)
744 	{
745 		lclTocEntry *tctx = (lclTocEntry *) te->formatData;
746 		char		fname[MAXPGPATH];
747 		struct stat st;
748 
749 		/*
750 		 * A dumpable object has set tctx->filename, any other object has not.
751 		 * (see _ArchiveEntry).
752 		 */
753 		if (tctx->filename == NULL)
754 			continue;
755 
756 		/* We may ignore items not due to be restored */
757 		if ((te->reqs & REQ_DATA) == 0)
758 			continue;
759 
760 		/*
761 		 * Stat the file and, if successful, put its size in dataLength.  When
762 		 * using compression, the physical file size might not be a very good
763 		 * guide to the amount of work involved in restoring the file, but we
764 		 * only need an approximate indicator of that.
765 		 */
766 		setFilePath(AH, fname, tctx->filename);
767 
768 		if (stat(fname, &st) == 0)
769 			te->dataLength = st.st_size;
770 		else
771 		{
772 			/* It might be compressed */
773 			strlcat(fname, ".gz", sizeof(fname));
774 			if (stat(fname, &st) == 0)
775 				te->dataLength = st.st_size;
776 		}
777 
778 		/*
779 		 * If this is the BLOBS entry, what we stat'd was blobs.toc, which
780 		 * most likely is a lot smaller than the actual blob data.  We don't
781 		 * have a cheap way to estimate how much smaller, but fortunately it
782 		 * doesn't matter too much as long as we get the blobs processed
783 		 * reasonably early.  Arbitrarily scale up by a factor of 1K.
784 		 */
785 		if (strcmp(te->desc, "BLOBS") == 0)
786 			te->dataLength *= 1024;
787 	}
788 }
789 
790 /*
791  * Clone format-specific fields during parallel restoration.
792  */
793 static void
_Clone(ArchiveHandle * AH)794 _Clone(ArchiveHandle *AH)
795 {
796 	lclContext *ctx = (lclContext *) AH->formatData;
797 
798 	AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
799 	memcpy(AH->formatData, ctx, sizeof(lclContext));
800 	ctx = (lclContext *) AH->formatData;
801 
802 	/*
803 	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
804 	 * entry per archive, so no parallelism is possible.  Likewise,
805 	 * TOC-entry-local state isn't an issue because any one TOC entry is
806 	 * touched by just one worker child.
807 	 */
808 
809 	/*
810 	 * We also don't copy the ParallelState pointer (pstate), only the leader
811 	 * process ever writes to it.
812 	 */
813 }
814 
815 static void
_DeClone(ArchiveHandle * AH)816 _DeClone(ArchiveHandle *AH)
817 {
818 	lclContext *ctx = (lclContext *) AH->formatData;
819 
820 	free(ctx);
821 }
822 
823 /*
824  * This function is executed in the child of a parallel backup for a
825  * directory-format archive and dumps the actual data for one TOC entry.
826  */
827 static int
_WorkerJobDumpDirectory(ArchiveHandle * AH,TocEntry * te)828 _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
829 {
830 	/*
831 	 * This function returns void. We either fail and die horribly or
832 	 * succeed... A failure will be detected by the parent when the child dies
833 	 * unexpectedly.
834 	 */
835 	WriteDataChunksForTocEntry(AH, te);
836 
837 	return 0;
838 }
839 
840 /*
841  * This function is executed in the child of a parallel restore from a
842  * directory-format archive and restores the actual data for one TOC entry.
843  */
844 static int
_WorkerJobRestoreDirectory(ArchiveHandle * AH,TocEntry * te)845 _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
846 {
847 	return parallel_restore(AH, te);
848 }
849