1 /*-------------------------------------------------------------------------
2  *
3  * compress_io.c
4  *	 Routines for archivers to write an uncompressed or compressed data
5  *	 stream.
6  *
7  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * This file includes two APIs for dealing with compressed data. The first
11  * provides more flexibility, using callbacks to read/write data from the
12  * underlying stream. The second API is a wrapper around fopen/gzopen and
13  * friends, providing an interface similar to those, but abstracts away
14  * the possible compression. Both APIs use libz for the compression, but
15  * the second API uses gzip headers, so the resulting files can be easily
16  * manipulated with the gzip utility.
17  *
18  * Compressor API
19  * --------------
20  *
21  *	The interface for writing to an archive consists of three functions:
22  *	AllocateCompressor, WriteDataToArchive and EndCompressor. First you call
23  *	AllocateCompressor, then write all the data by calling WriteDataToArchive
24  *	as many times as needed, and finally EndCompressor. WriteDataToArchive
25  *	and EndCompressor will call the WriteFunc that was provided to
26  *	AllocateCompressor for each chunk of compressed data.
27  *
28  *	The interface for reading an archive consists of just one function:
29  *	ReadDataFromArchive. ReadDataFromArchive reads the whole compressed input
30  *	stream, by repeatedly calling the given ReadFunc. ReadFunc returns the
31  *	compressed data chunk at a time, and ReadDataFromArchive decompresses it
32  *	and passes the decompressed data to ahwrite(), until ReadFunc returns 0
33  *	to signal EOF.
34  *
35  *	The interface is the same for compressed and uncompressed streams.
36  *
37  * Compressed stream API
38  * ----------------------
39  *
40  *	The compressed stream API is a wrapper around the C standard fopen() and
41  *	libz's gzopen() APIs. It allows you to use the same functions for
42  *	compressed and uncompressed streams. cfopen_read() first tries to open
43  *	the file with given name, and if it fails, it tries to open the same
44  *	file with the .gz suffix. cfopen_write() opens a file for writing, an
45  *	extra argument specifies if the file should be compressed, and adds the
46  *	.gz suffix to the filename if so. This allows you to easily handle both
47  *	compressed and uncompressed files.
48  *
49  * IDENTIFICATION
50  *	   src/bin/pg_dump/compress_io.c
51  *
52  *-------------------------------------------------------------------------
53  */
54 
55 #include "compress_io.h"
56 #include "pg_backup_utils.h"
57 #include "parallel.h"
58 
59 /*----------------------
60  * Compressor API
61  *----------------------
62  */
63 
64 /* typedef appears in compress_io.h */
65 struct CompressorState
66 {
67 	CompressionAlgorithm comprAlg;
68 	WriteFunc	writeF;
69 
70 #ifdef HAVE_LIBZ
71 	z_streamp	zp;
72 	char	   *zlibOut;
73 	size_t		zlibOutSize;
74 #endif
75 };
76 
77 /* translator: this is a module name */
78 static const char *modulename = gettext_noop("compress_io");
79 
80 static void ParseCompressionOption(int compression, CompressionAlgorithm *alg,
81 					   int *level);
82 
83 /* Routines that support zlib compressed data I/O */
84 #ifdef HAVE_LIBZ
85 static void InitCompressorZlib(CompressorState *cs, int level);
86 static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
87 					  bool flush);
88 static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF);
89 static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
90 					   const char *data, size_t dLen);
91 static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
92 #endif
93 
94 /* Routines that support uncompressed data I/O */
95 static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
96 static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
97 					   const char *data, size_t dLen);
98 
99 /*
100  * Interprets a numeric 'compression' value. The algorithm implied by the
101  * value (zlib or none at the moment), is returned in *alg, and the
102  * zlib compression level in *level.
103  */
104 static void
ParseCompressionOption(int compression,CompressionAlgorithm * alg,int * level)105 ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
106 {
107 	if (compression == Z_DEFAULT_COMPRESSION ||
108 		(compression > 0 && compression <= 9))
109 		*alg = COMPR_ALG_LIBZ;
110 	else if (compression == 0)
111 		*alg = COMPR_ALG_NONE;
112 	else
113 	{
114 		exit_horribly(modulename, "invalid compression code: %d\n",
115 					  compression);
116 		*alg = COMPR_ALG_NONE;	/* keep compiler quiet */
117 	}
118 
119 	/* The level is just the passed-in value. */
120 	if (level)
121 		*level = compression;
122 }
123 
124 /* Public interface routines */
125 
126 /* Allocate a new compressor */
127 CompressorState *
AllocateCompressor(int compression,WriteFunc writeF)128 AllocateCompressor(int compression, WriteFunc writeF)
129 {
130 	CompressorState *cs;
131 	CompressionAlgorithm alg;
132 	int			level;
133 
134 	ParseCompressionOption(compression, &alg, &level);
135 
136 #ifndef HAVE_LIBZ
137 	if (alg == COMPR_ALG_LIBZ)
138 		exit_horribly(modulename, "not built with zlib support\n");
139 #endif
140 
141 	cs = (CompressorState *) pg_malloc0(sizeof(CompressorState));
142 	cs->writeF = writeF;
143 	cs->comprAlg = alg;
144 
145 	/*
146 	 * Perform compression algorithm specific initialization.
147 	 */
148 #ifdef HAVE_LIBZ
149 	if (alg == COMPR_ALG_LIBZ)
150 		InitCompressorZlib(cs, level);
151 #endif
152 
153 	return cs;
154 }
155 
156 /*
157  * Read all compressed data from the input stream (via readF) and print it
158  * out with ahwrite().
159  */
160 void
ReadDataFromArchive(ArchiveHandle * AH,int compression,ReadFunc readF)161 ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
162 {
163 	CompressionAlgorithm alg;
164 
165 	ParseCompressionOption(compression, &alg, NULL);
166 
167 	if (alg == COMPR_ALG_NONE)
168 		ReadDataFromArchiveNone(AH, readF);
169 	if (alg == COMPR_ALG_LIBZ)
170 	{
171 #ifdef HAVE_LIBZ
172 		ReadDataFromArchiveZlib(AH, readF);
173 #else
174 		exit_horribly(modulename, "not built with zlib support\n");
175 #endif
176 	}
177 }
178 
179 /*
180  * Compress and write data to the output stream (via writeF).
181  */
182 void
WriteDataToArchive(ArchiveHandle * AH,CompressorState * cs,const void * data,size_t dLen)183 WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
184 				   const void *data, size_t dLen)
185 {
186 	/* Are we aborting? */
187 	checkAborting(AH);
188 
189 	switch (cs->comprAlg)
190 	{
191 		case COMPR_ALG_LIBZ:
192 #ifdef HAVE_LIBZ
193 			WriteDataToArchiveZlib(AH, cs, data, dLen);
194 #else
195 			exit_horribly(modulename, "not built with zlib support\n");
196 #endif
197 			break;
198 		case COMPR_ALG_NONE:
199 			WriteDataToArchiveNone(AH, cs, data, dLen);
200 			break;
201 	}
202 	return;
203 }
204 
205 /*
206  * Terminate compression library context and flush its buffers.
207  */
208 void
EndCompressor(ArchiveHandle * AH,CompressorState * cs)209 EndCompressor(ArchiveHandle *AH, CompressorState *cs)
210 {
211 #ifdef HAVE_LIBZ
212 	if (cs->comprAlg == COMPR_ALG_LIBZ)
213 		EndCompressorZlib(AH, cs);
214 #endif
215 	free(cs);
216 }
217 
218 /* Private routines, specific to each compression method. */
219 
220 #ifdef HAVE_LIBZ
221 /*
222  * Functions for zlib compressed output.
223  */
224 
225 static void
InitCompressorZlib(CompressorState * cs,int level)226 InitCompressorZlib(CompressorState *cs, int level)
227 {
228 	z_streamp	zp;
229 
230 	zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream));
231 	zp->zalloc = Z_NULL;
232 	zp->zfree = Z_NULL;
233 	zp->opaque = Z_NULL;
234 
235 	/*
236 	 * zlibOutSize is the buffer size we tell zlib it can output to.  We
237 	 * actually allocate one extra byte because some routines want to append a
238 	 * trailing zero byte to the zlib output.
239 	 */
240 	cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
241 	cs->zlibOutSize = ZLIB_OUT_SIZE;
242 
243 	if (deflateInit(zp, level) != Z_OK)
244 		exit_horribly(modulename,
245 					  "could not initialize compression library: %s\n",
246 					  zp->msg);
247 
248 	/* Just be paranoid - maybe End is called after Start, with no Write */
249 	zp->next_out = (void *) cs->zlibOut;
250 	zp->avail_out = cs->zlibOutSize;
251 }
252 
253 static void
EndCompressorZlib(ArchiveHandle * AH,CompressorState * cs)254 EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
255 {
256 	z_streamp	zp = cs->zp;
257 
258 	zp->next_in = NULL;
259 	zp->avail_in = 0;
260 
261 	/* Flush any remaining data from zlib buffer */
262 	DeflateCompressorZlib(AH, cs, true);
263 
264 	if (deflateEnd(zp) != Z_OK)
265 		exit_horribly(modulename,
266 					  "could not close compression stream: %s\n", zp->msg);
267 
268 	free(cs->zlibOut);
269 	free(cs->zp);
270 }
271 
272 static void
DeflateCompressorZlib(ArchiveHandle * AH,CompressorState * cs,bool flush)273 DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush)
274 {
275 	z_streamp	zp = cs->zp;
276 	char	   *out = cs->zlibOut;
277 	int			res = Z_OK;
278 
279 	while (cs->zp->avail_in != 0 || flush)
280 	{
281 		res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
282 		if (res == Z_STREAM_ERROR)
283 			exit_horribly(modulename,
284 						  "could not compress data: %s\n", zp->msg);
285 		if ((flush && (zp->avail_out < cs->zlibOutSize))
286 			|| (zp->avail_out == 0)
287 			|| (zp->avail_in != 0)
288 			)
289 		{
290 			/*
291 			 * Extra paranoia: avoid zero-length chunks, since a zero length
292 			 * chunk is the EOF marker in the custom format. This should never
293 			 * happen but...
294 			 */
295 			if (zp->avail_out < cs->zlibOutSize)
296 			{
297 				/*
298 				 * Any write function shoud do its own error checking but to
299 				 * make sure we do a check here as well...
300 				 */
301 				size_t		len = cs->zlibOutSize - zp->avail_out;
302 
303 				cs->writeF(AH, out, len);
304 			}
305 			zp->next_out = (void *) out;
306 			zp->avail_out = cs->zlibOutSize;
307 		}
308 
309 		if (res == Z_STREAM_END)
310 			break;
311 	}
312 }
313 
314 static void
WriteDataToArchiveZlib(ArchiveHandle * AH,CompressorState * cs,const char * data,size_t dLen)315 WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
316 					   const char *data, size_t dLen)
317 {
318 	cs->zp->next_in = (void *) data;
319 	cs->zp->avail_in = dLen;
320 	DeflateCompressorZlib(AH, cs, false);
321 
322 	return;
323 }
324 
325 static void
ReadDataFromArchiveZlib(ArchiveHandle * AH,ReadFunc readF)326 ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
327 {
328 	z_streamp	zp;
329 	char	   *out;
330 	int			res = Z_OK;
331 	size_t		cnt;
332 	char	   *buf;
333 	size_t		buflen;
334 
335 	zp = (z_streamp) pg_malloc(sizeof(z_stream));
336 	zp->zalloc = Z_NULL;
337 	zp->zfree = Z_NULL;
338 	zp->opaque = Z_NULL;
339 
340 	buf = pg_malloc(ZLIB_IN_SIZE);
341 	buflen = ZLIB_IN_SIZE;
342 
343 	out = pg_malloc(ZLIB_OUT_SIZE + 1);
344 
345 	if (inflateInit(zp) != Z_OK)
346 		exit_horribly(modulename,
347 					  "could not initialize compression library: %s\n",
348 					  zp->msg);
349 
350 	/* no minimal chunk size for zlib */
351 	while ((cnt = readF(AH, &buf, &buflen)))
352 	{
353 		/* Are we aborting? */
354 		checkAborting(AH);
355 
356 		zp->next_in = (void *) buf;
357 		zp->avail_in = cnt;
358 
359 		while (zp->avail_in > 0)
360 		{
361 			zp->next_out = (void *) out;
362 			zp->avail_out = ZLIB_OUT_SIZE;
363 
364 			res = inflate(zp, 0);
365 			if (res != Z_OK && res != Z_STREAM_END)
366 				exit_horribly(modulename,
367 							  "could not uncompress data: %s\n", zp->msg);
368 
369 			out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
370 			ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
371 		}
372 	}
373 
374 	zp->next_in = NULL;
375 	zp->avail_in = 0;
376 	while (res != Z_STREAM_END)
377 	{
378 		zp->next_out = (void *) out;
379 		zp->avail_out = ZLIB_OUT_SIZE;
380 		res = inflate(zp, 0);
381 		if (res != Z_OK && res != Z_STREAM_END)
382 			exit_horribly(modulename,
383 						  "could not uncompress data: %s\n", zp->msg);
384 
385 		out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
386 		ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
387 	}
388 
389 	if (inflateEnd(zp) != Z_OK)
390 		exit_horribly(modulename,
391 					  "could not close compression library: %s\n", zp->msg);
392 
393 	free(buf);
394 	free(out);
395 	free(zp);
396 }
397 #endif   /* HAVE_LIBZ */
398 
399 
400 /*
401  * Functions for uncompressed output.
402  */
403 
404 static void
ReadDataFromArchiveNone(ArchiveHandle * AH,ReadFunc readF)405 ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
406 {
407 	size_t		cnt;
408 	char	   *buf;
409 	size_t		buflen;
410 
411 	buf = pg_malloc(ZLIB_OUT_SIZE);
412 	buflen = ZLIB_OUT_SIZE;
413 
414 	while ((cnt = readF(AH, &buf, &buflen)))
415 	{
416 		/* Are we aborting? */
417 		checkAborting(AH);
418 
419 		ahwrite(buf, 1, cnt, AH);
420 	}
421 
422 	free(buf);
423 }
424 
425 static void
WriteDataToArchiveNone(ArchiveHandle * AH,CompressorState * cs,const char * data,size_t dLen)426 WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
427 					   const char *data, size_t dLen)
428 {
429 	cs->writeF(AH, data, dLen);
430 	return;
431 }
432 
433 
434 /*----------------------
435  * Compressed stream API
436  *----------------------
437  */
438 
439 /*
440  * cfp represents an open stream, wrapping the underlying FILE or gzFile
441  * pointer. This is opaque to the callers.
442  */
443 struct cfp
444 {
445 	FILE	   *uncompressedfp;
446 #ifdef HAVE_LIBZ
447 	gzFile		compressedfp;
448 #endif
449 };
450 
451 #ifdef HAVE_LIBZ
452 static int	hasSuffix(const char *filename, const char *suffix);
453 #endif
454 
455 /* free() without changing errno; useful in several places below */
456 static void
free_keep_errno(void * p)457 free_keep_errno(void *p)
458 {
459 	int			save_errno = errno;
460 
461 	free(p);
462 	errno = save_errno;
463 }
464 
465 /*
466  * Open a file for reading. 'path' is the file to open, and 'mode' should
467  * be either "r" or "rb".
468  *
469  * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path'
470  * doesn't already have it) and try again. So if you pass "foo" as 'path',
471  * this will open either "foo" or "foo.gz".
472  *
473  * On failure, return NULL with an error code in errno.
474  */
475 cfp *
cfopen_read(const char * path,const char * mode)476 cfopen_read(const char *path, const char *mode)
477 {
478 	cfp		   *fp;
479 
480 #ifdef HAVE_LIBZ
481 	if (hasSuffix(path, ".gz"))
482 		fp = cfopen(path, mode, 1);
483 	else
484 #endif
485 	{
486 		fp = cfopen(path, mode, 0);
487 #ifdef HAVE_LIBZ
488 		if (fp == NULL)
489 		{
490 			char	   *fname;
491 
492 			fname = psprintf("%s.gz", path);
493 			fp = cfopen(fname, mode, 1);
494 			free_keep_errno(fname);
495 		}
496 #endif
497 	}
498 	return fp;
499 }
500 
501 /*
502  * Open a file for writing. 'path' indicates the path name, and 'mode' must
503  * be a filemode as accepted by fopen() and gzopen() that indicates writing
504  * ("w", "wb", "a", or "ab").
505  *
506  * If 'compression' is non-zero, a gzip compressed stream is opened, and
507  * 'compression' indicates the compression level used. The ".gz" suffix
508  * is automatically added to 'path' in that case.
509  *
510  * On failure, return NULL with an error code in errno.
511  */
512 cfp *
cfopen_write(const char * path,const char * mode,int compression)513 cfopen_write(const char *path, const char *mode, int compression)
514 {
515 	cfp		   *fp;
516 
517 	if (compression == 0)
518 		fp = cfopen(path, mode, 0);
519 	else
520 	{
521 #ifdef HAVE_LIBZ
522 		char	   *fname;
523 
524 		fname = psprintf("%s.gz", path);
525 		fp = cfopen(fname, mode, compression);
526 		free_keep_errno(fname);
527 #else
528 		exit_horribly(modulename, "not built with zlib support\n");
529 		fp = NULL;				/* keep compiler quiet */
530 #endif
531 	}
532 	return fp;
533 }
534 
535 /*
536  * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file
537  * is opened with libz gzopen(), otherwise with plain fopen().
538  *
539  * On failure, return NULL with an error code in errno.
540  */
541 cfp *
cfopen(const char * path,const char * mode,int compression)542 cfopen(const char *path, const char *mode, int compression)
543 {
544 	cfp		   *fp = pg_malloc(sizeof(cfp));
545 
546 	if (compression != 0)
547 	{
548 #ifdef HAVE_LIBZ
549 		if (compression != Z_DEFAULT_COMPRESSION)
550 		{
551 			/* user has specified a compression level, so tell zlib to use it */
552 			char		mode_compression[32];
553 
554 			snprintf(mode_compression, sizeof(mode_compression), "%s%d",
555 					 mode, compression);
556 			fp->compressedfp = gzopen(path, mode_compression);
557 		}
558 		else
559 		{
560 			/* don't specify a level, just use the zlib default */
561 			fp->compressedfp = gzopen(path, mode);
562 		}
563 
564 		fp->uncompressedfp = NULL;
565 		if (fp->compressedfp == NULL)
566 		{
567 			free_keep_errno(fp);
568 			fp = NULL;
569 		}
570 #else
571 		exit_horribly(modulename, "not built with zlib support\n");
572 #endif
573 	}
574 	else
575 	{
576 #ifdef HAVE_LIBZ
577 		fp->compressedfp = NULL;
578 #endif
579 		fp->uncompressedfp = fopen(path, mode);
580 		if (fp->uncompressedfp == NULL)
581 		{
582 			free_keep_errno(fp);
583 			fp = NULL;
584 		}
585 	}
586 
587 	return fp;
588 }
589 
590 
591 int
cfread(void * ptr,int size,cfp * fp)592 cfread(void *ptr, int size, cfp *fp)
593 {
594 	int			ret;
595 
596 	if (size == 0)
597 		return 0;
598 
599 #ifdef HAVE_LIBZ
600 	if (fp->compressedfp)
601 	{
602 		ret = gzread(fp->compressedfp, ptr, size);
603 		if (ret != size && !gzeof(fp->compressedfp))
604 			exit_horribly(modulename,
605 					"could not read from input file: %s\n", strerror(errno));
606 	}
607 	else
608 #endif
609 	{
610 		ret = fread(ptr, 1, size, fp->uncompressedfp);
611 		if (ret != size && !feof(fp->uncompressedfp))
612 			READ_ERROR_EXIT(fp->uncompressedfp);
613 	}
614 	return ret;
615 }
616 
617 int
cfwrite(const void * ptr,int size,cfp * fp)618 cfwrite(const void *ptr, int size, cfp *fp)
619 {
620 #ifdef HAVE_LIBZ
621 	if (fp->compressedfp)
622 		return gzwrite(fp->compressedfp, ptr, size);
623 	else
624 #endif
625 		return fwrite(ptr, 1, size, fp->uncompressedfp);
626 }
627 
628 int
cfgetc(cfp * fp)629 cfgetc(cfp *fp)
630 {
631 	int			ret;
632 
633 #ifdef HAVE_LIBZ
634 	if (fp->compressedfp)
635 	{
636 		ret = gzgetc(fp->compressedfp);
637 		if (ret == EOF)
638 		{
639 			if (!gzeof(fp->compressedfp))
640 				exit_horribly(modulename,
641 					"could not read from input file: %s\n", strerror(errno));
642 			else
643 				exit_horribly(modulename,
644 							"could not read from input file: end of file\n");
645 		}
646 	}
647 	else
648 #endif
649 	{
650 		ret = fgetc(fp->uncompressedfp);
651 		if (ret == EOF)
652 			READ_ERROR_EXIT(fp->uncompressedfp);
653 	}
654 
655 	return ret;
656 }
657 
658 char *
cfgets(cfp * fp,char * buf,int len)659 cfgets(cfp *fp, char *buf, int len)
660 {
661 #ifdef HAVE_LIBZ
662 	if (fp->compressedfp)
663 		return gzgets(fp->compressedfp, buf, len);
664 	else
665 #endif
666 		return fgets(buf, len, fp->uncompressedfp);
667 }
668 
669 int
cfclose(cfp * fp)670 cfclose(cfp *fp)
671 {
672 	int			result;
673 
674 	if (fp == NULL)
675 	{
676 		errno = EBADF;
677 		return EOF;
678 	}
679 #ifdef HAVE_LIBZ
680 	if (fp->compressedfp)
681 	{
682 		result = gzclose(fp->compressedfp);
683 		fp->compressedfp = NULL;
684 	}
685 	else
686 #endif
687 	{
688 		result = fclose(fp->uncompressedfp);
689 		fp->uncompressedfp = NULL;
690 	}
691 	free_keep_errno(fp);
692 
693 	return result;
694 }
695 
696 int
cfeof(cfp * fp)697 cfeof(cfp *fp)
698 {
699 #ifdef HAVE_LIBZ
700 	if (fp->compressedfp)
701 		return gzeof(fp->compressedfp);
702 	else
703 #endif
704 		return feof(fp->uncompressedfp);
705 }
706 
707 #ifdef HAVE_LIBZ
708 static int
hasSuffix(const char * filename,const char * suffix)709 hasSuffix(const char *filename, const char *suffix)
710 {
711 	int			filenamelen = strlen(filename);
712 	int			suffixlen = strlen(suffix);
713 
714 	if (filenamelen < suffixlen)
715 		return 0;
716 
717 	return memcmp(&filename[filenamelen - suffixlen],
718 				  suffix,
719 				  suffixlen) == 0;
720 }
721 
722 #endif
723