1 /*-------------------------------------------------------------------------
2 *
3 * pg_backup_custom.c
4 *
5 * Implements the custom output format.
6 *
7 * The comments with the routines in this code are a good place to
8 * understand how to write a new format.
9 *
10 * See the headers to pg_restore for more details.
11 *
12 * Copyright (c) 2000, Philip Warner
13 * Rights are granted to use this software in any way so long
14 * as this notice is not removed.
15 *
16 * The author is not responsible for loss or damages that may
17 * and any liability will be limited to the time taken to fix any
18 * related bug.
19 *
20 *
21 * IDENTIFICATION
22 * src/bin/pg_dump/pg_backup_custom.c
23 *
24 *-------------------------------------------------------------------------
25 */
26 #include "postgres_fe.h"
27
28 #include "common/file_utils.h"
29 #include "compress_io.h"
30 #include "parallel.h"
31 #include "pg_backup_utils.h"
32
33 /*--------
34 * Routines in the format interface
35 *--------
36 */
37
38 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 static int _WriteByte(ArchiveHandle *AH, const int i);
43 static int _ReadByte(ArchiveHandle *);
44 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 static void _CloseArchive(ArchiveHandle *AH);
47 static void _ReopenArchive(ArchiveHandle *AH);
48 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52
53 static void _PrintData(ArchiveHandle *AH);
54 static void _skipData(ArchiveHandle *AH);
55 static void _skipBlobs(ArchiveHandle *AH);
56
57 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
58 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
61 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
62
63 static void _PrepParallelRestore(ArchiveHandle *AH);
64 static void _Clone(ArchiveHandle *AH);
65 static void _DeClone(ArchiveHandle *AH);
66
67 static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
68
69 typedef struct
70 {
71 CompressorState *cs;
72 int hasSeek;
73 /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
74 pgoff_t lastFilePos; /* position after last data block we've read */
75 } lclContext;
76
77 typedef struct
78 {
79 int dataState;
80 pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
81 } lclTocEntry;
82
83
84 /*------
85 * Static declarations
86 *------
87 */
88 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
89 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
90
91 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
92 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
93
94
95 /*
96 * Init routine required by ALL formats. This is a global routine
97 * and should be declared in pg_backup_archiver.h
98 *
99 * It's task is to create any extra archive context (using AH->formatData),
100 * and to initialize the supported function pointers.
101 *
102 * It should also prepare whatever it's input source is for reading/writing,
103 * and in the case of a read mode connection, it should load the Header & TOC.
104 */
105 void
InitArchiveFmt_Custom(ArchiveHandle * AH)106 InitArchiveFmt_Custom(ArchiveHandle *AH)
107 {
108 lclContext *ctx;
109
110 /* Assuming static functions, this can be copied for each format. */
111 AH->ArchiveEntryPtr = _ArchiveEntry;
112 AH->StartDataPtr = _StartData;
113 AH->WriteDataPtr = _WriteData;
114 AH->EndDataPtr = _EndData;
115 AH->WriteBytePtr = _WriteByte;
116 AH->ReadBytePtr = _ReadByte;
117 AH->WriteBufPtr = _WriteBuf;
118 AH->ReadBufPtr = _ReadBuf;
119 AH->ClosePtr = _CloseArchive;
120 AH->ReopenPtr = _ReopenArchive;
121 AH->PrintTocDataPtr = _PrintTocData;
122 AH->ReadExtraTocPtr = _ReadExtraToc;
123 AH->WriteExtraTocPtr = _WriteExtraToc;
124 AH->PrintExtraTocPtr = _PrintExtraToc;
125
126 AH->StartBlobsPtr = _StartBlobs;
127 AH->StartBlobPtr = _StartBlob;
128 AH->EndBlobPtr = _EndBlob;
129 AH->EndBlobsPtr = _EndBlobs;
130
131 AH->PrepParallelRestorePtr = _PrepParallelRestore;
132 AH->ClonePtr = _Clone;
133 AH->DeClonePtr = _DeClone;
134
135 /* no parallel dump in the custom archive, only parallel restore */
136 AH->WorkerJobDumpPtr = NULL;
137 AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
138
139 /* Set up a private area. */
140 ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
141 AH->formatData = (void *) ctx;
142
143 /* Initialize LO buffering */
144 AH->lo_buf_size = LOBBUFSIZE;
145 AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
146
147 /*
148 * Now open the file
149 */
150 if (AH->mode == archModeWrite)
151 {
152 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
153 {
154 AH->FH = fopen(AH->fSpec, PG_BINARY_W);
155 if (!AH->FH)
156 fatal("could not open output file \"%s\": %m", AH->fSpec);
157 }
158 else
159 {
160 AH->FH = stdout;
161 if (!AH->FH)
162 fatal("could not open output file: %m");
163 }
164
165 ctx->hasSeek = checkSeek(AH->FH);
166 }
167 else
168 {
169 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
170 {
171 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
172 if (!AH->FH)
173 fatal("could not open input file \"%s\": %m", AH->fSpec);
174 }
175 else
176 {
177 AH->FH = stdin;
178 if (!AH->FH)
179 fatal("could not open input file: %m");
180 }
181
182 ctx->hasSeek = checkSeek(AH->FH);
183
184 ReadHead(AH);
185 ReadToc(AH);
186
187 /*
188 * Remember location of first data block (i.e., the point after TOC)
189 * in case we have to search for desired data blocks.
190 */
191 ctx->lastFilePos = _getFilePos(AH, ctx);
192 }
193 }
194
195 /*
196 * Called by the Archiver when the dumper creates a new TOC entry.
197 *
198 * Optional.
199 *
200 * Set up extract format-related TOC data.
201 */
202 static void
_ArchiveEntry(ArchiveHandle * AH,TocEntry * te)203 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
204 {
205 lclTocEntry *ctx;
206
207 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
208 if (te->dataDumper)
209 ctx->dataState = K_OFFSET_POS_NOT_SET;
210 else
211 ctx->dataState = K_OFFSET_NO_DATA;
212
213 te->formatData = (void *) ctx;
214 }
215
216 /*
217 * Called by the Archiver to save any extra format-related TOC entry
218 * data.
219 *
220 * Optional.
221 *
222 * Use the Archiver routines to write data - they are non-endian, and
223 * maintain other important file information.
224 */
225 static void
_WriteExtraToc(ArchiveHandle * AH,TocEntry * te)226 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
227 {
228 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
229
230 WriteOffset(AH, ctx->dataPos, ctx->dataState);
231 }
232
233 /*
234 * Called by the Archiver to read any extra format-related TOC data.
235 *
236 * Optional.
237 *
238 * Needs to match the order defined in _WriteExtraToc, and should also
239 * use the Archiver input routines.
240 */
241 static void
_ReadExtraToc(ArchiveHandle * AH,TocEntry * te)242 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
243 {
244 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
245
246 if (ctx == NULL)
247 {
248 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
249 te->formatData = (void *) ctx;
250 }
251
252 ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
253
254 /*
255 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
256 * dump it at all.
257 */
258 if (AH->version < K_VERS_1_7)
259 ReadInt(AH);
260 }
261
262 /*
263 * Called by the Archiver when restoring an archive to output a comment
264 * that includes useful information about the TOC entry.
265 *
266 * Optional.
267 *
268 */
269 static void
_PrintExtraToc(ArchiveHandle * AH,TocEntry * te)270 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
271 {
272 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
273
274 if (AH->public.verbose)
275 ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
276 (int64) ctx->dataPos);
277 }
278
279 /*
280 * Called by the archiver when saving TABLE DATA (not schema). This routine
281 * should save whatever format-specific information is needed to read
282 * the archive back.
283 *
284 * It is called just prior to the dumper's 'DataDumper' routine being called.
285 *
286 * Optional, but strongly recommended.
287 *
288 */
289 static void
_StartData(ArchiveHandle * AH,TocEntry * te)290 _StartData(ArchiveHandle *AH, TocEntry *te)
291 {
292 lclContext *ctx = (lclContext *) AH->formatData;
293 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
294
295 tctx->dataPos = _getFilePos(AH, ctx);
296 if (tctx->dataPos >= 0)
297 tctx->dataState = K_OFFSET_POS_SET;
298
299 _WriteByte(AH, BLK_DATA); /* Block type */
300 WriteInt(AH, te->dumpId); /* For sanity check */
301
302 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
303 }
304
305 /*
306 * Called by archiver when dumper calls WriteData. This routine is
307 * called for both BLOB and TABLE data; it is the responsibility of
308 * the format to manage each kind of data using StartBlob/StartData.
309 *
310 * It should only be called from within a DataDumper routine.
311 *
312 * Mandatory.
313 */
314 static void
_WriteData(ArchiveHandle * AH,const void * data,size_t dLen)315 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
316 {
317 lclContext *ctx = (lclContext *) AH->formatData;
318 CompressorState *cs = ctx->cs;
319
320 if (dLen > 0)
321 /* WriteDataToArchive() internally throws write errors */
322 WriteDataToArchive(AH, cs, data, dLen);
323 }
324
325 /*
326 * Called by the archiver when a dumper's 'DataDumper' routine has
327 * finished.
328 *
329 * Optional.
330 *
331 */
332 static void
_EndData(ArchiveHandle * AH,TocEntry * te)333 _EndData(ArchiveHandle *AH, TocEntry *te)
334 {
335 lclContext *ctx = (lclContext *) AH->formatData;
336
337 EndCompressor(AH, ctx->cs);
338 /* Send the end marker */
339 WriteInt(AH, 0);
340 }
341
342 /*
343 * Called by the archiver when starting to save all BLOB DATA (not schema).
344 * This routine should save whatever format-specific information is needed
345 * to read the BLOBs back into memory.
346 *
347 * It is called just prior to the dumper's DataDumper routine.
348 *
349 * Optional, but strongly recommended.
350 */
351 static void
_StartBlobs(ArchiveHandle * AH,TocEntry * te)352 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
353 {
354 lclContext *ctx = (lclContext *) AH->formatData;
355 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
356
357 tctx->dataPos = _getFilePos(AH, ctx);
358 if (tctx->dataPos >= 0)
359 tctx->dataState = K_OFFSET_POS_SET;
360
361 _WriteByte(AH, BLK_BLOBS); /* Block type */
362 WriteInt(AH, te->dumpId); /* For sanity check */
363 }
364
365 /*
366 * Called by the archiver when the dumper calls StartBlob.
367 *
368 * Mandatory.
369 *
370 * Must save the passed OID for retrieval at restore-time.
371 */
372 static void
_StartBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)373 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
374 {
375 lclContext *ctx = (lclContext *) AH->formatData;
376
377 if (oid == 0)
378 fatal("invalid OID for large object");
379
380 WriteInt(AH, oid);
381
382 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
383 }
384
385 /*
386 * Called by the archiver when the dumper calls EndBlob.
387 *
388 * Optional.
389 */
390 static void
_EndBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)391 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
392 {
393 lclContext *ctx = (lclContext *) AH->formatData;
394
395 EndCompressor(AH, ctx->cs);
396 /* Send the end marker */
397 WriteInt(AH, 0);
398 }
399
400 /*
401 * Called by the archiver when finishing saving all BLOB DATA.
402 *
403 * Optional.
404 */
405 static void
_EndBlobs(ArchiveHandle * AH,TocEntry * te)406 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
407 {
408 /* Write out a fake zero OID to mark end-of-blobs. */
409 WriteInt(AH, 0);
410 }
411
412 /*
413 * Print data for a given TOC entry
414 */
415 static void
_PrintTocData(ArchiveHandle * AH,TocEntry * te)416 _PrintTocData(ArchiveHandle *AH, TocEntry *te)
417 {
418 lclContext *ctx = (lclContext *) AH->formatData;
419 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
420 int blkType;
421 int id;
422
423 if (tctx->dataState == K_OFFSET_NO_DATA)
424 return;
425
426 if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
427 {
428 /*
429 * We cannot seek directly to the desired block. Instead, skip over
430 * block headers until we find the one we want. Remember the
431 * positions of skipped-over blocks, so that if we later decide we
432 * need to read one, we'll be able to seek to it.
433 *
434 * When our input file is seekable, we can do the search starting from
435 * the point after the last data block we scanned in previous
436 * iterations of this function.
437 */
438 if (ctx->hasSeek)
439 {
440 if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
441 fatal("error during file seek: %m");
442 }
443
444 for (;;)
445 {
446 pgoff_t thisBlkPos = _getFilePos(AH, ctx);
447
448 _readBlockHeader(AH, &blkType, &id);
449
450 if (blkType == EOF || id == te->dumpId)
451 break;
452
453 /* Remember the block position, if we got one */
454 if (thisBlkPos >= 0)
455 {
456 TocEntry *otherte = getTocEntryByDumpId(AH, id);
457
458 if (otherte && otherte->formatData)
459 {
460 lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
461
462 /*
463 * Note: on Windows, multiple threads might access/update
464 * the same lclTocEntry concurrently, but that should be
465 * safe as long as we update dataPos before dataState.
466 * Ideally, we'd use pg_write_barrier() to enforce that,
467 * but the needed infrastructure doesn't exist in frontend
468 * code. But Windows only runs on machines with strong
469 * store ordering, so it should be okay for now.
470 */
471 if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
472 {
473 othertctx->dataPos = thisBlkPos;
474 othertctx->dataState = K_OFFSET_POS_SET;
475 }
476 else if (othertctx->dataPos != thisBlkPos ||
477 othertctx->dataState != K_OFFSET_POS_SET)
478 {
479 /* sanity check */
480 pg_log_warning("data block %d has wrong seek position",
481 id);
482 }
483 }
484 }
485
486 switch (blkType)
487 {
488 case BLK_DATA:
489 _skipData(AH);
490 break;
491
492 case BLK_BLOBS:
493 _skipBlobs(AH);
494 break;
495
496 default: /* Always have a default */
497 fatal("unrecognized data block type (%d) while searching archive",
498 blkType);
499 break;
500 }
501 }
502 }
503 else
504 {
505 /* We can just seek to the place we need to be. */
506 if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
507 fatal("error during file seek: %m");
508
509 _readBlockHeader(AH, &blkType, &id);
510 }
511
512 /*
513 * If we reached EOF without finding the block we want, then either it
514 * doesn't exist, or it does but we lack the ability to seek back to it.
515 */
516 if (blkType == EOF)
517 {
518 if (!ctx->hasSeek)
519 fatal("could not find block ID %d in archive -- "
520 "possibly due to out-of-order restore request, "
521 "which cannot be handled due to non-seekable input file",
522 te->dumpId);
523 else
524 fatal("could not find block ID %d in archive -- "
525 "possibly corrupt archive",
526 te->dumpId);
527 }
528
529 /* Are we sane? */
530 if (id != te->dumpId)
531 fatal("found unexpected block ID (%d) when reading data -- expected %d",
532 id, te->dumpId);
533
534 switch (blkType)
535 {
536 case BLK_DATA:
537 _PrintData(AH);
538 break;
539
540 case BLK_BLOBS:
541 _LoadBlobs(AH, AH->public.ropt->dropSchema);
542 break;
543
544 default: /* Always have a default */
545 fatal("unrecognized data block type %d while restoring archive",
546 blkType);
547 break;
548 }
549
550 /*
551 * If our input file is seekable but lacks data offsets, update our
552 * knowledge of where to start future searches from. (Note that we did
553 * not update the current TE's dataState/dataPos. We could have, but
554 * there is no point since it will not be visited again.)
555 */
556 if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
557 {
558 pgoff_t curPos = _getFilePos(AH, ctx);
559
560 if (curPos > ctx->lastFilePos)
561 ctx->lastFilePos = curPos;
562 }
563 }
564
565 /*
566 * Print data from current file position.
567 */
568 static void
_PrintData(ArchiveHandle * AH)569 _PrintData(ArchiveHandle *AH)
570 {
571 ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
572 }
573
574 static void
_LoadBlobs(ArchiveHandle * AH,bool drop)575 _LoadBlobs(ArchiveHandle *AH, bool drop)
576 {
577 Oid oid;
578
579 StartRestoreBlobs(AH);
580
581 oid = ReadInt(AH);
582 while (oid != 0)
583 {
584 StartRestoreBlob(AH, oid, drop);
585 _PrintData(AH);
586 EndRestoreBlob(AH, oid);
587 oid = ReadInt(AH);
588 }
589
590 EndRestoreBlobs(AH);
591 }
592
593 /*
594 * Skip the BLOBs from the current file position.
595 * BLOBS are written sequentially as data blocks (see below).
596 * Each BLOB is preceded by it's original OID.
597 * A zero OID indicated the end of the BLOBS
598 */
599 static void
_skipBlobs(ArchiveHandle * AH)600 _skipBlobs(ArchiveHandle *AH)
601 {
602 Oid oid;
603
604 oid = ReadInt(AH);
605 while (oid != 0)
606 {
607 _skipData(AH);
608 oid = ReadInt(AH);
609 }
610 }
611
612 /*
613 * Skip data from current file position.
614 * Data blocks are formatted as an integer length, followed by data.
615 * A zero length denoted the end of the block.
616 */
617 static void
_skipData(ArchiveHandle * AH)618 _skipData(ArchiveHandle *AH)
619 {
620 lclContext *ctx = (lclContext *) AH->formatData;
621 size_t blkLen;
622 char *buf = NULL;
623 int buflen = 0;
624 size_t cnt;
625
626 blkLen = ReadInt(AH);
627 while (blkLen != 0)
628 {
629 if (ctx->hasSeek)
630 {
631 if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
632 fatal("error during file seek: %m");
633 }
634 else
635 {
636 if (blkLen > buflen)
637 {
638 if (buf)
639 free(buf);
640 buf = (char *) pg_malloc(blkLen);
641 buflen = blkLen;
642 }
643 if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
644 {
645 if (feof(AH->FH))
646 fatal("could not read from input file: end of file");
647 else
648 fatal("could not read from input file: %m");
649 }
650 }
651
652 blkLen = ReadInt(AH);
653 }
654
655 if (buf)
656 free(buf);
657 }
658
659 /*
660 * Write a byte of data to the archive.
661 *
662 * Mandatory.
663 *
664 * Called by the archiver to do integer & byte output to the archive.
665 */
666 static int
_WriteByte(ArchiveHandle * AH,const int i)667 _WriteByte(ArchiveHandle *AH, const int i)
668 {
669 int res;
670
671 if ((res = fputc(i, AH->FH)) == EOF)
672 WRITE_ERROR_EXIT;
673
674 return 1;
675 }
676
677 /*
678 * Read a byte of data from the archive.
679 *
680 * Mandatory
681 *
682 * Called by the archiver to read bytes & integers from the archive.
683 * EOF should be treated as a fatal error.
684 */
685 static int
_ReadByte(ArchiveHandle * AH)686 _ReadByte(ArchiveHandle *AH)
687 {
688 int res;
689
690 res = getc(AH->FH);
691 if (res == EOF)
692 READ_ERROR_EXIT(AH->FH);
693 return res;
694 }
695
696 /*
697 * Write a buffer of data to the archive.
698 *
699 * Mandatory.
700 *
701 * Called by the archiver to write a block of bytes to the archive.
702 */
703 static void
_WriteBuf(ArchiveHandle * AH,const void * buf,size_t len)704 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
705 {
706 if (fwrite(buf, 1, len, AH->FH) != len)
707 WRITE_ERROR_EXIT;
708 }
709
710 /*
711 * Read a block of bytes from the archive.
712 *
713 * Mandatory.
714 *
715 * Called by the archiver to read a block of bytes from the archive
716 */
717 static void
_ReadBuf(ArchiveHandle * AH,void * buf,size_t len)718 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
719 {
720 if (fread(buf, 1, len, AH->FH) != len)
721 READ_ERROR_EXIT(AH->FH);
722 }
723
724 /*
725 * Close the archive.
726 *
727 * Mandatory.
728 *
729 * When writing the archive, this is the routine that actually starts
730 * the process of saving it to files. No data should be written prior
731 * to this point, since the user could sort the TOC after creating it.
732 *
733 * If an archive is to be written, this routine must call:
734 * WriteHead to save the archive header
735 * WriteToc to save the TOC entries
736 * WriteDataChunks to save all DATA & BLOBs.
737 *
738 */
739 static void
_CloseArchive(ArchiveHandle * AH)740 _CloseArchive(ArchiveHandle *AH)
741 {
742 lclContext *ctx = (lclContext *) AH->formatData;
743 pgoff_t tpos;
744
745 if (AH->mode == archModeWrite)
746 {
747 WriteHead(AH);
748 /* Remember TOC's seek position for use below */
749 tpos = ftello(AH->FH);
750 if (tpos < 0 && ctx->hasSeek)
751 fatal("could not determine seek position in archive file: %m");
752 WriteToc(AH);
753 WriteDataChunks(AH, NULL);
754
755 /*
756 * If possible, re-write the TOC in order to update the data offset
757 * information. This is not essential, as pg_restore can cope in most
758 * cases without it; but it can make pg_restore significantly faster
759 * in some situations (especially parallel restore).
760 */
761 if (ctx->hasSeek &&
762 fseeko(AH->FH, tpos, SEEK_SET) == 0)
763 WriteToc(AH);
764 }
765
766 if (fclose(AH->FH) != 0)
767 fatal("could not close archive file: %m");
768
769 /* Sync the output file if one is defined */
770 if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
771 (void) fsync_fname(AH->fSpec, false);
772
773 AH->FH = NULL;
774 }
775
776 /*
777 * Reopen the archive's file handle.
778 *
779 * We close the original file handle, except on Windows. (The difference
780 * is because on Windows, this is used within a multithreading context,
781 * and we don't want a thread closing the parent file handle.)
782 */
783 static void
_ReopenArchive(ArchiveHandle * AH)784 _ReopenArchive(ArchiveHandle *AH)
785 {
786 lclContext *ctx = (lclContext *) AH->formatData;
787 pgoff_t tpos;
788
789 if (AH->mode == archModeWrite)
790 fatal("can only reopen input archives");
791
792 /*
793 * These two cases are user-facing errors since they represent unsupported
794 * (but not invalid) use-cases. Word the error messages appropriately.
795 */
796 if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
797 fatal("parallel restore from standard input is not supported");
798 if (!ctx->hasSeek)
799 fatal("parallel restore from non-seekable file is not supported");
800
801 tpos = ftello(AH->FH);
802 if (tpos < 0)
803 fatal("could not determine seek position in archive file: %m");
804
805 #ifndef WIN32
806 if (fclose(AH->FH) != 0)
807 fatal("could not close archive file: %m");
808 #endif
809
810 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
811 if (!AH->FH)
812 fatal("could not open input file \"%s\": %m", AH->fSpec);
813
814 if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
815 fatal("could not set seek position in archive file: %m");
816 }
817
818 /*
819 * Prepare for parallel restore.
820 *
821 * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
822 * TOC entries' dataLength fields with appropriate values to guide the
823 * ordering of restore jobs. The source of said data is format-dependent,
824 * as is the exact meaning of the values.
825 *
826 * A format module might also choose to do other setup here.
827 */
828 static void
_PrepParallelRestore(ArchiveHandle * AH)829 _PrepParallelRestore(ArchiveHandle *AH)
830 {
831 lclContext *ctx = (lclContext *) AH->formatData;
832 TocEntry *prev_te = NULL;
833 lclTocEntry *prev_tctx = NULL;
834 TocEntry *te;
835
836 /*
837 * Knowing that the data items were dumped out in TOC order, we can
838 * reconstruct the length of each item as the delta to the start offset of
839 * the next data item.
840 */
841 for (te = AH->toc->next; te != AH->toc; te = te->next)
842 {
843 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
844
845 /*
846 * Ignore entries without a known data offset; if we were unable to
847 * seek to rewrite the TOC when creating the archive, this'll be all
848 * of them, and we'll end up with no size estimates.
849 */
850 if (tctx->dataState != K_OFFSET_POS_SET)
851 continue;
852
853 /* Compute previous data item's length */
854 if (prev_te)
855 {
856 if (tctx->dataPos > prev_tctx->dataPos)
857 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
858 }
859
860 prev_te = te;
861 prev_tctx = tctx;
862 }
863
864 /* If OK to seek, we can determine the length of the last item */
865 if (prev_te && ctx->hasSeek)
866 {
867 pgoff_t endpos;
868
869 if (fseeko(AH->FH, 0, SEEK_END) != 0)
870 fatal("error during file seek: %m");
871 endpos = ftello(AH->FH);
872 if (endpos > prev_tctx->dataPos)
873 prev_te->dataLength = endpos - prev_tctx->dataPos;
874 }
875 }
876
877 /*
878 * Clone format-specific fields during parallel restoration.
879 */
880 static void
_Clone(ArchiveHandle * AH)881 _Clone(ArchiveHandle *AH)
882 {
883 lclContext *ctx = (lclContext *) AH->formatData;
884
885 /*
886 * Each thread must have private lclContext working state.
887 */
888 AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
889 memcpy(AH->formatData, ctx, sizeof(lclContext));
890 ctx = (lclContext *) AH->formatData;
891
892 /* sanity check, shouldn't happen */
893 if (ctx->cs != NULL)
894 fatal("compressor active");
895
896 /*
897 * We intentionally do not clone TOC-entry-local state: it's useful to
898 * share knowledge about where the data blocks are across threads.
899 * _PrintTocData has to be careful about the order of operations on that
900 * state, though.
901 *
902 * Note: we do not make a local lo_buf because we expect at most one BLOBS
903 * entry per archive, so no parallelism is possible.
904 */
905 }
906
907 static void
_DeClone(ArchiveHandle * AH)908 _DeClone(ArchiveHandle *AH)
909 {
910 lclContext *ctx = (lclContext *) AH->formatData;
911
912 free(ctx);
913 }
914
915 /*
916 * This function is executed in the child of a parallel restore from a
917 * custom-format archive and restores the actual data for one TOC entry.
918 */
919 static int
_WorkerJobRestoreCustom(ArchiveHandle * AH,TocEntry * te)920 _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
921 {
922 return parallel_restore(AH, te);
923 }
924
925 /*--------------------------------------------------
926 * END OF FORMAT CALLBACKS
927 *--------------------------------------------------
928 */
929
930 /*
931 * Get the current position in the archive file.
932 *
933 * With a non-seekable archive file, we may not be able to obtain the
934 * file position. If so, just return -1. It's not too important in
935 * that case because we won't be able to rewrite the TOC to fill in
936 * data block offsets anyway.
937 */
938 static pgoff_t
_getFilePos(ArchiveHandle * AH,lclContext * ctx)939 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
940 {
941 pgoff_t pos;
942
943 pos = ftello(AH->FH);
944 if (pos < 0)
945 {
946 /* Not expected if we found we can seek. */
947 if (ctx->hasSeek)
948 fatal("could not determine seek position in archive file: %m");
949 }
950 return pos;
951 }
952
953 /*
954 * Read a data block header. The format changed in V1.3, so we
955 * centralize the code here for simplicity. Returns *type = EOF
956 * if at EOF.
957 */
958 static void
_readBlockHeader(ArchiveHandle * AH,int * type,int * id)959 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
960 {
961 int byt;
962
963 /*
964 * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
965 * ReadInt rather than returning EOF. It doesn't seem worth jumping
966 * through hoops to deal with that case better, because no such files are
967 * likely to exist in the wild: only some 7.1 development versions of
968 * pg_dump ever generated such files.
969 */
970 if (AH->version < K_VERS_1_3)
971 *type = BLK_DATA;
972 else
973 {
974 byt = getc(AH->FH);
975 *type = byt;
976 if (byt == EOF)
977 {
978 *id = 0; /* don't return an uninitialized value */
979 return;
980 }
981 }
982
983 *id = ReadInt(AH);
984 }
985
986 /*
987 * Callback function for WriteDataToArchive. Writes one block of (compressed)
988 * data to the archive.
989 */
990 static void
_CustomWriteFunc(ArchiveHandle * AH,const char * buf,size_t len)991 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
992 {
993 /* never write 0-byte blocks (this should not happen) */
994 if (len > 0)
995 {
996 WriteInt(AH, len);
997 _WriteBuf(AH, buf, len);
998 }
999 }
1000
1001 /*
1002 * Callback function for ReadDataFromArchive. To keep things simple, we
1003 * always read one compressed block at a time.
1004 */
1005 static size_t
_CustomReadFunc(ArchiveHandle * AH,char ** buf,size_t * buflen)1006 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1007 {
1008 size_t blkLen;
1009
1010 /* Read length */
1011 blkLen = ReadInt(AH);
1012 if (blkLen == 0)
1013 return 0;
1014
1015 /* If the caller's buffer is not large enough, allocate a bigger one */
1016 if (blkLen > *buflen)
1017 {
1018 free(*buf);
1019 *buf = (char *) pg_malloc(blkLen);
1020 *buflen = blkLen;
1021 }
1022
1023 /* exits app on read errors */
1024 _ReadBuf(AH, *buf, blkLen);
1025
1026 return blkLen;
1027 }
1028