1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_custom.c
4  *
5  *	Implements the custom output format.
6  *
7  *	The comments with the routines in this code are a good place to
8  *	understand how to write a new format.
9  *
10  *	See the headers to pg_restore for more details.
11  *
12  * Copyright (c) 2000, Philip Warner
13  *		Rights are granted to use this software in any way so long
14  *		as this notice is not removed.
15  *
16  *	The author is not responsible for loss or damages that may
17  *	and any liability will be limited to the time taken to fix any
18  *	related bug.
19  *
20  *
21  * IDENTIFICATION
22  *		src/bin/pg_dump/pg_backup_custom.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres_fe.h"
27 
28 #include "common/file_utils.h"
29 #include "compress_io.h"
30 #include "parallel.h"
31 #include "pg_backup_utils.h"
32 
33 /*--------
34  * Routines in the format interface
35  *--------
36  */
37 
38 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 static int	_WriteByte(ArchiveHandle *AH, const int i);
43 static int	_ReadByte(ArchiveHandle *);
44 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 static void _CloseArchive(ArchiveHandle *AH);
47 static void _ReopenArchive(ArchiveHandle *AH);
48 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52 
53 static void _PrintData(ArchiveHandle *AH);
54 static void _skipData(ArchiveHandle *AH);
55 static void _skipBlobs(ArchiveHandle *AH);
56 
57 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
58 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
61 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
62 
63 static void _PrepParallelRestore(ArchiveHandle *AH);
64 static void _Clone(ArchiveHandle *AH);
65 static void _DeClone(ArchiveHandle *AH);
66 
67 static int	_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
68 
69 typedef struct
70 {
71 	CompressorState *cs;
72 	int			hasSeek;
73 	/* lastFilePos is used only when reading, and may be invalid if !hasSeek */
74 	pgoff_t		lastFilePos;	/* position after last data block we've read */
75 } lclContext;
76 
77 typedef struct
78 {
79 	int			dataState;
80 	pgoff_t		dataPos;		/* valid only if dataState=K_OFFSET_POS_SET */
81 } lclTocEntry;
82 
83 
84 /*------
85  * Static declarations
86  *------
87  */
88 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
89 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
90 
91 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
92 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
93 
94 
95 /*
96  *	Init routine required by ALL formats. This is a global routine
97  *	and should be declared in pg_backup_archiver.h
98  *
99  *	It's task is to create any extra archive context (using AH->formatData),
100  *	and to initialize the supported function pointers.
101  *
102  *	It should also prepare whatever it's input source is for reading/writing,
103  *	and in the case of a read mode connection, it should load the Header & TOC.
104  */
105 void
InitArchiveFmt_Custom(ArchiveHandle * AH)106 InitArchiveFmt_Custom(ArchiveHandle *AH)
107 {
108 	lclContext *ctx;
109 
110 	/* Assuming static functions, this can be copied for each format. */
111 	AH->ArchiveEntryPtr = _ArchiveEntry;
112 	AH->StartDataPtr = _StartData;
113 	AH->WriteDataPtr = _WriteData;
114 	AH->EndDataPtr = _EndData;
115 	AH->WriteBytePtr = _WriteByte;
116 	AH->ReadBytePtr = _ReadByte;
117 	AH->WriteBufPtr = _WriteBuf;
118 	AH->ReadBufPtr = _ReadBuf;
119 	AH->ClosePtr = _CloseArchive;
120 	AH->ReopenPtr = _ReopenArchive;
121 	AH->PrintTocDataPtr = _PrintTocData;
122 	AH->ReadExtraTocPtr = _ReadExtraToc;
123 	AH->WriteExtraTocPtr = _WriteExtraToc;
124 	AH->PrintExtraTocPtr = _PrintExtraToc;
125 
126 	AH->StartBlobsPtr = _StartBlobs;
127 	AH->StartBlobPtr = _StartBlob;
128 	AH->EndBlobPtr = _EndBlob;
129 	AH->EndBlobsPtr = _EndBlobs;
130 
131 	AH->PrepParallelRestorePtr = _PrepParallelRestore;
132 	AH->ClonePtr = _Clone;
133 	AH->DeClonePtr = _DeClone;
134 
135 	/* no parallel dump in the custom archive, only parallel restore */
136 	AH->WorkerJobDumpPtr = NULL;
137 	AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
138 
139 	/* Set up a private area. */
140 	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
141 	AH->formatData = (void *) ctx;
142 
143 	/* Initialize LO buffering */
144 	AH->lo_buf_size = LOBBUFSIZE;
145 	AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
146 
147 	/*
148 	 * Now open the file
149 	 */
150 	if (AH->mode == archModeWrite)
151 	{
152 		if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
153 		{
154 			AH->FH = fopen(AH->fSpec, PG_BINARY_W);
155 			if (!AH->FH)
156 				fatal("could not open output file \"%s\": %m", AH->fSpec);
157 		}
158 		else
159 		{
160 			AH->FH = stdout;
161 			if (!AH->FH)
162 				fatal("could not open output file: %m");
163 		}
164 
165 		ctx->hasSeek = checkSeek(AH->FH);
166 	}
167 	else
168 	{
169 		if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
170 		{
171 			AH->FH = fopen(AH->fSpec, PG_BINARY_R);
172 			if (!AH->FH)
173 				fatal("could not open input file \"%s\": %m", AH->fSpec);
174 		}
175 		else
176 		{
177 			AH->FH = stdin;
178 			if (!AH->FH)
179 				fatal("could not open input file: %m");
180 		}
181 
182 		ctx->hasSeek = checkSeek(AH->FH);
183 
184 		ReadHead(AH);
185 		ReadToc(AH);
186 
187 		/*
188 		 * Remember location of first data block (i.e., the point after TOC)
189 		 * in case we have to search for desired data blocks.
190 		 */
191 		ctx->lastFilePos = _getFilePos(AH, ctx);
192 	}
193 }
194 
195 /*
196  * Called by the Archiver when the dumper creates a new TOC entry.
197  *
198  * Optional.
199  *
200  * Set up extract format-related TOC data.
201 */
202 static void
_ArchiveEntry(ArchiveHandle * AH,TocEntry * te)203 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
204 {
205 	lclTocEntry *ctx;
206 
207 	ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
208 	if (te->dataDumper)
209 		ctx->dataState = K_OFFSET_POS_NOT_SET;
210 	else
211 		ctx->dataState = K_OFFSET_NO_DATA;
212 
213 	te->formatData = (void *) ctx;
214 }
215 
216 /*
217  * Called by the Archiver to save any extra format-related TOC entry
218  * data.
219  *
220  * Optional.
221  *
222  * Use the Archiver routines to write data - they are non-endian, and
223  * maintain other important file information.
224  */
225 static void
_WriteExtraToc(ArchiveHandle * AH,TocEntry * te)226 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
227 {
228 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
229 
230 	WriteOffset(AH, ctx->dataPos, ctx->dataState);
231 }
232 
233 /*
234  * Called by the Archiver to read any extra format-related TOC data.
235  *
236  * Optional.
237  *
238  * Needs to match the order defined in _WriteExtraToc, and should also
239  * use the Archiver input routines.
240  */
241 static void
_ReadExtraToc(ArchiveHandle * AH,TocEntry * te)242 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
243 {
244 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
245 
246 	if (ctx == NULL)
247 	{
248 		ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
249 		te->formatData = (void *) ctx;
250 	}
251 
252 	ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
253 
254 	/*
255 	 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
256 	 * dump it at all.
257 	 */
258 	if (AH->version < K_VERS_1_7)
259 		ReadInt(AH);
260 }
261 
262 /*
263  * Called by the Archiver when restoring an archive to output a comment
264  * that includes useful information about the TOC entry.
265  *
266  * Optional.
267  *
268  */
269 static void
_PrintExtraToc(ArchiveHandle * AH,TocEntry * te)270 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
271 {
272 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
273 
274 	if (AH->public.verbose)
275 		ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
276 				 (int64) ctx->dataPos);
277 }
278 
279 /*
280  * Called by the archiver when saving TABLE DATA (not schema). This routine
281  * should save whatever format-specific information is needed to read
282  * the archive back.
283  *
284  * It is called just prior to the dumper's 'DataDumper' routine being called.
285  *
286  * Optional, but strongly recommended.
287  *
288  */
289 static void
_StartData(ArchiveHandle * AH,TocEntry * te)290 _StartData(ArchiveHandle *AH, TocEntry *te)
291 {
292 	lclContext *ctx = (lclContext *) AH->formatData;
293 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
294 
295 	tctx->dataPos = _getFilePos(AH, ctx);
296 	if (tctx->dataPos >= 0)
297 		tctx->dataState = K_OFFSET_POS_SET;
298 
299 	_WriteByte(AH, BLK_DATA);	/* Block type */
300 	WriteInt(AH, te->dumpId);	/* For sanity check */
301 
302 	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
303 }
304 
305 /*
306  * Called by archiver when dumper calls WriteData. This routine is
307  * called for both BLOB and TABLE data; it is the responsibility of
308  * the format to manage each kind of data using StartBlob/StartData.
309  *
310  * It should only be called from within a DataDumper routine.
311  *
312  * Mandatory.
313  */
314 static void
_WriteData(ArchiveHandle * AH,const void * data,size_t dLen)315 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
316 {
317 	lclContext *ctx = (lclContext *) AH->formatData;
318 	CompressorState *cs = ctx->cs;
319 
320 	if (dLen > 0)
321 		/* WriteDataToArchive() internally throws write errors */
322 		WriteDataToArchive(AH, cs, data, dLen);
323 }
324 
325 /*
326  * Called by the archiver when a dumper's 'DataDumper' routine has
327  * finished.
328  *
329  * Optional.
330  *
331  */
332 static void
_EndData(ArchiveHandle * AH,TocEntry * te)333 _EndData(ArchiveHandle *AH, TocEntry *te)
334 {
335 	lclContext *ctx = (lclContext *) AH->formatData;
336 
337 	EndCompressor(AH, ctx->cs);
338 	/* Send the end marker */
339 	WriteInt(AH, 0);
340 }
341 
342 /*
343  * Called by the archiver when starting to save all BLOB DATA (not schema).
344  * This routine should save whatever format-specific information is needed
345  * to read the BLOBs back into memory.
346  *
347  * It is called just prior to the dumper's DataDumper routine.
348  *
349  * Optional, but strongly recommended.
350  */
351 static void
_StartBlobs(ArchiveHandle * AH,TocEntry * te)352 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
353 {
354 	lclContext *ctx = (lclContext *) AH->formatData;
355 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
356 
357 	tctx->dataPos = _getFilePos(AH, ctx);
358 	if (tctx->dataPos >= 0)
359 		tctx->dataState = K_OFFSET_POS_SET;
360 
361 	_WriteByte(AH, BLK_BLOBS);	/* Block type */
362 	WriteInt(AH, te->dumpId);	/* For sanity check */
363 }
364 
365 /*
366  * Called by the archiver when the dumper calls StartBlob.
367  *
368  * Mandatory.
369  *
370  * Must save the passed OID for retrieval at restore-time.
371  */
372 static void
_StartBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)373 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
374 {
375 	lclContext *ctx = (lclContext *) AH->formatData;
376 
377 	if (oid == 0)
378 		fatal("invalid OID for large object");
379 
380 	WriteInt(AH, oid);
381 
382 	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
383 }
384 
385 /*
386  * Called by the archiver when the dumper calls EndBlob.
387  *
388  * Optional.
389  */
390 static void
_EndBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)391 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
392 {
393 	lclContext *ctx = (lclContext *) AH->formatData;
394 
395 	EndCompressor(AH, ctx->cs);
396 	/* Send the end marker */
397 	WriteInt(AH, 0);
398 }
399 
400 /*
401  * Called by the archiver when finishing saving all BLOB DATA.
402  *
403  * Optional.
404  */
405 static void
_EndBlobs(ArchiveHandle * AH,TocEntry * te)406 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
407 {
408 	/* Write out a fake zero OID to mark end-of-blobs. */
409 	WriteInt(AH, 0);
410 }
411 
412 /*
413  * Print data for a given TOC entry
414  */
415 static void
_PrintTocData(ArchiveHandle * AH,TocEntry * te)416 _PrintTocData(ArchiveHandle *AH, TocEntry *te)
417 {
418 	lclContext *ctx = (lclContext *) AH->formatData;
419 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
420 	int			blkType;
421 	int			id;
422 
423 	if (tctx->dataState == K_OFFSET_NO_DATA)
424 		return;
425 
426 	if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
427 	{
428 		/*
429 		 * We cannot seek directly to the desired block.  Instead, skip over
430 		 * block headers until we find the one we want.  Remember the
431 		 * positions of skipped-over blocks, so that if we later decide we
432 		 * need to read one, we'll be able to seek to it.
433 		 *
434 		 * When our input file is seekable, we can do the search starting from
435 		 * the point after the last data block we scanned in previous
436 		 * iterations of this function.
437 		 */
438 		if (ctx->hasSeek)
439 		{
440 			if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
441 				fatal("error during file seek: %m");
442 		}
443 
444 		for (;;)
445 		{
446 			pgoff_t		thisBlkPos = _getFilePos(AH, ctx);
447 
448 			_readBlockHeader(AH, &blkType, &id);
449 
450 			if (blkType == EOF || id == te->dumpId)
451 				break;
452 
453 			/* Remember the block position, if we got one */
454 			if (thisBlkPos >= 0)
455 			{
456 				TocEntry   *otherte = getTocEntryByDumpId(AH, id);
457 
458 				if (otherte && otherte->formatData)
459 				{
460 					lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
461 
462 					/*
463 					 * Note: on Windows, multiple threads might access/update
464 					 * the same lclTocEntry concurrently, but that should be
465 					 * safe as long as we update dataPos before dataState.
466 					 * Ideally, we'd use pg_write_barrier() to enforce that,
467 					 * but the needed infrastructure doesn't exist in frontend
468 					 * code.  But Windows only runs on machines with strong
469 					 * store ordering, so it should be okay for now.
470 					 */
471 					if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
472 					{
473 						othertctx->dataPos = thisBlkPos;
474 						othertctx->dataState = K_OFFSET_POS_SET;
475 					}
476 					else if (othertctx->dataPos != thisBlkPos ||
477 							 othertctx->dataState != K_OFFSET_POS_SET)
478 					{
479 						/* sanity check */
480 						pg_log_warning("data block %d has wrong seek position",
481 									   id);
482 					}
483 				}
484 			}
485 
486 			switch (blkType)
487 			{
488 				case BLK_DATA:
489 					_skipData(AH);
490 					break;
491 
492 				case BLK_BLOBS:
493 					_skipBlobs(AH);
494 					break;
495 
496 				default:		/* Always have a default */
497 					fatal("unrecognized data block type (%d) while searching archive",
498 						  blkType);
499 					break;
500 			}
501 		}
502 	}
503 	else
504 	{
505 		/* We can just seek to the place we need to be. */
506 		if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
507 			fatal("error during file seek: %m");
508 
509 		_readBlockHeader(AH, &blkType, &id);
510 	}
511 
512 	/*
513 	 * If we reached EOF without finding the block we want, then either it
514 	 * doesn't exist, or it does but we lack the ability to seek back to it.
515 	 */
516 	if (blkType == EOF)
517 	{
518 		if (!ctx->hasSeek)
519 			fatal("could not find block ID %d in archive -- "
520 				  "possibly due to out-of-order restore request, "
521 				  "which cannot be handled due to non-seekable input file",
522 				  te->dumpId);
523 		else
524 			fatal("could not find block ID %d in archive -- "
525 				  "possibly corrupt archive",
526 				  te->dumpId);
527 	}
528 
529 	/* Are we sane? */
530 	if (id != te->dumpId)
531 		fatal("found unexpected block ID (%d) when reading data -- expected %d",
532 			  id, te->dumpId);
533 
534 	switch (blkType)
535 	{
536 		case BLK_DATA:
537 			_PrintData(AH);
538 			break;
539 
540 		case BLK_BLOBS:
541 			_LoadBlobs(AH, AH->public.ropt->dropSchema);
542 			break;
543 
544 		default:				/* Always have a default */
545 			fatal("unrecognized data block type %d while restoring archive",
546 				  blkType);
547 			break;
548 	}
549 
550 	/*
551 	 * If our input file is seekable but lacks data offsets, update our
552 	 * knowledge of where to start future searches from.  (Note that we did
553 	 * not update the current TE's dataState/dataPos.  We could have, but
554 	 * there is no point since it will not be visited again.)
555 	 */
556 	if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
557 	{
558 		pgoff_t		curPos = _getFilePos(AH, ctx);
559 
560 		if (curPos > ctx->lastFilePos)
561 			ctx->lastFilePos = curPos;
562 	}
563 }
564 
565 /*
566  * Print data from current file position.
567 */
568 static void
_PrintData(ArchiveHandle * AH)569 _PrintData(ArchiveHandle *AH)
570 {
571 	ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
572 }
573 
574 static void
_LoadBlobs(ArchiveHandle * AH,bool drop)575 _LoadBlobs(ArchiveHandle *AH, bool drop)
576 {
577 	Oid			oid;
578 
579 	StartRestoreBlobs(AH);
580 
581 	oid = ReadInt(AH);
582 	while (oid != 0)
583 	{
584 		StartRestoreBlob(AH, oid, drop);
585 		_PrintData(AH);
586 		EndRestoreBlob(AH, oid);
587 		oid = ReadInt(AH);
588 	}
589 
590 	EndRestoreBlobs(AH);
591 }
592 
593 /*
594  * Skip the BLOBs from the current file position.
595  * BLOBS are written sequentially as data blocks (see below).
596  * Each BLOB is preceded by it's original OID.
597  * A zero OID indicated the end of the BLOBS
598  */
599 static void
_skipBlobs(ArchiveHandle * AH)600 _skipBlobs(ArchiveHandle *AH)
601 {
602 	Oid			oid;
603 
604 	oid = ReadInt(AH);
605 	while (oid != 0)
606 	{
607 		_skipData(AH);
608 		oid = ReadInt(AH);
609 	}
610 }
611 
612 /*
613  * Skip data from current file position.
614  * Data blocks are formatted as an integer length, followed by data.
615  * A zero length denoted the end of the block.
616 */
617 static void
_skipData(ArchiveHandle * AH)618 _skipData(ArchiveHandle *AH)
619 {
620 	lclContext *ctx = (lclContext *) AH->formatData;
621 	size_t		blkLen;
622 	char	   *buf = NULL;
623 	int			buflen = 0;
624 	size_t		cnt;
625 
626 	blkLen = ReadInt(AH);
627 	while (blkLen != 0)
628 	{
629 		if (ctx->hasSeek)
630 		{
631 			if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
632 				fatal("error during file seek: %m");
633 		}
634 		else
635 		{
636 			if (blkLen > buflen)
637 			{
638 				if (buf)
639 					free(buf);
640 				buf = (char *) pg_malloc(blkLen);
641 				buflen = blkLen;
642 			}
643 			if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
644 			{
645 				if (feof(AH->FH))
646 					fatal("could not read from input file: end of file");
647 				else
648 					fatal("could not read from input file: %m");
649 			}
650 		}
651 
652 		blkLen = ReadInt(AH);
653 	}
654 
655 	if (buf)
656 		free(buf);
657 }
658 
659 /*
660  * Write a byte of data to the archive.
661  *
662  * Mandatory.
663  *
664  * Called by the archiver to do integer & byte output to the archive.
665  */
666 static int
_WriteByte(ArchiveHandle * AH,const int i)667 _WriteByte(ArchiveHandle *AH, const int i)
668 {
669 	int			res;
670 
671 	if ((res = fputc(i, AH->FH)) == EOF)
672 		WRITE_ERROR_EXIT;
673 
674 	return 1;
675 }
676 
677 /*
678  * Read a byte of data from the archive.
679  *
680  * Mandatory
681  *
682  * Called by the archiver to read bytes & integers from the archive.
683  * EOF should be treated as a fatal error.
684  */
685 static int
_ReadByte(ArchiveHandle * AH)686 _ReadByte(ArchiveHandle *AH)
687 {
688 	int			res;
689 
690 	res = getc(AH->FH);
691 	if (res == EOF)
692 		READ_ERROR_EXIT(AH->FH);
693 	return res;
694 }
695 
696 /*
697  * Write a buffer of data to the archive.
698  *
699  * Mandatory.
700  *
701  * Called by the archiver to write a block of bytes to the archive.
702  */
703 static void
_WriteBuf(ArchiveHandle * AH,const void * buf,size_t len)704 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
705 {
706 	if (fwrite(buf, 1, len, AH->FH) != len)
707 		WRITE_ERROR_EXIT;
708 }
709 
710 /*
711  * Read a block of bytes from the archive.
712  *
713  * Mandatory.
714  *
715  * Called by the archiver to read a block of bytes from the archive
716  */
717 static void
_ReadBuf(ArchiveHandle * AH,void * buf,size_t len)718 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
719 {
720 	if (fread(buf, 1, len, AH->FH) != len)
721 		READ_ERROR_EXIT(AH->FH);
722 }
723 
724 /*
725  * Close the archive.
726  *
727  * Mandatory.
728  *
729  * When writing the archive, this is the routine that actually starts
730  * the process of saving it to files. No data should be written prior
731  * to this point, since the user could sort the TOC after creating it.
732  *
733  * If an archive is to be written, this routine must call:
734  *		WriteHead			to save the archive header
735  *		WriteToc			to save the TOC entries
736  *		WriteDataChunks		to save all DATA & BLOBs.
737  *
738  */
739 static void
_CloseArchive(ArchiveHandle * AH)740 _CloseArchive(ArchiveHandle *AH)
741 {
742 	lclContext *ctx = (lclContext *) AH->formatData;
743 	pgoff_t		tpos;
744 
745 	if (AH->mode == archModeWrite)
746 	{
747 		WriteHead(AH);
748 		/* Remember TOC's seek position for use below */
749 		tpos = ftello(AH->FH);
750 		if (tpos < 0 && ctx->hasSeek)
751 			fatal("could not determine seek position in archive file: %m");
752 		WriteToc(AH);
753 		WriteDataChunks(AH, NULL);
754 
755 		/*
756 		 * If possible, re-write the TOC in order to update the data offset
757 		 * information.  This is not essential, as pg_restore can cope in most
758 		 * cases without it; but it can make pg_restore significantly faster
759 		 * in some situations (especially parallel restore).
760 		 */
761 		if (ctx->hasSeek &&
762 			fseeko(AH->FH, tpos, SEEK_SET) == 0)
763 			WriteToc(AH);
764 	}
765 
766 	if (fclose(AH->FH) != 0)
767 		fatal("could not close archive file: %m");
768 
769 	/* Sync the output file if one is defined */
770 	if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
771 		(void) fsync_fname(AH->fSpec, false);
772 
773 	AH->FH = NULL;
774 }
775 
776 /*
777  * Reopen the archive's file handle.
778  *
779  * We close the original file handle, except on Windows.  (The difference
780  * is because on Windows, this is used within a multithreading context,
781  * and we don't want a thread closing the parent file handle.)
782  */
783 static void
_ReopenArchive(ArchiveHandle * AH)784 _ReopenArchive(ArchiveHandle *AH)
785 {
786 	lclContext *ctx = (lclContext *) AH->formatData;
787 	pgoff_t		tpos;
788 
789 	if (AH->mode == archModeWrite)
790 		fatal("can only reopen input archives");
791 
792 	/*
793 	 * These two cases are user-facing errors since they represent unsupported
794 	 * (but not invalid) use-cases.  Word the error messages appropriately.
795 	 */
796 	if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
797 		fatal("parallel restore from standard input is not supported");
798 	if (!ctx->hasSeek)
799 		fatal("parallel restore from non-seekable file is not supported");
800 
801 	tpos = ftello(AH->FH);
802 	if (tpos < 0)
803 		fatal("could not determine seek position in archive file: %m");
804 
805 #ifndef WIN32
806 	if (fclose(AH->FH) != 0)
807 		fatal("could not close archive file: %m");
808 #endif
809 
810 	AH->FH = fopen(AH->fSpec, PG_BINARY_R);
811 	if (!AH->FH)
812 		fatal("could not open input file \"%s\": %m", AH->fSpec);
813 
814 	if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
815 		fatal("could not set seek position in archive file: %m");
816 }
817 
818 /*
819  * Prepare for parallel restore.
820  *
821  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
822  * TOC entries' dataLength fields with appropriate values to guide the
823  * ordering of restore jobs.  The source of said data is format-dependent,
824  * as is the exact meaning of the values.
825  *
826  * A format module might also choose to do other setup here.
827  */
828 static void
_PrepParallelRestore(ArchiveHandle * AH)829 _PrepParallelRestore(ArchiveHandle *AH)
830 {
831 	lclContext *ctx = (lclContext *) AH->formatData;
832 	TocEntry   *prev_te = NULL;
833 	lclTocEntry *prev_tctx = NULL;
834 	TocEntry   *te;
835 
836 	/*
837 	 * Knowing that the data items were dumped out in TOC order, we can
838 	 * reconstruct the length of each item as the delta to the start offset of
839 	 * the next data item.
840 	 */
841 	for (te = AH->toc->next; te != AH->toc; te = te->next)
842 	{
843 		lclTocEntry *tctx = (lclTocEntry *) te->formatData;
844 
845 		/*
846 		 * Ignore entries without a known data offset; if we were unable to
847 		 * seek to rewrite the TOC when creating the archive, this'll be all
848 		 * of them, and we'll end up with no size estimates.
849 		 */
850 		if (tctx->dataState != K_OFFSET_POS_SET)
851 			continue;
852 
853 		/* Compute previous data item's length */
854 		if (prev_te)
855 		{
856 			if (tctx->dataPos > prev_tctx->dataPos)
857 				prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
858 		}
859 
860 		prev_te = te;
861 		prev_tctx = tctx;
862 	}
863 
864 	/* If OK to seek, we can determine the length of the last item */
865 	if (prev_te && ctx->hasSeek)
866 	{
867 		pgoff_t		endpos;
868 
869 		if (fseeko(AH->FH, 0, SEEK_END) != 0)
870 			fatal("error during file seek: %m");
871 		endpos = ftello(AH->FH);
872 		if (endpos > prev_tctx->dataPos)
873 			prev_te->dataLength = endpos - prev_tctx->dataPos;
874 	}
875 }
876 
877 /*
878  * Clone format-specific fields during parallel restoration.
879  */
880 static void
_Clone(ArchiveHandle * AH)881 _Clone(ArchiveHandle *AH)
882 {
883 	lclContext *ctx = (lclContext *) AH->formatData;
884 
885 	/*
886 	 * Each thread must have private lclContext working state.
887 	 */
888 	AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
889 	memcpy(AH->formatData, ctx, sizeof(lclContext));
890 	ctx = (lclContext *) AH->formatData;
891 
892 	/* sanity check, shouldn't happen */
893 	if (ctx->cs != NULL)
894 		fatal("compressor active");
895 
896 	/*
897 	 * We intentionally do not clone TOC-entry-local state: it's useful to
898 	 * share knowledge about where the data blocks are across threads.
899 	 * _PrintTocData has to be careful about the order of operations on that
900 	 * state, though.
901 	 *
902 	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
903 	 * entry per archive, so no parallelism is possible.
904 	 */
905 }
906 
907 static void
_DeClone(ArchiveHandle * AH)908 _DeClone(ArchiveHandle *AH)
909 {
910 	lclContext *ctx = (lclContext *) AH->formatData;
911 
912 	free(ctx);
913 }
914 
915 /*
916  * This function is executed in the child of a parallel restore from a
917  * custom-format archive and restores the actual data for one TOC entry.
918  */
919 static int
_WorkerJobRestoreCustom(ArchiveHandle * AH,TocEntry * te)920 _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
921 {
922 	return parallel_restore(AH, te);
923 }
924 
925 /*--------------------------------------------------
926  * END OF FORMAT CALLBACKS
927  *--------------------------------------------------
928  */
929 
930 /*
931  * Get the current position in the archive file.
932  *
933  * With a non-seekable archive file, we may not be able to obtain the
934  * file position.  If so, just return -1.  It's not too important in
935  * that case because we won't be able to rewrite the TOC to fill in
936  * data block offsets anyway.
937  */
938 static pgoff_t
_getFilePos(ArchiveHandle * AH,lclContext * ctx)939 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
940 {
941 	pgoff_t		pos;
942 
943 	pos = ftello(AH->FH);
944 	if (pos < 0)
945 	{
946 		/* Not expected if we found we can seek. */
947 		if (ctx->hasSeek)
948 			fatal("could not determine seek position in archive file: %m");
949 	}
950 	return pos;
951 }
952 
953 /*
954  * Read a data block header. The format changed in V1.3, so we
955  * centralize the code here for simplicity.  Returns *type = EOF
956  * if at EOF.
957  */
958 static void
_readBlockHeader(ArchiveHandle * AH,int * type,int * id)959 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
960 {
961 	int			byt;
962 
963 	/*
964 	 * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
965 	 * ReadInt rather than returning EOF.  It doesn't seem worth jumping
966 	 * through hoops to deal with that case better, because no such files are
967 	 * likely to exist in the wild: only some 7.1 development versions of
968 	 * pg_dump ever generated such files.
969 	 */
970 	if (AH->version < K_VERS_1_3)
971 		*type = BLK_DATA;
972 	else
973 	{
974 		byt = getc(AH->FH);
975 		*type = byt;
976 		if (byt == EOF)
977 		{
978 			*id = 0;			/* don't return an uninitialized value */
979 			return;
980 		}
981 	}
982 
983 	*id = ReadInt(AH);
984 }
985 
986 /*
987  * Callback function for WriteDataToArchive. Writes one block of (compressed)
988  * data to the archive.
989  */
990 static void
_CustomWriteFunc(ArchiveHandle * AH,const char * buf,size_t len)991 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
992 {
993 	/* never write 0-byte blocks (this should not happen) */
994 	if (len > 0)
995 	{
996 		WriteInt(AH, len);
997 		_WriteBuf(AH, buf, len);
998 	}
999 }
1000 
1001 /*
1002  * Callback function for ReadDataFromArchive. To keep things simple, we
1003  * always read one compressed block at a time.
1004  */
1005 static size_t
_CustomReadFunc(ArchiveHandle * AH,char ** buf,size_t * buflen)1006 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1007 {
1008 	size_t		blkLen;
1009 
1010 	/* Read length */
1011 	blkLen = ReadInt(AH);
1012 	if (blkLen == 0)
1013 		return 0;
1014 
1015 	/* If the caller's buffer is not large enough, allocate a bigger one */
1016 	if (blkLen > *buflen)
1017 	{
1018 		free(*buf);
1019 		*buf = (char *) pg_malloc(blkLen);
1020 		*buflen = blkLen;
1021 	}
1022 
1023 	/* exits app on read errors */
1024 	_ReadBuf(AH, *buf, blkLen);
1025 
1026 	return blkLen;
1027 }
1028