1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_custom.c
4  *
5  *	Implements the custom output format.
6  *
7  *	The comments with the routined in this code are a good place to
8  *	understand how to write a new format.
9  *
10  *	See the headers to pg_restore for more details.
11  *
12  * Copyright (c) 2000, Philip Warner
13  *		Rights are granted to use this software in any way so long
14  *		as this notice is not removed.
15  *
16  *	The author is not responsible for loss or damages that may
17  *	and any liability will be limited to the time taken to fix any
18  *	related bug.
19  *
20  *
21  * IDENTIFICATION
22  *		src/bin/pg_dump/pg_backup_custom.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres_fe.h"
27 
28 #include "compress_io.h"
29 #include "parallel.h"
30 #include "pg_backup_utils.h"
31 
32 /*--------
33  * Routines in the format interface
34  *--------
35  */
36 
37 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
38 static void _StartData(ArchiveHandle *AH, TocEntry *te);
39 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
40 static void _EndData(ArchiveHandle *AH, TocEntry *te);
41 static int	_WriteByte(ArchiveHandle *AH, const int i);
42 static int	_ReadByte(ArchiveHandle *);
43 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
44 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
45 static void _CloseArchive(ArchiveHandle *AH);
46 static void _ReopenArchive(ArchiveHandle *AH);
47 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
48 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
49 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
51 
52 static void _PrintData(ArchiveHandle *AH);
53 static void _skipData(ArchiveHandle *AH);
54 static void _skipBlobs(ArchiveHandle *AH);
55 
56 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
57 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
58 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
60 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
61 static void _Clone(ArchiveHandle *AH);
62 static void _DeClone(ArchiveHandle *AH);
63 
64 static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
65 static int	_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
66 char	   *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
67 
68 typedef struct
69 {
70 	CompressorState *cs;
71 	int			hasSeek;
72 	pgoff_t		filePos;
73 	pgoff_t		dataStart;
74 } lclContext;
75 
76 typedef struct
77 {
78 	int			dataState;
79 	pgoff_t		dataPos;
80 } lclTocEntry;
81 
82 
83 /*------
84  * Static declarations
85  *------
86  */
87 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
88 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
89 
90 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
91 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
92 
93 /* translator: this is a module name */
94 static const char *modulename = gettext_noop("custom archiver");
95 
96 
97 
98 /*
99  *	Init routine required by ALL formats. This is a global routine
100  *	and should be declared in pg_backup_archiver.h
101  *
102  *	It's task is to create any extra archive context (using AH->formatData),
103  *	and to initialize the supported function pointers.
104  *
105  *	It should also prepare whatever it's input source is for reading/writing,
106  *	and in the case of a read mode connection, it should load the Header & TOC.
107  */
108 void
InitArchiveFmt_Custom(ArchiveHandle * AH)109 InitArchiveFmt_Custom(ArchiveHandle *AH)
110 {
111 	lclContext *ctx;
112 
113 	/* Assuming static functions, this can be copied for each format. */
114 	AH->ArchiveEntryPtr = _ArchiveEntry;
115 	AH->StartDataPtr = _StartData;
116 	AH->WriteDataPtr = _WriteData;
117 	AH->EndDataPtr = _EndData;
118 	AH->WriteBytePtr = _WriteByte;
119 	AH->ReadBytePtr = _ReadByte;
120 	AH->WriteBufPtr = _WriteBuf;
121 	AH->ReadBufPtr = _ReadBuf;
122 	AH->ClosePtr = _CloseArchive;
123 	AH->ReopenPtr = _ReopenArchive;
124 	AH->PrintTocDataPtr = _PrintTocData;
125 	AH->ReadExtraTocPtr = _ReadExtraToc;
126 	AH->WriteExtraTocPtr = _WriteExtraToc;
127 	AH->PrintExtraTocPtr = _PrintExtraToc;
128 
129 	AH->StartBlobsPtr = _StartBlobs;
130 	AH->StartBlobPtr = _StartBlob;
131 	AH->EndBlobPtr = _EndBlob;
132 	AH->EndBlobsPtr = _EndBlobs;
133 	AH->ClonePtr = _Clone;
134 	AH->DeClonePtr = _DeClone;
135 
136 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
137 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
138 
139 	/* no parallel dump in the custom archive, only parallel restore */
140 	AH->WorkerJobDumpPtr = NULL;
141 	AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
142 
143 	/* Set up a private area. */
144 	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
145 	AH->formatData = (void *) ctx;
146 
147 	/* Initialize LO buffering */
148 	AH->lo_buf_size = LOBBUFSIZE;
149 	AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
150 
151 	ctx->filePos = 0;
152 
153 	/*
154 	 * Now open the file
155 	 */
156 	if (AH->mode == archModeWrite)
157 	{
158 		if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
159 		{
160 			AH->FH = fopen(AH->fSpec, PG_BINARY_W);
161 			if (!AH->FH)
162 				exit_horribly(modulename, "could not open output file \"%s\": %s\n",
163 							  AH->fSpec, strerror(errno));
164 		}
165 		else
166 		{
167 			AH->FH = stdout;
168 			if (!AH->FH)
169 				exit_horribly(modulename, "could not open output file: %s\n",
170 							  strerror(errno));
171 		}
172 
173 		ctx->hasSeek = checkSeek(AH->FH);
174 	}
175 	else
176 	{
177 		if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
178 		{
179 			AH->FH = fopen(AH->fSpec, PG_BINARY_R);
180 			if (!AH->FH)
181 				exit_horribly(modulename, "could not open input file \"%s\": %s\n",
182 							  AH->fSpec, strerror(errno));
183 		}
184 		else
185 		{
186 			AH->FH = stdin;
187 			if (!AH->FH)
188 				exit_horribly(modulename, "could not open input file: %s\n",
189 							  strerror(errno));
190 		}
191 
192 		ctx->hasSeek = checkSeek(AH->FH);
193 
194 		ReadHead(AH);
195 		ReadToc(AH);
196 		ctx->dataStart = _getFilePos(AH, ctx);
197 	}
198 
199 }
200 
201 /*
202  * Called by the Archiver when the dumper creates a new TOC entry.
203  *
204  * Optional.
205  *
206  * Set up extract format-related TOC data.
207 */
208 static void
_ArchiveEntry(ArchiveHandle * AH,TocEntry * te)209 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
210 {
211 	lclTocEntry *ctx;
212 
213 	ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
214 	if (te->dataDumper)
215 		ctx->dataState = K_OFFSET_POS_NOT_SET;
216 	else
217 		ctx->dataState = K_OFFSET_NO_DATA;
218 
219 	te->formatData = (void *) ctx;
220 }
221 
222 /*
223  * Called by the Archiver to save any extra format-related TOC entry
224  * data.
225  *
226  * Optional.
227  *
228  * Use the Archiver routines to write data - they are non-endian, and
229  * maintain other important file information.
230  */
231 static void
_WriteExtraToc(ArchiveHandle * AH,TocEntry * te)232 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
233 {
234 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
235 
236 	WriteOffset(AH, ctx->dataPos, ctx->dataState);
237 }
238 
239 /*
240  * Called by the Archiver to read any extra format-related TOC data.
241  *
242  * Optional.
243  *
244  * Needs to match the order defined in _WriteExtraToc, and should also
245  * use the Archiver input routines.
246  */
247 static void
_ReadExtraToc(ArchiveHandle * AH,TocEntry * te)248 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
249 {
250 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
251 
252 	if (ctx == NULL)
253 	{
254 		ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
255 		te->formatData = (void *) ctx;
256 	}
257 
258 	ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
259 
260 	/*
261 	 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
262 	 * dump it at all.
263 	 */
264 	if (AH->version < K_VERS_1_7)
265 		ReadInt(AH);
266 }
267 
268 /*
269  * Called by the Archiver when restoring an archive to output a comment
270  * that includes useful information about the TOC entry.
271  *
272  * Optional.
273  *
274  */
275 static void
_PrintExtraToc(ArchiveHandle * AH,TocEntry * te)276 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
277 {
278 	lclTocEntry *ctx = (lclTocEntry *) te->formatData;
279 
280 	if (AH->public.verbose)
281 		ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
282 				 (int64) ctx->dataPos);
283 }
284 
285 /*
286  * Called by the archiver when saving TABLE DATA (not schema). This routine
287  * should save whatever format-specific information is needed to read
288  * the archive back.
289  *
290  * It is called just prior to the dumper's 'DataDumper' routine being called.
291  *
292  * Optional, but strongly recommended.
293  *
294  */
295 static void
_StartData(ArchiveHandle * AH,TocEntry * te)296 _StartData(ArchiveHandle *AH, TocEntry *te)
297 {
298 	lclContext *ctx = (lclContext *) AH->formatData;
299 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
300 
301 	tctx->dataPos = _getFilePos(AH, ctx);
302 	tctx->dataState = K_OFFSET_POS_SET;
303 
304 	_WriteByte(AH, BLK_DATA);	/* Block type */
305 	WriteInt(AH, te->dumpId);	/* For sanity check */
306 
307 	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
308 }
309 
310 /*
311  * Called by archiver when dumper calls WriteData. This routine is
312  * called for both BLOB and TABLE data; it is the responsibility of
313  * the format to manage each kind of data using StartBlob/StartData.
314  *
315  * It should only be called from within a DataDumper routine.
316  *
317  * Mandatory.
318  */
319 static void
_WriteData(ArchiveHandle * AH,const void * data,size_t dLen)320 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
321 {
322 	lclContext *ctx = (lclContext *) AH->formatData;
323 	CompressorState *cs = ctx->cs;
324 
325 	if (dLen > 0)
326 		/* WriteDataToArchive() internally throws write errors */
327 		WriteDataToArchive(AH, cs, data, dLen);
328 
329 	return;
330 }
331 
332 /*
333  * Called by the archiver when a dumper's 'DataDumper' routine has
334  * finished.
335  *
336  * Optional.
337  *
338  */
339 static void
_EndData(ArchiveHandle * AH,TocEntry * te)340 _EndData(ArchiveHandle *AH, TocEntry *te)
341 {
342 	lclContext *ctx = (lclContext *) AH->formatData;
343 
344 	EndCompressor(AH, ctx->cs);
345 	/* Send the end marker */
346 	WriteInt(AH, 0);
347 }
348 
349 /*
350  * Called by the archiver when starting to save all BLOB DATA (not schema).
351  * This routine should save whatever format-specific information is needed
352  * to read the BLOBs back into memory.
353  *
354  * It is called just prior to the dumper's DataDumper routine.
355  *
356  * Optional, but strongly recommended.
357  */
358 static void
_StartBlobs(ArchiveHandle * AH,TocEntry * te)359 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
360 {
361 	lclContext *ctx = (lclContext *) AH->formatData;
362 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
363 
364 	tctx->dataPos = _getFilePos(AH, ctx);
365 	tctx->dataState = K_OFFSET_POS_SET;
366 
367 	_WriteByte(AH, BLK_BLOBS);	/* Block type */
368 	WriteInt(AH, te->dumpId);	/* For sanity check */
369 }
370 
371 /*
372  * Called by the archiver when the dumper calls StartBlob.
373  *
374  * Mandatory.
375  *
376  * Must save the passed OID for retrieval at restore-time.
377  */
378 static void
_StartBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)379 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
380 {
381 	lclContext *ctx = (lclContext *) AH->formatData;
382 
383 	if (oid == 0)
384 		exit_horribly(modulename, "invalid OID for large object\n");
385 
386 	WriteInt(AH, oid);
387 
388 	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
389 }
390 
391 /*
392  * Called by the archiver when the dumper calls EndBlob.
393  *
394  * Optional.
395  */
396 static void
_EndBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)397 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
398 {
399 	lclContext *ctx = (lclContext *) AH->formatData;
400 
401 	EndCompressor(AH, ctx->cs);
402 	/* Send the end marker */
403 	WriteInt(AH, 0);
404 }
405 
406 /*
407  * Called by the archiver when finishing saving all BLOB DATA.
408  *
409  * Optional.
410  */
411 static void
_EndBlobs(ArchiveHandle * AH,TocEntry * te)412 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
413 {
414 	/* Write out a fake zero OID to mark end-of-blobs. */
415 	WriteInt(AH, 0);
416 }
417 
418 /*
419  * Print data for a given TOC entry
420  */
421 static void
_PrintTocData(ArchiveHandle * AH,TocEntry * te)422 _PrintTocData(ArchiveHandle *AH, TocEntry *te)
423 {
424 	lclContext *ctx = (lclContext *) AH->formatData;
425 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
426 	int			blkType;
427 	int			id;
428 
429 	if (tctx->dataState == K_OFFSET_NO_DATA)
430 		return;
431 
432 	if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
433 	{
434 		/*
435 		 * We cannot seek directly to the desired block.  Instead, skip over
436 		 * block headers until we find the one we want.  This could fail if we
437 		 * are asked to restore items out-of-order.
438 		 */
439 		_readBlockHeader(AH, &blkType, &id);
440 
441 		while (blkType != EOF && id != te->dumpId)
442 		{
443 			switch (blkType)
444 			{
445 				case BLK_DATA:
446 					_skipData(AH);
447 					break;
448 
449 				case BLK_BLOBS:
450 					_skipBlobs(AH);
451 					break;
452 
453 				default:		/* Always have a default */
454 					exit_horribly(modulename,
455 								  "unrecognized data block type (%d) while searching archive\n",
456 								  blkType);
457 					break;
458 			}
459 			_readBlockHeader(AH, &blkType, &id);
460 		}
461 	}
462 	else
463 	{
464 		/* We can just seek to the place we need to be. */
465 		if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
466 			exit_horribly(modulename, "error during file seek: %s\n",
467 						  strerror(errno));
468 
469 		_readBlockHeader(AH, &blkType, &id);
470 	}
471 
472 	/* Produce suitable failure message if we fell off end of file */
473 	if (blkType == EOF)
474 	{
475 		if (tctx->dataState == K_OFFSET_POS_NOT_SET)
476 			exit_horribly(modulename, "could not find block ID %d in archive -- "
477 						  "possibly due to out-of-order restore request, "
478 						  "which cannot be handled due to lack of data offsets in archive\n",
479 						  te->dumpId);
480 		else if (!ctx->hasSeek)
481 			exit_horribly(modulename, "could not find block ID %d in archive -- "
482 						  "possibly due to out-of-order restore request, "
483 				  "which cannot be handled due to non-seekable input file\n",
484 						  te->dumpId);
485 		else	/* huh, the dataPos led us to EOF? */
486 			exit_horribly(modulename, "could not find block ID %d in archive -- "
487 						  "possibly corrupt archive\n",
488 						  te->dumpId);
489 	}
490 
491 	/* Are we sane? */
492 	if (id != te->dumpId)
493 		exit_horribly(modulename, "found unexpected block ID (%d) when reading data -- expected %d\n",
494 					  id, te->dumpId);
495 
496 	switch (blkType)
497 	{
498 		case BLK_DATA:
499 			_PrintData(AH);
500 			break;
501 
502 		case BLK_BLOBS:
503 			_LoadBlobs(AH, AH->public.ropt->dropSchema);
504 			break;
505 
506 		default:				/* Always have a default */
507 			exit_horribly(modulename, "unrecognized data block type %d while restoring archive\n",
508 						  blkType);
509 			break;
510 	}
511 }
512 
513 /*
514  * Print data from current file position.
515 */
516 static void
_PrintData(ArchiveHandle * AH)517 _PrintData(ArchiveHandle *AH)
518 {
519 	ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
520 }
521 
522 static void
_LoadBlobs(ArchiveHandle * AH,bool drop)523 _LoadBlobs(ArchiveHandle *AH, bool drop)
524 {
525 	Oid			oid;
526 
527 	StartRestoreBlobs(AH);
528 
529 	oid = ReadInt(AH);
530 	while (oid != 0)
531 	{
532 		StartRestoreBlob(AH, oid, drop);
533 		_PrintData(AH);
534 		EndRestoreBlob(AH, oid);
535 		oid = ReadInt(AH);
536 	}
537 
538 	EndRestoreBlobs(AH);
539 }
540 
541 /*
542  * Skip the BLOBs from the current file position.
543  * BLOBS are written sequentially as data blocks (see below).
544  * Each BLOB is preceded by it's original OID.
545  * A zero OID indicated the end of the BLOBS
546  */
547 static void
_skipBlobs(ArchiveHandle * AH)548 _skipBlobs(ArchiveHandle *AH)
549 {
550 	Oid			oid;
551 
552 	oid = ReadInt(AH);
553 	while (oid != 0)
554 	{
555 		_skipData(AH);
556 		oid = ReadInt(AH);
557 	}
558 }
559 
560 /*
561  * Skip data from current file position.
562  * Data blocks are formatted as an integer length, followed by data.
563  * A zero length denoted the end of the block.
564 */
565 static void
_skipData(ArchiveHandle * AH)566 _skipData(ArchiveHandle *AH)
567 {
568 	lclContext *ctx = (lclContext *) AH->formatData;
569 	size_t		blkLen;
570 	char	   *buf = NULL;
571 	int			buflen = 0;
572 	size_t		cnt;
573 
574 	blkLen = ReadInt(AH);
575 	while (blkLen != 0)
576 	{
577 		if (blkLen > buflen)
578 		{
579 			if (buf)
580 				free(buf);
581 			buf = (char *) pg_malloc(blkLen);
582 			buflen = blkLen;
583 		}
584 		if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
585 		{
586 			if (feof(AH->FH))
587 				exit_horribly(modulename,
588 							"could not read from input file: end of file\n");
589 			else
590 				exit_horribly(modulename,
591 					"could not read from input file: %s\n", strerror(errno));
592 		}
593 
594 		ctx->filePos += blkLen;
595 
596 		blkLen = ReadInt(AH);
597 	}
598 
599 	if (buf)
600 		free(buf);
601 }
602 
603 /*
604  * Write a byte of data to the archive.
605  *
606  * Mandatory.
607  *
608  * Called by the archiver to do integer & byte output to the archive.
609  */
610 static int
_WriteByte(ArchiveHandle * AH,const int i)611 _WriteByte(ArchiveHandle *AH, const int i)
612 {
613 	lclContext *ctx = (lclContext *) AH->formatData;
614 	int			res;
615 
616 	if ((res = fputc(i, AH->FH)) == EOF)
617 		WRITE_ERROR_EXIT;
618 	ctx->filePos += 1;
619 
620 	return 1;
621 }
622 
623 /*
624  * Read a byte of data from the archive.
625  *
626  * Mandatory
627  *
628  * Called by the archiver to read bytes & integers from the archive.
629  * EOF should be treated as a fatal error.
630  */
631 static int
_ReadByte(ArchiveHandle * AH)632 _ReadByte(ArchiveHandle *AH)
633 {
634 	lclContext *ctx = (lclContext *) AH->formatData;
635 	int			res;
636 
637 	res = getc(AH->FH);
638 	if (res == EOF)
639 		READ_ERROR_EXIT(AH->FH);
640 	ctx->filePos += 1;
641 	return res;
642 }
643 
644 /*
645  * Write a buffer of data to the archive.
646  *
647  * Mandatory.
648  *
649  * Called by the archiver to write a block of bytes to the archive.
650  */
651 static void
_WriteBuf(ArchiveHandle * AH,const void * buf,size_t len)652 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
653 {
654 	lclContext *ctx = (lclContext *) AH->formatData;
655 
656 	if (fwrite(buf, 1, len, AH->FH) != len)
657 		WRITE_ERROR_EXIT;
658 	ctx->filePos += len;
659 
660 	return;
661 }
662 
663 /*
664  * Read a block of bytes from the archive.
665  *
666  * Mandatory.
667  *
668  * Called by the archiver to read a block of bytes from the archive
669  */
670 static void
_ReadBuf(ArchiveHandle * AH,void * buf,size_t len)671 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
672 {
673 	lclContext *ctx = (lclContext *) AH->formatData;
674 
675 	if (fread(buf, 1, len, AH->FH) != len)
676 		READ_ERROR_EXIT(AH->FH);
677 	ctx->filePos += len;
678 
679 	return;
680 }
681 
682 /*
683  * Close the archive.
684  *
685  * Mandatory.
686  *
687  * When writing the archive, this is the routine that actually starts
688  * the process of saving it to files. No data should be written prior
689  * to this point, since the user could sort the TOC after creating it.
690  *
691  * If an archive is to be written, this routine must call:
692  *		WriteHead			to save the archive header
693  *		WriteToc			to save the TOC entries
694  *		WriteDataChunks		to save all DATA & BLOBs.
695  *
696  */
697 static void
_CloseArchive(ArchiveHandle * AH)698 _CloseArchive(ArchiveHandle *AH)
699 {
700 	lclContext *ctx = (lclContext *) AH->formatData;
701 	pgoff_t		tpos;
702 
703 	if (AH->mode == archModeWrite)
704 	{
705 		WriteHead(AH);
706 		/* Remember TOC's seek position for use below */
707 		tpos = ftello(AH->FH);
708 		if (tpos < 0 && ctx->hasSeek)
709 			exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
710 						  strerror(errno));
711 		WriteToc(AH);
712 		ctx->dataStart = _getFilePos(AH, ctx);
713 		WriteDataChunks(AH, NULL);
714 
715 		/*
716 		 * If possible, re-write the TOC in order to update the data offset
717 		 * information.  This is not essential, as pg_restore can cope in most
718 		 * cases without it; but it can make pg_restore significantly faster
719 		 * in some situations (especially parallel restore).
720 		 */
721 		if (ctx->hasSeek &&
722 			fseeko(AH->FH, tpos, SEEK_SET) == 0)
723 			WriteToc(AH);
724 	}
725 
726 	if (fclose(AH->FH) != 0)
727 		exit_horribly(modulename, "could not close archive file: %s\n", strerror(errno));
728 
729 	AH->FH = NULL;
730 }
731 
732 /*
733  * Reopen the archive's file handle.
734  *
735  * We close the original file handle, except on Windows.  (The difference
736  * is because on Windows, this is used within a multithreading context,
737  * and we don't want a thread closing the parent file handle.)
738  */
739 static void
_ReopenArchive(ArchiveHandle * AH)740 _ReopenArchive(ArchiveHandle *AH)
741 {
742 	lclContext *ctx = (lclContext *) AH->formatData;
743 	pgoff_t		tpos;
744 
745 	if (AH->mode == archModeWrite)
746 		exit_horribly(modulename, "can only reopen input archives\n");
747 
748 	/*
749 	 * These two cases are user-facing errors since they represent unsupported
750 	 * (but not invalid) use-cases.  Word the error messages appropriately.
751 	 */
752 	if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
753 		exit_horribly(modulename, "parallel restore from standard input is not supported\n");
754 	if (!ctx->hasSeek)
755 		exit_horribly(modulename, "parallel restore from non-seekable file is not supported\n");
756 
757 	tpos = ftello(AH->FH);
758 	if (tpos < 0)
759 		exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
760 					  strerror(errno));
761 
762 #ifndef WIN32
763 	if (fclose(AH->FH) != 0)
764 		exit_horribly(modulename, "could not close archive file: %s\n",
765 					  strerror(errno));
766 #endif
767 
768 	AH->FH = fopen(AH->fSpec, PG_BINARY_R);
769 	if (!AH->FH)
770 		exit_horribly(modulename, "could not open input file \"%s\": %s\n",
771 					  AH->fSpec, strerror(errno));
772 
773 	if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
774 		exit_horribly(modulename, "could not set seek position in archive file: %s\n",
775 					  strerror(errno));
776 }
777 
778 /*
779  * Clone format-specific fields during parallel restoration.
780  */
781 static void
_Clone(ArchiveHandle * AH)782 _Clone(ArchiveHandle *AH)
783 {
784 	lclContext *ctx = (lclContext *) AH->formatData;
785 
786 	AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
787 	memcpy(AH->formatData, ctx, sizeof(lclContext));
788 	ctx = (lclContext *) AH->formatData;
789 
790 	/* sanity check, shouldn't happen */
791 	if (ctx->cs != NULL)
792 		exit_horribly(modulename, "compressor active\n");
793 
794 	/*
795 	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
796 	 * entry per archive, so no parallelism is possible.  Likewise,
797 	 * TOC-entry-local state isn't an issue because any one TOC entry is
798 	 * touched by just one worker child.
799 	 */
800 }
801 
802 static void
_DeClone(ArchiveHandle * AH)803 _DeClone(ArchiveHandle *AH)
804 {
805 	lclContext *ctx = (lclContext *) AH->formatData;
806 
807 	free(ctx);
808 }
809 
810 /*
811  * This function is executed in the child of a parallel backup for the
812  * custom format archive and dumps the actual data.
813  */
814 char *
_WorkerJobRestoreCustom(ArchiveHandle * AH,TocEntry * te)815 _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
816 {
817 	/*
818 	 * short fixed-size string + some ID so far, this needs to be malloc'ed
819 	 * instead of static because we work with threads on windows
820 	 */
821 	const int	buflen = 64;
822 	char	   *buf = (char *) pg_malloc(buflen);
823 	ParallelArgs pargs;
824 	int			status;
825 
826 	pargs.AH = AH;
827 	pargs.te = te;
828 
829 	status = parallel_restore(&pargs);
830 
831 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
832 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
833 
834 	return buf;
835 }
836 
837 /*
838  * This function is executed in the parent process. Depending on the desired
839  * action (dump or restore) it creates a string that is understood by the
840  * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
841  */
842 static char *
_MasterStartParallelItem(ArchiveHandle * AH,TocEntry * te,T_Action act)843 _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
844 {
845 	/*
846 	 * A static char is okay here, even on Windows because we call this
847 	 * function only from one process (the master).
848 	 */
849 	static char buf[64];		/* short fixed-size string + number */
850 
851 	/* no parallel dump in the custom archive format */
852 	Assert(act == ACT_RESTORE);
853 
854 	snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
855 
856 	return buf;
857 }
858 
859 /*
860  * This function is executed in the parent process. It analyzes the response of
861  * the _WorkerJobDump / _WorkerJobRestore functions of the dump format.
862  */
863 static int
_MasterEndParallelItem(ArchiveHandle * AH,TocEntry * te,const char * str,T_Action act)864 _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
865 {
866 	DumpId		dumpId;
867 	int			nBytes,
868 				status,
869 				n_errors;
870 
871 	/* no parallel dump in the custom archive */
872 	Assert(act == ACT_RESTORE);
873 
874 	sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
875 
876 	Assert(nBytes == strlen(str));
877 	Assert(dumpId == te->dumpId);
878 
879 	AH->public.n_errors += n_errors;
880 
881 	return status;
882 }
883 
884 /*--------------------------------------------------
885  * END OF FORMAT CALLBACKS
886  *--------------------------------------------------
887  */
888 
889 /*
890  * Get the current position in the archive file.
891  */
892 static pgoff_t
_getFilePos(ArchiveHandle * AH,lclContext * ctx)893 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
894 {
895 	pgoff_t		pos;
896 
897 	if (ctx->hasSeek)
898 	{
899 		/*
900 		 * Prior to 1.7 (pg7.3) we relied on the internally maintained
901 		 * pointer.  Now we rely on ftello() always, unless the file has been
902 		 * found to not support it.  For debugging purposes, print a warning
903 		 * if the internal pointer disagrees, so that we're more likely to
904 		 * notice if something's broken about the internal position tracking.
905 		 */
906 		pos = ftello(AH->FH);
907 		if (pos < 0)
908 			exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
909 						  strerror(errno));
910 
911 		if (pos != ctx->filePos)
912 			write_msg(modulename, "WARNING: ftell mismatch with expected position -- ftell used\n");
913 	}
914 	else
915 		pos = ctx->filePos;
916 	return pos;
917 }
918 
919 /*
920  * Read a data block header. The format changed in V1.3, so we
921  * centralize the code here for simplicity.  Returns *type = EOF
922  * if at EOF.
923  */
924 static void
_readBlockHeader(ArchiveHandle * AH,int * type,int * id)925 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
926 {
927 	lclContext *ctx = (lclContext *) AH->formatData;
928 	int			byt;
929 
930 	/*
931 	 * Note: if we are at EOF with a pre-1.3 input file, we'll exit_horribly
932 	 * inside ReadInt rather than returning EOF.  It doesn't seem worth
933 	 * jumping through hoops to deal with that case better, because no such
934 	 * files are likely to exist in the wild: only some 7.1 development
935 	 * versions of pg_dump ever generated such files.
936 	 */
937 	if (AH->version < K_VERS_1_3)
938 		*type = BLK_DATA;
939 	else
940 	{
941 		byt = getc(AH->FH);
942 		*type = byt;
943 		if (byt == EOF)
944 		{
945 			*id = 0;			/* don't return an uninitialized value */
946 			return;
947 		}
948 		ctx->filePos += 1;
949 	}
950 
951 	*id = ReadInt(AH);
952 }
953 
954 /*
955  * Callback function for WriteDataToArchive. Writes one block of (compressed)
956  * data to the archive.
957  */
958 static void
_CustomWriteFunc(ArchiveHandle * AH,const char * buf,size_t len)959 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
960 {
961 	/* never write 0-byte blocks (this should not happen) */
962 	if (len > 0)
963 	{
964 		WriteInt(AH, len);
965 		_WriteBuf(AH, buf, len);
966 	}
967 	return;
968 }
969 
970 /*
971  * Callback function for ReadDataFromArchive. To keep things simple, we
972  * always read one compressed block at a time.
973  */
974 static size_t
_CustomReadFunc(ArchiveHandle * AH,char ** buf,size_t * buflen)975 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
976 {
977 	size_t		blkLen;
978 
979 	/* Read length */
980 	blkLen = ReadInt(AH);
981 	if (blkLen == 0)
982 		return 0;
983 
984 	/* If the caller's buffer is not large enough, allocate a bigger one */
985 	if (blkLen > *buflen)
986 	{
987 		free(*buf);
988 		*buf = (char *) pg_malloc(blkLen);
989 		*buflen = blkLen;
990 	}
991 
992 	/* exits app on read errors */
993 	_ReadBuf(AH, *buf, blkLen);
994 
995 	return blkLen;
996 }
997