1 /*-------------------------------------------------------------------------
2 *
3 * pg_backup_custom.c
4 *
5 * Implements the custom output format.
6 *
7 * The comments with the routines in this code are a good place to
8 * understand how to write a new format.
9 *
10 * See the headers to pg_restore for more details.
11 *
12 * Copyright (c) 2000, Philip Warner
13 * Rights are granted to use this software in any way so long
14 * as this notice is not removed.
15 *
16 * The author is not responsible for loss or damages that may
17 * and any liability will be limited to the time taken to fix any
18 * related bug.
19 *
20 *
21 * IDENTIFICATION
22 * src/bin/pg_dump/pg_backup_custom.c
23 *
24 *-------------------------------------------------------------------------
25 */
26 #include "postgres_fe.h"
27
28 #include "compress_io.h"
29 #include "parallel.h"
30 #include "pg_backup_utils.h"
31 #include "common/file_utils.h"
32
33
34 /*--------
35 * Routines in the format interface
36 *--------
37 */
38
39 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
40 static void _StartData(ArchiveHandle *AH, TocEntry *te);
41 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
42 static void _EndData(ArchiveHandle *AH, TocEntry *te);
43 static int _WriteByte(ArchiveHandle *AH, const int i);
44 static int _ReadByte(ArchiveHandle *);
45 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
46 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
47 static void _CloseArchive(ArchiveHandle *AH);
48 static void _ReopenArchive(ArchiveHandle *AH);
49 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
50 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
52 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
53
54 static void _PrintData(ArchiveHandle *AH);
55 static void _skipData(ArchiveHandle *AH);
56 static void _skipBlobs(ArchiveHandle *AH);
57
58 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
59 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
61 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
62 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
63
64 static void _PrepParallelRestore(ArchiveHandle *AH);
65 static void _Clone(ArchiveHandle *AH);
66 static void _DeClone(ArchiveHandle *AH);
67
68 static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
69
70 typedef struct
71 {
72 CompressorState *cs;
73 int hasSeek;
74 /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
75 pgoff_t lastFilePos; /* position after last data block we've read */
76 } lclContext;
77
78 typedef struct
79 {
80 int dataState;
81 pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
82 } lclTocEntry;
83
84
85 /*------
86 * Static declarations
87 *------
88 */
89 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
90 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
91
92 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
93 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
94
95
96 /*
97 * Init routine required by ALL formats. This is a global routine
98 * and should be declared in pg_backup_archiver.h
99 *
100 * It's task is to create any extra archive context (using AH->formatData),
101 * and to initialize the supported function pointers.
102 *
103 * It should also prepare whatever it's input source is for reading/writing,
104 * and in the case of a read mode connection, it should load the Header & TOC.
105 */
106 void
InitArchiveFmt_Custom(ArchiveHandle * AH)107 InitArchiveFmt_Custom(ArchiveHandle *AH)
108 {
109 lclContext *ctx;
110
111 /* Assuming static functions, this can be copied for each format. */
112 AH->ArchiveEntryPtr = _ArchiveEntry;
113 AH->StartDataPtr = _StartData;
114 AH->WriteDataPtr = _WriteData;
115 AH->EndDataPtr = _EndData;
116 AH->WriteBytePtr = _WriteByte;
117 AH->ReadBytePtr = _ReadByte;
118 AH->WriteBufPtr = _WriteBuf;
119 AH->ReadBufPtr = _ReadBuf;
120 AH->ClosePtr = _CloseArchive;
121 AH->ReopenPtr = _ReopenArchive;
122 AH->PrintTocDataPtr = _PrintTocData;
123 AH->ReadExtraTocPtr = _ReadExtraToc;
124 AH->WriteExtraTocPtr = _WriteExtraToc;
125 AH->PrintExtraTocPtr = _PrintExtraToc;
126
127 AH->StartBlobsPtr = _StartBlobs;
128 AH->StartBlobPtr = _StartBlob;
129 AH->EndBlobPtr = _EndBlob;
130 AH->EndBlobsPtr = _EndBlobs;
131
132 AH->PrepParallelRestorePtr = _PrepParallelRestore;
133 AH->ClonePtr = _Clone;
134 AH->DeClonePtr = _DeClone;
135
136 /* no parallel dump in the custom archive, only parallel restore */
137 AH->WorkerJobDumpPtr = NULL;
138 AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
139
140 /* Set up a private area. */
141 ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
142 AH->formatData = (void *) ctx;
143
144 /* Initialize LO buffering */
145 AH->lo_buf_size = LOBBUFSIZE;
146 AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
147
148 /*
149 * Now open the file
150 */
151 if (AH->mode == archModeWrite)
152 {
153 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
154 {
155 AH->FH = fopen(AH->fSpec, PG_BINARY_W);
156 if (!AH->FH)
157 fatal("could not open output file \"%s\": %m", AH->fSpec);
158 }
159 else
160 {
161 AH->FH = stdout;
162 if (!AH->FH)
163 fatal("could not open output file: %m");
164 }
165
166 ctx->hasSeek = checkSeek(AH->FH);
167 }
168 else
169 {
170 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
171 {
172 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
173 if (!AH->FH)
174 fatal("could not open input file \"%s\": %m", AH->fSpec);
175 }
176 else
177 {
178 AH->FH = stdin;
179 if (!AH->FH)
180 fatal("could not open input file: %m");
181 }
182
183 ctx->hasSeek = checkSeek(AH->FH);
184
185 ReadHead(AH);
186 ReadToc(AH);
187
188 /*
189 * Remember location of first data block (i.e., the point after TOC)
190 * in case we have to search for desired data blocks.
191 */
192 ctx->lastFilePos = _getFilePos(AH, ctx);
193 }
194 }
195
196 /*
197 * Called by the Archiver when the dumper creates a new TOC entry.
198 *
199 * Optional.
200 *
201 * Set up extract format-related TOC data.
202 */
203 static void
_ArchiveEntry(ArchiveHandle * AH,TocEntry * te)204 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
205 {
206 lclTocEntry *ctx;
207
208 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
209 if (te->dataDumper)
210 ctx->dataState = K_OFFSET_POS_NOT_SET;
211 else
212 ctx->dataState = K_OFFSET_NO_DATA;
213
214 te->formatData = (void *) ctx;
215 }
216
217 /*
218 * Called by the Archiver to save any extra format-related TOC entry
219 * data.
220 *
221 * Optional.
222 *
223 * Use the Archiver routines to write data - they are non-endian, and
224 * maintain other important file information.
225 */
226 static void
_WriteExtraToc(ArchiveHandle * AH,TocEntry * te)227 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
228 {
229 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
230
231 WriteOffset(AH, ctx->dataPos, ctx->dataState);
232 }
233
234 /*
235 * Called by the Archiver to read any extra format-related TOC data.
236 *
237 * Optional.
238 *
239 * Needs to match the order defined in _WriteExtraToc, and should also
240 * use the Archiver input routines.
241 */
242 static void
_ReadExtraToc(ArchiveHandle * AH,TocEntry * te)243 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
244 {
245 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
246
247 if (ctx == NULL)
248 {
249 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
250 te->formatData = (void *) ctx;
251 }
252
253 ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
254
255 /*
256 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
257 * dump it at all.
258 */
259 if (AH->version < K_VERS_1_7)
260 ReadInt(AH);
261 }
262
263 /*
264 * Called by the Archiver when restoring an archive to output a comment
265 * that includes useful information about the TOC entry.
266 *
267 * Optional.
268 *
269 */
270 static void
_PrintExtraToc(ArchiveHandle * AH,TocEntry * te)271 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
272 {
273 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
274
275 if (AH->public.verbose)
276 ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
277 (int64) ctx->dataPos);
278 }
279
280 /*
281 * Called by the archiver when saving TABLE DATA (not schema). This routine
282 * should save whatever format-specific information is needed to read
283 * the archive back.
284 *
285 * It is called just prior to the dumper's 'DataDumper' routine being called.
286 *
287 * Optional, but strongly recommended.
288 *
289 */
290 static void
_StartData(ArchiveHandle * AH,TocEntry * te)291 _StartData(ArchiveHandle *AH, TocEntry *te)
292 {
293 lclContext *ctx = (lclContext *) AH->formatData;
294 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
295
296 tctx->dataPos = _getFilePos(AH, ctx);
297 if (tctx->dataPos >= 0)
298 tctx->dataState = K_OFFSET_POS_SET;
299
300 _WriteByte(AH, BLK_DATA); /* Block type */
301 WriteInt(AH, te->dumpId); /* For sanity check */
302
303 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
304 }
305
306 /*
307 * Called by archiver when dumper calls WriteData. This routine is
308 * called for both BLOB and TABLE data; it is the responsibility of
309 * the format to manage each kind of data using StartBlob/StartData.
310 *
311 * It should only be called from within a DataDumper routine.
312 *
313 * Mandatory.
314 */
315 static void
_WriteData(ArchiveHandle * AH,const void * data,size_t dLen)316 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
317 {
318 lclContext *ctx = (lclContext *) AH->formatData;
319 CompressorState *cs = ctx->cs;
320
321 if (dLen > 0)
322 /* WriteDataToArchive() internally throws write errors */
323 WriteDataToArchive(AH, cs, data, dLen);
324
325 return;
326 }
327
328 /*
329 * Called by the archiver when a dumper's 'DataDumper' routine has
330 * finished.
331 *
332 * Optional.
333 *
334 */
335 static void
_EndData(ArchiveHandle * AH,TocEntry * te)336 _EndData(ArchiveHandle *AH, TocEntry *te)
337 {
338 lclContext *ctx = (lclContext *) AH->formatData;
339
340 EndCompressor(AH, ctx->cs);
341 /* Send the end marker */
342 WriteInt(AH, 0);
343 }
344
345 /*
346 * Called by the archiver when starting to save all BLOB DATA (not schema).
347 * This routine should save whatever format-specific information is needed
348 * to read the BLOBs back into memory.
349 *
350 * It is called just prior to the dumper's DataDumper routine.
351 *
352 * Optional, but strongly recommended.
353 */
354 static void
_StartBlobs(ArchiveHandle * AH,TocEntry * te)355 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
356 {
357 lclContext *ctx = (lclContext *) AH->formatData;
358 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
359
360 tctx->dataPos = _getFilePos(AH, ctx);
361 if (tctx->dataPos >= 0)
362 tctx->dataState = K_OFFSET_POS_SET;
363
364 _WriteByte(AH, BLK_BLOBS); /* Block type */
365 WriteInt(AH, te->dumpId); /* For sanity check */
366 }
367
368 /*
369 * Called by the archiver when the dumper calls StartBlob.
370 *
371 * Mandatory.
372 *
373 * Must save the passed OID for retrieval at restore-time.
374 */
375 static void
_StartBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)376 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
377 {
378 lclContext *ctx = (lclContext *) AH->formatData;
379
380 if (oid == 0)
381 fatal("invalid OID for large object");
382
383 WriteInt(AH, oid);
384
385 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
386 }
387
388 /*
389 * Called by the archiver when the dumper calls EndBlob.
390 *
391 * Optional.
392 */
393 static void
_EndBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)394 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
395 {
396 lclContext *ctx = (lclContext *) AH->formatData;
397
398 EndCompressor(AH, ctx->cs);
399 /* Send the end marker */
400 WriteInt(AH, 0);
401 }
402
403 /*
404 * Called by the archiver when finishing saving all BLOB DATA.
405 *
406 * Optional.
407 */
408 static void
_EndBlobs(ArchiveHandle * AH,TocEntry * te)409 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
410 {
411 /* Write out a fake zero OID to mark end-of-blobs. */
412 WriteInt(AH, 0);
413 }
414
415 /*
416 * Print data for a given TOC entry
417 */
418 static void
_PrintTocData(ArchiveHandle * AH,TocEntry * te)419 _PrintTocData(ArchiveHandle *AH, TocEntry *te)
420 {
421 lclContext *ctx = (lclContext *) AH->formatData;
422 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
423 int blkType;
424 int id;
425
426 if (tctx->dataState == K_OFFSET_NO_DATA)
427 return;
428
429 if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
430 {
431 /*
432 * We cannot seek directly to the desired block. Instead, skip over
433 * block headers until we find the one we want. Remember the
434 * positions of skipped-over blocks, so that if we later decide we
435 * need to read one, we'll be able to seek to it.
436 *
437 * When our input file is seekable, we can do the search starting from
438 * the point after the last data block we scanned in previous
439 * iterations of this function.
440 */
441 if (ctx->hasSeek)
442 {
443 if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
444 fatal("error during file seek: %m");
445 }
446
447 for (;;)
448 {
449 pgoff_t thisBlkPos = _getFilePos(AH, ctx);
450
451 _readBlockHeader(AH, &blkType, &id);
452
453 if (blkType == EOF || id == te->dumpId)
454 break;
455
456 /* Remember the block position, if we got one */
457 if (thisBlkPos >= 0)
458 {
459 TocEntry *otherte = getTocEntryByDumpId(AH, id);
460
461 if (otherte && otherte->formatData)
462 {
463 lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
464
465 /*
466 * Note: on Windows, multiple threads might access/update
467 * the same lclTocEntry concurrently, but that should be
468 * safe as long as we update dataPos before dataState.
469 * Ideally, we'd use pg_write_barrier() to enforce that,
470 * but the needed infrastructure doesn't exist in frontend
471 * code. But Windows only runs on machines with strong
472 * store ordering, so it should be okay for now.
473 */
474 if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
475 {
476 othertctx->dataPos = thisBlkPos;
477 othertctx->dataState = K_OFFSET_POS_SET;
478 }
479 else if (othertctx->dataPos != thisBlkPos ||
480 othertctx->dataState != K_OFFSET_POS_SET)
481 {
482 /* sanity check */
483 pg_log_warning("data block %d has wrong seek position",
484 id);
485 }
486 }
487 }
488
489 switch (blkType)
490 {
491 case BLK_DATA:
492 _skipData(AH);
493 break;
494
495 case BLK_BLOBS:
496 _skipBlobs(AH);
497 break;
498
499 default: /* Always have a default */
500 fatal("unrecognized data block type (%d) while searching archive",
501 blkType);
502 break;
503 }
504 }
505 }
506 else
507 {
508 /* We can just seek to the place we need to be. */
509 if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
510 fatal("error during file seek: %m");
511
512 _readBlockHeader(AH, &blkType, &id);
513 }
514
515 /*
516 * If we reached EOF without finding the block we want, then either it
517 * doesn't exist, or it does but we lack the ability to seek back to it.
518 */
519 if (blkType == EOF)
520 {
521 if (!ctx->hasSeek)
522 fatal("could not find block ID %d in archive -- "
523 "possibly due to out-of-order restore request, "
524 "which cannot be handled due to non-seekable input file",
525 te->dumpId);
526 else
527 fatal("could not find block ID %d in archive -- "
528 "possibly corrupt archive",
529 te->dumpId);
530 }
531
532 /* Are we sane? */
533 if (id != te->dumpId)
534 fatal("found unexpected block ID (%d) when reading data -- expected %d",
535 id, te->dumpId);
536
537 switch (blkType)
538 {
539 case BLK_DATA:
540 _PrintData(AH);
541 break;
542
543 case BLK_BLOBS:
544 _LoadBlobs(AH, AH->public.ropt->dropSchema);
545 break;
546
547 default: /* Always have a default */
548 fatal("unrecognized data block type %d while restoring archive",
549 blkType);
550 break;
551 }
552
553 /*
554 * If our input file is seekable but lacks data offsets, update our
555 * knowledge of where to start future searches from. (Note that we did
556 * not update the current TE's dataState/dataPos. We could have, but
557 * there is no point since it will not be visited again.)
558 */
559 if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
560 {
561 pgoff_t curPos = _getFilePos(AH, ctx);
562
563 if (curPos > ctx->lastFilePos)
564 ctx->lastFilePos = curPos;
565 }
566 }
567
568 /*
569 * Print data from current file position.
570 */
571 static void
_PrintData(ArchiveHandle * AH)572 _PrintData(ArchiveHandle *AH)
573 {
574 ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
575 }
576
577 static void
_LoadBlobs(ArchiveHandle * AH,bool drop)578 _LoadBlobs(ArchiveHandle *AH, bool drop)
579 {
580 Oid oid;
581
582 StartRestoreBlobs(AH);
583
584 oid = ReadInt(AH);
585 while (oid != 0)
586 {
587 StartRestoreBlob(AH, oid, drop);
588 _PrintData(AH);
589 EndRestoreBlob(AH, oid);
590 oid = ReadInt(AH);
591 }
592
593 EndRestoreBlobs(AH);
594 }
595
596 /*
597 * Skip the BLOBs from the current file position.
598 * BLOBS are written sequentially as data blocks (see below).
599 * Each BLOB is preceded by it's original OID.
600 * A zero OID indicated the end of the BLOBS
601 */
602 static void
_skipBlobs(ArchiveHandle * AH)603 _skipBlobs(ArchiveHandle *AH)
604 {
605 Oid oid;
606
607 oid = ReadInt(AH);
608 while (oid != 0)
609 {
610 _skipData(AH);
611 oid = ReadInt(AH);
612 }
613 }
614
615 /*
616 * Skip data from current file position.
617 * Data blocks are formatted as an integer length, followed by data.
618 * A zero length denoted the end of the block.
619 */
620 static void
_skipData(ArchiveHandle * AH)621 _skipData(ArchiveHandle *AH)
622 {
623 lclContext *ctx = (lclContext *) AH->formatData;
624 size_t blkLen;
625 char *buf = NULL;
626 int buflen = 0;
627 size_t cnt;
628
629 blkLen = ReadInt(AH);
630 while (blkLen != 0)
631 {
632 if (ctx->hasSeek)
633 {
634 if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
635 fatal("error during file seek: %m");
636 }
637 else
638 {
639 if (blkLen > buflen)
640 {
641 if (buf)
642 free(buf);
643 buf = (char *) pg_malloc(blkLen);
644 buflen = blkLen;
645 }
646 if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
647 {
648 if (feof(AH->FH))
649 fatal("could not read from input file: end of file");
650 else
651 fatal("could not read from input file: %m");
652 }
653 }
654
655 blkLen = ReadInt(AH);
656 }
657
658 if (buf)
659 free(buf);
660 }
661
662 /*
663 * Write a byte of data to the archive.
664 *
665 * Mandatory.
666 *
667 * Called by the archiver to do integer & byte output to the archive.
668 */
669 static int
_WriteByte(ArchiveHandle * AH,const int i)670 _WriteByte(ArchiveHandle *AH, const int i)
671 {
672 int res;
673
674 if ((res = fputc(i, AH->FH)) == EOF)
675 WRITE_ERROR_EXIT;
676
677 return 1;
678 }
679
680 /*
681 * Read a byte of data from the archive.
682 *
683 * Mandatory
684 *
685 * Called by the archiver to read bytes & integers from the archive.
686 * EOF should be treated as a fatal error.
687 */
688 static int
_ReadByte(ArchiveHandle * AH)689 _ReadByte(ArchiveHandle *AH)
690 {
691 int res;
692
693 res = getc(AH->FH);
694 if (res == EOF)
695 READ_ERROR_EXIT(AH->FH);
696 return res;
697 }
698
699 /*
700 * Write a buffer of data to the archive.
701 *
702 * Mandatory.
703 *
704 * Called by the archiver to write a block of bytes to the archive.
705 */
706 static void
_WriteBuf(ArchiveHandle * AH,const void * buf,size_t len)707 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
708 {
709 if (fwrite(buf, 1, len, AH->FH) != len)
710 WRITE_ERROR_EXIT;
711 }
712
713 /*
714 * Read a block of bytes from the archive.
715 *
716 * Mandatory.
717 *
718 * Called by the archiver to read a block of bytes from the archive
719 */
720 static void
_ReadBuf(ArchiveHandle * AH,void * buf,size_t len)721 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
722 {
723 if (fread(buf, 1, len, AH->FH) != len)
724 READ_ERROR_EXIT(AH->FH);
725 }
726
727 /*
728 * Close the archive.
729 *
730 * Mandatory.
731 *
732 * When writing the archive, this is the routine that actually starts
733 * the process of saving it to files. No data should be written prior
734 * to this point, since the user could sort the TOC after creating it.
735 *
736 * If an archive is to be written, this routine must call:
737 * WriteHead to save the archive header
738 * WriteToc to save the TOC entries
739 * WriteDataChunks to save all DATA & BLOBs.
740 *
741 */
742 static void
_CloseArchive(ArchiveHandle * AH)743 _CloseArchive(ArchiveHandle *AH)
744 {
745 lclContext *ctx = (lclContext *) AH->formatData;
746 pgoff_t tpos;
747
748 if (AH->mode == archModeWrite)
749 {
750 WriteHead(AH);
751 /* Remember TOC's seek position for use below */
752 tpos = ftello(AH->FH);
753 if (tpos < 0 && ctx->hasSeek)
754 fatal("could not determine seek position in archive file: %m");
755 WriteToc(AH);
756 WriteDataChunks(AH, NULL);
757
758 /*
759 * If possible, re-write the TOC in order to update the data offset
760 * information. This is not essential, as pg_restore can cope in most
761 * cases without it; but it can make pg_restore significantly faster
762 * in some situations (especially parallel restore).
763 */
764 if (ctx->hasSeek &&
765 fseeko(AH->FH, tpos, SEEK_SET) == 0)
766 WriteToc(AH);
767 }
768
769 if (fclose(AH->FH) != 0)
770 fatal("could not close archive file: %m");
771
772 /* Sync the output file if one is defined */
773 if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
774 (void) fsync_fname(AH->fSpec, false);
775
776 AH->FH = NULL;
777 }
778
779 /*
780 * Reopen the archive's file handle.
781 *
782 * We close the original file handle, except on Windows. (The difference
783 * is because on Windows, this is used within a multithreading context,
784 * and we don't want a thread closing the parent file handle.)
785 */
786 static void
_ReopenArchive(ArchiveHandle * AH)787 _ReopenArchive(ArchiveHandle *AH)
788 {
789 lclContext *ctx = (lclContext *) AH->formatData;
790 pgoff_t tpos;
791
792 if (AH->mode == archModeWrite)
793 fatal("can only reopen input archives");
794
795 /*
796 * These two cases are user-facing errors since they represent unsupported
797 * (but not invalid) use-cases. Word the error messages appropriately.
798 */
799 if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
800 fatal("parallel restore from standard input is not supported");
801 if (!ctx->hasSeek)
802 fatal("parallel restore from non-seekable file is not supported");
803
804 tpos = ftello(AH->FH);
805 if (tpos < 0)
806 fatal("could not determine seek position in archive file: %m");
807
808 #ifndef WIN32
809 if (fclose(AH->FH) != 0)
810 fatal("could not close archive file: %m");
811 #endif
812
813 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
814 if (!AH->FH)
815 fatal("could not open input file \"%s\": %m", AH->fSpec);
816
817 if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
818 fatal("could not set seek position in archive file: %m");
819 }
820
821 /*
822 * Prepare for parallel restore.
823 *
824 * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
825 * TOC entries' dataLength fields with appropriate values to guide the
826 * ordering of restore jobs. The source of said data is format-dependent,
827 * as is the exact meaning of the values.
828 *
829 * A format module might also choose to do other setup here.
830 */
831 static void
_PrepParallelRestore(ArchiveHandle * AH)832 _PrepParallelRestore(ArchiveHandle *AH)
833 {
834 lclContext *ctx = (lclContext *) AH->formatData;
835 TocEntry *prev_te = NULL;
836 lclTocEntry *prev_tctx = NULL;
837 TocEntry *te;
838
839 /*
840 * Knowing that the data items were dumped out in TOC order, we can
841 * reconstruct the length of each item as the delta to the start offset of
842 * the next data item.
843 */
844 for (te = AH->toc->next; te != AH->toc; te = te->next)
845 {
846 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
847
848 /*
849 * Ignore entries without a known data offset; if we were unable to
850 * seek to rewrite the TOC when creating the archive, this'll be all
851 * of them, and we'll end up with no size estimates.
852 */
853 if (tctx->dataState != K_OFFSET_POS_SET)
854 continue;
855
856 /* Compute previous data item's length */
857 if (prev_te)
858 {
859 if (tctx->dataPos > prev_tctx->dataPos)
860 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
861 }
862
863 prev_te = te;
864 prev_tctx = tctx;
865 }
866
867 /* If OK to seek, we can determine the length of the last item */
868 if (prev_te && ctx->hasSeek)
869 {
870 pgoff_t endpos;
871
872 if (fseeko(AH->FH, 0, SEEK_END) != 0)
873 fatal("error during file seek: %m");
874 endpos = ftello(AH->FH);
875 if (endpos > prev_tctx->dataPos)
876 prev_te->dataLength = endpos - prev_tctx->dataPos;
877 }
878 }
879
880 /*
881 * Clone format-specific fields during parallel restoration.
882 */
883 static void
_Clone(ArchiveHandle * AH)884 _Clone(ArchiveHandle *AH)
885 {
886 lclContext *ctx = (lclContext *) AH->formatData;
887
888 /*
889 * Each thread must have private lclContext working state.
890 */
891 AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
892 memcpy(AH->formatData, ctx, sizeof(lclContext));
893 ctx = (lclContext *) AH->formatData;
894
895 /* sanity check, shouldn't happen */
896 if (ctx->cs != NULL)
897 fatal("compressor active");
898
899 /*
900 * We intentionally do not clone TOC-entry-local state: it's useful to
901 * share knowledge about where the data blocks are across threads.
902 * _PrintTocData has to be careful about the order of operations on that
903 * state, though.
904 *
905 * Note: we do not make a local lo_buf because we expect at most one BLOBS
906 * entry per archive, so no parallelism is possible.
907 */
908 }
909
910 static void
_DeClone(ArchiveHandle * AH)911 _DeClone(ArchiveHandle *AH)
912 {
913 lclContext *ctx = (lclContext *) AH->formatData;
914
915 free(ctx);
916 }
917
918 /*
919 * This function is executed in the child of a parallel restore from a
920 * custom-format archive and restores the actual data for one TOC entry.
921 */
922 static int
_WorkerJobRestoreCustom(ArchiveHandle * AH,TocEntry * te)923 _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
924 {
925 return parallel_restore(AH, te);
926 }
927
928 /*--------------------------------------------------
929 * END OF FORMAT CALLBACKS
930 *--------------------------------------------------
931 */
932
933 /*
934 * Get the current position in the archive file.
935 *
936 * With a non-seekable archive file, we may not be able to obtain the
937 * file position. If so, just return -1. It's not too important in
938 * that case because we won't be able to rewrite the TOC to fill in
939 * data block offsets anyway.
940 */
941 static pgoff_t
_getFilePos(ArchiveHandle * AH,lclContext * ctx)942 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
943 {
944 pgoff_t pos;
945
946 pos = ftello(AH->FH);
947 if (pos < 0)
948 {
949 /* Not expected if we found we can seek. */
950 if (ctx->hasSeek)
951 fatal("could not determine seek position in archive file: %m");
952 }
953 return pos;
954 }
955
956 /*
957 * Read a data block header. The format changed in V1.3, so we
958 * centralize the code here for simplicity. Returns *type = EOF
959 * if at EOF.
960 */
961 static void
_readBlockHeader(ArchiveHandle * AH,int * type,int * id)962 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
963 {
964 int byt;
965
966 /*
967 * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
968 * ReadInt rather than returning EOF. It doesn't seem worth jumping
969 * through hoops to deal with that case better, because no such files are
970 * likely to exist in the wild: only some 7.1 development versions of
971 * pg_dump ever generated such files.
972 */
973 if (AH->version < K_VERS_1_3)
974 *type = BLK_DATA;
975 else
976 {
977 byt = getc(AH->FH);
978 *type = byt;
979 if (byt == EOF)
980 {
981 *id = 0; /* don't return an uninitialized value */
982 return;
983 }
984 }
985
986 *id = ReadInt(AH);
987 }
988
989 /*
990 * Callback function for WriteDataToArchive. Writes one block of (compressed)
991 * data to the archive.
992 */
993 static void
_CustomWriteFunc(ArchiveHandle * AH,const char * buf,size_t len)994 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
995 {
996 /* never write 0-byte blocks (this should not happen) */
997 if (len > 0)
998 {
999 WriteInt(AH, len);
1000 _WriteBuf(AH, buf, len);
1001 }
1002 return;
1003 }
1004
1005 /*
1006 * Callback function for ReadDataFromArchive. To keep things simple, we
1007 * always read one compressed block at a time.
1008 */
1009 static size_t
_CustomReadFunc(ArchiveHandle * AH,char ** buf,size_t * buflen)1010 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1011 {
1012 size_t blkLen;
1013
1014 /* Read length */
1015 blkLen = ReadInt(AH);
1016 if (blkLen == 0)
1017 return 0;
1018
1019 /* If the caller's buffer is not large enough, allocate a bigger one */
1020 if (blkLen > *buflen)
1021 {
1022 free(*buf);
1023 *buf = (char *) pg_malloc(blkLen);
1024 *buflen = blkLen;
1025 }
1026
1027 /* exits app on read errors */
1028 _ReadBuf(AH, *buf, blkLen);
1029
1030 return blkLen;
1031 }
1032