1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_custom.c
4  *
5  *	Implements the custom output format.
6  *
7  *	The comments with the routined in this code are a good place to
8  *	understand how to write a new format.
9  *
10  *	See the headers to pg_restore for more details.
11  *
12  * Copyright (c) 2000, Philip Warner
13  *		Rights are granted to use this software in any way so long
14  *		as this notice is not removed.
15  *
16  *	The author is not responsible for loss or damages that may
17  *	and any liability will be limited to the time taken to fix any
18  *	related bug.
19  *
20  *
21  * IDENTIFICATION
22  *		src/bin/pg_dump/pg_backup_custom.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres_fe.h"
27 
28 #include "compress_io.h"
29 #include "parallel.h"
30 #include "pg_backup_utils.h"
31 #include "common/file_utils.h"
32 
33 /*--------
34  * Routines in the format interface
35  *--------
36  */
37 
38 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 static int	_WriteByte(ArchiveHandle *AH, const int i);
43 static int	_ReadByte(ArchiveHandle *);
44 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 static void _CloseArchive(ArchiveHandle *AH);
47 static void _ReopenArchive(ArchiveHandle *AH);
48 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52 
53 static void _PrintData(ArchiveHandle *AH);
54 static void _skipData(ArchiveHandle *AH);
55 static void _skipBlobs(ArchiveHandle *AH);
56 
57 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
58 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
61 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
62 static void _Clone(ArchiveHandle *AH);
63 static void _DeClone(ArchiveHandle *AH);
64 
65 static int	_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
66 
67 typedef struct
68 {
69 	CompressorState *cs;
70 	int			hasSeek;
71 	pgoff_t		filePos;
72 	pgoff_t		dataStart;
73 } lclContext;
74 
75 typedef struct
76 {
77 	int			dataState;
78 	pgoff_t		dataPos;
79 } lclTocEntry;
80 
81 
82 /*------
83  * Static declarations
84  *------
85  */
86 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
87 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
88 
89 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
90 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
91 
92 /* translator: this is a module name */
93 static const char *modulename = gettext_noop("custom archiver");
94 
95 
96 
97 /*
98  *	Init routine required by ALL formats. This is a global routine
99  *	and should be declared in pg_backup_archiver.h
100  *
101  *	It's task is to create any extra archive context (using AH->formatData),
102  *	and to initialize the supported function pointers.
103  *
104  *	It should also prepare whatever it's input source is for reading/writing,
105  *	and in the case of a read mode connection, it should load the Header & TOC.
106  */
107 void
InitArchiveFmt_Custom(ArchiveHandle * AH)108 InitArchiveFmt_Custom(ArchiveHandle *AH)
109 {
110 	lclContext *ctx;
111 
112 	/* Assuming static functions, this can be copied for each format. */
113 	AH->ArchiveEntryPtr = _ArchiveEntry;
114 	AH->StartDataPtr = _StartData;
115 	AH->WriteDataPtr = _WriteData;
116 	AH->EndDataPtr = _EndData;
117 	AH->WriteBytePtr = _WriteByte;
118 	AH->ReadBytePtr = _ReadByte;
119 	AH->WriteBufPtr = _WriteBuf;
120 	AH->ReadBufPtr = _ReadBuf;
121 	AH->ClosePtr = _CloseArchive;
122 	AH->ReopenPtr = _ReopenArchive;
123 	AH->PrintTocDataPtr = _PrintTocData;
124 	AH->ReadExtraTocPtr = _ReadExtraToc;
125 	AH->WriteExtraTocPtr = _WriteExtraToc;
126 	AH->PrintExtraTocPtr = _PrintExtraToc;
127 
128 	AH->StartBlobsPtr = _StartBlobs;
129 	AH->StartBlobPtr = _StartBlob;
130 	AH->EndBlobPtr = _EndBlob;
131 	AH->EndBlobsPtr = _EndBlobs;
132 	AH->ClonePtr = _Clone;
133 	AH->DeClonePtr = _DeClone;
134 
135 	/* no parallel dump in the custom archive, only parallel restore */
136 	AH->WorkerJobDumpPtr = NULL;
137 	AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
138 
139 	/* Set up a private area. */
140 	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
141 	AH->formatData = (void *) ctx;
142 
143 	/* Initialize LO buffering */
144 	AH->lo_buf_size = LOBBUFSIZE;
145 	AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
146 
147 	ctx->filePos = 0;
148 
149 	/*
150 	 * Now open the file
151 	 */
152 	if (AH->mode == archModeWrite)
153 	{
154 		if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
155 		{
156 			AH->FH = fopen(AH->fSpec, PG_BINARY_W);
157 			if (!AH->FH)
158 				exit_horribly(modulename, "could not open output file \"%s\": %s\n",
159 							  AH->fSpec, strerror(errno));
160 		}
161 		else
162 		{
163 			AH->FH = stdout;
164 			if (!AH->FH)
165 				exit_horribly(modulename, "could not open output file: %s\n",
166 							  strerror(errno));
167 		}
168 
169 		ctx->hasSeek = checkSeek(AH->FH);
170 	}
171 	else
172 	{
173 		if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
174 		{
175 			AH->FH = fopen(AH->fSpec, PG_BINARY_R);
176 			if (!AH->FH)
177 				exit_horribly(modulename, "could not open input file \"%s\": %s\n",
178 							  AH->fSpec, strerror(errno));
179 		}
180 		else
181 		{
182 			AH->FH = stdin;
183 			if (!AH->FH)
184 				exit_horribly(modulename, "could not open input file: %s\n",
185 							  strerror(errno));
186 		}
187 
188 		ctx->hasSeek = checkSeek(AH->FH);
189 
190 		ReadHead(AH);
191 		ReadToc(AH);
192 		ctx->dataStart = _getFilePos(AH, ctx);
193 	}
194 
195 }
196 
197 /*
198  * Called by the Archiver when the dumper creates a new TOC entry.
199  *
200  * Optional.
201  *
202  * Set up extract format-related TOC data.
203 */
204 static void
_ArchiveEntry(ArchiveHandle * AH,TocEntry * te)205 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
206 {
207 	lclTocEntry *ctx;
208 
209 	ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
210 	if (te->dataDumper)
211 		ctx->dataState = K_OFFSET_POS_NOT_SET;
212 	else
213 		ctx->dataState = K_OFFSET_NO_DATA;
214 
215 	te->formatData = (void *) ctx;
216 }
217 
218 /*
219  * Called by the Archiver to save any extra format-related TOC entry
220  * data.
221  *
222  * Optional.
223  *
224  * Use the Archiver routines to write data - they are non-endian, and
225  * maintain other important file information.
226  */
227 static void
_WriteExtraToc(ArchiveHandle * AH,TocEntry * te)228 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
229 {
230 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
231 
232 	WriteOffset(AH, ctx->dataPos, ctx->dataState);
233 }
234 
235 /*
236  * Called by the Archiver to read any extra format-related TOC data.
237  *
238  * Optional.
239  *
240  * Needs to match the order defined in _WriteExtraToc, and should also
241  * use the Archiver input routines.
242  */
243 static void
_ReadExtraToc(ArchiveHandle * AH,TocEntry * te)244 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
245 {
246 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
247 
248 	if (ctx == NULL)
249 	{
250 		ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
251 		te->formatData = (void *) ctx;
252 	}
253 
254 	ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
255 
256 	/*
257 	 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
258 	 * dump it at all.
259 	 */
260 	if (AH->version < K_VERS_1_7)
261 		ReadInt(AH);
262 }
263 
264 /*
265  * Called by the Archiver when restoring an archive to output a comment
266  * that includes useful information about the TOC entry.
267  *
268  * Optional.
269  *
270  */
271 static void
_PrintExtraToc(ArchiveHandle * AH,TocEntry * te)272 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
273 {
274 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
275 
276 	if (AH->public.verbose)
277 		ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
278 				 (int64) ctx->dataPos);
279 }
280 
281 /*
282  * Called by the archiver when saving TABLE DATA (not schema). This routine
283  * should save whatever format-specific information is needed to read
284  * the archive back.
285  *
286  * It is called just prior to the dumper's 'DataDumper' routine being called.
287  *
288  * Optional, but strongly recommended.
289  *
290  */
291 static void
_StartData(ArchiveHandle * AH,TocEntry * te)292 _StartData(ArchiveHandle *AH, TocEntry *te)
293 {
294 	lclContext *ctx = (lclContext *) AH->formatData;
295 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
296 
297 	tctx->dataPos = _getFilePos(AH, ctx);
298 	tctx->dataState = K_OFFSET_POS_SET;
299 
300 	_WriteByte(AH, BLK_DATA);	/* Block type */
301 	WriteInt(AH, te->dumpId);	/* For sanity check */
302 
303 	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
304 }
305 
306 /*
307  * Called by archiver when dumper calls WriteData. This routine is
308  * called for both BLOB and TABLE data; it is the responsibility of
309  * the format to manage each kind of data using StartBlob/StartData.
310  *
311  * It should only be called from within a DataDumper routine.
312  *
313  * Mandatory.
314  */
315 static void
_WriteData(ArchiveHandle * AH,const void * data,size_t dLen)316 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
317 {
318 	lclContext *ctx = (lclContext *) AH->formatData;
319 	CompressorState *cs = ctx->cs;
320 
321 	if (dLen > 0)
322 		/* WriteDataToArchive() internally throws write errors */
323 		WriteDataToArchive(AH, cs, data, dLen);
324 
325 	return;
326 }
327 
328 /*
329  * Called by the archiver when a dumper's 'DataDumper' routine has
330  * finished.
331  *
332  * Optional.
333  *
334  */
335 static void
_EndData(ArchiveHandle * AH,TocEntry * te)336 _EndData(ArchiveHandle *AH, TocEntry *te)
337 {
338 	lclContext *ctx = (lclContext *) AH->formatData;
339 
340 	EndCompressor(AH, ctx->cs);
341 	/* Send the end marker */
342 	WriteInt(AH, 0);
343 }
344 
345 /*
346  * Called by the archiver when starting to save all BLOB DATA (not schema).
347  * This routine should save whatever format-specific information is needed
348  * to read the BLOBs back into memory.
349  *
350  * It is called just prior to the dumper's DataDumper routine.
351  *
352  * Optional, but strongly recommended.
353  */
354 static void
_StartBlobs(ArchiveHandle * AH,TocEntry * te)355 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
356 {
357 	lclContext *ctx = (lclContext *) AH->formatData;
358 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
359 
360 	tctx->dataPos = _getFilePos(AH, ctx);
361 	tctx->dataState = K_OFFSET_POS_SET;
362 
363 	_WriteByte(AH, BLK_BLOBS);	/* Block type */
364 	WriteInt(AH, te->dumpId);	/* For sanity check */
365 }
366 
367 /*
368  * Called by the archiver when the dumper calls StartBlob.
369  *
370  * Mandatory.
371  *
372  * Must save the passed OID for retrieval at restore-time.
373  */
374 static void
_StartBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)375 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
376 {
377 	lclContext *ctx = (lclContext *) AH->formatData;
378 
379 	if (oid == 0)
380 		exit_horribly(modulename, "invalid OID for large object\n");
381 
382 	WriteInt(AH, oid);
383 
384 	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
385 }
386 
387 /*
388  * Called by the archiver when the dumper calls EndBlob.
389  *
390  * Optional.
391  */
392 static void
_EndBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)393 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
394 {
395 	lclContext *ctx = (lclContext *) AH->formatData;
396 
397 	EndCompressor(AH, ctx->cs);
398 	/* Send the end marker */
399 	WriteInt(AH, 0);
400 }
401 
402 /*
403  * Called by the archiver when finishing saving all BLOB DATA.
404  *
405  * Optional.
406  */
407 static void
_EndBlobs(ArchiveHandle * AH,TocEntry * te)408 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
409 {
410 	/* Write out a fake zero OID to mark end-of-blobs. */
411 	WriteInt(AH, 0);
412 }
413 
414 /*
415  * Print data for a given TOC entry
416  */
417 static void
_PrintTocData(ArchiveHandle * AH,TocEntry * te)418 _PrintTocData(ArchiveHandle *AH, TocEntry *te)
419 {
420 	lclContext *ctx = (lclContext *) AH->formatData;
421 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
422 	int			blkType;
423 	int			id;
424 
425 	if (tctx->dataState == K_OFFSET_NO_DATA)
426 		return;
427 
428 	if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
429 	{
430 		/*
431 		 * We cannot seek directly to the desired block.  Instead, skip over
432 		 * block headers until we find the one we want.  This could fail if we
433 		 * are asked to restore items out-of-order.
434 		 */
435 		_readBlockHeader(AH, &blkType, &id);
436 
437 		while (blkType != EOF && id != te->dumpId)
438 		{
439 			switch (blkType)
440 			{
441 				case BLK_DATA:
442 					_skipData(AH);
443 					break;
444 
445 				case BLK_BLOBS:
446 					_skipBlobs(AH);
447 					break;
448 
449 				default:		/* Always have a default */
450 					exit_horribly(modulename,
451 								  "unrecognized data block type (%d) while searching archive\n",
452 								  blkType);
453 					break;
454 			}
455 			_readBlockHeader(AH, &blkType, &id);
456 		}
457 	}
458 	else
459 	{
460 		/* We can just seek to the place we need to be. */
461 		if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
462 			exit_horribly(modulename, "error during file seek: %s\n",
463 						  strerror(errno));
464 
465 		_readBlockHeader(AH, &blkType, &id);
466 	}
467 
468 	/* Produce suitable failure message if we fell off end of file */
469 	if (blkType == EOF)
470 	{
471 		if (tctx->dataState == K_OFFSET_POS_NOT_SET)
472 			exit_horribly(modulename, "could not find block ID %d in archive -- "
473 						  "possibly due to out-of-order restore request, "
474 						  "which cannot be handled due to lack of data offsets in archive\n",
475 						  te->dumpId);
476 		else if (!ctx->hasSeek)
477 			exit_horribly(modulename, "could not find block ID %d in archive -- "
478 						  "possibly due to out-of-order restore request, "
479 						  "which cannot be handled due to non-seekable input file\n",
480 						  te->dumpId);
481 		else					/* huh, the dataPos led us to EOF? */
482 			exit_horribly(modulename, "could not find block ID %d in archive -- "
483 						  "possibly corrupt archive\n",
484 						  te->dumpId);
485 	}
486 
487 	/* Are we sane? */
488 	if (id != te->dumpId)
489 		exit_horribly(modulename, "found unexpected block ID (%d) when reading data -- expected %d\n",
490 					  id, te->dumpId);
491 
492 	switch (blkType)
493 	{
494 		case BLK_DATA:
495 			_PrintData(AH);
496 			break;
497 
498 		case BLK_BLOBS:
499 			_LoadBlobs(AH, AH->public.ropt->dropSchema);
500 			break;
501 
502 		default:				/* Always have a default */
503 			exit_horribly(modulename, "unrecognized data block type %d while restoring archive\n",
504 						  blkType);
505 			break;
506 	}
507 }
508 
509 /*
510  * Print data from current file position.
511 */
512 static void
_PrintData(ArchiveHandle * AH)513 _PrintData(ArchiveHandle *AH)
514 {
515 	ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
516 }
517 
518 static void
_LoadBlobs(ArchiveHandle * AH,bool drop)519 _LoadBlobs(ArchiveHandle *AH, bool drop)
520 {
521 	Oid			oid;
522 
523 	StartRestoreBlobs(AH);
524 
525 	oid = ReadInt(AH);
526 	while (oid != 0)
527 	{
528 		StartRestoreBlob(AH, oid, drop);
529 		_PrintData(AH);
530 		EndRestoreBlob(AH, oid);
531 		oid = ReadInt(AH);
532 	}
533 
534 	EndRestoreBlobs(AH);
535 }
536 
537 /*
538  * Skip the BLOBs from the current file position.
539  * BLOBS are written sequentially as data blocks (see below).
540  * Each BLOB is preceded by it's original OID.
541  * A zero OID indicated the end of the BLOBS
542  */
543 static void
_skipBlobs(ArchiveHandle * AH)544 _skipBlobs(ArchiveHandle *AH)
545 {
546 	Oid			oid;
547 
548 	oid = ReadInt(AH);
549 	while (oid != 0)
550 	{
551 		_skipData(AH);
552 		oid = ReadInt(AH);
553 	}
554 }
555 
556 /*
557  * Skip data from current file position.
558  * Data blocks are formatted as an integer length, followed by data.
559  * A zero length denoted the end of the block.
560 */
561 static void
_skipData(ArchiveHandle * AH)562 _skipData(ArchiveHandle *AH)
563 {
564 	lclContext *ctx = (lclContext *) AH->formatData;
565 	size_t		blkLen;
566 	char	   *buf = NULL;
567 	int			buflen = 0;
568 	size_t		cnt;
569 
570 	blkLen = ReadInt(AH);
571 	while (blkLen != 0)
572 	{
573 		if (blkLen > buflen)
574 		{
575 			if (buf)
576 				free(buf);
577 			buf = (char *) pg_malloc(blkLen);
578 			buflen = blkLen;
579 		}
580 		if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
581 		{
582 			if (feof(AH->FH))
583 				exit_horribly(modulename,
584 							  "could not read from input file: end of file\n");
585 			else
586 				exit_horribly(modulename,
587 							  "could not read from input file: %s\n", strerror(errno));
588 		}
589 
590 		ctx->filePos += blkLen;
591 
592 		blkLen = ReadInt(AH);
593 	}
594 
595 	if (buf)
596 		free(buf);
597 }
598 
599 /*
600  * Write a byte of data to the archive.
601  *
602  * Mandatory.
603  *
604  * Called by the archiver to do integer & byte output to the archive.
605  */
606 static int
_WriteByte(ArchiveHandle * AH,const int i)607 _WriteByte(ArchiveHandle *AH, const int i)
608 {
609 	lclContext *ctx = (lclContext *) AH->formatData;
610 	int			res;
611 
612 	if ((res = fputc(i, AH->FH)) == EOF)
613 		WRITE_ERROR_EXIT;
614 	ctx->filePos += 1;
615 
616 	return 1;
617 }
618 
619 /*
620  * Read a byte of data from the archive.
621  *
622  * Mandatory
623  *
624  * Called by the archiver to read bytes & integers from the archive.
625  * EOF should be treated as a fatal error.
626  */
627 static int
_ReadByte(ArchiveHandle * AH)628 _ReadByte(ArchiveHandle *AH)
629 {
630 	lclContext *ctx = (lclContext *) AH->formatData;
631 	int			res;
632 
633 	res = getc(AH->FH);
634 	if (res == EOF)
635 		READ_ERROR_EXIT(AH->FH);
636 	ctx->filePos += 1;
637 	return res;
638 }
639 
640 /*
641  * Write a buffer of data to the archive.
642  *
643  * Mandatory.
644  *
645  * Called by the archiver to write a block of bytes to the archive.
646  */
647 static void
_WriteBuf(ArchiveHandle * AH,const void * buf,size_t len)648 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
649 {
650 	lclContext *ctx = (lclContext *) AH->formatData;
651 
652 	if (fwrite(buf, 1, len, AH->FH) != len)
653 		WRITE_ERROR_EXIT;
654 	ctx->filePos += len;
655 
656 	return;
657 }
658 
659 /*
660  * Read a block of bytes from the archive.
661  *
662  * Mandatory.
663  *
664  * Called by the archiver to read a block of bytes from the archive
665  */
666 static void
_ReadBuf(ArchiveHandle * AH,void * buf,size_t len)667 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
668 {
669 	lclContext *ctx = (lclContext *) AH->formatData;
670 
671 	if (fread(buf, 1, len, AH->FH) != len)
672 		READ_ERROR_EXIT(AH->FH);
673 	ctx->filePos += len;
674 
675 	return;
676 }
677 
678 /*
679  * Close the archive.
680  *
681  * Mandatory.
682  *
683  * When writing the archive, this is the routine that actually starts
684  * the process of saving it to files. No data should be written prior
685  * to this point, since the user could sort the TOC after creating it.
686  *
687  * If an archive is to be written, this routine must call:
688  *		WriteHead			to save the archive header
689  *		WriteToc			to save the TOC entries
690  *		WriteDataChunks		to save all DATA & BLOBs.
691  *
692  */
693 static void
_CloseArchive(ArchiveHandle * AH)694 _CloseArchive(ArchiveHandle *AH)
695 {
696 	lclContext *ctx = (lclContext *) AH->formatData;
697 	pgoff_t		tpos;
698 
699 	if (AH->mode == archModeWrite)
700 	{
701 		WriteHead(AH);
702 		/* Remember TOC's seek position for use below */
703 		tpos = ftello(AH->FH);
704 		if (tpos < 0 && ctx->hasSeek)
705 			exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
706 						  strerror(errno));
707 		WriteToc(AH);
708 		ctx->dataStart = _getFilePos(AH, ctx);
709 		WriteDataChunks(AH, NULL);
710 
711 		/*
712 		 * If possible, re-write the TOC in order to update the data offset
713 		 * information.  This is not essential, as pg_restore can cope in most
714 		 * cases without it; but it can make pg_restore significantly faster
715 		 * in some situations (especially parallel restore).
716 		 */
717 		if (ctx->hasSeek &&
718 			fseeko(AH->FH, tpos, SEEK_SET) == 0)
719 			WriteToc(AH);
720 	}
721 
722 	if (fclose(AH->FH) != 0)
723 		exit_horribly(modulename, "could not close archive file: %s\n", strerror(errno));
724 
725 	/* Sync the output file if one is defined */
726 	if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
727 		(void) fsync_fname(AH->fSpec, false, progname);
728 
729 	AH->FH = NULL;
730 }
731 
732 /*
733  * Reopen the archive's file handle.
734  *
735  * We close the original file handle, except on Windows.  (The difference
736  * is because on Windows, this is used within a multithreading context,
737  * and we don't want a thread closing the parent file handle.)
738  */
739 static void
_ReopenArchive(ArchiveHandle * AH)740 _ReopenArchive(ArchiveHandle *AH)
741 {
742 	lclContext *ctx = (lclContext *) AH->formatData;
743 	pgoff_t		tpos;
744 
745 	if (AH->mode == archModeWrite)
746 		exit_horribly(modulename, "can only reopen input archives\n");
747 
748 	/*
749 	 * These two cases are user-facing errors since they represent unsupported
750 	 * (but not invalid) use-cases.  Word the error messages appropriately.
751 	 */
752 	if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
753 		exit_horribly(modulename, "parallel restore from standard input is not supported\n");
754 	if (!ctx->hasSeek)
755 		exit_horribly(modulename, "parallel restore from non-seekable file is not supported\n");
756 
757 	tpos = ftello(AH->FH);
758 	if (tpos < 0)
759 		exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
760 					  strerror(errno));
761 
762 #ifndef WIN32
763 	if (fclose(AH->FH) != 0)
764 		exit_horribly(modulename, "could not close archive file: %s\n",
765 					  strerror(errno));
766 #endif
767 
768 	AH->FH = fopen(AH->fSpec, PG_BINARY_R);
769 	if (!AH->FH)
770 		exit_horribly(modulename, "could not open input file \"%s\": %s\n",
771 					  AH->fSpec, strerror(errno));
772 
773 	if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
774 		exit_horribly(modulename, "could not set seek position in archive file: %s\n",
775 					  strerror(errno));
776 }
777 
778 /*
779  * Clone format-specific fields during parallel restoration.
780  */
781 static void
_Clone(ArchiveHandle * AH)782 _Clone(ArchiveHandle *AH)
783 {
784 	lclContext *ctx = (lclContext *) AH->formatData;
785 
786 	AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
787 	memcpy(AH->formatData, ctx, sizeof(lclContext));
788 	ctx = (lclContext *) AH->formatData;
789 
790 	/* sanity check, shouldn't happen */
791 	if (ctx->cs != NULL)
792 		exit_horribly(modulename, "compressor active\n");
793 
794 	/*
795 	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
796 	 * entry per archive, so no parallelism is possible.  Likewise,
797 	 * TOC-entry-local state isn't an issue because any one TOC entry is
798 	 * touched by just one worker child.
799 	 */
800 }
801 
802 static void
_DeClone(ArchiveHandle * AH)803 _DeClone(ArchiveHandle *AH)
804 {
805 	lclContext *ctx = (lclContext *) AH->formatData;
806 
807 	free(ctx);
808 }
809 
810 /*
811  * This function is executed in the child of a parallel restore from a
812  * custom-format archive and restores the actual data for one TOC entry.
813  */
814 static int
_WorkerJobRestoreCustom(ArchiveHandle * AH,TocEntry * te)815 _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
816 {
817 	return parallel_restore(AH, te);
818 }
819 
820 /*--------------------------------------------------
821  * END OF FORMAT CALLBACKS
822  *--------------------------------------------------
823  */
824 
825 /*
826  * Get the current position in the archive file.
827  */
828 static pgoff_t
_getFilePos(ArchiveHandle * AH,lclContext * ctx)829 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
830 {
831 	pgoff_t		pos;
832 
833 	if (ctx->hasSeek)
834 	{
835 		/*
836 		 * Prior to 1.7 (pg7.3) we relied on the internally maintained
837 		 * pointer.  Now we rely on ftello() always, unless the file has been
838 		 * found to not support it.  For debugging purposes, print a warning
839 		 * if the internal pointer disagrees, so that we're more likely to
840 		 * notice if something's broken about the internal position tracking.
841 		 */
842 		pos = ftello(AH->FH);
843 		if (pos < 0)
844 			exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
845 						  strerror(errno));
846 
847 		if (pos != ctx->filePos)
848 			write_msg(modulename, "WARNING: ftell mismatch with expected position -- ftell used\n");
849 	}
850 	else
851 		pos = ctx->filePos;
852 	return pos;
853 }
854 
855 /*
856  * Read a data block header. The format changed in V1.3, so we
857  * centralize the code here for simplicity.  Returns *type = EOF
858  * if at EOF.
859  */
860 static void
_readBlockHeader(ArchiveHandle * AH,int * type,int * id)861 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
862 {
863 	lclContext *ctx = (lclContext *) AH->formatData;
864 	int			byt;
865 
866 	/*
867 	 * Note: if we are at EOF with a pre-1.3 input file, we'll exit_horribly
868 	 * inside ReadInt rather than returning EOF.  It doesn't seem worth
869 	 * jumping through hoops to deal with that case better, because no such
870 	 * files are likely to exist in the wild: only some 7.1 development
871 	 * versions of pg_dump ever generated such files.
872 	 */
873 	if (AH->version < K_VERS_1_3)
874 		*type = BLK_DATA;
875 	else
876 	{
877 		byt = getc(AH->FH);
878 		*type = byt;
879 		if (byt == EOF)
880 		{
881 			*id = 0;			/* don't return an uninitialized value */
882 			return;
883 		}
884 		ctx->filePos += 1;
885 	}
886 
887 	*id = ReadInt(AH);
888 }
889 
890 /*
891  * Callback function for WriteDataToArchive. Writes one block of (compressed)
892  * data to the archive.
893  */
894 static void
_CustomWriteFunc(ArchiveHandle * AH,const char * buf,size_t len)895 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
896 {
897 	/* never write 0-byte blocks (this should not happen) */
898 	if (len > 0)
899 	{
900 		WriteInt(AH, len);
901 		_WriteBuf(AH, buf, len);
902 	}
903 	return;
904 }
905 
906 /*
907  * Callback function for ReadDataFromArchive. To keep things simple, we
908  * always read one compressed block at a time.
909  */
910 static size_t
_CustomReadFunc(ArchiveHandle * AH,char ** buf,size_t * buflen)911 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
912 {
913 	size_t		blkLen;
914 
915 	/* Read length */
916 	blkLen = ReadInt(AH);
917 	if (blkLen == 0)
918 		return 0;
919 
920 	/* If the caller's buffer is not large enough, allocate a bigger one */
921 	if (blkLen > *buflen)
922 	{
923 		free(*buf);
924 		*buf = (char *) pg_malloc(blkLen);
925 		*buflen = blkLen;
926 	}
927 
928 	/* exits app on read errors */
929 	_ReadBuf(AH, *buf, blkLen);
930 
931 	return blkLen;
932 }
933