1 /*-------------------------------------------------------------------------
2 *
3 * pg_backup_custom.c
4 *
5 * Implements the custom output format.
6 *
7 * The comments with the routined in this code are a good place to
8 * understand how to write a new format.
9 *
10 * See the headers to pg_restore for more details.
11 *
12 * Copyright (c) 2000, Philip Warner
13 * Rights are granted to use this software in any way so long
14 * as this notice is not removed.
15 *
16 * The author is not responsible for loss or damages that may
17 * and any liability will be limited to the time taken to fix any
18 * related bug.
19 *
20 *
21 * IDENTIFICATION
22 * src/bin/pg_dump/pg_backup_custom.c
23 *
24 *-------------------------------------------------------------------------
25 */
26 #include "postgres_fe.h"
27
28 #include "compress_io.h"
29 #include "parallel.h"
30 #include "pg_backup_utils.h"
31 #include "common/file_utils.h"
32
33 /*--------
34 * Routines in the format interface
35 *--------
36 */
37
38 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 static int _WriteByte(ArchiveHandle *AH, const int i);
43 static int _ReadByte(ArchiveHandle *);
44 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 static void _CloseArchive(ArchiveHandle *AH);
47 static void _ReopenArchive(ArchiveHandle *AH);
48 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52
53 static void _PrintData(ArchiveHandle *AH);
54 static void _skipData(ArchiveHandle *AH);
55 static void _skipBlobs(ArchiveHandle *AH);
56
57 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
58 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
61 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
62 static void _Clone(ArchiveHandle *AH);
63 static void _DeClone(ArchiveHandle *AH);
64
65 static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
66
67 typedef struct
68 {
69 CompressorState *cs;
70 int hasSeek;
71 pgoff_t filePos;
72 pgoff_t dataStart;
73 } lclContext;
74
75 typedef struct
76 {
77 int dataState;
78 pgoff_t dataPos;
79 } lclTocEntry;
80
81
82 /*------
83 * Static declarations
84 *------
85 */
86 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
87 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
88
89 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
90 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
91
92 /* translator: this is a module name */
93 static const char *modulename = gettext_noop("custom archiver");
94
95
96
97 /*
98 * Init routine required by ALL formats. This is a global routine
99 * and should be declared in pg_backup_archiver.h
100 *
101 * It's task is to create any extra archive context (using AH->formatData),
102 * and to initialize the supported function pointers.
103 *
104 * It should also prepare whatever it's input source is for reading/writing,
105 * and in the case of a read mode connection, it should load the Header & TOC.
106 */
107 void
InitArchiveFmt_Custom(ArchiveHandle * AH)108 InitArchiveFmt_Custom(ArchiveHandle *AH)
109 {
110 lclContext *ctx;
111
112 /* Assuming static functions, this can be copied for each format. */
113 AH->ArchiveEntryPtr = _ArchiveEntry;
114 AH->StartDataPtr = _StartData;
115 AH->WriteDataPtr = _WriteData;
116 AH->EndDataPtr = _EndData;
117 AH->WriteBytePtr = _WriteByte;
118 AH->ReadBytePtr = _ReadByte;
119 AH->WriteBufPtr = _WriteBuf;
120 AH->ReadBufPtr = _ReadBuf;
121 AH->ClosePtr = _CloseArchive;
122 AH->ReopenPtr = _ReopenArchive;
123 AH->PrintTocDataPtr = _PrintTocData;
124 AH->ReadExtraTocPtr = _ReadExtraToc;
125 AH->WriteExtraTocPtr = _WriteExtraToc;
126 AH->PrintExtraTocPtr = _PrintExtraToc;
127
128 AH->StartBlobsPtr = _StartBlobs;
129 AH->StartBlobPtr = _StartBlob;
130 AH->EndBlobPtr = _EndBlob;
131 AH->EndBlobsPtr = _EndBlobs;
132 AH->ClonePtr = _Clone;
133 AH->DeClonePtr = _DeClone;
134
135 /* no parallel dump in the custom archive, only parallel restore */
136 AH->WorkerJobDumpPtr = NULL;
137 AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
138
139 /* Set up a private area. */
140 ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
141 AH->formatData = (void *) ctx;
142
143 /* Initialize LO buffering */
144 AH->lo_buf_size = LOBBUFSIZE;
145 AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
146
147 ctx->filePos = 0;
148
149 /*
150 * Now open the file
151 */
152 if (AH->mode == archModeWrite)
153 {
154 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
155 {
156 AH->FH = fopen(AH->fSpec, PG_BINARY_W);
157 if (!AH->FH)
158 exit_horribly(modulename, "could not open output file \"%s\": %s\n",
159 AH->fSpec, strerror(errno));
160 }
161 else
162 {
163 AH->FH = stdout;
164 if (!AH->FH)
165 exit_horribly(modulename, "could not open output file: %s\n",
166 strerror(errno));
167 }
168
169 ctx->hasSeek = checkSeek(AH->FH);
170 }
171 else
172 {
173 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
174 {
175 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
176 if (!AH->FH)
177 exit_horribly(modulename, "could not open input file \"%s\": %s\n",
178 AH->fSpec, strerror(errno));
179 }
180 else
181 {
182 AH->FH = stdin;
183 if (!AH->FH)
184 exit_horribly(modulename, "could not open input file: %s\n",
185 strerror(errno));
186 }
187
188 ctx->hasSeek = checkSeek(AH->FH);
189
190 ReadHead(AH);
191 ReadToc(AH);
192 ctx->dataStart = _getFilePos(AH, ctx);
193 }
194
195 }
196
197 /*
198 * Called by the Archiver when the dumper creates a new TOC entry.
199 *
200 * Optional.
201 *
202 * Set up extract format-related TOC data.
203 */
204 static void
_ArchiveEntry(ArchiveHandle * AH,TocEntry * te)205 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
206 {
207 lclTocEntry *ctx;
208
209 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
210 if (te->dataDumper)
211 ctx->dataState = K_OFFSET_POS_NOT_SET;
212 else
213 ctx->dataState = K_OFFSET_NO_DATA;
214
215 te->formatData = (void *) ctx;
216 }
217
218 /*
219 * Called by the Archiver to save any extra format-related TOC entry
220 * data.
221 *
222 * Optional.
223 *
224 * Use the Archiver routines to write data - they are non-endian, and
225 * maintain other important file information.
226 */
227 static void
_WriteExtraToc(ArchiveHandle * AH,TocEntry * te)228 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
229 {
230 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
231
232 WriteOffset(AH, ctx->dataPos, ctx->dataState);
233 }
234
235 /*
236 * Called by the Archiver to read any extra format-related TOC data.
237 *
238 * Optional.
239 *
240 * Needs to match the order defined in _WriteExtraToc, and should also
241 * use the Archiver input routines.
242 */
243 static void
_ReadExtraToc(ArchiveHandle * AH,TocEntry * te)244 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
245 {
246 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
247
248 if (ctx == NULL)
249 {
250 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
251 te->formatData = (void *) ctx;
252 }
253
254 ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
255
256 /*
257 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
258 * dump it at all.
259 */
260 if (AH->version < K_VERS_1_7)
261 ReadInt(AH);
262 }
263
264 /*
265 * Called by the Archiver when restoring an archive to output a comment
266 * that includes useful information about the TOC entry.
267 *
268 * Optional.
269 *
270 */
271 static void
_PrintExtraToc(ArchiveHandle * AH,TocEntry * te)272 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
273 {
274 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
275
276 if (AH->public.verbose)
277 ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
278 (int64) ctx->dataPos);
279 }
280
281 /*
282 * Called by the archiver when saving TABLE DATA (not schema). This routine
283 * should save whatever format-specific information is needed to read
284 * the archive back.
285 *
286 * It is called just prior to the dumper's 'DataDumper' routine being called.
287 *
288 * Optional, but strongly recommended.
289 *
290 */
291 static void
_StartData(ArchiveHandle * AH,TocEntry * te)292 _StartData(ArchiveHandle *AH, TocEntry *te)
293 {
294 lclContext *ctx = (lclContext *) AH->formatData;
295 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
296
297 tctx->dataPos = _getFilePos(AH, ctx);
298 tctx->dataState = K_OFFSET_POS_SET;
299
300 _WriteByte(AH, BLK_DATA); /* Block type */
301 WriteInt(AH, te->dumpId); /* For sanity check */
302
303 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
304 }
305
306 /*
307 * Called by archiver when dumper calls WriteData. This routine is
308 * called for both BLOB and TABLE data; it is the responsibility of
309 * the format to manage each kind of data using StartBlob/StartData.
310 *
311 * It should only be called from within a DataDumper routine.
312 *
313 * Mandatory.
314 */
315 static void
_WriteData(ArchiveHandle * AH,const void * data,size_t dLen)316 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
317 {
318 lclContext *ctx = (lclContext *) AH->formatData;
319 CompressorState *cs = ctx->cs;
320
321 if (dLen > 0)
322 /* WriteDataToArchive() internally throws write errors */
323 WriteDataToArchive(AH, cs, data, dLen);
324
325 return;
326 }
327
328 /*
329 * Called by the archiver when a dumper's 'DataDumper' routine has
330 * finished.
331 *
332 * Optional.
333 *
334 */
335 static void
_EndData(ArchiveHandle * AH,TocEntry * te)336 _EndData(ArchiveHandle *AH, TocEntry *te)
337 {
338 lclContext *ctx = (lclContext *) AH->formatData;
339
340 EndCompressor(AH, ctx->cs);
341 /* Send the end marker */
342 WriteInt(AH, 0);
343 }
344
345 /*
346 * Called by the archiver when starting to save all BLOB DATA (not schema).
347 * This routine should save whatever format-specific information is needed
348 * to read the BLOBs back into memory.
349 *
350 * It is called just prior to the dumper's DataDumper routine.
351 *
352 * Optional, but strongly recommended.
353 */
354 static void
_StartBlobs(ArchiveHandle * AH,TocEntry * te)355 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
356 {
357 lclContext *ctx = (lclContext *) AH->formatData;
358 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
359
360 tctx->dataPos = _getFilePos(AH, ctx);
361 tctx->dataState = K_OFFSET_POS_SET;
362
363 _WriteByte(AH, BLK_BLOBS); /* Block type */
364 WriteInt(AH, te->dumpId); /* For sanity check */
365 }
366
367 /*
368 * Called by the archiver when the dumper calls StartBlob.
369 *
370 * Mandatory.
371 *
372 * Must save the passed OID for retrieval at restore-time.
373 */
374 static void
_StartBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)375 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
376 {
377 lclContext *ctx = (lclContext *) AH->formatData;
378
379 if (oid == 0)
380 exit_horribly(modulename, "invalid OID for large object\n");
381
382 WriteInt(AH, oid);
383
384 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
385 }
386
387 /*
388 * Called by the archiver when the dumper calls EndBlob.
389 *
390 * Optional.
391 */
392 static void
_EndBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)393 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
394 {
395 lclContext *ctx = (lclContext *) AH->formatData;
396
397 EndCompressor(AH, ctx->cs);
398 /* Send the end marker */
399 WriteInt(AH, 0);
400 }
401
402 /*
403 * Called by the archiver when finishing saving all BLOB DATA.
404 *
405 * Optional.
406 */
407 static void
_EndBlobs(ArchiveHandle * AH,TocEntry * te)408 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
409 {
410 /* Write out a fake zero OID to mark end-of-blobs. */
411 WriteInt(AH, 0);
412 }
413
414 /*
415 * Print data for a given TOC entry
416 */
417 static void
_PrintTocData(ArchiveHandle * AH,TocEntry * te)418 _PrintTocData(ArchiveHandle *AH, TocEntry *te)
419 {
420 lclContext *ctx = (lclContext *) AH->formatData;
421 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
422 int blkType;
423 int id;
424
425 if (tctx->dataState == K_OFFSET_NO_DATA)
426 return;
427
428 if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
429 {
430 /*
431 * We cannot seek directly to the desired block. Instead, skip over
432 * block headers until we find the one we want. This could fail if we
433 * are asked to restore items out-of-order.
434 */
435 _readBlockHeader(AH, &blkType, &id);
436
437 while (blkType != EOF && id != te->dumpId)
438 {
439 switch (blkType)
440 {
441 case BLK_DATA:
442 _skipData(AH);
443 break;
444
445 case BLK_BLOBS:
446 _skipBlobs(AH);
447 break;
448
449 default: /* Always have a default */
450 exit_horribly(modulename,
451 "unrecognized data block type (%d) while searching archive\n",
452 blkType);
453 break;
454 }
455 _readBlockHeader(AH, &blkType, &id);
456 }
457 }
458 else
459 {
460 /* We can just seek to the place we need to be. */
461 if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
462 exit_horribly(modulename, "error during file seek: %s\n",
463 strerror(errno));
464
465 _readBlockHeader(AH, &blkType, &id);
466 }
467
468 /* Produce suitable failure message if we fell off end of file */
469 if (blkType == EOF)
470 {
471 if (tctx->dataState == K_OFFSET_POS_NOT_SET)
472 exit_horribly(modulename, "could not find block ID %d in archive -- "
473 "possibly due to out-of-order restore request, "
474 "which cannot be handled due to lack of data offsets in archive\n",
475 te->dumpId);
476 else if (!ctx->hasSeek)
477 exit_horribly(modulename, "could not find block ID %d in archive -- "
478 "possibly due to out-of-order restore request, "
479 "which cannot be handled due to non-seekable input file\n",
480 te->dumpId);
481 else /* huh, the dataPos led us to EOF? */
482 exit_horribly(modulename, "could not find block ID %d in archive -- "
483 "possibly corrupt archive\n",
484 te->dumpId);
485 }
486
487 /* Are we sane? */
488 if (id != te->dumpId)
489 exit_horribly(modulename, "found unexpected block ID (%d) when reading data -- expected %d\n",
490 id, te->dumpId);
491
492 switch (blkType)
493 {
494 case BLK_DATA:
495 _PrintData(AH);
496 break;
497
498 case BLK_BLOBS:
499 _LoadBlobs(AH, AH->public.ropt->dropSchema);
500 break;
501
502 default: /* Always have a default */
503 exit_horribly(modulename, "unrecognized data block type %d while restoring archive\n",
504 blkType);
505 break;
506 }
507 }
508
509 /*
510 * Print data from current file position.
511 */
512 static void
_PrintData(ArchiveHandle * AH)513 _PrintData(ArchiveHandle *AH)
514 {
515 ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
516 }
517
518 static void
_LoadBlobs(ArchiveHandle * AH,bool drop)519 _LoadBlobs(ArchiveHandle *AH, bool drop)
520 {
521 Oid oid;
522
523 StartRestoreBlobs(AH);
524
525 oid = ReadInt(AH);
526 while (oid != 0)
527 {
528 StartRestoreBlob(AH, oid, drop);
529 _PrintData(AH);
530 EndRestoreBlob(AH, oid);
531 oid = ReadInt(AH);
532 }
533
534 EndRestoreBlobs(AH);
535 }
536
537 /*
538 * Skip the BLOBs from the current file position.
539 * BLOBS are written sequentially as data blocks (see below).
540 * Each BLOB is preceded by it's original OID.
541 * A zero OID indicated the end of the BLOBS
542 */
543 static void
_skipBlobs(ArchiveHandle * AH)544 _skipBlobs(ArchiveHandle *AH)
545 {
546 Oid oid;
547
548 oid = ReadInt(AH);
549 while (oid != 0)
550 {
551 _skipData(AH);
552 oid = ReadInt(AH);
553 }
554 }
555
556 /*
557 * Skip data from current file position.
558 * Data blocks are formatted as an integer length, followed by data.
559 * A zero length denoted the end of the block.
560 */
561 static void
_skipData(ArchiveHandle * AH)562 _skipData(ArchiveHandle *AH)
563 {
564 lclContext *ctx = (lclContext *) AH->formatData;
565 size_t blkLen;
566 char *buf = NULL;
567 int buflen = 0;
568 size_t cnt;
569
570 blkLen = ReadInt(AH);
571 while (blkLen != 0)
572 {
573 if (blkLen > buflen)
574 {
575 if (buf)
576 free(buf);
577 buf = (char *) pg_malloc(blkLen);
578 buflen = blkLen;
579 }
580 if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
581 {
582 if (feof(AH->FH))
583 exit_horribly(modulename,
584 "could not read from input file: end of file\n");
585 else
586 exit_horribly(modulename,
587 "could not read from input file: %s\n", strerror(errno));
588 }
589
590 ctx->filePos += blkLen;
591
592 blkLen = ReadInt(AH);
593 }
594
595 if (buf)
596 free(buf);
597 }
598
599 /*
600 * Write a byte of data to the archive.
601 *
602 * Mandatory.
603 *
604 * Called by the archiver to do integer & byte output to the archive.
605 */
606 static int
_WriteByte(ArchiveHandle * AH,const int i)607 _WriteByte(ArchiveHandle *AH, const int i)
608 {
609 lclContext *ctx = (lclContext *) AH->formatData;
610 int res;
611
612 if ((res = fputc(i, AH->FH)) == EOF)
613 WRITE_ERROR_EXIT;
614 ctx->filePos += 1;
615
616 return 1;
617 }
618
619 /*
620 * Read a byte of data from the archive.
621 *
622 * Mandatory
623 *
624 * Called by the archiver to read bytes & integers from the archive.
625 * EOF should be treated as a fatal error.
626 */
627 static int
_ReadByte(ArchiveHandle * AH)628 _ReadByte(ArchiveHandle *AH)
629 {
630 lclContext *ctx = (lclContext *) AH->formatData;
631 int res;
632
633 res = getc(AH->FH);
634 if (res == EOF)
635 READ_ERROR_EXIT(AH->FH);
636 ctx->filePos += 1;
637 return res;
638 }
639
640 /*
641 * Write a buffer of data to the archive.
642 *
643 * Mandatory.
644 *
645 * Called by the archiver to write a block of bytes to the archive.
646 */
647 static void
_WriteBuf(ArchiveHandle * AH,const void * buf,size_t len)648 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
649 {
650 lclContext *ctx = (lclContext *) AH->formatData;
651
652 if (fwrite(buf, 1, len, AH->FH) != len)
653 WRITE_ERROR_EXIT;
654 ctx->filePos += len;
655
656 return;
657 }
658
659 /*
660 * Read a block of bytes from the archive.
661 *
662 * Mandatory.
663 *
664 * Called by the archiver to read a block of bytes from the archive
665 */
666 static void
_ReadBuf(ArchiveHandle * AH,void * buf,size_t len)667 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
668 {
669 lclContext *ctx = (lclContext *) AH->formatData;
670
671 if (fread(buf, 1, len, AH->FH) != len)
672 READ_ERROR_EXIT(AH->FH);
673 ctx->filePos += len;
674
675 return;
676 }
677
678 /*
679 * Close the archive.
680 *
681 * Mandatory.
682 *
683 * When writing the archive, this is the routine that actually starts
684 * the process of saving it to files. No data should be written prior
685 * to this point, since the user could sort the TOC after creating it.
686 *
687 * If an archive is to be written, this routine must call:
688 * WriteHead to save the archive header
689 * WriteToc to save the TOC entries
690 * WriteDataChunks to save all DATA & BLOBs.
691 *
692 */
693 static void
_CloseArchive(ArchiveHandle * AH)694 _CloseArchive(ArchiveHandle *AH)
695 {
696 lclContext *ctx = (lclContext *) AH->formatData;
697 pgoff_t tpos;
698
699 if (AH->mode == archModeWrite)
700 {
701 WriteHead(AH);
702 /* Remember TOC's seek position for use below */
703 tpos = ftello(AH->FH);
704 if (tpos < 0 && ctx->hasSeek)
705 exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
706 strerror(errno));
707 WriteToc(AH);
708 ctx->dataStart = _getFilePos(AH, ctx);
709 WriteDataChunks(AH, NULL);
710
711 /*
712 * If possible, re-write the TOC in order to update the data offset
713 * information. This is not essential, as pg_restore can cope in most
714 * cases without it; but it can make pg_restore significantly faster
715 * in some situations (especially parallel restore).
716 */
717 if (ctx->hasSeek &&
718 fseeko(AH->FH, tpos, SEEK_SET) == 0)
719 WriteToc(AH);
720 }
721
722 if (fclose(AH->FH) != 0)
723 exit_horribly(modulename, "could not close archive file: %s\n", strerror(errno));
724
725 /* Sync the output file if one is defined */
726 if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
727 (void) fsync_fname(AH->fSpec, false, progname);
728
729 AH->FH = NULL;
730 }
731
732 /*
733 * Reopen the archive's file handle.
734 *
735 * We close the original file handle, except on Windows. (The difference
736 * is because on Windows, this is used within a multithreading context,
737 * and we don't want a thread closing the parent file handle.)
738 */
739 static void
_ReopenArchive(ArchiveHandle * AH)740 _ReopenArchive(ArchiveHandle *AH)
741 {
742 lclContext *ctx = (lclContext *) AH->formatData;
743 pgoff_t tpos;
744
745 if (AH->mode == archModeWrite)
746 exit_horribly(modulename, "can only reopen input archives\n");
747
748 /*
749 * These two cases are user-facing errors since they represent unsupported
750 * (but not invalid) use-cases. Word the error messages appropriately.
751 */
752 if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
753 exit_horribly(modulename, "parallel restore from standard input is not supported\n");
754 if (!ctx->hasSeek)
755 exit_horribly(modulename, "parallel restore from non-seekable file is not supported\n");
756
757 tpos = ftello(AH->FH);
758 if (tpos < 0)
759 exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
760 strerror(errno));
761
762 #ifndef WIN32
763 if (fclose(AH->FH) != 0)
764 exit_horribly(modulename, "could not close archive file: %s\n",
765 strerror(errno));
766 #endif
767
768 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
769 if (!AH->FH)
770 exit_horribly(modulename, "could not open input file \"%s\": %s\n",
771 AH->fSpec, strerror(errno));
772
773 if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
774 exit_horribly(modulename, "could not set seek position in archive file: %s\n",
775 strerror(errno));
776 }
777
778 /*
779 * Clone format-specific fields during parallel restoration.
780 */
781 static void
_Clone(ArchiveHandle * AH)782 _Clone(ArchiveHandle *AH)
783 {
784 lclContext *ctx = (lclContext *) AH->formatData;
785
786 AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
787 memcpy(AH->formatData, ctx, sizeof(lclContext));
788 ctx = (lclContext *) AH->formatData;
789
790 /* sanity check, shouldn't happen */
791 if (ctx->cs != NULL)
792 exit_horribly(modulename, "compressor active\n");
793
794 /*
795 * Note: we do not make a local lo_buf because we expect at most one BLOBS
796 * entry per archive, so no parallelism is possible. Likewise,
797 * TOC-entry-local state isn't an issue because any one TOC entry is
798 * touched by just one worker child.
799 */
800 }
801
802 static void
_DeClone(ArchiveHandle * AH)803 _DeClone(ArchiveHandle *AH)
804 {
805 lclContext *ctx = (lclContext *) AH->formatData;
806
807 free(ctx);
808 }
809
810 /*
811 * This function is executed in the child of a parallel restore from a
812 * custom-format archive and restores the actual data for one TOC entry.
813 */
814 static int
_WorkerJobRestoreCustom(ArchiveHandle * AH,TocEntry * te)815 _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
816 {
817 return parallel_restore(AH, te);
818 }
819
820 /*--------------------------------------------------
821 * END OF FORMAT CALLBACKS
822 *--------------------------------------------------
823 */
824
825 /*
826 * Get the current position in the archive file.
827 */
828 static pgoff_t
_getFilePos(ArchiveHandle * AH,lclContext * ctx)829 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
830 {
831 pgoff_t pos;
832
833 if (ctx->hasSeek)
834 {
835 /*
836 * Prior to 1.7 (pg7.3) we relied on the internally maintained
837 * pointer. Now we rely on ftello() always, unless the file has been
838 * found to not support it. For debugging purposes, print a warning
839 * if the internal pointer disagrees, so that we're more likely to
840 * notice if something's broken about the internal position tracking.
841 */
842 pos = ftello(AH->FH);
843 if (pos < 0)
844 exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
845 strerror(errno));
846
847 if (pos != ctx->filePos)
848 write_msg(modulename, "WARNING: ftell mismatch with expected position -- ftell used\n");
849 }
850 else
851 pos = ctx->filePos;
852 return pos;
853 }
854
855 /*
856 * Read a data block header. The format changed in V1.3, so we
857 * centralize the code here for simplicity. Returns *type = EOF
858 * if at EOF.
859 */
860 static void
_readBlockHeader(ArchiveHandle * AH,int * type,int * id)861 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
862 {
863 lclContext *ctx = (lclContext *) AH->formatData;
864 int byt;
865
866 /*
867 * Note: if we are at EOF with a pre-1.3 input file, we'll exit_horribly
868 * inside ReadInt rather than returning EOF. It doesn't seem worth
869 * jumping through hoops to deal with that case better, because no such
870 * files are likely to exist in the wild: only some 7.1 development
871 * versions of pg_dump ever generated such files.
872 */
873 if (AH->version < K_VERS_1_3)
874 *type = BLK_DATA;
875 else
876 {
877 byt = getc(AH->FH);
878 *type = byt;
879 if (byt == EOF)
880 {
881 *id = 0; /* don't return an uninitialized value */
882 return;
883 }
884 ctx->filePos += 1;
885 }
886
887 *id = ReadInt(AH);
888 }
889
890 /*
891 * Callback function for WriteDataToArchive. Writes one block of (compressed)
892 * data to the archive.
893 */
894 static void
_CustomWriteFunc(ArchiveHandle * AH,const char * buf,size_t len)895 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
896 {
897 /* never write 0-byte blocks (this should not happen) */
898 if (len > 0)
899 {
900 WriteInt(AH, len);
901 _WriteBuf(AH, buf, len);
902 }
903 return;
904 }
905
906 /*
907 * Callback function for ReadDataFromArchive. To keep things simple, we
908 * always read one compressed block at a time.
909 */
910 static size_t
_CustomReadFunc(ArchiveHandle * AH,char ** buf,size_t * buflen)911 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
912 {
913 size_t blkLen;
914
915 /* Read length */
916 blkLen = ReadInt(AH);
917 if (blkLen == 0)
918 return 0;
919
920 /* If the caller's buffer is not large enough, allocate a bigger one */
921 if (blkLen > *buflen)
922 {
923 free(*buf);
924 *buf = (char *) pg_malloc(blkLen);
925 *buflen = blkLen;
926 }
927
928 /* exits app on read errors */
929 _ReadBuf(AH, *buf, blkLen);
930
931 return blkLen;
932 }
933