1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_custom.c
4  *
5  *	Implements the custom output format.
6  *
7  *	The comments with the routines in this code are a good place to
8  *	understand how to write a new format.
9  *
10  *	See the headers to pg_restore for more details.
11  *
12  * Copyright (c) 2000, Philip Warner
13  *		Rights are granted to use this software in any way so long
14  *		as this notice is not removed.
15  *
16  *	The author is not responsible for loss or damages that may
17  *	and any liability will be limited to the time taken to fix any
18  *	related bug.
19  *
20  *
21  * IDENTIFICATION
22  *		src/bin/pg_dump/pg_backup_custom.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres_fe.h"
27 
28 #include "compress_io.h"
29 #include "parallel.h"
30 #include "pg_backup_utils.h"
31 #include "common/file_utils.h"
32 
33 
34 /*--------
35  * Routines in the format interface
36  *--------
37  */
38 
39 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
40 static void _StartData(ArchiveHandle *AH, TocEntry *te);
41 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
42 static void _EndData(ArchiveHandle *AH, TocEntry *te);
43 static int	_WriteByte(ArchiveHandle *AH, const int i);
44 static int	_ReadByte(ArchiveHandle *);
45 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
46 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
47 static void _CloseArchive(ArchiveHandle *AH);
48 static void _ReopenArchive(ArchiveHandle *AH);
49 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
50 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
52 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
53 
54 static void _PrintData(ArchiveHandle *AH);
55 static void _skipData(ArchiveHandle *AH);
56 static void _skipBlobs(ArchiveHandle *AH);
57 
58 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
59 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
61 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
62 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
63 
64 static void _PrepParallelRestore(ArchiveHandle *AH);
65 static void _Clone(ArchiveHandle *AH);
66 static void _DeClone(ArchiveHandle *AH);
67 
68 static int	_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
69 
70 typedef struct
71 {
72 	CompressorState *cs;
73 	int			hasSeek;
74 	/* lastFilePos is used only when reading, and may be invalid if !hasSeek */
75 	pgoff_t		lastFilePos;	/* position after last data block we've read */
76 } lclContext;
77 
78 typedef struct
79 {
80 	int			dataState;
81 	pgoff_t		dataPos;		/* valid only if dataState=K_OFFSET_POS_SET */
82 } lclTocEntry;
83 
84 
85 /*------
86  * Static declarations
87  *------
88  */
89 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
90 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
91 
92 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
93 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
94 
95 
96 /*
97  *	Init routine required by ALL formats. This is a global routine
98  *	and should be declared in pg_backup_archiver.h
99  *
100  *	It's task is to create any extra archive context (using AH->formatData),
101  *	and to initialize the supported function pointers.
102  *
103  *	It should also prepare whatever it's input source is for reading/writing,
104  *	and in the case of a read mode connection, it should load the Header & TOC.
105  */
106 void
InitArchiveFmt_Custom(ArchiveHandle * AH)107 InitArchiveFmt_Custom(ArchiveHandle *AH)
108 {
109 	lclContext *ctx;
110 
111 	/* Assuming static functions, this can be copied for each format. */
112 	AH->ArchiveEntryPtr = _ArchiveEntry;
113 	AH->StartDataPtr = _StartData;
114 	AH->WriteDataPtr = _WriteData;
115 	AH->EndDataPtr = _EndData;
116 	AH->WriteBytePtr = _WriteByte;
117 	AH->ReadBytePtr = _ReadByte;
118 	AH->WriteBufPtr = _WriteBuf;
119 	AH->ReadBufPtr = _ReadBuf;
120 	AH->ClosePtr = _CloseArchive;
121 	AH->ReopenPtr = _ReopenArchive;
122 	AH->PrintTocDataPtr = _PrintTocData;
123 	AH->ReadExtraTocPtr = _ReadExtraToc;
124 	AH->WriteExtraTocPtr = _WriteExtraToc;
125 	AH->PrintExtraTocPtr = _PrintExtraToc;
126 
127 	AH->StartBlobsPtr = _StartBlobs;
128 	AH->StartBlobPtr = _StartBlob;
129 	AH->EndBlobPtr = _EndBlob;
130 	AH->EndBlobsPtr = _EndBlobs;
131 
132 	AH->PrepParallelRestorePtr = _PrepParallelRestore;
133 	AH->ClonePtr = _Clone;
134 	AH->DeClonePtr = _DeClone;
135 
136 	/* no parallel dump in the custom archive, only parallel restore */
137 	AH->WorkerJobDumpPtr = NULL;
138 	AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
139 
140 	/* Set up a private area. */
141 	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
142 	AH->formatData = (void *) ctx;
143 
144 	/* Initialize LO buffering */
145 	AH->lo_buf_size = LOBBUFSIZE;
146 	AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
147 
148 	/*
149 	 * Now open the file
150 	 */
151 	if (AH->mode == archModeWrite)
152 	{
153 		if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
154 		{
155 			AH->FH = fopen(AH->fSpec, PG_BINARY_W);
156 			if (!AH->FH)
157 				fatal("could not open output file \"%s\": %m", AH->fSpec);
158 		}
159 		else
160 		{
161 			AH->FH = stdout;
162 			if (!AH->FH)
163 				fatal("could not open output file: %m");
164 		}
165 
166 		ctx->hasSeek = checkSeek(AH->FH);
167 	}
168 	else
169 	{
170 		if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
171 		{
172 			AH->FH = fopen(AH->fSpec, PG_BINARY_R);
173 			if (!AH->FH)
174 				fatal("could not open input file \"%s\": %m", AH->fSpec);
175 		}
176 		else
177 		{
178 			AH->FH = stdin;
179 			if (!AH->FH)
180 				fatal("could not open input file: %m");
181 		}
182 
183 		ctx->hasSeek = checkSeek(AH->FH);
184 
185 		ReadHead(AH);
186 		ReadToc(AH);
187 
188 		/*
189 		 * Remember location of first data block (i.e., the point after TOC)
190 		 * in case we have to search for desired data blocks.
191 		 */
192 		ctx->lastFilePos = _getFilePos(AH, ctx);
193 	}
194 }
195 
196 /*
197  * Called by the Archiver when the dumper creates a new TOC entry.
198  *
199  * Optional.
200  *
201  * Set up extract format-related TOC data.
202 */
203 static void
_ArchiveEntry(ArchiveHandle * AH,TocEntry * te)204 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
205 {
206 	lclTocEntry *ctx;
207 
208 	ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
209 	if (te->dataDumper)
210 		ctx->dataState = K_OFFSET_POS_NOT_SET;
211 	else
212 		ctx->dataState = K_OFFSET_NO_DATA;
213 
214 	te->formatData = (void *) ctx;
215 }
216 
217 /*
218  * Called by the Archiver to save any extra format-related TOC entry
219  * data.
220  *
221  * Optional.
222  *
223  * Use the Archiver routines to write data - they are non-endian, and
224  * maintain other important file information.
225  */
226 static void
_WriteExtraToc(ArchiveHandle * AH,TocEntry * te)227 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
228 {
229 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
230 
231 	WriteOffset(AH, ctx->dataPos, ctx->dataState);
232 }
233 
234 /*
235  * Called by the Archiver to read any extra format-related TOC data.
236  *
237  * Optional.
238  *
239  * Needs to match the order defined in _WriteExtraToc, and should also
240  * use the Archiver input routines.
241  */
242 static void
_ReadExtraToc(ArchiveHandle * AH,TocEntry * te)243 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
244 {
245 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
246 
247 	if (ctx == NULL)
248 	{
249 		ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
250 		te->formatData = (void *) ctx;
251 	}
252 
253 	ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
254 
255 	/*
256 	 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
257 	 * dump it at all.
258 	 */
259 	if (AH->version < K_VERS_1_7)
260 		ReadInt(AH);
261 }
262 
263 /*
264  * Called by the Archiver when restoring an archive to output a comment
265  * that includes useful information about the TOC entry.
266  *
267  * Optional.
268  *
269  */
270 static void
_PrintExtraToc(ArchiveHandle * AH,TocEntry * te)271 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
272 {
273 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
274 
275 	if (AH->public.verbose)
276 		ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
277 				 (int64) ctx->dataPos);
278 }
279 
280 /*
281  * Called by the archiver when saving TABLE DATA (not schema). This routine
282  * should save whatever format-specific information is needed to read
283  * the archive back.
284  *
285  * It is called just prior to the dumper's 'DataDumper' routine being called.
286  *
287  * Optional, but strongly recommended.
288  *
289  */
290 static void
_StartData(ArchiveHandle * AH,TocEntry * te)291 _StartData(ArchiveHandle *AH, TocEntry *te)
292 {
293 	lclContext *ctx = (lclContext *) AH->formatData;
294 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
295 
296 	tctx->dataPos = _getFilePos(AH, ctx);
297 	if (tctx->dataPos >= 0)
298 		tctx->dataState = K_OFFSET_POS_SET;
299 
300 	_WriteByte(AH, BLK_DATA);	/* Block type */
301 	WriteInt(AH, te->dumpId);	/* For sanity check */
302 
303 	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
304 }
305 
306 /*
307  * Called by archiver when dumper calls WriteData. This routine is
308  * called for both BLOB and TABLE data; it is the responsibility of
309  * the format to manage each kind of data using StartBlob/StartData.
310  *
311  * It should only be called from within a DataDumper routine.
312  *
313  * Mandatory.
314  */
315 static void
_WriteData(ArchiveHandle * AH,const void * data,size_t dLen)316 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
317 {
318 	lclContext *ctx = (lclContext *) AH->formatData;
319 	CompressorState *cs = ctx->cs;
320 
321 	if (dLen > 0)
322 		/* WriteDataToArchive() internally throws write errors */
323 		WriteDataToArchive(AH, cs, data, dLen);
324 
325 	return;
326 }
327 
328 /*
329  * Called by the archiver when a dumper's 'DataDumper' routine has
330  * finished.
331  *
332  * Optional.
333  *
334  */
335 static void
_EndData(ArchiveHandle * AH,TocEntry * te)336 _EndData(ArchiveHandle *AH, TocEntry *te)
337 {
338 	lclContext *ctx = (lclContext *) AH->formatData;
339 
340 	EndCompressor(AH, ctx->cs);
341 	/* Send the end marker */
342 	WriteInt(AH, 0);
343 }
344 
345 /*
346  * Called by the archiver when starting to save all BLOB DATA (not schema).
347  * This routine should save whatever format-specific information is needed
348  * to read the BLOBs back into memory.
349  *
350  * It is called just prior to the dumper's DataDumper routine.
351  *
352  * Optional, but strongly recommended.
353  */
354 static void
_StartBlobs(ArchiveHandle * AH,TocEntry * te)355 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
356 {
357 	lclContext *ctx = (lclContext *) AH->formatData;
358 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
359 
360 	tctx->dataPos = _getFilePos(AH, ctx);
361 	if (tctx->dataPos >= 0)
362 		tctx->dataState = K_OFFSET_POS_SET;
363 
364 	_WriteByte(AH, BLK_BLOBS);	/* Block type */
365 	WriteInt(AH, te->dumpId);	/* For sanity check */
366 }
367 
368 /*
369  * Called by the archiver when the dumper calls StartBlob.
370  *
371  * Mandatory.
372  *
373  * Must save the passed OID for retrieval at restore-time.
374  */
375 static void
_StartBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)376 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
377 {
378 	lclContext *ctx = (lclContext *) AH->formatData;
379 
380 	if (oid == 0)
381 		fatal("invalid OID for large object");
382 
383 	WriteInt(AH, oid);
384 
385 	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
386 }
387 
388 /*
389  * Called by the archiver when the dumper calls EndBlob.
390  *
391  * Optional.
392  */
393 static void
_EndBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)394 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
395 {
396 	lclContext *ctx = (lclContext *) AH->formatData;
397 
398 	EndCompressor(AH, ctx->cs);
399 	/* Send the end marker */
400 	WriteInt(AH, 0);
401 }
402 
403 /*
404  * Called by the archiver when finishing saving all BLOB DATA.
405  *
406  * Optional.
407  */
408 static void
_EndBlobs(ArchiveHandle * AH,TocEntry * te)409 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
410 {
411 	/* Write out a fake zero OID to mark end-of-blobs. */
412 	WriteInt(AH, 0);
413 }
414 
415 /*
416  * Print data for a given TOC entry
417  */
418 static void
_PrintTocData(ArchiveHandle * AH,TocEntry * te)419 _PrintTocData(ArchiveHandle *AH, TocEntry *te)
420 {
421 	lclContext *ctx = (lclContext *) AH->formatData;
422 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
423 	int			blkType;
424 	int			id;
425 
426 	if (tctx->dataState == K_OFFSET_NO_DATA)
427 		return;
428 
429 	if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
430 	{
431 		/*
432 		 * We cannot seek directly to the desired block.  Instead, skip over
433 		 * block headers until we find the one we want.  Remember the
434 		 * positions of skipped-over blocks, so that if we later decide we
435 		 * need to read one, we'll be able to seek to it.
436 		 *
437 		 * When our input file is seekable, we can do the search starting from
438 		 * the point after the last data block we scanned in previous
439 		 * iterations of this function.
440 		 */
441 		if (ctx->hasSeek)
442 		{
443 			if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
444 				fatal("error during file seek: %m");
445 		}
446 
447 		for (;;)
448 		{
449 			pgoff_t		thisBlkPos = _getFilePos(AH, ctx);
450 
451 			_readBlockHeader(AH, &blkType, &id);
452 
453 			if (blkType == EOF || id == te->dumpId)
454 				break;
455 
456 			/* Remember the block position, if we got one */
457 			if (thisBlkPos >= 0)
458 			{
459 				TocEntry   *otherte = getTocEntryByDumpId(AH, id);
460 
461 				if (otherte && otherte->formatData)
462 				{
463 					lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
464 
465 					/*
466 					 * Note: on Windows, multiple threads might access/update
467 					 * the same lclTocEntry concurrently, but that should be
468 					 * safe as long as we update dataPos before dataState.
469 					 * Ideally, we'd use pg_write_barrier() to enforce that,
470 					 * but the needed infrastructure doesn't exist in frontend
471 					 * code.  But Windows only runs on machines with strong
472 					 * store ordering, so it should be okay for now.
473 					 */
474 					if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
475 					{
476 						othertctx->dataPos = thisBlkPos;
477 						othertctx->dataState = K_OFFSET_POS_SET;
478 					}
479 					else if (othertctx->dataPos != thisBlkPos ||
480 							 othertctx->dataState != K_OFFSET_POS_SET)
481 					{
482 						/* sanity check */
483 						pg_log_warning("data block %d has wrong seek position",
484 									   id);
485 					}
486 				}
487 			}
488 
489 			switch (blkType)
490 			{
491 				case BLK_DATA:
492 					_skipData(AH);
493 					break;
494 
495 				case BLK_BLOBS:
496 					_skipBlobs(AH);
497 					break;
498 
499 				default:		/* Always have a default */
500 					fatal("unrecognized data block type (%d) while searching archive",
501 						  blkType);
502 					break;
503 			}
504 		}
505 	}
506 	else
507 	{
508 		/* We can just seek to the place we need to be. */
509 		if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
510 			fatal("error during file seek: %m");
511 
512 		_readBlockHeader(AH, &blkType, &id);
513 	}
514 
515 	/*
516 	 * If we reached EOF without finding the block we want, then either it
517 	 * doesn't exist, or it does but we lack the ability to seek back to it.
518 	 */
519 	if (blkType == EOF)
520 	{
521 		if (!ctx->hasSeek)
522 			fatal("could not find block ID %d in archive -- "
523 				  "possibly due to out-of-order restore request, "
524 				  "which cannot be handled due to non-seekable input file",
525 				  te->dumpId);
526 		else
527 			fatal("could not find block ID %d in archive -- "
528 				  "possibly corrupt archive",
529 				  te->dumpId);
530 	}
531 
532 	/* Are we sane? */
533 	if (id != te->dumpId)
534 		fatal("found unexpected block ID (%d) when reading data -- expected %d",
535 			  id, te->dumpId);
536 
537 	switch (blkType)
538 	{
539 		case BLK_DATA:
540 			_PrintData(AH);
541 			break;
542 
543 		case BLK_BLOBS:
544 			_LoadBlobs(AH, AH->public.ropt->dropSchema);
545 			break;
546 
547 		default:				/* Always have a default */
548 			fatal("unrecognized data block type %d while restoring archive",
549 				  blkType);
550 			break;
551 	}
552 
553 	/*
554 	 * If our input file is seekable but lacks data offsets, update our
555 	 * knowledge of where to start future searches from.  (Note that we did
556 	 * not update the current TE's dataState/dataPos.  We could have, but
557 	 * there is no point since it will not be visited again.)
558 	 */
559 	if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
560 	{
561 		pgoff_t		curPos = _getFilePos(AH, ctx);
562 
563 		if (curPos > ctx->lastFilePos)
564 			ctx->lastFilePos = curPos;
565 	}
566 }
567 
568 /*
569  * Print data from current file position.
570 */
571 static void
_PrintData(ArchiveHandle * AH)572 _PrintData(ArchiveHandle *AH)
573 {
574 	ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
575 }
576 
577 static void
_LoadBlobs(ArchiveHandle * AH,bool drop)578 _LoadBlobs(ArchiveHandle *AH, bool drop)
579 {
580 	Oid			oid;
581 
582 	StartRestoreBlobs(AH);
583 
584 	oid = ReadInt(AH);
585 	while (oid != 0)
586 	{
587 		StartRestoreBlob(AH, oid, drop);
588 		_PrintData(AH);
589 		EndRestoreBlob(AH, oid);
590 		oid = ReadInt(AH);
591 	}
592 
593 	EndRestoreBlobs(AH);
594 }
595 
596 /*
597  * Skip the BLOBs from the current file position.
598  * BLOBS are written sequentially as data blocks (see below).
599  * Each BLOB is preceded by it's original OID.
600  * A zero OID indicated the end of the BLOBS
601  */
602 static void
_skipBlobs(ArchiveHandle * AH)603 _skipBlobs(ArchiveHandle *AH)
604 {
605 	Oid			oid;
606 
607 	oid = ReadInt(AH);
608 	while (oid != 0)
609 	{
610 		_skipData(AH);
611 		oid = ReadInt(AH);
612 	}
613 }
614 
615 /*
616  * Skip data from current file position.
617  * Data blocks are formatted as an integer length, followed by data.
618  * A zero length denoted the end of the block.
619 */
620 static void
_skipData(ArchiveHandle * AH)621 _skipData(ArchiveHandle *AH)
622 {
623 	lclContext *ctx = (lclContext *) AH->formatData;
624 	size_t		blkLen;
625 	char	   *buf = NULL;
626 	int			buflen = 0;
627 	size_t		cnt;
628 
629 	blkLen = ReadInt(AH);
630 	while (blkLen != 0)
631 	{
632 		if (ctx->hasSeek)
633 		{
634 			if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
635 				fatal("error during file seek: %m");
636 		}
637 		else
638 		{
639 			if (blkLen > buflen)
640 			{
641 				if (buf)
642 					free(buf);
643 				buf = (char *) pg_malloc(blkLen);
644 				buflen = blkLen;
645 			}
646 			if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
647 			{
648 				if (feof(AH->FH))
649 					fatal("could not read from input file: end of file");
650 				else
651 					fatal("could not read from input file: %m");
652 			}
653 		}
654 
655 		blkLen = ReadInt(AH);
656 	}
657 
658 	if (buf)
659 		free(buf);
660 }
661 
662 /*
663  * Write a byte of data to the archive.
664  *
665  * Mandatory.
666  *
667  * Called by the archiver to do integer & byte output to the archive.
668  */
669 static int
_WriteByte(ArchiveHandle * AH,const int i)670 _WriteByte(ArchiveHandle *AH, const int i)
671 {
672 	int			res;
673 
674 	if ((res = fputc(i, AH->FH)) == EOF)
675 		WRITE_ERROR_EXIT;
676 
677 	return 1;
678 }
679 
680 /*
681  * Read a byte of data from the archive.
682  *
683  * Mandatory
684  *
685  * Called by the archiver to read bytes & integers from the archive.
686  * EOF should be treated as a fatal error.
687  */
688 static int
_ReadByte(ArchiveHandle * AH)689 _ReadByte(ArchiveHandle *AH)
690 {
691 	int			res;
692 
693 	res = getc(AH->FH);
694 	if (res == EOF)
695 		READ_ERROR_EXIT(AH->FH);
696 	return res;
697 }
698 
699 /*
700  * Write a buffer of data to the archive.
701  *
702  * Mandatory.
703  *
704  * Called by the archiver to write a block of bytes to the archive.
705  */
706 static void
_WriteBuf(ArchiveHandle * AH,const void * buf,size_t len)707 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
708 {
709 	if (fwrite(buf, 1, len, AH->FH) != len)
710 		WRITE_ERROR_EXIT;
711 }
712 
713 /*
714  * Read a block of bytes from the archive.
715  *
716  * Mandatory.
717  *
718  * Called by the archiver to read a block of bytes from the archive
719  */
720 static void
_ReadBuf(ArchiveHandle * AH,void * buf,size_t len)721 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
722 {
723 	if (fread(buf, 1, len, AH->FH) != len)
724 		READ_ERROR_EXIT(AH->FH);
725 }
726 
727 /*
728  * Close the archive.
729  *
730  * Mandatory.
731  *
732  * When writing the archive, this is the routine that actually starts
733  * the process of saving it to files. No data should be written prior
734  * to this point, since the user could sort the TOC after creating it.
735  *
736  * If an archive is to be written, this routine must call:
737  *		WriteHead			to save the archive header
738  *		WriteToc			to save the TOC entries
739  *		WriteDataChunks		to save all DATA & BLOBs.
740  *
741  */
742 static void
_CloseArchive(ArchiveHandle * AH)743 _CloseArchive(ArchiveHandle *AH)
744 {
745 	lclContext *ctx = (lclContext *) AH->formatData;
746 	pgoff_t		tpos;
747 
748 	if (AH->mode == archModeWrite)
749 	{
750 		WriteHead(AH);
751 		/* Remember TOC's seek position for use below */
752 		tpos = ftello(AH->FH);
753 		if (tpos < 0 && ctx->hasSeek)
754 			fatal("could not determine seek position in archive file: %m");
755 		WriteToc(AH);
756 		WriteDataChunks(AH, NULL);
757 
758 		/*
759 		 * If possible, re-write the TOC in order to update the data offset
760 		 * information.  This is not essential, as pg_restore can cope in most
761 		 * cases without it; but it can make pg_restore significantly faster
762 		 * in some situations (especially parallel restore).
763 		 */
764 		if (ctx->hasSeek &&
765 			fseeko(AH->FH, tpos, SEEK_SET) == 0)
766 			WriteToc(AH);
767 	}
768 
769 	if (fclose(AH->FH) != 0)
770 		fatal("could not close archive file: %m");
771 
772 	/* Sync the output file if one is defined */
773 	if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
774 		(void) fsync_fname(AH->fSpec, false);
775 
776 	AH->FH = NULL;
777 }
778 
779 /*
780  * Reopen the archive's file handle.
781  *
782  * We close the original file handle, except on Windows.  (The difference
783  * is because on Windows, this is used within a multithreading context,
784  * and we don't want a thread closing the parent file handle.)
785  */
786 static void
_ReopenArchive(ArchiveHandle * AH)787 _ReopenArchive(ArchiveHandle *AH)
788 {
789 	lclContext *ctx = (lclContext *) AH->formatData;
790 	pgoff_t		tpos;
791 
792 	if (AH->mode == archModeWrite)
793 		fatal("can only reopen input archives");
794 
795 	/*
796 	 * These two cases are user-facing errors since they represent unsupported
797 	 * (but not invalid) use-cases.  Word the error messages appropriately.
798 	 */
799 	if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
800 		fatal("parallel restore from standard input is not supported");
801 	if (!ctx->hasSeek)
802 		fatal("parallel restore from non-seekable file is not supported");
803 
804 	tpos = ftello(AH->FH);
805 	if (tpos < 0)
806 		fatal("could not determine seek position in archive file: %m");
807 
808 #ifndef WIN32
809 	if (fclose(AH->FH) != 0)
810 		fatal("could not close archive file: %m");
811 #endif
812 
813 	AH->FH = fopen(AH->fSpec, PG_BINARY_R);
814 	if (!AH->FH)
815 		fatal("could not open input file \"%s\": %m", AH->fSpec);
816 
817 	if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
818 		fatal("could not set seek position in archive file: %m");
819 }
820 
821 /*
822  * Prepare for parallel restore.
823  *
824  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
825  * TOC entries' dataLength fields with appropriate values to guide the
826  * ordering of restore jobs.  The source of said data is format-dependent,
827  * as is the exact meaning of the values.
828  *
829  * A format module might also choose to do other setup here.
830  */
831 static void
_PrepParallelRestore(ArchiveHandle * AH)832 _PrepParallelRestore(ArchiveHandle *AH)
833 {
834 	lclContext *ctx = (lclContext *) AH->formatData;
835 	TocEntry   *prev_te = NULL;
836 	lclTocEntry *prev_tctx = NULL;
837 	TocEntry   *te;
838 
839 	/*
840 	 * Knowing that the data items were dumped out in TOC order, we can
841 	 * reconstruct the length of each item as the delta to the start offset of
842 	 * the next data item.
843 	 */
844 	for (te = AH->toc->next; te != AH->toc; te = te->next)
845 	{
846 		lclTocEntry *tctx = (lclTocEntry *) te->formatData;
847 
848 		/*
849 		 * Ignore entries without a known data offset; if we were unable to
850 		 * seek to rewrite the TOC when creating the archive, this'll be all
851 		 * of them, and we'll end up with no size estimates.
852 		 */
853 		if (tctx->dataState != K_OFFSET_POS_SET)
854 			continue;
855 
856 		/* Compute previous data item's length */
857 		if (prev_te)
858 		{
859 			if (tctx->dataPos > prev_tctx->dataPos)
860 				prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
861 		}
862 
863 		prev_te = te;
864 		prev_tctx = tctx;
865 	}
866 
867 	/* If OK to seek, we can determine the length of the last item */
868 	if (prev_te && ctx->hasSeek)
869 	{
870 		pgoff_t		endpos;
871 
872 		if (fseeko(AH->FH, 0, SEEK_END) != 0)
873 			fatal("error during file seek: %m");
874 		endpos = ftello(AH->FH);
875 		if (endpos > prev_tctx->dataPos)
876 			prev_te->dataLength = endpos - prev_tctx->dataPos;
877 	}
878 }
879 
880 /*
881  * Clone format-specific fields during parallel restoration.
882  */
883 static void
_Clone(ArchiveHandle * AH)884 _Clone(ArchiveHandle *AH)
885 {
886 	lclContext *ctx = (lclContext *) AH->formatData;
887 
888 	/*
889 	 * Each thread must have private lclContext working state.
890 	 */
891 	AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
892 	memcpy(AH->formatData, ctx, sizeof(lclContext));
893 	ctx = (lclContext *) AH->formatData;
894 
895 	/* sanity check, shouldn't happen */
896 	if (ctx->cs != NULL)
897 		fatal("compressor active");
898 
899 	/*
900 	 * We intentionally do not clone TOC-entry-local state: it's useful to
901 	 * share knowledge about where the data blocks are across threads.
902 	 * _PrintTocData has to be careful about the order of operations on that
903 	 * state, though.
904 	 *
905 	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
906 	 * entry per archive, so no parallelism is possible.
907 	 */
908 }
909 
910 static void
_DeClone(ArchiveHandle * AH)911 _DeClone(ArchiveHandle *AH)
912 {
913 	lclContext *ctx = (lclContext *) AH->formatData;
914 
915 	free(ctx);
916 }
917 
918 /*
919  * This function is executed in the child of a parallel restore from a
920  * custom-format archive and restores the actual data for one TOC entry.
921  */
922 static int
_WorkerJobRestoreCustom(ArchiveHandle * AH,TocEntry * te)923 _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
924 {
925 	return parallel_restore(AH, te);
926 }
927 
928 /*--------------------------------------------------
929  * END OF FORMAT CALLBACKS
930  *--------------------------------------------------
931  */
932 
933 /*
934  * Get the current position in the archive file.
935  *
936  * With a non-seekable archive file, we may not be able to obtain the
937  * file position.  If so, just return -1.  It's not too important in
938  * that case because we won't be able to rewrite the TOC to fill in
939  * data block offsets anyway.
940  */
941 static pgoff_t
_getFilePos(ArchiveHandle * AH,lclContext * ctx)942 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
943 {
944 	pgoff_t		pos;
945 
946 	pos = ftello(AH->FH);
947 	if (pos < 0)
948 	{
949 		/* Not expected if we found we can seek. */
950 		if (ctx->hasSeek)
951 			fatal("could not determine seek position in archive file: %m");
952 	}
953 	return pos;
954 }
955 
956 /*
957  * Read a data block header. The format changed in V1.3, so we
958  * centralize the code here for simplicity.  Returns *type = EOF
959  * if at EOF.
960  */
961 static void
_readBlockHeader(ArchiveHandle * AH,int * type,int * id)962 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
963 {
964 	int			byt;
965 
966 	/*
967 	 * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
968 	 * ReadInt rather than returning EOF.  It doesn't seem worth jumping
969 	 * through hoops to deal with that case better, because no such files are
970 	 * likely to exist in the wild: only some 7.1 development versions of
971 	 * pg_dump ever generated such files.
972 	 */
973 	if (AH->version < K_VERS_1_3)
974 		*type = BLK_DATA;
975 	else
976 	{
977 		byt = getc(AH->FH);
978 		*type = byt;
979 		if (byt == EOF)
980 		{
981 			*id = 0;			/* don't return an uninitialized value */
982 			return;
983 		}
984 	}
985 
986 	*id = ReadInt(AH);
987 }
988 
989 /*
990  * Callback function for WriteDataToArchive. Writes one block of (compressed)
991  * data to the archive.
992  */
993 static void
_CustomWriteFunc(ArchiveHandle * AH,const char * buf,size_t len)994 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
995 {
996 	/* never write 0-byte blocks (this should not happen) */
997 	if (len > 0)
998 	{
999 		WriteInt(AH, len);
1000 		_WriteBuf(AH, buf, len);
1001 	}
1002 	return;
1003 }
1004 
1005 /*
1006  * Callback function for ReadDataFromArchive. To keep things simple, we
1007  * always read one compressed block at a time.
1008  */
1009 static size_t
_CustomReadFunc(ArchiveHandle * AH,char ** buf,size_t * buflen)1010 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1011 {
1012 	size_t		blkLen;
1013 
1014 	/* Read length */
1015 	blkLen = ReadInt(AH);
1016 	if (blkLen == 0)
1017 		return 0;
1018 
1019 	/* If the caller's buffer is not large enough, allocate a bigger one */
1020 	if (blkLen > *buflen)
1021 	{
1022 		free(*buf);
1023 		*buf = (char *) pg_malloc(blkLen);
1024 		*buflen = blkLen;
1025 	}
1026 
1027 	/* exits app on read errors */
1028 	_ReadBuf(AH, *buf, blkLen);
1029 
1030 	return blkLen;
1031 }
1032