1 /*-------------------------------------------------------------------------
2 *
3 * pg_backup_custom.c
4 *
5 * Implements the custom output format.
6 *
7 * The comments with the routined in this code are a good place to
8 * understand how to write a new format.
9 *
10 * See the headers to pg_restore for more details.
11 *
12 * Copyright (c) 2000, Philip Warner
13 * Rights are granted to use this software in any way so long
14 * as this notice is not removed.
15 *
16 * The author is not responsible for loss or damages that may
17 * and any liability will be limited to the time taken to fix any
18 * related bug.
19 *
20 *
21 * IDENTIFICATION
22 * src/bin/pg_dump/pg_backup_custom.c
23 *
24 *-------------------------------------------------------------------------
25 */
26 #include "postgres_fe.h"
27
28 #include "compress_io.h"
29 #include "parallel.h"
30 #include "pg_backup_utils.h"
31
32 /*--------
33 * Routines in the format interface
34 *--------
35 */
36
37 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
38 static void _StartData(ArchiveHandle *AH, TocEntry *te);
39 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
40 static void _EndData(ArchiveHandle *AH, TocEntry *te);
41 static int _WriteByte(ArchiveHandle *AH, const int i);
42 static int _ReadByte(ArchiveHandle *);
43 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
44 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
45 static void _CloseArchive(ArchiveHandle *AH);
46 static void _ReopenArchive(ArchiveHandle *AH);
47 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
48 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
49 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
51
52 static void _PrintData(ArchiveHandle *AH);
53 static void _skipData(ArchiveHandle *AH);
54 static void _skipBlobs(ArchiveHandle *AH);
55
56 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
57 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
58 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
60 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
61 static void _Clone(ArchiveHandle *AH);
62 static void _DeClone(ArchiveHandle *AH);
63
64 static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
65 static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
66 char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
67
68 typedef struct
69 {
70 CompressorState *cs;
71 int hasSeek;
72 pgoff_t filePos;
73 pgoff_t dataStart;
74 } lclContext;
75
76 typedef struct
77 {
78 int dataState;
79 pgoff_t dataPos;
80 } lclTocEntry;
81
82
83 /*------
84 * Static declarations
85 *------
86 */
87 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
88 static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
89
90 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
91 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
92
93 /* translator: this is a module name */
94 static const char *modulename = gettext_noop("custom archiver");
95
96
97
98 /*
99 * Init routine required by ALL formats. This is a global routine
100 * and should be declared in pg_backup_archiver.h
101 *
102 * It's task is to create any extra archive context (using AH->formatData),
103 * and to initialize the supported function pointers.
104 *
105 * It should also prepare whatever it's input source is for reading/writing,
106 * and in the case of a read mode connection, it should load the Header & TOC.
107 */
108 void
InitArchiveFmt_Custom(ArchiveHandle * AH)109 InitArchiveFmt_Custom(ArchiveHandle *AH)
110 {
111 lclContext *ctx;
112
113 /* Assuming static functions, this can be copied for each format. */
114 AH->ArchiveEntryPtr = _ArchiveEntry;
115 AH->StartDataPtr = _StartData;
116 AH->WriteDataPtr = _WriteData;
117 AH->EndDataPtr = _EndData;
118 AH->WriteBytePtr = _WriteByte;
119 AH->ReadBytePtr = _ReadByte;
120 AH->WriteBufPtr = _WriteBuf;
121 AH->ReadBufPtr = _ReadBuf;
122 AH->ClosePtr = _CloseArchive;
123 AH->ReopenPtr = _ReopenArchive;
124 AH->PrintTocDataPtr = _PrintTocData;
125 AH->ReadExtraTocPtr = _ReadExtraToc;
126 AH->WriteExtraTocPtr = _WriteExtraToc;
127 AH->PrintExtraTocPtr = _PrintExtraToc;
128
129 AH->StartBlobsPtr = _StartBlobs;
130 AH->StartBlobPtr = _StartBlob;
131 AH->EndBlobPtr = _EndBlob;
132 AH->EndBlobsPtr = _EndBlobs;
133 AH->ClonePtr = _Clone;
134 AH->DeClonePtr = _DeClone;
135
136 AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
137 AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
138
139 /* no parallel dump in the custom archive, only parallel restore */
140 AH->WorkerJobDumpPtr = NULL;
141 AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
142
143 /* Set up a private area. */
144 ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
145 AH->formatData = (void *) ctx;
146
147 /* Initialize LO buffering */
148 AH->lo_buf_size = LOBBUFSIZE;
149 AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
150
151 ctx->filePos = 0;
152
153 /*
154 * Now open the file
155 */
156 if (AH->mode == archModeWrite)
157 {
158 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
159 {
160 AH->FH = fopen(AH->fSpec, PG_BINARY_W);
161 if (!AH->FH)
162 exit_horribly(modulename, "could not open output file \"%s\": %s\n",
163 AH->fSpec, strerror(errno));
164 }
165 else
166 {
167 AH->FH = stdout;
168 if (!AH->FH)
169 exit_horribly(modulename, "could not open output file: %s\n",
170 strerror(errno));
171 }
172
173 ctx->hasSeek = checkSeek(AH->FH);
174 }
175 else
176 {
177 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
178 {
179 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
180 if (!AH->FH)
181 exit_horribly(modulename, "could not open input file \"%s\": %s\n",
182 AH->fSpec, strerror(errno));
183 }
184 else
185 {
186 AH->FH = stdin;
187 if (!AH->FH)
188 exit_horribly(modulename, "could not open input file: %s\n",
189 strerror(errno));
190 }
191
192 ctx->hasSeek = checkSeek(AH->FH);
193
194 ReadHead(AH);
195 ReadToc(AH);
196 ctx->dataStart = _getFilePos(AH, ctx);
197 }
198
199 }
200
201 /*
202 * Called by the Archiver when the dumper creates a new TOC entry.
203 *
204 * Optional.
205 *
206 * Set up extract format-related TOC data.
207 */
208 static void
_ArchiveEntry(ArchiveHandle * AH,TocEntry * te)209 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
210 {
211 lclTocEntry *ctx;
212
213 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
214 if (te->dataDumper)
215 ctx->dataState = K_OFFSET_POS_NOT_SET;
216 else
217 ctx->dataState = K_OFFSET_NO_DATA;
218
219 te->formatData = (void *) ctx;
220 }
221
222 /*
223 * Called by the Archiver to save any extra format-related TOC entry
224 * data.
225 *
226 * Optional.
227 *
228 * Use the Archiver routines to write data - they are non-endian, and
229 * maintain other important file information.
230 */
231 static void
_WriteExtraToc(ArchiveHandle * AH,TocEntry * te)232 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
233 {
234 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
235
236 WriteOffset(AH, ctx->dataPos, ctx->dataState);
237 }
238
239 /*
240 * Called by the Archiver to read any extra format-related TOC data.
241 *
242 * Optional.
243 *
244 * Needs to match the order defined in _WriteExtraToc, and should also
245 * use the Archiver input routines.
246 */
247 static void
_ReadExtraToc(ArchiveHandle * AH,TocEntry * te)248 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
249 {
250 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
251
252 if (ctx == NULL)
253 {
254 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
255 te->formatData = (void *) ctx;
256 }
257
258 ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
259
260 /*
261 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
262 * dump it at all.
263 */
264 if (AH->version < K_VERS_1_7)
265 ReadInt(AH);
266 }
267
268 /*
269 * Called by the Archiver when restoring an archive to output a comment
270 * that includes useful information about the TOC entry.
271 *
272 * Optional.
273 *
274 */
275 static void
_PrintExtraToc(ArchiveHandle * AH,TocEntry * te)276 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
277 {
278 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
279
280 if (AH->public.verbose)
281 ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
282 (int64) ctx->dataPos);
283 }
284
285 /*
286 * Called by the archiver when saving TABLE DATA (not schema). This routine
287 * should save whatever format-specific information is needed to read
288 * the archive back.
289 *
290 * It is called just prior to the dumper's 'DataDumper' routine being called.
291 *
292 * Optional, but strongly recommended.
293 *
294 */
295 static void
_StartData(ArchiveHandle * AH,TocEntry * te)296 _StartData(ArchiveHandle *AH, TocEntry *te)
297 {
298 lclContext *ctx = (lclContext *) AH->formatData;
299 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
300
301 tctx->dataPos = _getFilePos(AH, ctx);
302 tctx->dataState = K_OFFSET_POS_SET;
303
304 _WriteByte(AH, BLK_DATA); /* Block type */
305 WriteInt(AH, te->dumpId); /* For sanity check */
306
307 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
308 }
309
310 /*
311 * Called by archiver when dumper calls WriteData. This routine is
312 * called for both BLOB and TABLE data; it is the responsibility of
313 * the format to manage each kind of data using StartBlob/StartData.
314 *
315 * It should only be called from within a DataDumper routine.
316 *
317 * Mandatory.
318 */
319 static void
_WriteData(ArchiveHandle * AH,const void * data,size_t dLen)320 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
321 {
322 lclContext *ctx = (lclContext *) AH->formatData;
323 CompressorState *cs = ctx->cs;
324
325 if (dLen > 0)
326 /* WriteDataToArchive() internally throws write errors */
327 WriteDataToArchive(AH, cs, data, dLen);
328
329 return;
330 }
331
332 /*
333 * Called by the archiver when a dumper's 'DataDumper' routine has
334 * finished.
335 *
336 * Optional.
337 *
338 */
339 static void
_EndData(ArchiveHandle * AH,TocEntry * te)340 _EndData(ArchiveHandle *AH, TocEntry *te)
341 {
342 lclContext *ctx = (lclContext *) AH->formatData;
343
344 EndCompressor(AH, ctx->cs);
345 /* Send the end marker */
346 WriteInt(AH, 0);
347 }
348
349 /*
350 * Called by the archiver when starting to save all BLOB DATA (not schema).
351 * This routine should save whatever format-specific information is needed
352 * to read the BLOBs back into memory.
353 *
354 * It is called just prior to the dumper's DataDumper routine.
355 *
356 * Optional, but strongly recommended.
357 */
358 static void
_StartBlobs(ArchiveHandle * AH,TocEntry * te)359 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
360 {
361 lclContext *ctx = (lclContext *) AH->formatData;
362 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
363
364 tctx->dataPos = _getFilePos(AH, ctx);
365 tctx->dataState = K_OFFSET_POS_SET;
366
367 _WriteByte(AH, BLK_BLOBS); /* Block type */
368 WriteInt(AH, te->dumpId); /* For sanity check */
369 }
370
371 /*
372 * Called by the archiver when the dumper calls StartBlob.
373 *
374 * Mandatory.
375 *
376 * Must save the passed OID for retrieval at restore-time.
377 */
378 static void
_StartBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)379 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
380 {
381 lclContext *ctx = (lclContext *) AH->formatData;
382
383 if (oid == 0)
384 exit_horribly(modulename, "invalid OID for large object\n");
385
386 WriteInt(AH, oid);
387
388 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
389 }
390
391 /*
392 * Called by the archiver when the dumper calls EndBlob.
393 *
394 * Optional.
395 */
396 static void
_EndBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)397 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
398 {
399 lclContext *ctx = (lclContext *) AH->formatData;
400
401 EndCompressor(AH, ctx->cs);
402 /* Send the end marker */
403 WriteInt(AH, 0);
404 }
405
406 /*
407 * Called by the archiver when finishing saving all BLOB DATA.
408 *
409 * Optional.
410 */
411 static void
_EndBlobs(ArchiveHandle * AH,TocEntry * te)412 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
413 {
414 /* Write out a fake zero OID to mark end-of-blobs. */
415 WriteInt(AH, 0);
416 }
417
418 /*
419 * Print data for a given TOC entry
420 */
421 static void
_PrintTocData(ArchiveHandle * AH,TocEntry * te)422 _PrintTocData(ArchiveHandle *AH, TocEntry *te)
423 {
424 lclContext *ctx = (lclContext *) AH->formatData;
425 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
426 int blkType;
427 int id;
428
429 if (tctx->dataState == K_OFFSET_NO_DATA)
430 return;
431
432 if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
433 {
434 /*
435 * We cannot seek directly to the desired block. Instead, skip over
436 * block headers until we find the one we want. This could fail if we
437 * are asked to restore items out-of-order.
438 */
439 _readBlockHeader(AH, &blkType, &id);
440
441 while (blkType != EOF && id != te->dumpId)
442 {
443 switch (blkType)
444 {
445 case BLK_DATA:
446 _skipData(AH);
447 break;
448
449 case BLK_BLOBS:
450 _skipBlobs(AH);
451 break;
452
453 default: /* Always have a default */
454 exit_horribly(modulename,
455 "unrecognized data block type (%d) while searching archive\n",
456 blkType);
457 break;
458 }
459 _readBlockHeader(AH, &blkType, &id);
460 }
461 }
462 else
463 {
464 /* We can just seek to the place we need to be. */
465 if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
466 exit_horribly(modulename, "error during file seek: %s\n",
467 strerror(errno));
468
469 _readBlockHeader(AH, &blkType, &id);
470 }
471
472 /* Produce suitable failure message if we fell off end of file */
473 if (blkType == EOF)
474 {
475 if (tctx->dataState == K_OFFSET_POS_NOT_SET)
476 exit_horribly(modulename, "could not find block ID %d in archive -- "
477 "possibly due to out-of-order restore request, "
478 "which cannot be handled due to lack of data offsets in archive\n",
479 te->dumpId);
480 else if (!ctx->hasSeek)
481 exit_horribly(modulename, "could not find block ID %d in archive -- "
482 "possibly due to out-of-order restore request, "
483 "which cannot be handled due to non-seekable input file\n",
484 te->dumpId);
485 else /* huh, the dataPos led us to EOF? */
486 exit_horribly(modulename, "could not find block ID %d in archive -- "
487 "possibly corrupt archive\n",
488 te->dumpId);
489 }
490
491 /* Are we sane? */
492 if (id != te->dumpId)
493 exit_horribly(modulename, "found unexpected block ID (%d) when reading data -- expected %d\n",
494 id, te->dumpId);
495
496 switch (blkType)
497 {
498 case BLK_DATA:
499 _PrintData(AH);
500 break;
501
502 case BLK_BLOBS:
503 _LoadBlobs(AH, AH->public.ropt->dropSchema);
504 break;
505
506 default: /* Always have a default */
507 exit_horribly(modulename, "unrecognized data block type %d while restoring archive\n",
508 blkType);
509 break;
510 }
511 }
512
513 /*
514 * Print data from current file position.
515 */
516 static void
_PrintData(ArchiveHandle * AH)517 _PrintData(ArchiveHandle *AH)
518 {
519 ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
520 }
521
522 static void
_LoadBlobs(ArchiveHandle * AH,bool drop)523 _LoadBlobs(ArchiveHandle *AH, bool drop)
524 {
525 Oid oid;
526
527 StartRestoreBlobs(AH);
528
529 oid = ReadInt(AH);
530 while (oid != 0)
531 {
532 StartRestoreBlob(AH, oid, drop);
533 _PrintData(AH);
534 EndRestoreBlob(AH, oid);
535 oid = ReadInt(AH);
536 }
537
538 EndRestoreBlobs(AH);
539 }
540
541 /*
542 * Skip the BLOBs from the current file position.
543 * BLOBS are written sequentially as data blocks (see below).
544 * Each BLOB is preceded by it's original OID.
545 * A zero OID indicated the end of the BLOBS
546 */
547 static void
_skipBlobs(ArchiveHandle * AH)548 _skipBlobs(ArchiveHandle *AH)
549 {
550 Oid oid;
551
552 oid = ReadInt(AH);
553 while (oid != 0)
554 {
555 _skipData(AH);
556 oid = ReadInt(AH);
557 }
558 }
559
560 /*
561 * Skip data from current file position.
562 * Data blocks are formatted as an integer length, followed by data.
563 * A zero length denoted the end of the block.
564 */
565 static void
_skipData(ArchiveHandle * AH)566 _skipData(ArchiveHandle *AH)
567 {
568 lclContext *ctx = (lclContext *) AH->formatData;
569 size_t blkLen;
570 char *buf = NULL;
571 int buflen = 0;
572 size_t cnt;
573
574 blkLen = ReadInt(AH);
575 while (blkLen != 0)
576 {
577 if (blkLen > buflen)
578 {
579 if (buf)
580 free(buf);
581 buf = (char *) pg_malloc(blkLen);
582 buflen = blkLen;
583 }
584 if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
585 {
586 if (feof(AH->FH))
587 exit_horribly(modulename,
588 "could not read from input file: end of file\n");
589 else
590 exit_horribly(modulename,
591 "could not read from input file: %s\n", strerror(errno));
592 }
593
594 ctx->filePos += blkLen;
595
596 blkLen = ReadInt(AH);
597 }
598
599 if (buf)
600 free(buf);
601 }
602
603 /*
604 * Write a byte of data to the archive.
605 *
606 * Mandatory.
607 *
608 * Called by the archiver to do integer & byte output to the archive.
609 */
610 static int
_WriteByte(ArchiveHandle * AH,const int i)611 _WriteByte(ArchiveHandle *AH, const int i)
612 {
613 lclContext *ctx = (lclContext *) AH->formatData;
614 int res;
615
616 if ((res = fputc(i, AH->FH)) == EOF)
617 WRITE_ERROR_EXIT;
618 ctx->filePos += 1;
619
620 return 1;
621 }
622
623 /*
624 * Read a byte of data from the archive.
625 *
626 * Mandatory
627 *
628 * Called by the archiver to read bytes & integers from the archive.
629 * EOF should be treated as a fatal error.
630 */
631 static int
_ReadByte(ArchiveHandle * AH)632 _ReadByte(ArchiveHandle *AH)
633 {
634 lclContext *ctx = (lclContext *) AH->formatData;
635 int res;
636
637 res = getc(AH->FH);
638 if (res == EOF)
639 READ_ERROR_EXIT(AH->FH);
640 ctx->filePos += 1;
641 return res;
642 }
643
644 /*
645 * Write a buffer of data to the archive.
646 *
647 * Mandatory.
648 *
649 * Called by the archiver to write a block of bytes to the archive.
650 */
651 static void
_WriteBuf(ArchiveHandle * AH,const void * buf,size_t len)652 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
653 {
654 lclContext *ctx = (lclContext *) AH->formatData;
655
656 if (fwrite(buf, 1, len, AH->FH) != len)
657 WRITE_ERROR_EXIT;
658 ctx->filePos += len;
659
660 return;
661 }
662
663 /*
664 * Read a block of bytes from the archive.
665 *
666 * Mandatory.
667 *
668 * Called by the archiver to read a block of bytes from the archive
669 */
670 static void
_ReadBuf(ArchiveHandle * AH,void * buf,size_t len)671 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
672 {
673 lclContext *ctx = (lclContext *) AH->formatData;
674
675 if (fread(buf, 1, len, AH->FH) != len)
676 READ_ERROR_EXIT(AH->FH);
677 ctx->filePos += len;
678
679 return;
680 }
681
682 /*
683 * Close the archive.
684 *
685 * Mandatory.
686 *
687 * When writing the archive, this is the routine that actually starts
688 * the process of saving it to files. No data should be written prior
689 * to this point, since the user could sort the TOC after creating it.
690 *
691 * If an archive is to be written, this routine must call:
692 * WriteHead to save the archive header
693 * WriteToc to save the TOC entries
694 * WriteDataChunks to save all DATA & BLOBs.
695 *
696 */
697 static void
_CloseArchive(ArchiveHandle * AH)698 _CloseArchive(ArchiveHandle *AH)
699 {
700 lclContext *ctx = (lclContext *) AH->formatData;
701 pgoff_t tpos;
702
703 if (AH->mode == archModeWrite)
704 {
705 WriteHead(AH);
706 /* Remember TOC's seek position for use below */
707 tpos = ftello(AH->FH);
708 if (tpos < 0 && ctx->hasSeek)
709 exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
710 strerror(errno));
711 WriteToc(AH);
712 ctx->dataStart = _getFilePos(AH, ctx);
713 WriteDataChunks(AH, NULL);
714
715 /*
716 * If possible, re-write the TOC in order to update the data offset
717 * information. This is not essential, as pg_restore can cope in most
718 * cases without it; but it can make pg_restore significantly faster
719 * in some situations (especially parallel restore).
720 */
721 if (ctx->hasSeek &&
722 fseeko(AH->FH, tpos, SEEK_SET) == 0)
723 WriteToc(AH);
724 }
725
726 if (fclose(AH->FH) != 0)
727 exit_horribly(modulename, "could not close archive file: %s\n", strerror(errno));
728
729 AH->FH = NULL;
730 }
731
732 /*
733 * Reopen the archive's file handle.
734 *
735 * We close the original file handle, except on Windows. (The difference
736 * is because on Windows, this is used within a multithreading context,
737 * and we don't want a thread closing the parent file handle.)
738 */
739 static void
_ReopenArchive(ArchiveHandle * AH)740 _ReopenArchive(ArchiveHandle *AH)
741 {
742 lclContext *ctx = (lclContext *) AH->formatData;
743 pgoff_t tpos;
744
745 if (AH->mode == archModeWrite)
746 exit_horribly(modulename, "can only reopen input archives\n");
747
748 /*
749 * These two cases are user-facing errors since they represent unsupported
750 * (but not invalid) use-cases. Word the error messages appropriately.
751 */
752 if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
753 exit_horribly(modulename, "parallel restore from standard input is not supported\n");
754 if (!ctx->hasSeek)
755 exit_horribly(modulename, "parallel restore from non-seekable file is not supported\n");
756
757 tpos = ftello(AH->FH);
758 if (tpos < 0)
759 exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
760 strerror(errno));
761
762 #ifndef WIN32
763 if (fclose(AH->FH) != 0)
764 exit_horribly(modulename, "could not close archive file: %s\n",
765 strerror(errno));
766 #endif
767
768 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
769 if (!AH->FH)
770 exit_horribly(modulename, "could not open input file \"%s\": %s\n",
771 AH->fSpec, strerror(errno));
772
773 if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
774 exit_horribly(modulename, "could not set seek position in archive file: %s\n",
775 strerror(errno));
776 }
777
778 /*
779 * Clone format-specific fields during parallel restoration.
780 */
781 static void
_Clone(ArchiveHandle * AH)782 _Clone(ArchiveHandle *AH)
783 {
784 lclContext *ctx = (lclContext *) AH->formatData;
785
786 AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
787 memcpy(AH->formatData, ctx, sizeof(lclContext));
788 ctx = (lclContext *) AH->formatData;
789
790 /* sanity check, shouldn't happen */
791 if (ctx->cs != NULL)
792 exit_horribly(modulename, "compressor active\n");
793
794 /*
795 * Note: we do not make a local lo_buf because we expect at most one BLOBS
796 * entry per archive, so no parallelism is possible. Likewise,
797 * TOC-entry-local state isn't an issue because any one TOC entry is
798 * touched by just one worker child.
799 */
800 }
801
802 static void
_DeClone(ArchiveHandle * AH)803 _DeClone(ArchiveHandle *AH)
804 {
805 lclContext *ctx = (lclContext *) AH->formatData;
806
807 free(ctx);
808 }
809
810 /*
811 * This function is executed in the child of a parallel backup for the
812 * custom format archive and dumps the actual data.
813 */
814 char *
_WorkerJobRestoreCustom(ArchiveHandle * AH,TocEntry * te)815 _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
816 {
817 /*
818 * short fixed-size string + some ID so far, this needs to be malloc'ed
819 * instead of static because we work with threads on windows
820 */
821 const int buflen = 64;
822 char *buf = (char *) pg_malloc(buflen);
823 ParallelArgs pargs;
824 int status;
825
826 pargs.AH = AH;
827 pargs.te = te;
828
829 status = parallel_restore(&pargs);
830
831 snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
832 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
833
834 return buf;
835 }
836
837 /*
838 * This function is executed in the parent process. Depending on the desired
839 * action (dump or restore) it creates a string that is understood by the
840 * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
841 */
842 static char *
_MasterStartParallelItem(ArchiveHandle * AH,TocEntry * te,T_Action act)843 _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
844 {
845 /*
846 * A static char is okay here, even on Windows because we call this
847 * function only from one process (the master).
848 */
849 static char buf[64]; /* short fixed-size string + number */
850
851 /* no parallel dump in the custom archive format */
852 Assert(act == ACT_RESTORE);
853
854 snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
855
856 return buf;
857 }
858
859 /*
860 * This function is executed in the parent process. It analyzes the response of
861 * the _WorkerJobDump / _WorkerJobRestore functions of the dump format.
862 */
863 static int
_MasterEndParallelItem(ArchiveHandle * AH,TocEntry * te,const char * str,T_Action act)864 _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
865 {
866 DumpId dumpId;
867 int nBytes,
868 status,
869 n_errors;
870
871 /* no parallel dump in the custom archive */
872 Assert(act == ACT_RESTORE);
873
874 sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
875
876 Assert(nBytes == strlen(str));
877 Assert(dumpId == te->dumpId);
878
879 AH->public.n_errors += n_errors;
880
881 return status;
882 }
883
884 /*--------------------------------------------------
885 * END OF FORMAT CALLBACKS
886 *--------------------------------------------------
887 */
888
889 /*
890 * Get the current position in the archive file.
891 */
892 static pgoff_t
_getFilePos(ArchiveHandle * AH,lclContext * ctx)893 _getFilePos(ArchiveHandle *AH, lclContext *ctx)
894 {
895 pgoff_t pos;
896
897 if (ctx->hasSeek)
898 {
899 /*
900 * Prior to 1.7 (pg7.3) we relied on the internally maintained
901 * pointer. Now we rely on ftello() always, unless the file has been
902 * found to not support it. For debugging purposes, print a warning
903 * if the internal pointer disagrees, so that we're more likely to
904 * notice if something's broken about the internal position tracking.
905 */
906 pos = ftello(AH->FH);
907 if (pos < 0)
908 exit_horribly(modulename, "could not determine seek position in archive file: %s\n",
909 strerror(errno));
910
911 if (pos != ctx->filePos)
912 write_msg(modulename, "WARNING: ftell mismatch with expected position -- ftell used\n");
913 }
914 else
915 pos = ctx->filePos;
916 return pos;
917 }
918
919 /*
920 * Read a data block header. The format changed in V1.3, so we
921 * centralize the code here for simplicity. Returns *type = EOF
922 * if at EOF.
923 */
924 static void
_readBlockHeader(ArchiveHandle * AH,int * type,int * id)925 _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
926 {
927 lclContext *ctx = (lclContext *) AH->formatData;
928 int byt;
929
930 /*
931 * Note: if we are at EOF with a pre-1.3 input file, we'll exit_horribly
932 * inside ReadInt rather than returning EOF. It doesn't seem worth
933 * jumping through hoops to deal with that case better, because no such
934 * files are likely to exist in the wild: only some 7.1 development
935 * versions of pg_dump ever generated such files.
936 */
937 if (AH->version < K_VERS_1_3)
938 *type = BLK_DATA;
939 else
940 {
941 byt = getc(AH->FH);
942 *type = byt;
943 if (byt == EOF)
944 {
945 *id = 0; /* don't return an uninitialized value */
946 return;
947 }
948 ctx->filePos += 1;
949 }
950
951 *id = ReadInt(AH);
952 }
953
954 /*
955 * Callback function for WriteDataToArchive. Writes one block of (compressed)
956 * data to the archive.
957 */
958 static void
_CustomWriteFunc(ArchiveHandle * AH,const char * buf,size_t len)959 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
960 {
961 /* never write 0-byte blocks (this should not happen) */
962 if (len > 0)
963 {
964 WriteInt(AH, len);
965 _WriteBuf(AH, buf, len);
966 }
967 return;
968 }
969
970 /*
971 * Callback function for ReadDataFromArchive. To keep things simple, we
972 * always read one compressed block at a time.
973 */
974 static size_t
_CustomReadFunc(ArchiveHandle * AH,char ** buf,size_t * buflen)975 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
976 {
977 size_t blkLen;
978
979 /* Read length */
980 blkLen = ReadInt(AH);
981 if (blkLen == 0)
982 return 0;
983
984 /* If the caller's buffer is not large enough, allocate a bigger one */
985 if (blkLen > *buflen)
986 {
987 free(*buf);
988 *buf = (char *) pg_malloc(blkLen);
989 *buflen = blkLen;
990 }
991
992 /* exits app on read errors */
993 _ReadBuf(AH, *buf, blkLen);
994
995 return blkLen;
996 }
997