1 /*-------------------------------------------------------------------------
2 *
3 * walmethods.c - implementations of different ways to write received wal
4 *
5 * NOTE! The caller must ensure that only one method is instantiated in
6 * any given program, and that it's only instantiated once!
7 *
8 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9 *
10 * IDENTIFICATION
11 * src/bin/pg_basebackup/walmethods.c
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres_fe.h"
16
17 #include <sys/stat.h>
18 #include <time.h>
19 #include <unistd.h>
20 #ifdef HAVE_LIBZ
21 #include <zlib.h>
22 #endif
23
24 #include "pgtar.h"
25 #include "common/file_utils.h"
26
27 #include "receivelog.h"
28 #include "streamutil.h"
29
30 /* Size of zlib buffer for .tar.gz */
31 #define ZLIB_OUT_SIZE 4096
32
33 /*-------------------------------------------------------------------------
34 * WalDirectoryMethod - write wal to a directory looking like pg_wal
35 *-------------------------------------------------------------------------
36 */
37
38 /*
39 * Global static data for this method
40 */
41 typedef struct DirectoryMethodData
42 {
43 char *basedir;
44 int compression;
45 bool sync;
46 } DirectoryMethodData;
47 static DirectoryMethodData *dir_data = NULL;
48
49 /*
50 * Local file handle
51 */
52 typedef struct DirectoryMethodFile
53 {
54 int fd;
55 off_t currpos;
56 char *pathname;
57 char *fullpath;
58 char *temp_suffix;
59 #ifdef HAVE_LIBZ
60 gzFile gzfp;
61 #endif
62 } DirectoryMethodFile;
63
64 static const char *
dir_getlasterror(void)65 dir_getlasterror(void)
66 {
67 /* Directory method always sets errno, so just use strerror */
68 return strerror(errno);
69 }
70
71 static char *
dir_get_file_name(const char * pathname,const char * temp_suffix)72 dir_get_file_name(const char *pathname, const char *temp_suffix)
73 {
74 char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
75
76 snprintf(filename, MAXPGPATH, "%s%s%s",
77 pathname, dir_data->compression > 0 ? ".gz" : "",
78 temp_suffix ? temp_suffix : "");
79
80 return filename;
81 }
82
83 static Walfile
dir_open_for_write(const char * pathname,const char * temp_suffix,size_t pad_to_size)84 dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
85 {
86 static char tmppath[MAXPGPATH];
87 char *filename;
88 int fd;
89 DirectoryMethodFile *f;
90 #ifdef HAVE_LIBZ
91 gzFile gzfp = NULL;
92 #endif
93
94 filename = dir_get_file_name(pathname, temp_suffix);
95 snprintf(tmppath, sizeof(tmppath), "%s/%s",
96 dir_data->basedir, filename);
97 pg_free(filename);
98
99 /*
100 * Open a file for non-compressed as well as compressed files. Tracking
101 * the file descriptor is important for dir_sync() method as gzflush()
102 * does not do any system calls to fsync() to make changes permanent on
103 * disk.
104 */
105 fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
106 if (fd < 0)
107 return NULL;
108
109 #ifdef HAVE_LIBZ
110 if (dir_data->compression > 0)
111 {
112 gzfp = gzdopen(fd, "wb");
113 if (gzfp == NULL)
114 {
115 close(fd);
116 return NULL;
117 }
118
119 if (gzsetparams(gzfp, dir_data->compression,
120 Z_DEFAULT_STRATEGY) != Z_OK)
121 {
122 gzclose(gzfp);
123 return NULL;
124 }
125 }
126 #endif
127
128 /* Do pre-padding on non-compressed files */
129 if (pad_to_size && dir_data->compression == 0)
130 {
131 PGAlignedXLogBlock zerobuf;
132 int bytes;
133
134 memset(zerobuf.data, 0, XLOG_BLCKSZ);
135 for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
136 {
137 errno = 0;
138 if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
139 {
140 int save_errno = errno;
141
142 close(fd);
143
144 /*
145 * If write didn't set errno, assume problem is no disk space.
146 */
147 errno = save_errno ? save_errno : ENOSPC;
148 return NULL;
149 }
150 }
151
152 if (lseek(fd, 0, SEEK_SET) != 0)
153 {
154 int save_errno = errno;
155
156 close(fd);
157 errno = save_errno;
158 return NULL;
159 }
160 }
161
162 /*
163 * fsync WAL file and containing directory, to ensure the file is
164 * persistently created and zeroed (if padded). That's particularly
165 * important when using synchronous mode, where the file is modified and
166 * fsynced in-place, without a directory fsync.
167 */
168 if (dir_data->sync)
169 {
170 if (fsync_fname(tmppath, false, progname) != 0 ||
171 fsync_parent_path(tmppath, progname) != 0)
172 {
173 #ifdef HAVE_LIBZ
174 if (dir_data->compression > 0)
175 gzclose(gzfp);
176 else
177 #endif
178 close(fd);
179 return NULL;
180 }
181 }
182
183 f = pg_malloc0(sizeof(DirectoryMethodFile));
184 #ifdef HAVE_LIBZ
185 if (dir_data->compression > 0)
186 f->gzfp = gzfp;
187 #endif
188 f->fd = fd;
189 f->currpos = 0;
190 f->pathname = pg_strdup(pathname);
191 f->fullpath = pg_strdup(tmppath);
192 if (temp_suffix)
193 f->temp_suffix = pg_strdup(temp_suffix);
194
195 return f;
196 }
197
198 static ssize_t
dir_write(Walfile f,const void * buf,size_t count)199 dir_write(Walfile f, const void *buf, size_t count)
200 {
201 ssize_t r;
202 DirectoryMethodFile *df = (DirectoryMethodFile *) f;
203
204 Assert(f != NULL);
205
206 #ifdef HAVE_LIBZ
207 if (dir_data->compression > 0)
208 r = (ssize_t) gzwrite(df->gzfp, buf, count);
209 else
210 #endif
211 r = write(df->fd, buf, count);
212 if (r > 0)
213 df->currpos += r;
214 return r;
215 }
216
217 static off_t
dir_get_current_pos(Walfile f)218 dir_get_current_pos(Walfile f)
219 {
220 Assert(f != NULL);
221
222 /* Use a cached value to prevent lots of reseeks */
223 return ((DirectoryMethodFile *) f)->currpos;
224 }
225
226 static int
dir_close(Walfile f,WalCloseMethod method)227 dir_close(Walfile f, WalCloseMethod method)
228 {
229 int r;
230 DirectoryMethodFile *df = (DirectoryMethodFile *) f;
231 static char tmppath[MAXPGPATH];
232 static char tmppath2[MAXPGPATH];
233
234 Assert(f != NULL);
235
236 #ifdef HAVE_LIBZ
237 if (dir_data->compression > 0)
238 r = gzclose(df->gzfp);
239 else
240 #endif
241 r = close(df->fd);
242
243 if (r == 0)
244 {
245 /* Build path to the current version of the file */
246 if (method == CLOSE_NORMAL && df->temp_suffix)
247 {
248 char *filename;
249 char *filename2;
250
251 /*
252 * If we have a temp prefix, normal operation is to rename the
253 * file.
254 */
255 filename = dir_get_file_name(df->pathname, df->temp_suffix);
256 snprintf(tmppath, sizeof(tmppath), "%s/%s",
257 dir_data->basedir, filename);
258 pg_free(filename);
259
260 /* permanent name, so no need for the prefix */
261 filename2 = dir_get_file_name(df->pathname, NULL);
262 snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
263 dir_data->basedir, filename2);
264 pg_free(filename2);
265 r = durable_rename(tmppath, tmppath2, progname);
266 }
267 else if (method == CLOSE_UNLINK)
268 {
269 char *filename;
270
271 /* Unlink the file once it's closed */
272 filename = dir_get_file_name(df->pathname, df->temp_suffix);
273 snprintf(tmppath, sizeof(tmppath), "%s/%s",
274 dir_data->basedir, filename);
275 pg_free(filename);
276 r = unlink(tmppath);
277 }
278 else
279 {
280 /*
281 * Else either CLOSE_NORMAL and no temp suffix, or
282 * CLOSE_NO_RENAME. In this case, fsync the file and containing
283 * directory if sync mode is requested.
284 */
285 if (dir_data->sync)
286 {
287 r = fsync_fname(df->fullpath, false, progname);
288 if (r == 0)
289 r = fsync_parent_path(df->fullpath, progname);
290 }
291 }
292 }
293
294 pg_free(df->pathname);
295 pg_free(df->fullpath);
296 if (df->temp_suffix)
297 pg_free(df->temp_suffix);
298 pg_free(df);
299
300 return r;
301 }
302
303 static int
dir_sync(Walfile f)304 dir_sync(Walfile f)
305 {
306 Assert(f != NULL);
307
308 if (!dir_data->sync)
309 return 0;
310
311 #ifdef HAVE_LIBZ
312 if (dir_data->compression > 0)
313 {
314 if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
315 return -1;
316 }
317 #endif
318
319 return fsync(((DirectoryMethodFile *) f)->fd);
320 }
321
322 static ssize_t
dir_get_file_size(const char * pathname)323 dir_get_file_size(const char *pathname)
324 {
325 struct stat statbuf;
326 static char tmppath[MAXPGPATH];
327
328 snprintf(tmppath, sizeof(tmppath), "%s/%s",
329 dir_data->basedir, pathname);
330
331 if (stat(tmppath, &statbuf) != 0)
332 return -1;
333
334 return statbuf.st_size;
335 }
336
337 static int
dir_compression(void)338 dir_compression(void)
339 {
340 return dir_data->compression;
341 }
342
343 static bool
dir_existsfile(const char * pathname)344 dir_existsfile(const char *pathname)
345 {
346 static char tmppath[MAXPGPATH];
347 int fd;
348
349 snprintf(tmppath, sizeof(tmppath), "%s/%s",
350 dir_data->basedir, pathname);
351
352 fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
353 if (fd < 0)
354 return false;
355 close(fd);
356 return true;
357 }
358
359 static bool
dir_finish(void)360 dir_finish(void)
361 {
362 if (dir_data->sync)
363 {
364 /*
365 * Files are fsynced when they are closed, but we need to fsync the
366 * directory entry here as well.
367 */
368 if (fsync_fname(dir_data->basedir, true, progname) != 0)
369 return false;
370 }
371 return true;
372 }
373
374
375 WalWriteMethod *
CreateWalDirectoryMethod(const char * basedir,int compression,bool sync)376 CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
377 {
378 WalWriteMethod *method;
379
380 method = pg_malloc0(sizeof(WalWriteMethod));
381 method->open_for_write = dir_open_for_write;
382 method->write = dir_write;
383 method->get_current_pos = dir_get_current_pos;
384 method->get_file_size = dir_get_file_size;
385 method->get_file_name = dir_get_file_name;
386 method->compression = dir_compression;
387 method->close = dir_close;
388 method->sync = dir_sync;
389 method->existsfile = dir_existsfile;
390 method->finish = dir_finish;
391 method->getlasterror = dir_getlasterror;
392
393 dir_data = pg_malloc0(sizeof(DirectoryMethodData));
394 dir_data->compression = compression;
395 dir_data->basedir = pg_strdup(basedir);
396 dir_data->sync = sync;
397
398 return method;
399 }
400
401 void
FreeWalDirectoryMethod(void)402 FreeWalDirectoryMethod(void)
403 {
404 pg_free(dir_data->basedir);
405 pg_free(dir_data);
406 }
407
408
409 /*-------------------------------------------------------------------------
410 * WalTarMethod - write wal to a tar file containing pg_wal contents
411 *-------------------------------------------------------------------------
412 */
413
414 typedef struct TarMethodFile
415 {
416 off_t ofs_start; /* Where does the *header* for this file start */
417 off_t currpos;
418 char header[512];
419 char *pathname;
420 size_t pad_to_size;
421 } TarMethodFile;
422
423 typedef struct TarMethodData
424 {
425 char *tarfilename;
426 int fd;
427 int compression;
428 bool sync;
429 TarMethodFile *currentfile;
430 char lasterror[1024];
431 #ifdef HAVE_LIBZ
432 z_streamp zp;
433 void *zlibOut;
434 #endif
435 } TarMethodData;
436 static TarMethodData *tar_data = NULL;
437
438 #define tar_clear_error() tar_data->lasterror[0] = '\0'
439 #define tar_set_error(msg) strlcpy(tar_data->lasterror, _(msg), sizeof(tar_data->lasterror))
440
441 static const char *
tar_getlasterror(void)442 tar_getlasterror(void)
443 {
444 /*
445 * If a custom error is set, return that one. Otherwise, assume errno is
446 * set and return that one.
447 */
448 if (tar_data->lasterror[0])
449 return tar_data->lasterror;
450 return strerror(errno);
451 }
452
453 #ifdef HAVE_LIBZ
454 static bool
tar_write_compressed_data(void * buf,size_t count,bool flush)455 tar_write_compressed_data(void *buf, size_t count, bool flush)
456 {
457 tar_data->zp->next_in = buf;
458 tar_data->zp->avail_in = count;
459
460 while (tar_data->zp->avail_in || flush)
461 {
462 int r;
463
464 r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
465 if (r == Z_STREAM_ERROR)
466 {
467 tar_set_error("could not compress data");
468 return false;
469 }
470
471 if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
472 {
473 size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
474
475 errno = 0;
476 if (write(tar_data->fd, tar_data->zlibOut, len) != len)
477 {
478 /*
479 * If write didn't set errno, assume problem is no disk space.
480 */
481 if (errno == 0)
482 errno = ENOSPC;
483 return false;
484 }
485
486 tar_data->zp->next_out = tar_data->zlibOut;
487 tar_data->zp->avail_out = ZLIB_OUT_SIZE;
488 }
489
490 if (r == Z_STREAM_END)
491 break;
492 }
493
494 if (flush)
495 {
496 /* Reset the stream for writing */
497 if (deflateReset(tar_data->zp) != Z_OK)
498 {
499 tar_set_error("could not reset compression stream");
500 return false;
501 }
502 }
503
504 return true;
505 }
506 #endif
507
508 static ssize_t
tar_write(Walfile f,const void * buf,size_t count)509 tar_write(Walfile f, const void *buf, size_t count)
510 {
511 ssize_t r;
512
513 Assert(f != NULL);
514 tar_clear_error();
515
516 /* Tarfile will always be positioned at the end */
517 if (!tar_data->compression)
518 {
519 r = write(tar_data->fd, buf, count);
520 if (r > 0)
521 ((TarMethodFile *) f)->currpos += r;
522 return r;
523 }
524 #ifdef HAVE_LIBZ
525 else
526 {
527 if (!tar_write_compressed_data((void *) buf, count, false))
528 return -1;
529 ((TarMethodFile *) f)->currpos += count;
530 return count;
531 }
532 #else
533 else
534 /* Can't happen - compression enabled with no libz */
535 return -1;
536 #endif
537 }
538
539 static bool
tar_write_padding_data(TarMethodFile * f,size_t bytes)540 tar_write_padding_data(TarMethodFile *f, size_t bytes)
541 {
542 PGAlignedXLogBlock zerobuf;
543 size_t bytesleft = bytes;
544
545 memset(zerobuf.data, 0, XLOG_BLCKSZ);
546 while (bytesleft)
547 {
548 size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
549 ssize_t r = tar_write(f, zerobuf.data, bytestowrite);
550
551 if (r < 0)
552 return false;
553 bytesleft -= r;
554 }
555
556 return true;
557 }
558
559 static char *
tar_get_file_name(const char * pathname,const char * temp_suffix)560 tar_get_file_name(const char *pathname, const char *temp_suffix)
561 {
562 char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
563
564 snprintf(filename, MAXPGPATH, "%s%s",
565 pathname, temp_suffix ? temp_suffix : "");
566
567 return filename;
568 }
569
570 static Walfile
tar_open_for_write(const char * pathname,const char * temp_suffix,size_t pad_to_size)571 tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
572 {
573 int save_errno;
574 char *tmppath;
575
576 tar_clear_error();
577
578 if (tar_data->fd < 0)
579 {
580 /*
581 * We open the tar file only when we first try to write to it.
582 */
583 tar_data->fd = open(tar_data->tarfilename,
584 O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
585 if (tar_data->fd < 0)
586 return NULL;
587
588 #ifdef HAVE_LIBZ
589 if (tar_data->compression)
590 {
591 tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
592 tar_data->zp->zalloc = Z_NULL;
593 tar_data->zp->zfree = Z_NULL;
594 tar_data->zp->opaque = Z_NULL;
595 tar_data->zp->next_out = tar_data->zlibOut;
596 tar_data->zp->avail_out = ZLIB_OUT_SIZE;
597
598 /*
599 * Initialize deflation library. Adding the magic value 16 to the
600 * default 15 for the windowBits parameter makes the output be
601 * gzip instead of zlib.
602 */
603 if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
604 {
605 pg_free(tar_data->zp);
606 tar_data->zp = NULL;
607 tar_set_error("could not initialize compression library");
608 return NULL;
609 }
610 }
611 #endif
612
613 /* There's no tar header itself, the file starts with regular files */
614 }
615
616 Assert(tar_data->currentfile == NULL);
617 if (tar_data->currentfile != NULL)
618 {
619 tar_set_error("implementation error: tar files can't have more than one open file");
620 return NULL;
621 }
622
623 tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
624
625 tmppath = tar_get_file_name(pathname, temp_suffix);
626
627 /* Create a header with size set to 0 - we will fill out the size on close */
628 if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
629 {
630 pg_free(tar_data->currentfile);
631 pg_free(tmppath);
632 tar_data->currentfile = NULL;
633 tar_set_error("could not create tar header");
634 return NULL;
635 }
636
637 pg_free(tmppath);
638
639 #ifdef HAVE_LIBZ
640 if (tar_data->compression)
641 {
642 /* Flush existing data */
643 if (!tar_write_compressed_data(NULL, 0, true))
644 return NULL;
645
646 /* Turn off compression for header */
647 if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
648 {
649 tar_set_error("could not change compression parameters");
650 return NULL;
651 }
652 }
653 #endif
654
655 tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
656 if (tar_data->currentfile->ofs_start == -1)
657 {
658 save_errno = errno;
659 pg_free(tar_data->currentfile);
660 tar_data->currentfile = NULL;
661 errno = save_errno;
662 return NULL;
663 }
664 tar_data->currentfile->currpos = 0;
665
666 if (!tar_data->compression)
667 {
668 errno = 0;
669 if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
670 {
671 save_errno = errno;
672 pg_free(tar_data->currentfile);
673 tar_data->currentfile = NULL;
674 /* if write didn't set errno, assume problem is no disk space */
675 errno = save_errno ? save_errno : ENOSPC;
676 return NULL;
677 }
678 }
679 #ifdef HAVE_LIBZ
680 else
681 {
682 /* Write header through the zlib APIs but with no compression */
683 if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
684 return NULL;
685
686 /* Re-enable compression for the rest of the file */
687 if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
688 {
689 tar_set_error("could not change compression parameters");
690 return NULL;
691 }
692 }
693 #endif
694
695 tar_data->currentfile->pathname = pg_strdup(pathname);
696
697 /*
698 * Uncompressed files are padded on creation, but for compression we can't
699 * do that
700 */
701 if (pad_to_size)
702 {
703 tar_data->currentfile->pad_to_size = pad_to_size;
704 if (!tar_data->compression)
705 {
706 /* Uncompressed, so pad now */
707 tar_write_padding_data(tar_data->currentfile, pad_to_size);
708 /* Seek back to start */
709 if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512)
710 return NULL;
711
712 tar_data->currentfile->currpos = 0;
713 }
714 }
715
716 return tar_data->currentfile;
717 }
718
719 static ssize_t
tar_get_file_size(const char * pathname)720 tar_get_file_size(const char *pathname)
721 {
722 tar_clear_error();
723
724 /* Currently not used, so not supported */
725 errno = ENOSYS;
726 return -1;
727 }
728
729 static int
tar_compression(void)730 tar_compression(void)
731 {
732 return tar_data->compression;
733 }
734
735 static off_t
tar_get_current_pos(Walfile f)736 tar_get_current_pos(Walfile f)
737 {
738 Assert(f != NULL);
739 tar_clear_error();
740
741 return ((TarMethodFile *) f)->currpos;
742 }
743
744 static int
tar_sync(Walfile f)745 tar_sync(Walfile f)
746 {
747 Assert(f != NULL);
748 tar_clear_error();
749
750 if (!tar_data->sync)
751 return 0;
752
753 /*
754 * Always sync the whole tarfile, because that's all we can do. This makes
755 * no sense on compressed files, so just ignore those.
756 */
757 if (tar_data->compression)
758 return 0;
759
760 return fsync(tar_data->fd);
761 }
762
763 static int
tar_close(Walfile f,WalCloseMethod method)764 tar_close(Walfile f, WalCloseMethod method)
765 {
766 ssize_t filesize;
767 int padding;
768 TarMethodFile *tf = (TarMethodFile *) f;
769
770 Assert(f != NULL);
771 tar_clear_error();
772
773 if (method == CLOSE_UNLINK)
774 {
775 if (tar_data->compression)
776 {
777 tar_set_error("unlink not supported with compression");
778 return -1;
779 }
780
781 /*
782 * Unlink the file that we just wrote to the tar. We do this by
783 * truncating it to the start of the header. This is safe as we only
784 * allow writing of the very last file.
785 */
786 if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
787 return -1;
788
789 pg_free(tf->pathname);
790 pg_free(tf);
791 tar_data->currentfile = NULL;
792
793 return 0;
794 }
795
796 /*
797 * Pad the file itself with zeroes if necessary. Note that this is
798 * different from the tar format padding -- this is the padding we asked
799 * for when the file was opened.
800 */
801 if (tf->pad_to_size)
802 {
803 if (tar_data->compression)
804 {
805 /*
806 * A compressed tarfile is padded on close since we cannot know
807 * the size of the compressed output until the end.
808 */
809 size_t sizeleft = tf->pad_to_size - tf->currpos;
810
811 if (sizeleft)
812 {
813 if (!tar_write_padding_data(tf, sizeleft))
814 return -1;
815 }
816 }
817 else
818 {
819 /*
820 * An uncompressed tarfile was padded on creation, so just adjust
821 * the current position as if we seeked to the end.
822 */
823 tf->currpos = tf->pad_to_size;
824 }
825 }
826
827 /*
828 * Get the size of the file, and pad the current data up to the nearest
829 * 512 byte boundary.
830 */
831 filesize = tar_get_current_pos(f);
832 padding = ((filesize + 511) & ~511) - filesize;
833 if (padding)
834 {
835 char zerobuf[512];
836
837 MemSet(zerobuf, 0, padding);
838 if (tar_write(f, zerobuf, padding) != padding)
839 return -1;
840 }
841
842
843 #ifdef HAVE_LIBZ
844 if (tar_data->compression)
845 {
846 /* Flush the current buffer */
847 if (!tar_write_compressed_data(NULL, 0, true))
848 {
849 errno = EINVAL;
850 return -1;
851 }
852 }
853 #endif
854
855 /*
856 * Now go back and update the header with the correct filesize and
857 * possibly also renaming the file. We overwrite the entire current header
858 * when done, including the checksum.
859 */
860 print_tar_number(&(tf->header[124]), 12, filesize);
861
862 if (method == CLOSE_NORMAL)
863
864 /*
865 * We overwrite it with what it was before if we have no tempname,
866 * since we're going to write the buffer anyway.
867 */
868 strlcpy(&(tf->header[0]), tf->pathname, 100);
869
870 print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
871 if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
872 return -1;
873 if (!tar_data->compression)
874 {
875 errno = 0;
876 if (write(tar_data->fd, tf->header, 512) != 512)
877 {
878 /* if write didn't set errno, assume problem is no disk space */
879 if (errno == 0)
880 errno = ENOSPC;
881 return -1;
882 }
883 }
884 #ifdef HAVE_LIBZ
885 else
886 {
887 /* Turn off compression */
888 if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
889 {
890 tar_set_error("could not change compression parameters");
891 return -1;
892 }
893
894 /* Overwrite the header, assuming the size will be the same */
895 if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
896 return -1;
897
898 /* Turn compression back on */
899 if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
900 {
901 tar_set_error("could not change compression parameters");
902 return -1;
903 }
904 }
905 #endif
906
907 /* Move file pointer back down to end, so we can write the next file */
908 if (lseek(tar_data->fd, 0, SEEK_END) < 0)
909 return -1;
910
911 /* Always fsync on close, so the padding gets fsynced */
912 if (tar_sync(f) < 0)
913 return -1;
914
915 /* Clean up and done */
916 pg_free(tf->pathname);
917 pg_free(tf);
918 tar_data->currentfile = NULL;
919
920 return 0;
921 }
922
923 static bool
tar_existsfile(const char * pathname)924 tar_existsfile(const char *pathname)
925 {
926 tar_clear_error();
927 /* We only deal with new tarfiles, so nothing externally created exists */
928 return false;
929 }
930
931 static bool
tar_finish(void)932 tar_finish(void)
933 {
934 char zerobuf[1024];
935
936 tar_clear_error();
937
938 if (tar_data->currentfile)
939 {
940 if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
941 return false;
942 }
943
944 /* A tarfile always ends with two empty blocks */
945 MemSet(zerobuf, 0, sizeof(zerobuf));
946 if (!tar_data->compression)
947 {
948 errno = 0;
949 if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
950 {
951 /* if write didn't set errno, assume problem is no disk space */
952 if (errno == 0)
953 errno = ENOSPC;
954 return false;
955 }
956 }
957 #ifdef HAVE_LIBZ
958 else
959 {
960 if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
961 return false;
962
963 /* Also flush all data to make sure the gzip stream is finished */
964 tar_data->zp->next_in = NULL;
965 tar_data->zp->avail_in = 0;
966 while (true)
967 {
968 int r;
969
970 r = deflate(tar_data->zp, Z_FINISH);
971
972 if (r == Z_STREAM_ERROR)
973 {
974 tar_set_error("could not compress data");
975 return false;
976 }
977 if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
978 {
979 size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
980
981 errno = 0;
982 if (write(tar_data->fd, tar_data->zlibOut, len) != len)
983 {
984 /*
985 * If write didn't set errno, assume problem is no disk
986 * space.
987 */
988 if (errno == 0)
989 errno = ENOSPC;
990 return false;
991 }
992 }
993 if (r == Z_STREAM_END)
994 break;
995 }
996
997 if (deflateEnd(tar_data->zp) != Z_OK)
998 {
999 tar_set_error("could not close compression stream");
1000 return false;
1001 }
1002 }
1003 #endif
1004
1005 /* sync the empty blocks as well, since they're after the last file */
1006 if (tar_data->sync)
1007 {
1008 if (fsync(tar_data->fd) != 0)
1009 return false;
1010 }
1011
1012 if (close(tar_data->fd) != 0)
1013 return false;
1014
1015 tar_data->fd = -1;
1016
1017 if (tar_data->sync)
1018 {
1019 if (fsync_fname(tar_data->tarfilename, false, progname) != 0)
1020 return false;
1021 if (fsync_parent_path(tar_data->tarfilename, progname) != 0)
1022 return false;
1023 }
1024
1025 return true;
1026 }
1027
1028 WalWriteMethod *
CreateWalTarMethod(const char * tarbase,int compression,bool sync)1029 CreateWalTarMethod(const char *tarbase, int compression, bool sync)
1030 {
1031 WalWriteMethod *method;
1032 const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
1033
1034 method = pg_malloc0(sizeof(WalWriteMethod));
1035 method->open_for_write = tar_open_for_write;
1036 method->write = tar_write;
1037 method->get_current_pos = tar_get_current_pos;
1038 method->get_file_size = tar_get_file_size;
1039 method->get_file_name = tar_get_file_name;
1040 method->compression = tar_compression;
1041 method->close = tar_close;
1042 method->sync = tar_sync;
1043 method->existsfile = tar_existsfile;
1044 method->finish = tar_finish;
1045 method->getlasterror = tar_getlasterror;
1046
1047 tar_data = pg_malloc0(sizeof(TarMethodData));
1048 tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
1049 sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
1050 tar_data->fd = -1;
1051 tar_data->compression = compression;
1052 tar_data->sync = sync;
1053 #ifdef HAVE_LIBZ
1054 if (compression)
1055 tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
1056 #endif
1057
1058 return method;
1059 }
1060
1061 void
FreeWalTarMethod(void)1062 FreeWalTarMethod(void)
1063 {
1064 pg_free(tar_data->tarfilename);
1065 #ifdef HAVE_LIBZ
1066 if (tar_data->compression)
1067 pg_free(tar_data->zlibOut);
1068 #endif
1069 pg_free(tar_data);
1070 }
1071