1 /*-------------------------------------------------------------------------
2  *
3  * walmethods.c - implementations of different ways to write received wal
4  *
5  * NOTE! The caller must ensure that only one method is instantiated in
6  *		 any given program, and that it's only instantiated once!
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  *		  src/bin/pg_basebackup/walmethods.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <time.h>
19 #include <unistd.h>
20 #ifdef HAVE_LIBZ
21 #include <zlib.h>
22 #endif
23 
24 #include "pgtar.h"
25 #include "common/file_utils.h"
26 
27 #include "receivelog.h"
28 #include "streamutil.h"
29 
30 /* Size of zlib buffer for .tar.gz */
31 #define ZLIB_OUT_SIZE 4096
32 
33 /*-------------------------------------------------------------------------
34  * WalDirectoryMethod - write wal to a directory looking like pg_wal
35  *-------------------------------------------------------------------------
36  */
37 
38 /*
39  * Global static data for this method
40  */
41 typedef struct DirectoryMethodData
42 {
43 	char	   *basedir;
44 	int			compression;
45 	bool		sync;
46 } DirectoryMethodData;
47 static DirectoryMethodData *dir_data = NULL;
48 
49 /*
50  * Local file handle
51  */
52 typedef struct DirectoryMethodFile
53 {
54 	int			fd;
55 	off_t		currpos;
56 	char	   *pathname;
57 	char	   *fullpath;
58 	char	   *temp_suffix;
59 #ifdef HAVE_LIBZ
60 	gzFile		gzfp;
61 #endif
62 } DirectoryMethodFile;
63 
64 static const char *
dir_getlasterror(void)65 dir_getlasterror(void)
66 {
67 	/* Directory method always sets errno, so just use strerror */
68 	return strerror(errno);
69 }
70 
71 static char *
dir_get_file_name(const char * pathname,const char * temp_suffix)72 dir_get_file_name(const char *pathname, const char *temp_suffix)
73 {
74 	char	   *filename = pg_malloc0(MAXPGPATH * sizeof(char));
75 
76 	snprintf(filename, MAXPGPATH, "%s%s%s",
77 			 pathname, dir_data->compression > 0 ? ".gz" : "",
78 			 temp_suffix ? temp_suffix : "");
79 
80 	return filename;
81 }
82 
83 static Walfile
dir_open_for_write(const char * pathname,const char * temp_suffix,size_t pad_to_size)84 dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
85 {
86 	static char tmppath[MAXPGPATH];
87 	char	   *filename;
88 	int			fd;
89 	DirectoryMethodFile *f;
90 #ifdef HAVE_LIBZ
91 	gzFile		gzfp = NULL;
92 #endif
93 
94 	filename = dir_get_file_name(pathname, temp_suffix);
95 	snprintf(tmppath, sizeof(tmppath), "%s/%s",
96 			 dir_data->basedir, filename);
97 	pg_free(filename);
98 
99 	/*
100 	 * Open a file for non-compressed as well as compressed files. Tracking
101 	 * the file descriptor is important for dir_sync() method as gzflush()
102 	 * does not do any system calls to fsync() to make changes permanent on
103 	 * disk.
104 	 */
105 	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
106 	if (fd < 0)
107 		return NULL;
108 
109 #ifdef HAVE_LIBZ
110 	if (dir_data->compression > 0)
111 	{
112 		gzfp = gzdopen(fd, "wb");
113 		if (gzfp == NULL)
114 		{
115 			close(fd);
116 			return NULL;
117 		}
118 
119 		if (gzsetparams(gzfp, dir_data->compression,
120 						Z_DEFAULT_STRATEGY) != Z_OK)
121 		{
122 			gzclose(gzfp);
123 			return NULL;
124 		}
125 	}
126 #endif
127 
128 	/* Do pre-padding on non-compressed files */
129 	if (pad_to_size && dir_data->compression == 0)
130 	{
131 		PGAlignedXLogBlock zerobuf;
132 		int			bytes;
133 
134 		memset(zerobuf.data, 0, XLOG_BLCKSZ);
135 		for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
136 		{
137 			errno = 0;
138 			if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
139 			{
140 				int			save_errno = errno;
141 
142 				close(fd);
143 
144 				/*
145 				 * If write didn't set errno, assume problem is no disk space.
146 				 */
147 				errno = save_errno ? save_errno : ENOSPC;
148 				return NULL;
149 			}
150 		}
151 
152 		if (lseek(fd, 0, SEEK_SET) != 0)
153 		{
154 			int			save_errno = errno;
155 
156 			close(fd);
157 			errno = save_errno;
158 			return NULL;
159 		}
160 	}
161 
162 	/*
163 	 * fsync WAL file and containing directory, to ensure the file is
164 	 * persistently created and zeroed (if padded). That's particularly
165 	 * important when using synchronous mode, where the file is modified and
166 	 * fsynced in-place, without a directory fsync.
167 	 */
168 	if (dir_data->sync)
169 	{
170 		if (fsync_fname(tmppath, false, progname) != 0 ||
171 			fsync_parent_path(tmppath, progname) != 0)
172 		{
173 #ifdef HAVE_LIBZ
174 			if (dir_data->compression > 0)
175 				gzclose(gzfp);
176 			else
177 #endif
178 				close(fd);
179 			return NULL;
180 		}
181 	}
182 
183 	f = pg_malloc0(sizeof(DirectoryMethodFile));
184 #ifdef HAVE_LIBZ
185 	if (dir_data->compression > 0)
186 		f->gzfp = gzfp;
187 #endif
188 	f->fd = fd;
189 	f->currpos = 0;
190 	f->pathname = pg_strdup(pathname);
191 	f->fullpath = pg_strdup(tmppath);
192 	if (temp_suffix)
193 		f->temp_suffix = pg_strdup(temp_suffix);
194 
195 	return f;
196 }
197 
198 static ssize_t
dir_write(Walfile f,const void * buf,size_t count)199 dir_write(Walfile f, const void *buf, size_t count)
200 {
201 	ssize_t		r;
202 	DirectoryMethodFile *df = (DirectoryMethodFile *) f;
203 
204 	Assert(f != NULL);
205 
206 #ifdef HAVE_LIBZ
207 	if (dir_data->compression > 0)
208 		r = (ssize_t) gzwrite(df->gzfp, buf, count);
209 	else
210 #endif
211 		r = write(df->fd, buf, count);
212 	if (r > 0)
213 		df->currpos += r;
214 	return r;
215 }
216 
217 static off_t
dir_get_current_pos(Walfile f)218 dir_get_current_pos(Walfile f)
219 {
220 	Assert(f != NULL);
221 
222 	/* Use a cached value to prevent lots of reseeks */
223 	return ((DirectoryMethodFile *) f)->currpos;
224 }
225 
226 static int
dir_close(Walfile f,WalCloseMethod method)227 dir_close(Walfile f, WalCloseMethod method)
228 {
229 	int			r;
230 	DirectoryMethodFile *df = (DirectoryMethodFile *) f;
231 	static char tmppath[MAXPGPATH];
232 	static char tmppath2[MAXPGPATH];
233 
234 	Assert(f != NULL);
235 
236 #ifdef HAVE_LIBZ
237 	if (dir_data->compression > 0)
238 		r = gzclose(df->gzfp);
239 	else
240 #endif
241 		r = close(df->fd);
242 
243 	if (r == 0)
244 	{
245 		/* Build path to the current version of the file */
246 		if (method == CLOSE_NORMAL && df->temp_suffix)
247 		{
248 			char	   *filename;
249 			char	   *filename2;
250 
251 			/*
252 			 * If we have a temp prefix, normal operation is to rename the
253 			 * file.
254 			 */
255 			filename = dir_get_file_name(df->pathname, df->temp_suffix);
256 			snprintf(tmppath, sizeof(tmppath), "%s/%s",
257 					 dir_data->basedir, filename);
258 			pg_free(filename);
259 
260 			/* permanent name, so no need for the prefix */
261 			filename2 = dir_get_file_name(df->pathname, NULL);
262 			snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
263 					 dir_data->basedir, filename2);
264 			pg_free(filename2);
265 			r = durable_rename(tmppath, tmppath2, progname);
266 		}
267 		else if (method == CLOSE_UNLINK)
268 		{
269 			char	   *filename;
270 
271 			/* Unlink the file once it's closed */
272 			filename = dir_get_file_name(df->pathname, df->temp_suffix);
273 			snprintf(tmppath, sizeof(tmppath), "%s/%s",
274 					 dir_data->basedir, filename);
275 			pg_free(filename);
276 			r = unlink(tmppath);
277 		}
278 		else
279 		{
280 			/*
281 			 * Else either CLOSE_NORMAL and no temp suffix, or
282 			 * CLOSE_NO_RENAME. In this case, fsync the file and containing
283 			 * directory if sync mode is requested.
284 			 */
285 			if (dir_data->sync)
286 			{
287 				r = fsync_fname(df->fullpath, false, progname);
288 				if (r == 0)
289 					r = fsync_parent_path(df->fullpath, progname);
290 			}
291 		}
292 	}
293 
294 	pg_free(df->pathname);
295 	pg_free(df->fullpath);
296 	if (df->temp_suffix)
297 		pg_free(df->temp_suffix);
298 	pg_free(df);
299 
300 	return r;
301 }
302 
303 static int
dir_sync(Walfile f)304 dir_sync(Walfile f)
305 {
306 	Assert(f != NULL);
307 
308 	if (!dir_data->sync)
309 		return 0;
310 
311 #ifdef HAVE_LIBZ
312 	if (dir_data->compression > 0)
313 	{
314 		if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
315 			return -1;
316 	}
317 #endif
318 
319 	return fsync(((DirectoryMethodFile *) f)->fd);
320 }
321 
322 static ssize_t
dir_get_file_size(const char * pathname)323 dir_get_file_size(const char *pathname)
324 {
325 	struct stat statbuf;
326 	static char tmppath[MAXPGPATH];
327 
328 	snprintf(tmppath, sizeof(tmppath), "%s/%s",
329 			 dir_data->basedir, pathname);
330 
331 	if (stat(tmppath, &statbuf) != 0)
332 		return -1;
333 
334 	return statbuf.st_size;
335 }
336 
337 static int
dir_compression(void)338 dir_compression(void)
339 {
340 	return dir_data->compression;
341 }
342 
343 static bool
dir_existsfile(const char * pathname)344 dir_existsfile(const char *pathname)
345 {
346 	static char tmppath[MAXPGPATH];
347 	int			fd;
348 
349 	snprintf(tmppath, sizeof(tmppath), "%s/%s",
350 			 dir_data->basedir, pathname);
351 
352 	fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
353 	if (fd < 0)
354 		return false;
355 	close(fd);
356 	return true;
357 }
358 
359 static bool
dir_finish(void)360 dir_finish(void)
361 {
362 	if (dir_data->sync)
363 	{
364 		/*
365 		 * Files are fsynced when they are closed, but we need to fsync the
366 		 * directory entry here as well.
367 		 */
368 		if (fsync_fname(dir_data->basedir, true, progname) != 0)
369 			return false;
370 	}
371 	return true;
372 }
373 
374 
375 WalWriteMethod *
CreateWalDirectoryMethod(const char * basedir,int compression,bool sync)376 CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
377 {
378 	WalWriteMethod *method;
379 
380 	method = pg_malloc0(sizeof(WalWriteMethod));
381 	method->open_for_write = dir_open_for_write;
382 	method->write = dir_write;
383 	method->get_current_pos = dir_get_current_pos;
384 	method->get_file_size = dir_get_file_size;
385 	method->get_file_name = dir_get_file_name;
386 	method->compression = dir_compression;
387 	method->close = dir_close;
388 	method->sync = dir_sync;
389 	method->existsfile = dir_existsfile;
390 	method->finish = dir_finish;
391 	method->getlasterror = dir_getlasterror;
392 
393 	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
394 	dir_data->compression = compression;
395 	dir_data->basedir = pg_strdup(basedir);
396 	dir_data->sync = sync;
397 
398 	return method;
399 }
400 
401 void
FreeWalDirectoryMethod(void)402 FreeWalDirectoryMethod(void)
403 {
404 	pg_free(dir_data->basedir);
405 	pg_free(dir_data);
406 }
407 
408 
409 /*-------------------------------------------------------------------------
410  * WalTarMethod - write wal to a tar file containing pg_wal contents
411  *-------------------------------------------------------------------------
412  */
413 
414 typedef struct TarMethodFile
415 {
416 	off_t		ofs_start;		/* Where does the *header* for this file start */
417 	off_t		currpos;
418 	char		header[512];
419 	char	   *pathname;
420 	size_t		pad_to_size;
421 } TarMethodFile;
422 
423 typedef struct TarMethodData
424 {
425 	char	   *tarfilename;
426 	int			fd;
427 	int			compression;
428 	bool		sync;
429 	TarMethodFile *currentfile;
430 	char		lasterror[1024];
431 #ifdef HAVE_LIBZ
432 	z_streamp	zp;
433 	void	   *zlibOut;
434 #endif
435 } TarMethodData;
436 static TarMethodData *tar_data = NULL;
437 
438 #define tar_clear_error() tar_data->lasterror[0] = '\0'
439 #define tar_set_error(msg) strlcpy(tar_data->lasterror, _(msg), sizeof(tar_data->lasterror))
440 
441 static const char *
tar_getlasterror(void)442 tar_getlasterror(void)
443 {
444 	/*
445 	 * If a custom error is set, return that one. Otherwise, assume errno is
446 	 * set and return that one.
447 	 */
448 	if (tar_data->lasterror[0])
449 		return tar_data->lasterror;
450 	return strerror(errno);
451 }
452 
453 #ifdef HAVE_LIBZ
454 static bool
tar_write_compressed_data(void * buf,size_t count,bool flush)455 tar_write_compressed_data(void *buf, size_t count, bool flush)
456 {
457 	tar_data->zp->next_in = buf;
458 	tar_data->zp->avail_in = count;
459 
460 	while (tar_data->zp->avail_in || flush)
461 	{
462 		int			r;
463 
464 		r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
465 		if (r == Z_STREAM_ERROR)
466 		{
467 			tar_set_error("could not compress data");
468 			return false;
469 		}
470 
471 		if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
472 		{
473 			size_t		len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
474 
475 			errno = 0;
476 			if (write(tar_data->fd, tar_data->zlibOut, len) != len)
477 			{
478 				/*
479 				 * If write didn't set errno, assume problem is no disk space.
480 				 */
481 				if (errno == 0)
482 					errno = ENOSPC;
483 				return false;
484 			}
485 
486 			tar_data->zp->next_out = tar_data->zlibOut;
487 			tar_data->zp->avail_out = ZLIB_OUT_SIZE;
488 		}
489 
490 		if (r == Z_STREAM_END)
491 			break;
492 	}
493 
494 	if (flush)
495 	{
496 		/* Reset the stream for writing */
497 		if (deflateReset(tar_data->zp) != Z_OK)
498 		{
499 			tar_set_error("could not reset compression stream");
500 			return false;
501 		}
502 	}
503 
504 	return true;
505 }
506 #endif
507 
508 static ssize_t
tar_write(Walfile f,const void * buf,size_t count)509 tar_write(Walfile f, const void *buf, size_t count)
510 {
511 	ssize_t		r;
512 
513 	Assert(f != NULL);
514 	tar_clear_error();
515 
516 	/* Tarfile will always be positioned at the end */
517 	if (!tar_data->compression)
518 	{
519 		r = write(tar_data->fd, buf, count);
520 		if (r > 0)
521 			((TarMethodFile *) f)->currpos += r;
522 		return r;
523 	}
524 #ifdef HAVE_LIBZ
525 	else
526 	{
527 		if (!tar_write_compressed_data((void *) buf, count, false))
528 			return -1;
529 		((TarMethodFile *) f)->currpos += count;
530 		return count;
531 	}
532 #else
533 	else
534 		/* Can't happen - compression enabled with no libz */
535 		return -1;
536 #endif
537 }
538 
539 static bool
tar_write_padding_data(TarMethodFile * f,size_t bytes)540 tar_write_padding_data(TarMethodFile *f, size_t bytes)
541 {
542 	PGAlignedXLogBlock zerobuf;
543 	size_t		bytesleft = bytes;
544 
545 	memset(zerobuf.data, 0, XLOG_BLCKSZ);
546 	while (bytesleft)
547 	{
548 		size_t		bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
549 		ssize_t		r = tar_write(f, zerobuf.data, bytestowrite);
550 
551 		if (r < 0)
552 			return false;
553 		bytesleft -= r;
554 	}
555 
556 	return true;
557 }
558 
559 static char *
tar_get_file_name(const char * pathname,const char * temp_suffix)560 tar_get_file_name(const char *pathname, const char *temp_suffix)
561 {
562 	char	   *filename = pg_malloc0(MAXPGPATH * sizeof(char));
563 
564 	snprintf(filename, MAXPGPATH, "%s%s",
565 			 pathname, temp_suffix ? temp_suffix : "");
566 
567 	return filename;
568 }
569 
570 static Walfile
tar_open_for_write(const char * pathname,const char * temp_suffix,size_t pad_to_size)571 tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
572 {
573 	int			save_errno;
574 	char	   *tmppath;
575 
576 	tar_clear_error();
577 
578 	if (tar_data->fd < 0)
579 	{
580 		/*
581 		 * We open the tar file only when we first try to write to it.
582 		 */
583 		tar_data->fd = open(tar_data->tarfilename,
584 							O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
585 		if (tar_data->fd < 0)
586 			return NULL;
587 
588 #ifdef HAVE_LIBZ
589 		if (tar_data->compression)
590 		{
591 			tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
592 			tar_data->zp->zalloc = Z_NULL;
593 			tar_data->zp->zfree = Z_NULL;
594 			tar_data->zp->opaque = Z_NULL;
595 			tar_data->zp->next_out = tar_data->zlibOut;
596 			tar_data->zp->avail_out = ZLIB_OUT_SIZE;
597 
598 			/*
599 			 * Initialize deflation library. Adding the magic value 16 to the
600 			 * default 15 for the windowBits parameter makes the output be
601 			 * gzip instead of zlib.
602 			 */
603 			if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
604 			{
605 				pg_free(tar_data->zp);
606 				tar_data->zp = NULL;
607 				tar_set_error("could not initialize compression library");
608 				return NULL;
609 			}
610 		}
611 #endif
612 
613 		/* There's no tar header itself, the file starts with regular files */
614 	}
615 
616 	Assert(tar_data->currentfile == NULL);
617 	if (tar_data->currentfile != NULL)
618 	{
619 		tar_set_error("implementation error: tar files can't have more than one open file");
620 		return NULL;
621 	}
622 
623 	tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
624 
625 	tmppath = tar_get_file_name(pathname, temp_suffix);
626 
627 	/* Create a header with size set to 0 - we will fill out the size on close */
628 	if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
629 	{
630 		pg_free(tar_data->currentfile);
631 		pg_free(tmppath);
632 		tar_data->currentfile = NULL;
633 		tar_set_error("could not create tar header");
634 		return NULL;
635 	}
636 
637 	pg_free(tmppath);
638 
639 #ifdef HAVE_LIBZ
640 	if (tar_data->compression)
641 	{
642 		/* Flush existing data */
643 		if (!tar_write_compressed_data(NULL, 0, true))
644 			return NULL;
645 
646 		/* Turn off compression for header */
647 		if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
648 		{
649 			tar_set_error("could not change compression parameters");
650 			return NULL;
651 		}
652 	}
653 #endif
654 
655 	tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
656 	if (tar_data->currentfile->ofs_start == -1)
657 	{
658 		save_errno = errno;
659 		pg_free(tar_data->currentfile);
660 		tar_data->currentfile = NULL;
661 		errno = save_errno;
662 		return NULL;
663 	}
664 	tar_data->currentfile->currpos = 0;
665 
666 	if (!tar_data->compression)
667 	{
668 		errno = 0;
669 		if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
670 		{
671 			save_errno = errno;
672 			pg_free(tar_data->currentfile);
673 			tar_data->currentfile = NULL;
674 			/* if write didn't set errno, assume problem is no disk space */
675 			errno = save_errno ? save_errno : ENOSPC;
676 			return NULL;
677 		}
678 	}
679 #ifdef HAVE_LIBZ
680 	else
681 	{
682 		/* Write header through the zlib APIs but with no compression */
683 		if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
684 			return NULL;
685 
686 		/* Re-enable compression for the rest of the file */
687 		if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
688 		{
689 			tar_set_error("could not change compression parameters");
690 			return NULL;
691 		}
692 	}
693 #endif
694 
695 	tar_data->currentfile->pathname = pg_strdup(pathname);
696 
697 	/*
698 	 * Uncompressed files are padded on creation, but for compression we can't
699 	 * do that
700 	 */
701 	if (pad_to_size)
702 	{
703 		tar_data->currentfile->pad_to_size = pad_to_size;
704 		if (!tar_data->compression)
705 		{
706 			/* Uncompressed, so pad now */
707 			tar_write_padding_data(tar_data->currentfile, pad_to_size);
708 			/* Seek back to start */
709 			if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512)
710 				return NULL;
711 
712 			tar_data->currentfile->currpos = 0;
713 		}
714 	}
715 
716 	return tar_data->currentfile;
717 }
718 
719 static ssize_t
tar_get_file_size(const char * pathname)720 tar_get_file_size(const char *pathname)
721 {
722 	tar_clear_error();
723 
724 	/* Currently not used, so not supported */
725 	errno = ENOSYS;
726 	return -1;
727 }
728 
729 static int
tar_compression(void)730 tar_compression(void)
731 {
732 	return tar_data->compression;
733 }
734 
735 static off_t
tar_get_current_pos(Walfile f)736 tar_get_current_pos(Walfile f)
737 {
738 	Assert(f != NULL);
739 	tar_clear_error();
740 
741 	return ((TarMethodFile *) f)->currpos;
742 }
743 
744 static int
tar_sync(Walfile f)745 tar_sync(Walfile f)
746 {
747 	Assert(f != NULL);
748 	tar_clear_error();
749 
750 	if (!tar_data->sync)
751 		return 0;
752 
753 	/*
754 	 * Always sync the whole tarfile, because that's all we can do. This makes
755 	 * no sense on compressed files, so just ignore those.
756 	 */
757 	if (tar_data->compression)
758 		return 0;
759 
760 	return fsync(tar_data->fd);
761 }
762 
763 static int
tar_close(Walfile f,WalCloseMethod method)764 tar_close(Walfile f, WalCloseMethod method)
765 {
766 	ssize_t		filesize;
767 	int			padding;
768 	TarMethodFile *tf = (TarMethodFile *) f;
769 
770 	Assert(f != NULL);
771 	tar_clear_error();
772 
773 	if (method == CLOSE_UNLINK)
774 	{
775 		if (tar_data->compression)
776 		{
777 			tar_set_error("unlink not supported with compression");
778 			return -1;
779 		}
780 
781 		/*
782 		 * Unlink the file that we just wrote to the tar. We do this by
783 		 * truncating it to the start of the header. This is safe as we only
784 		 * allow writing of the very last file.
785 		 */
786 		if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
787 			return -1;
788 
789 		pg_free(tf->pathname);
790 		pg_free(tf);
791 		tar_data->currentfile = NULL;
792 
793 		return 0;
794 	}
795 
796 	/*
797 	 * Pad the file itself with zeroes if necessary. Note that this is
798 	 * different from the tar format padding -- this is the padding we asked
799 	 * for when the file was opened.
800 	 */
801 	if (tf->pad_to_size)
802 	{
803 		if (tar_data->compression)
804 		{
805 			/*
806 			 * A compressed tarfile is padded on close since we cannot know
807 			 * the size of the compressed output until the end.
808 			 */
809 			size_t		sizeleft = tf->pad_to_size - tf->currpos;
810 
811 			if (sizeleft)
812 			{
813 				if (!tar_write_padding_data(tf, sizeleft))
814 					return -1;
815 			}
816 		}
817 		else
818 		{
819 			/*
820 			 * An uncompressed tarfile was padded on creation, so just adjust
821 			 * the current position as if we seeked to the end.
822 			 */
823 			tf->currpos = tf->pad_to_size;
824 		}
825 	}
826 
827 	/*
828 	 * Get the size of the file, and pad the current data up to the nearest
829 	 * 512 byte boundary.
830 	 */
831 	filesize = tar_get_current_pos(f);
832 	padding = ((filesize + 511) & ~511) - filesize;
833 	if (padding)
834 	{
835 		char		zerobuf[512];
836 
837 		MemSet(zerobuf, 0, padding);
838 		if (tar_write(f, zerobuf, padding) != padding)
839 			return -1;
840 	}
841 
842 
843 #ifdef HAVE_LIBZ
844 	if (tar_data->compression)
845 	{
846 		/* Flush the current buffer */
847 		if (!tar_write_compressed_data(NULL, 0, true))
848 		{
849 			errno = EINVAL;
850 			return -1;
851 		}
852 	}
853 #endif
854 
855 	/*
856 	 * Now go back and update the header with the correct filesize and
857 	 * possibly also renaming the file. We overwrite the entire current header
858 	 * when done, including the checksum.
859 	 */
860 	print_tar_number(&(tf->header[124]), 12, filesize);
861 
862 	if (method == CLOSE_NORMAL)
863 
864 		/*
865 		 * We overwrite it with what it was before if we have no tempname,
866 		 * since we're going to write the buffer anyway.
867 		 */
868 		strlcpy(&(tf->header[0]), tf->pathname, 100);
869 
870 	print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
871 	if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
872 		return -1;
873 	if (!tar_data->compression)
874 	{
875 		errno = 0;
876 		if (write(tar_data->fd, tf->header, 512) != 512)
877 		{
878 			/* if write didn't set errno, assume problem is no disk space */
879 			if (errno == 0)
880 				errno = ENOSPC;
881 			return -1;
882 		}
883 	}
884 #ifdef HAVE_LIBZ
885 	else
886 	{
887 		/* Turn off compression */
888 		if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
889 		{
890 			tar_set_error("could not change compression parameters");
891 			return -1;
892 		}
893 
894 		/* Overwrite the header, assuming the size will be the same */
895 		if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
896 			return -1;
897 
898 		/* Turn compression back on */
899 		if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
900 		{
901 			tar_set_error("could not change compression parameters");
902 			return -1;
903 		}
904 	}
905 #endif
906 
907 	/* Move file pointer back down to end, so we can write the next file */
908 	if (lseek(tar_data->fd, 0, SEEK_END) < 0)
909 		return -1;
910 
911 	/* Always fsync on close, so the padding gets fsynced */
912 	if (tar_sync(f) < 0)
913 		return -1;
914 
915 	/* Clean up and done */
916 	pg_free(tf->pathname);
917 	pg_free(tf);
918 	tar_data->currentfile = NULL;
919 
920 	return 0;
921 }
922 
923 static bool
tar_existsfile(const char * pathname)924 tar_existsfile(const char *pathname)
925 {
926 	tar_clear_error();
927 	/* We only deal with new tarfiles, so nothing externally created exists */
928 	return false;
929 }
930 
931 static bool
tar_finish(void)932 tar_finish(void)
933 {
934 	char		zerobuf[1024];
935 
936 	tar_clear_error();
937 
938 	if (tar_data->currentfile)
939 	{
940 		if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
941 			return false;
942 	}
943 
944 	/* A tarfile always ends with two empty blocks */
945 	MemSet(zerobuf, 0, sizeof(zerobuf));
946 	if (!tar_data->compression)
947 	{
948 		errno = 0;
949 		if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
950 		{
951 			/* if write didn't set errno, assume problem is no disk space */
952 			if (errno == 0)
953 				errno = ENOSPC;
954 			return false;
955 		}
956 	}
957 #ifdef HAVE_LIBZ
958 	else
959 	{
960 		if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
961 			return false;
962 
963 		/* Also flush all data to make sure the gzip stream is finished */
964 		tar_data->zp->next_in = NULL;
965 		tar_data->zp->avail_in = 0;
966 		while (true)
967 		{
968 			int			r;
969 
970 			r = deflate(tar_data->zp, Z_FINISH);
971 
972 			if (r == Z_STREAM_ERROR)
973 			{
974 				tar_set_error("could not compress data");
975 				return false;
976 			}
977 			if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
978 			{
979 				size_t		len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
980 
981 				errno = 0;
982 				if (write(tar_data->fd, tar_data->zlibOut, len) != len)
983 				{
984 					/*
985 					 * If write didn't set errno, assume problem is no disk
986 					 * space.
987 					 */
988 					if (errno == 0)
989 						errno = ENOSPC;
990 					return false;
991 				}
992 			}
993 			if (r == Z_STREAM_END)
994 				break;
995 		}
996 
997 		if (deflateEnd(tar_data->zp) != Z_OK)
998 		{
999 			tar_set_error("could not close compression stream");
1000 			return false;
1001 		}
1002 	}
1003 #endif
1004 
1005 	/* sync the empty blocks as well, since they're after the last file */
1006 	if (tar_data->sync)
1007 	{
1008 		if (fsync(tar_data->fd) != 0)
1009 			return false;
1010 	}
1011 
1012 	if (close(tar_data->fd) != 0)
1013 		return false;
1014 
1015 	tar_data->fd = -1;
1016 
1017 	if (tar_data->sync)
1018 	{
1019 		if (fsync_fname(tar_data->tarfilename, false, progname) != 0)
1020 			return false;
1021 		if (fsync_parent_path(tar_data->tarfilename, progname) != 0)
1022 			return false;
1023 	}
1024 
1025 	return true;
1026 }
1027 
1028 WalWriteMethod *
CreateWalTarMethod(const char * tarbase,int compression,bool sync)1029 CreateWalTarMethod(const char *tarbase, int compression, bool sync)
1030 {
1031 	WalWriteMethod *method;
1032 	const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
1033 
1034 	method = pg_malloc0(sizeof(WalWriteMethod));
1035 	method->open_for_write = tar_open_for_write;
1036 	method->write = tar_write;
1037 	method->get_current_pos = tar_get_current_pos;
1038 	method->get_file_size = tar_get_file_size;
1039 	method->get_file_name = tar_get_file_name;
1040 	method->compression = tar_compression;
1041 	method->close = tar_close;
1042 	method->sync = tar_sync;
1043 	method->existsfile = tar_existsfile;
1044 	method->finish = tar_finish;
1045 	method->getlasterror = tar_getlasterror;
1046 
1047 	tar_data = pg_malloc0(sizeof(TarMethodData));
1048 	tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
1049 	sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
1050 	tar_data->fd = -1;
1051 	tar_data->compression = compression;
1052 	tar_data->sync = sync;
1053 #ifdef HAVE_LIBZ
1054 	if (compression)
1055 		tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
1056 #endif
1057 
1058 	return method;
1059 }
1060 
1061 void
FreeWalTarMethod(void)1062 FreeWalTarMethod(void)
1063 {
1064 	pg_free(tar_data->tarfilename);
1065 #ifdef HAVE_LIBZ
1066 	if (tar_data->compression)
1067 		pg_free(tar_data->zlibOut);
1068 #endif
1069 	pg_free(tar_data);
1070 }
1071