1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_directory.c
4  *
5  *	A directory format dump is a directory, which contains a "toc.dat" file
6  *	for the TOC, and a separate file for each data entry, named "<oid>.dat".
7  *	Large objects (BLOBs) are stored in separate files named "blob_<uid>.dat",
8  *	and there's a plain-text TOC file for them called "blobs.toc". If
9  *	compression is used, each data file is individually compressed and the
10  *	".gz" suffix is added to the filenames. The TOC files are never
11  *	compressed by pg_dump, however they are accepted with the .gz suffix too,
12  *	in case the user has manually compressed them with 'gzip'.
13  *
14  *	NOTE: This format is identical to the files written in the tar file in
15  *	the 'tar' format, except that we don't write the restore.sql file (TODO),
16  *	and the tar format doesn't support compression. Please keep the formats in
17  *	sync.
18  *
19  *
20  *	Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
21  *	Portions Copyright (c) 1994, Regents of the University of California
22  *	Portions Copyright (c) 2000, Philip Warner
23  *
24  *	Rights are granted to use this software in any way so long
25  *	as this notice is not removed.
26  *
27  *	The author is not responsible for loss or damages that may
28  *	result from it's use.
29  *
30  * IDENTIFICATION
31  *		src/bin/pg_dump/pg_backup_directory.c
32  *
33  *-------------------------------------------------------------------------
34  */
35 #include "postgres_fe.h"
36 
37 #include "compress_io.h"
38 #include "parallel.h"
39 #include "pg_backup_utils.h"
40 
41 #include <dirent.h>
42 #include <sys/stat.h>
43 
44 typedef struct
45 {
46 	/*
47 	 * Our archive location. This is basically what the user specified as his
48 	 * backup file but of course here it is a directory.
49 	 */
50 	char	   *directory;
51 
52 	cfp		   *dataFH;			/* currently open data file */
53 
54 	cfp		   *blobsTocFH;		/* file handle for blobs.toc */
55 	ParallelState *pstate;		/* for parallel backup / restore */
56 } lclContext;
57 
58 typedef struct
59 {
60 	char	   *filename;		/* filename excluding the directory (basename) */
61 } lclTocEntry;
62 
63 /* translator: this is a module name */
64 static const char *modulename = gettext_noop("directory archiver");
65 
66 /* prototypes for private functions */
67 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
68 static void _StartData(ArchiveHandle *AH, TocEntry *te);
69 static void _EndData(ArchiveHandle *AH, TocEntry *te);
70 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
71 static int	_WriteByte(ArchiveHandle *AH, const int i);
72 static int	_ReadByte(ArchiveHandle *);
73 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
74 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
75 static void _CloseArchive(ArchiveHandle *AH);
76 static void _ReopenArchive(ArchiveHandle *AH);
77 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
78 
79 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
80 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
81 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
82 
83 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
84 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
85 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
86 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
87 static void _LoadBlobs(ArchiveHandle *AH);
88 
89 static void _Clone(ArchiveHandle *AH);
90 static void _DeClone(ArchiveHandle *AH);
91 
92 static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
93 static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te,
94 					   const char *str, T_Action act);
95 static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
96 static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
97 
98 static void setFilePath(ArchiveHandle *AH, char *buf,
99 			const char *relativeFilename);
100 
101 /*
102  *	Init routine required by ALL formats. This is a global routine
103  *	and should be declared in pg_backup_archiver.h
104  *
105  *	Its task is to create any extra archive context (using AH->formatData),
106  *	and to initialize the supported function pointers.
107  *
108  *	It should also prepare whatever its input source is for reading/writing,
109  *	and in the case of a read mode connection, it should load the Header & TOC.
110  */
111 void
InitArchiveFmt_Directory(ArchiveHandle * AH)112 InitArchiveFmt_Directory(ArchiveHandle *AH)
113 {
114 	lclContext *ctx;
115 
116 	/* Assuming static functions, this can be copied for each format. */
117 	AH->ArchiveEntryPtr = _ArchiveEntry;
118 	AH->StartDataPtr = _StartData;
119 	AH->WriteDataPtr = _WriteData;
120 	AH->EndDataPtr = _EndData;
121 	AH->WriteBytePtr = _WriteByte;
122 	AH->ReadBytePtr = _ReadByte;
123 	AH->WriteBufPtr = _WriteBuf;
124 	AH->ReadBufPtr = _ReadBuf;
125 	AH->ClosePtr = _CloseArchive;
126 	AH->ReopenPtr = _ReopenArchive;
127 	AH->PrintTocDataPtr = _PrintTocData;
128 	AH->ReadExtraTocPtr = _ReadExtraToc;
129 	AH->WriteExtraTocPtr = _WriteExtraToc;
130 	AH->PrintExtraTocPtr = _PrintExtraToc;
131 
132 	AH->StartBlobsPtr = _StartBlobs;
133 	AH->StartBlobPtr = _StartBlob;
134 	AH->EndBlobPtr = _EndBlob;
135 	AH->EndBlobsPtr = _EndBlobs;
136 
137 	AH->ClonePtr = _Clone;
138 	AH->DeClonePtr = _DeClone;
139 
140 	AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
141 	AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
142 
143 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
144 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
145 
146 	/* Set up our private context */
147 	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
148 	AH->formatData = (void *) ctx;
149 
150 	ctx->dataFH = NULL;
151 	ctx->blobsTocFH = NULL;
152 
153 	/* Initialize LO buffering */
154 	AH->lo_buf_size = LOBBUFSIZE;
155 	AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
156 
157 	/*
158 	 * Now open the TOC file
159 	 */
160 
161 	if (!AH->fSpec || strcmp(AH->fSpec, "") == 0)
162 		exit_horribly(modulename, "no output directory specified\n");
163 
164 	ctx->directory = AH->fSpec;
165 
166 	if (AH->mode == archModeWrite)
167 	{
168 		struct stat st;
169 		bool		is_empty = false;
170 
171 		/* we accept an empty existing directory */
172 		if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
173 		{
174 			DIR		   *dir = opendir(ctx->directory);
175 
176 			if (dir)
177 			{
178 				struct dirent *d;
179 
180 				is_empty = true;
181 				while (errno = 0, (d = readdir(dir)))
182 				{
183 					if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
184 					{
185 						is_empty = false;
186 						break;
187 					}
188 				}
189 
190 				if (errno)
191 					exit_horribly(modulename, "could not read directory \"%s\": %s\n",
192 								  ctx->directory, strerror(errno));
193 
194 				if (closedir(dir))
195 					exit_horribly(modulename, "could not close directory \"%s\": %s\n",
196 								  ctx->directory, strerror(errno));
197 			}
198 		}
199 
200 		if (!is_empty && mkdir(ctx->directory, 0700) < 0)
201 			exit_horribly(modulename, "could not create directory \"%s\": %s\n",
202 						  ctx->directory, strerror(errno));
203 	}
204 	else
205 	{							/* Read Mode */
206 		char		fname[MAXPGPATH];
207 		cfp		   *tocFH;
208 
209 		setFilePath(AH, fname, "toc.dat");
210 
211 		tocFH = cfopen_read(fname, PG_BINARY_R);
212 		if (tocFH == NULL)
213 			exit_horribly(modulename,
214 						  "could not open input file \"%s\": %s\n",
215 						  fname, strerror(errno));
216 
217 		ctx->dataFH = tocFH;
218 
219 		/*
220 		 * The TOC of a directory format dump shares the format code of the
221 		 * tar format.
222 		 */
223 		AH->format = archTar;
224 		ReadHead(AH);
225 		AH->format = archDirectory;
226 		ReadToc(AH);
227 
228 		/* Nothing else in the file, so close it again... */
229 		if (cfclose(tocFH) != 0)
230 			exit_horribly(modulename, "could not close TOC file: %s\n",
231 						  strerror(errno));
232 		ctx->dataFH = NULL;
233 	}
234 }
235 
236 /*
237  * Called by the Archiver when the dumper creates a new TOC entry.
238  *
239  * We determine the filename for this entry.
240 */
241 static void
_ArchiveEntry(ArchiveHandle * AH,TocEntry * te)242 _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
243 {
244 	lclTocEntry *tctx;
245 	char		fn[MAXPGPATH];
246 
247 	tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
248 	if (te->dataDumper)
249 	{
250 		snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
251 		tctx->filename = pg_strdup(fn);
252 	}
253 	else if (strcmp(te->desc, "BLOBS") == 0)
254 		tctx->filename = pg_strdup("blobs.toc");
255 	else
256 		tctx->filename = NULL;
257 
258 	te->formatData = (void *) tctx;
259 }
260 
261 /*
262  * Called by the Archiver to save any extra format-related TOC entry
263  * data.
264  *
265  * Use the Archiver routines to write data - they are non-endian, and
266  * maintain other important file information.
267  */
268 static void
_WriteExtraToc(ArchiveHandle * AH,TocEntry * te)269 _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
270 {
271 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
272 
273 	/*
274 	 * A dumpable object has set tctx->filename, any other object has not.
275 	 * (see _ArchiveEntry).
276 	 */
277 	if (tctx->filename)
278 		WriteStr(AH, tctx->filename);
279 	else
280 		WriteStr(AH, "");
281 }
282 
283 /*
284  * Called by the Archiver to read any extra format-related TOC data.
285  *
286  * Needs to match the order defined in _WriteExtraToc, and should also
287  * use the Archiver input routines.
288  */
289 static void
_ReadExtraToc(ArchiveHandle * AH,TocEntry * te)290 _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
291 {
292 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
293 
294 	if (tctx == NULL)
295 	{
296 		tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
297 		te->formatData = (void *) tctx;
298 	}
299 
300 	tctx->filename = ReadStr(AH);
301 	if (strlen(tctx->filename) == 0)
302 	{
303 		free(tctx->filename);
304 		tctx->filename = NULL;
305 	}
306 }
307 
308 /*
309  * Called by the Archiver when restoring an archive to output a comment
310  * that includes useful information about the TOC entry.
311  */
312 static void
_PrintExtraToc(ArchiveHandle * AH,TocEntry * te)313 _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
314 {
315 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
316 
317 	if (AH->public.verbose && tctx->filename)
318 		ahprintf(AH, "-- File: %s\n", tctx->filename);
319 }
320 
321 /*
322  * Called by the archiver when saving TABLE DATA (not schema). This routine
323  * should save whatever format-specific information is needed to read
324  * the archive back.
325  *
326  * It is called just prior to the dumper's 'DataDumper' routine being called.
327  *
328  * We create the data file for writing.
329  */
330 static void
_StartData(ArchiveHandle * AH,TocEntry * te)331 _StartData(ArchiveHandle *AH, TocEntry *te)
332 {
333 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
334 	lclContext *ctx = (lclContext *) AH->formatData;
335 	char		fname[MAXPGPATH];
336 
337 	setFilePath(AH, fname, tctx->filename);
338 
339 	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
340 	if (ctx->dataFH == NULL)
341 		exit_horribly(modulename, "could not open output file \"%s\": %s\n",
342 					  fname, strerror(errno));
343 }
344 
345 /*
346  * Called by archiver when dumper calls WriteData. This routine is
347  * called for both BLOB and TABLE data; it is the responsibility of
348  * the format to manage each kind of data using StartBlob/StartData.
349  *
350  * It should only be called from within a DataDumper routine.
351  *
352  * We write the data to the open data file.
353  */
354 static void
_WriteData(ArchiveHandle * AH,const void * data,size_t dLen)355 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
356 {
357 	lclContext *ctx = (lclContext *) AH->formatData;
358 
359 	errno = 0;
360 	if (dLen > 0 && cfwrite(data, dLen, ctx->dataFH) != dLen)
361 	{
362 		/* if write didn't set errno, assume problem is no disk space */
363 		if (errno == 0)
364 			errno = ENOSPC;
365 		exit_horribly(modulename, "could not write to output file: %s\n",
366 					  get_cfp_error(ctx->dataFH));
367 	}
368 
369 	return;
370 }
371 
372 /*
373  * Called by the archiver when a dumper's 'DataDumper' routine has
374  * finished.
375  *
376  * We close the data file.
377  */
378 static void
_EndData(ArchiveHandle * AH,TocEntry * te)379 _EndData(ArchiveHandle *AH, TocEntry *te)
380 {
381 	lclContext *ctx = (lclContext *) AH->formatData;
382 
383 	/* Close the file */
384 	cfclose(ctx->dataFH);
385 
386 	ctx->dataFH = NULL;
387 }
388 
389 /*
390  * Print data for a given file (can be a BLOB as well)
391  */
392 static void
_PrintFileData(ArchiveHandle * AH,char * filename)393 _PrintFileData(ArchiveHandle *AH, char *filename)
394 {
395 	size_t		cnt;
396 	char	   *buf;
397 	size_t		buflen;
398 	cfp		   *cfp;
399 
400 	if (!filename)
401 		return;
402 
403 	cfp = cfopen_read(filename, PG_BINARY_R);
404 
405 	if (!cfp)
406 		exit_horribly(modulename, "could not open input file \"%s\": %s\n",
407 					  filename, strerror(errno));
408 
409 	buf = pg_malloc(ZLIB_OUT_SIZE);
410 	buflen = ZLIB_OUT_SIZE;
411 
412 	while ((cnt = cfread(buf, buflen, cfp)))
413 	{
414 		ahwrite(buf, 1, cnt, AH);
415 	}
416 
417 	free(buf);
418 	if (cfclose(cfp) !=0)
419 		exit_horribly(modulename, "could not close data file \"%s\": %s\n",
420 					  filename, strerror(errno));
421 }
422 
423 /*
424  * Print data for a given TOC entry
425 */
426 static void
_PrintTocData(ArchiveHandle * AH,TocEntry * te)427 _PrintTocData(ArchiveHandle *AH, TocEntry *te)
428 {
429 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
430 
431 	if (!tctx->filename)
432 		return;
433 
434 	if (strcmp(te->desc, "BLOBS") == 0)
435 		_LoadBlobs(AH);
436 	else
437 	{
438 		char		fname[MAXPGPATH];
439 
440 		setFilePath(AH, fname, tctx->filename);
441 		_PrintFileData(AH, fname);
442 	}
443 }
444 
445 static void
_LoadBlobs(ArchiveHandle * AH)446 _LoadBlobs(ArchiveHandle *AH)
447 {
448 	Oid			oid;
449 	lclContext *ctx = (lclContext *) AH->formatData;
450 	char		tocfname[MAXPGPATH];
451 	char		line[MAXPGPATH];
452 
453 	StartRestoreBlobs(AH);
454 
455 	setFilePath(AH, tocfname, "blobs.toc");
456 
457 	ctx->blobsTocFH = cfopen_read(tocfname, PG_BINARY_R);
458 
459 	if (ctx->blobsTocFH == NULL)
460 		exit_horribly(modulename, "could not open large object TOC file \"%s\" for input: %s\n",
461 					  tocfname, strerror(errno));
462 
463 	/* Read the blobs TOC file line-by-line, and process each blob */
464 	while ((cfgets(ctx->blobsTocFH, line, MAXPGPATH)) != NULL)
465 	{
466 		char		blobfname[MAXPGPATH + 1];
467 		char		path[MAXPGPATH];
468 
469 		/* Can't overflow because line and blobfname are the same length. */
470 		if (sscanf(line, "%u %" CppAsString2(MAXPGPATH) "s\n", &oid, blobfname) != 2)
471 			exit_horribly(modulename, "invalid line in large object TOC file \"%s\": \"%s\"\n",
472 						  tocfname, line);
473 
474 		StartRestoreBlob(AH, oid, AH->public.ropt->dropSchema);
475 		snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, blobfname);
476 		_PrintFileData(AH, path);
477 		EndRestoreBlob(AH, oid);
478 	}
479 	if (!cfeof(ctx->blobsTocFH))
480 		exit_horribly(modulename, "error reading large object TOC file \"%s\"\n",
481 					  tocfname);
482 
483 	if (cfclose(ctx->blobsTocFH) != 0)
484 		exit_horribly(modulename, "could not close large object TOC file \"%s\": %s\n",
485 					  tocfname, strerror(errno));
486 
487 	ctx->blobsTocFH = NULL;
488 
489 	EndRestoreBlobs(AH);
490 }
491 
492 
493 /*
494  * Write a byte of data to the archive.
495  * Called by the archiver to do integer & byte output to the archive.
496  * These routines are only used to read & write the headers & TOC.
497  */
498 static int
_WriteByte(ArchiveHandle * AH,const int i)499 _WriteByte(ArchiveHandle *AH, const int i)
500 {
501 	unsigned char c = (unsigned char) i;
502 	lclContext *ctx = (lclContext *) AH->formatData;
503 
504 	errno = 0;
505 	if (cfwrite(&c, 1, ctx->dataFH) != 1)
506 	{
507 		/* if write didn't set errno, assume problem is no disk space */
508 		if (errno == 0)
509 			errno = ENOSPC;
510 		exit_horribly(modulename, "could not write to output file: %s\n",
511 					  get_cfp_error(ctx->dataFH));
512 	}
513 
514 	return 1;
515 }
516 
517 /*
518  * Read a byte of data from the archive.
519  * Called by the archiver to read bytes & integers from the archive.
520  * These routines are only used to read & write headers & TOC.
521  * EOF should be treated as a fatal error.
522  */
523 static int
_ReadByte(ArchiveHandle * AH)524 _ReadByte(ArchiveHandle *AH)
525 {
526 	lclContext *ctx = (lclContext *) AH->formatData;
527 
528 	return cfgetc(ctx->dataFH);
529 }
530 
531 /*
532  * Write a buffer of data to the archive.
533  * Called by the archiver to write a block of bytes to the TOC or a data file.
534  */
535 static void
_WriteBuf(ArchiveHandle * AH,const void * buf,size_t len)536 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
537 {
538 	lclContext *ctx = (lclContext *) AH->formatData;
539 
540 	errno = 0;
541 	if (cfwrite(buf, len, ctx->dataFH) != len)
542 	{
543 		/* if write didn't set errno, assume problem is no disk space */
544 		if (errno == 0)
545 			errno = ENOSPC;
546 		exit_horribly(modulename, "could not write to output file: %s\n",
547 					  get_cfp_error(ctx->dataFH));
548 	}
549 
550 	return;
551 }
552 
553 /*
554  * Read a block of bytes from the archive.
555  *
556  * Called by the archiver to read a block of bytes from the archive
557  */
558 static void
_ReadBuf(ArchiveHandle * AH,void * buf,size_t len)559 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
560 {
561 	lclContext *ctx = (lclContext *) AH->formatData;
562 
563 	/*
564 	 * If there was an I/O error, we already exited in cfread(), so here we
565 	 * exit on short reads.
566 	 */
567 	if (cfread(buf, len, ctx->dataFH) != len)
568 		exit_horribly(modulename,
569 					  "could not read from input file: end of file\n");
570 
571 	return;
572 }
573 
574 /*
575  * Close the archive.
576  *
577  * When writing the archive, this is the routine that actually starts
578  * the process of saving it to files. No data should be written prior
579  * to this point, since the user could sort the TOC after creating it.
580  *
581  * If an archive is to be written, this routine must call:
582  *		WriteHead			to save the archive header
583  *		WriteToc			to save the TOC entries
584  *		WriteDataChunks		to save all DATA & BLOBs.
585  */
586 static void
_CloseArchive(ArchiveHandle * AH)587 _CloseArchive(ArchiveHandle *AH)
588 {
589 	lclContext *ctx = (lclContext *) AH->formatData;
590 
591 	if (AH->mode == archModeWrite)
592 	{
593 		cfp		   *tocFH;
594 		char		fname[MAXPGPATH];
595 
596 		setFilePath(AH, fname, "toc.dat");
597 
598 		/* this will actually fork the processes for a parallel backup */
599 		ctx->pstate = ParallelBackupStart(AH);
600 
601 		/* The TOC is always created uncompressed */
602 		tocFH = cfopen_write(fname, PG_BINARY_W, 0);
603 		if (tocFH == NULL)
604 			exit_horribly(modulename, "could not open output file \"%s\": %s\n",
605 						  fname, strerror(errno));
606 		ctx->dataFH = tocFH;
607 
608 		/*
609 		 * Write 'tar' in the format field of the toc.dat file. The directory
610 		 * is compatible with 'tar', so there's no point having a different
611 		 * format code for it.
612 		 */
613 		AH->format = archTar;
614 		WriteHead(AH);
615 		AH->format = archDirectory;
616 		WriteToc(AH);
617 		if (cfclose(tocFH) != 0)
618 			exit_horribly(modulename, "could not close TOC file: %s\n",
619 						  strerror(errno));
620 		WriteDataChunks(AH, ctx->pstate);
621 
622 		ParallelBackupEnd(AH, ctx->pstate);
623 	}
624 	AH->FH = NULL;
625 }
626 
627 /*
628  * Reopen the archive's file handle.
629  */
630 static void
_ReopenArchive(ArchiveHandle * AH)631 _ReopenArchive(ArchiveHandle *AH)
632 {
633 	/*
634 	 * Our TOC is in memory, our data files are opened by each child anyway as
635 	 * they are separate. We support reopening the archive by just doing
636 	 * nothing.
637 	 */
638 }
639 
640 /*
641  * BLOB support
642  */
643 
644 /*
645  * Called by the archiver when starting to save all BLOB DATA (not schema).
646  * It is called just prior to the dumper's DataDumper routine.
647  *
648  * We open the large object TOC file here, so that we can append a line to
649  * it for each blob.
650  */
651 static void
_StartBlobs(ArchiveHandle * AH,TocEntry * te)652 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
653 {
654 	lclContext *ctx = (lclContext *) AH->formatData;
655 	char		fname[MAXPGPATH];
656 
657 	setFilePath(AH, fname, "blobs.toc");
658 
659 	/* The blob TOC file is never compressed */
660 	ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
661 	if (ctx->blobsTocFH == NULL)
662 		exit_horribly(modulename, "could not open output file \"%s\": %s\n",
663 					  fname, strerror(errno));
664 }
665 
666 /*
667  * Called by the archiver when we're about to start dumping a blob.
668  *
669  * We create a file to write the blob to.
670  */
671 static void
_StartBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)672 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
673 {
674 	lclContext *ctx = (lclContext *) AH->formatData;
675 	char		fname[MAXPGPATH];
676 
677 	snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
678 
679 	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
680 
681 	if (ctx->dataFH == NULL)
682 		exit_horribly(modulename, "could not open output file \"%s\": %s\n",
683 					  fname, strerror(errno));
684 }
685 
686 /*
687  * Called by the archiver when the dumper is finished writing a blob.
688  *
689  * We close the blob file and write an entry to the blob TOC file for it.
690  */
691 static void
_EndBlob(ArchiveHandle * AH,TocEntry * te,Oid oid)692 _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
693 {
694 	lclContext *ctx = (lclContext *) AH->formatData;
695 	char		buf[50];
696 	int			len;
697 
698 	/* Close the BLOB data file itself */
699 	cfclose(ctx->dataFH);
700 	ctx->dataFH = NULL;
701 
702 	/* register the blob in blobs.toc */
703 	len = snprintf(buf, sizeof(buf), "%u blob_%u.dat\n", oid, oid);
704 	if (cfwrite(buf, len, ctx->blobsTocFH) != len)
705 		exit_horribly(modulename, "could not write to blobs TOC file\n");
706 }
707 
708 /*
709  * Called by the archiver when finishing saving all BLOB DATA.
710  *
711  * We close the blobs TOC file.
712  */
713 static void
_EndBlobs(ArchiveHandle * AH,TocEntry * te)714 _EndBlobs(ArchiveHandle *AH, TocEntry *te)
715 {
716 	lclContext *ctx = (lclContext *) AH->formatData;
717 
718 	cfclose(ctx->blobsTocFH);
719 	ctx->blobsTocFH = NULL;
720 }
721 
722 /*
723  * Gets a relative file name and prepends the output directory, writing the
724  * result to buf. The caller needs to make sure that buf is MAXPGPATH bytes
725  * big. Can't use a static char[MAXPGPATH] inside the function because we run
726  * multithreaded on Windows.
727  */
728 static void
setFilePath(ArchiveHandle * AH,char * buf,const char * relativeFilename)729 setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
730 {
731 	lclContext *ctx = (lclContext *) AH->formatData;
732 	char	   *dname;
733 
734 	dname = ctx->directory;
735 
736 	if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
737 		exit_horribly(modulename, "file name too long: \"%s\"\n", dname);
738 
739 	strcpy(buf, dname);
740 	strcat(buf, "/");
741 	strcat(buf, relativeFilename);
742 }
743 
744 /*
745  * Clone format-specific fields during parallel restoration.
746  */
747 static void
_Clone(ArchiveHandle * AH)748 _Clone(ArchiveHandle *AH)
749 {
750 	lclContext *ctx = (lclContext *) AH->formatData;
751 
752 	AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
753 	memcpy(AH->formatData, ctx, sizeof(lclContext));
754 	ctx = (lclContext *) AH->formatData;
755 
756 	/*
757 	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
758 	 * entry per archive, so no parallelism is possible.  Likewise,
759 	 * TOC-entry-local state isn't an issue because any one TOC entry is
760 	 * touched by just one worker child.
761 	 */
762 
763 	/*
764 	 * We also don't copy the ParallelState pointer (pstate), only the master
765 	 * process ever writes to it.
766 	 */
767 }
768 
769 static void
_DeClone(ArchiveHandle * AH)770 _DeClone(ArchiveHandle *AH)
771 {
772 	lclContext *ctx = (lclContext *) AH->formatData;
773 
774 	free(ctx);
775 }
776 
777 /*
778  * This function is executed in the parent process. Depending on the desired
779  * action (dump or restore) it creates a string that is understood by the
780  * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
781  */
782 static char *
_MasterStartParallelItem(ArchiveHandle * AH,TocEntry * te,T_Action act)783 _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
784 {
785 	/*
786 	 * A static char is okay here, even on Windows because we call this
787 	 * function only from one process (the master).
788 	 */
789 	static char buf[64];
790 
791 	if (act == ACT_DUMP)
792 		snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId);
793 	else if (act == ACT_RESTORE)
794 		snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
795 
796 	return buf;
797 }
798 
799 /*
800  * This function is executed in the child of a parallel backup for the
801  * directory archive and dumps the actual data.
802  *
803  * We are currently returning only the DumpId so theoretically we could
804  * make this function returning an int (or a DumpId). However, to
805  * facilitate further enhancements and because sooner or later we need to
806  * convert this to a string and send it via a message anyway, we stick with
807  * char *. It is parsed on the other side by the _EndMasterParallel()
808  * function of the respective dump format.
809  */
810 static char *
_WorkerJobDumpDirectory(ArchiveHandle * AH,TocEntry * te)811 _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
812 {
813 	/*
814 	 * short fixed-size string + some ID so far, this needs to be malloc'ed
815 	 * instead of static because we work with threads on windows
816 	 */
817 	const int	buflen = 64;
818 	char	   *buf = (char *) pg_malloc(buflen);
819 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
820 
821 	/* This should never happen */
822 	if (!tctx)
823 		exit_horribly(modulename, "error during backup\n");
824 
825 	/*
826 	 * This function returns void. We either fail and die horribly or
827 	 * succeed... A failure will be detected by the parent when the child dies
828 	 * unexpectedly.
829 	 */
830 	WriteDataChunksForTocEntry(AH, te);
831 
832 	snprintf(buf, buflen, "OK DUMP %d", te->dumpId);
833 
834 	return buf;
835 }
836 
837 /*
838  * This function is executed in the child of a parallel backup for the
839  * directory archive and dumps the actual data.
840  */
841 static char *
_WorkerJobRestoreDirectory(ArchiveHandle * AH,TocEntry * te)842 _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
843 {
844 	/*
845 	 * short fixed-size string + some ID so far, this needs to be malloc'ed
846 	 * instead of static because we work with threads on windows
847 	 */
848 	const int	buflen = 64;
849 	char	   *buf = (char *) pg_malloc(buflen);
850 	ParallelArgs pargs;
851 	int			status;
852 
853 	pargs.AH = AH;
854 	pargs.te = te;
855 
856 	status = parallel_restore(&pargs);
857 
858 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
859 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
860 
861 	return buf;
862 }
863 
864 /*
865  * This function is executed in the parent process. It analyzes the response of
866  * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
867  * respective dump format.
868  */
869 static int
_MasterEndParallelItem(ArchiveHandle * AH,TocEntry * te,const char * str,T_Action act)870 _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
871 {
872 	DumpId		dumpId;
873 	int			nBytes,
874 				n_errors;
875 	int			status = 0;
876 
877 	if (act == ACT_DUMP)
878 	{
879 		sscanf(str, "%d%n", &dumpId, &nBytes);
880 
881 		Assert(dumpId == te->dumpId);
882 		Assert(nBytes == strlen(str));
883 	}
884 	else if (act == ACT_RESTORE)
885 	{
886 		sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
887 
888 		Assert(dumpId == te->dumpId);
889 		Assert(nBytes == strlen(str));
890 
891 		AH->public.n_errors += n_errors;
892 	}
893 
894 	return status;
895 }
896