1 /* pigz.c -- parallel implementation of gzip
2  * Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012, 2013 Mark Adler
3  * Version 2.3  3 Mar 2013  Mark Adler
4  */
5 
6 // Modified by Christopher Chang (chrchang@alumni.caltech.edu) to export
7 // parallel_compress() as a library function (except on Windows, where the
8 // yarn.c threadling library doesn't yet work).
9 
10 /*
11   This software is provided 'as-is', without any express or implied
12   warranty.  In no event will the author be held liable for any damages
13   arising from the use of this software.
14 
15   Permission is granted to anyone to use this software for any purpose,
16   including commercial applications, and to alter it and redistribute it
17   freely, subject to the following restrictions:
18 
19   1. The origin of this software must not be misrepresented; you must not
20      claim that you wrote the original software. If you use this software
21      in a product, an acknowledgment in the product documentation would be
22      appreciated but is not required.
23   2. Altered source versions must be plainly marked as such, and must not be
24      misrepresented as being the original software.  (THIS IS AN ALTERED
25      VERSION.)
26   3. This notice may not be removed or altered from any source distribution.
27 
28   Mark Adler
29   madler@alumni.caltech.edu
30 
31   Mark accepts donations for providing this software.  Donations are not
32   required or expected.  Any amount that you feel is appropriate would be
33   appreciated.  You can use this link:
34 
35   https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=536055
36 
37  */
38 
39 /* Version history:
40    1.0    17 Jan 2007  First version, pipe only
41    1.1    28 Jan 2007  Avoid void * arithmetic (some compilers don't get that)
42                        Add note about requiring zlib 1.2.3
43                        Allow compression level 0 (no compression)
44                        Completely rewrite parallelism -- add a write thread
45                        Use deflateSetDictionary() to make use of history
46                        Tune argument defaults to best performance on four cores
47    1.2.1   1 Feb 2007  Add long command line options, add all gzip options
48                        Add debugging options
49    1.2.2  19 Feb 2007  Add list (--list) function
50                        Process file names on command line, write .gz output
51                        Write name and time in gzip header, set output file time
52                        Implement all command line options except --recursive
53                        Add --keep option to prevent deleting input files
54                        Add thread tracing information with -vv used
55                        Copy crc32_combine() from zlib (shared libraries issue)
56    1.3    25 Feb 2007  Implement --recursive
57                        Expand help to show all options
58                        Show help if no arguments or output piping are provided
59                        Process options in GZIP environment variable
60                        Add progress indicator to write thread if --verbose
61    1.4     4 Mar 2007  Add --independent to facilitate damaged file recovery
62                        Reallocate jobs for new --blocksize or --processes
63                        Do not delete original if writing to stdout
64                        Allow --processes 1, which does no threading
65                        Add NOTHREAD define to compile without threads
66                        Incorporate license text from zlib in source code
67    1.5    25 Mar 2007  Reinitialize jobs for new compression level
68                        Copy attributes and owner from input file to output file
69                        Add decompression and testing
70                        Add -lt (or -ltv) to show all entries and proper lengths
71                        Add decompression, testing, listing of LZW (.Z) files
72                        Only generate and show trace log if DEBUG defined
73                        Take "-" argument to mean read file from stdin
74    1.6    30 Mar 2007  Add zlib stream compression (--zlib), and decompression
75    1.7    29 Apr 2007  Decompress first entry of a zip file (if deflated)
76                        Avoid empty deflate blocks at end of deflate stream
77                        Show zlib check value (Adler-32) when listing
78                        Don't complain when decompressing empty file
79                        Warn about trailing junk for gzip and zlib streams
80                        Make listings consistent, ignore gzip extra flags
81                        Add zip stream compression (--zip)
82    1.8    13 May 2007  Document --zip option in help output
83    2.0    19 Oct 2008  Complete rewrite of thread usage and synchronization
84                        Use polling threads and a pool of memory buffers
85                        Remove direct pthread library use, hide in yarn.c
86    2.0.1  20 Oct 2008  Check version of zlib at compile time, need >= 1.2.3
87    2.1    24 Oct 2008  Decompress with read, write, inflate, and check threads
88                        Remove spurious use of ctime_r(), ctime() more portable
89                        Change application of job->calc lock to be a semaphore
90                        Detect size of off_t at run time to select %lu vs. %llu
91                        #define large file support macro even if not __linux__
92                        Remove _LARGEFILE64_SOURCE, _FILE_OFFSET_BITS is enough
93                        Detect file-too-large error and report, blame build
94                        Replace check combination routines with those from zlib
95    2.1.1  28 Oct 2008  Fix a leak for files with an integer number of blocks
96                        Update for yarn 1.1 (yarn_prefix and yarn_abort)
97    2.1.2  30 Oct 2008  Work around use of beta zlib in production systems
98    2.1.3   8 Nov 2008  Don't use zlib combination routines, put back in pigz
99    2.1.4   9 Nov 2008  Fix bug when decompressing very short files
100    2.1.5  20 Jul 2009  Added 2008, 2009 to --license statement
101                        Allow numeric parameter immediately after -p or -b
102                        Enforce parameter after -p, -b, -s, before other options
103                        Enforce numeric parameters to have only numeric digits
104                        Try to determine the number of processors for -p default
105                        Fix --suffix short option to be -S to match gzip [Bloch]
106                        Decompress if executable named "unpigz" [Amundsen]
107                        Add a little bit of testing to Makefile
108    2.1.6  17 Jan 2010  Added pigz.spec to distribution for RPM systems [Brown]
109                        Avoid some compiler warnings
110                        Process symbolic links if piping to stdout [Hoffstätte]
111                        Decompress if executable named "gunzip" [Hoffstätte]
112                        Allow ".tgz" suffix [Chernookiy]
113                        Fix adler32 comparison on .zz files
114    2.1.7  17 Dec 2011  Avoid unused parameter warning in reenter()
115                        Don't assume 2's complement ints in compress_thread()
116                        Replicate gzip -cdf cat-like behavior
117                        Replicate gzip -- option to suppress option decoding
118                        Test output from make test instead of showing it
119                        Updated pigz.spec to install unpigz, pigz.1 [Obermaier]
120                        Add PIGZ environment variable [Mueller]
121                        Replicate gzip suffix search when decoding or listing
122                        Fix bug in load() to set in_left to zero on end of file
123                        Do not check suffix when input file won't be modified
124                        Decompress to stdout if name is "*cat" [Hayasaka]
125                        Write data descriptor signature to be like Info-ZIP
126                        Update and sort options list in help
127                        Use CC variable for compiler in Makefile
128                        Exit with code 2 if a warning has been issued
129                        Fix thread synchronization problem when tracing
130                        Change macro name MAX to MAX2 to avoid library conflicts
131                        Determine number of processors on HP-UX [Lloyd]
132    2.2    31 Dec 2011  Check for expansion bound busting (e.g. modified zlib)
133                        Make the "threads" list head global variable volatile
134                        Fix construction and printing of 32-bit check values
135                        Add --rsyncable functionality
136    2.2.1   1 Jan 2012  Fix bug in --rsyncable buffer management
137    2.2.2   1 Jan 2012  Fix another bug in --rsyncable buffer management
138    2.2.3  15 Jan 2012  Remove volatile in yarn.c
139                        Reduce the number of input buffers
140                        Change initial rsyncable hash to comparison value
141                        Improve the efficiency of arriving at a byte boundary
142                        Add thread portability #defines from yarn.c
143                        Have rsyncable compression be independent of threading
144                        Fix bug where constructed dictionaries not being used
145    2.2.4  11 Mar 2012  Avoid some return value warnings
146                        Improve the portability of printing the off_t type
147                        Check for existence of compress binary before using
148                        Update zlib version checking to 1.2.6 for new functions
149                        Fix bug in zip (-K) output
150                        Fix license in pigz.spec
151                        Remove thread portability #defines in pigz.c
152    2.2.5  28 Jul 2012  Avoid race condition in free_pool()
153                        Change suffix to .tar when decompressing or listing .tgz
154                        Print name of executable in error messages
155                        Show help properly when the name is unpigz or gunzip
156                        Fix permissions security problem before output is closed
157    2.3     3 Mar 2013  Don't complain about missing suffix when not writing output file
158                        Put all global variables in one global structure for readability
159                        Do not decompress concatenated zlib streams -- only gzip streams
160                        Add option for compression level 11 to use zopfli
161                        Fix handling of junk after compressed data
162  */
163 
164 /* To-do:
165     - make source portable for Windows, VMS, etc. (see gzip source code)
166     - make build portable (currently good for Unixish)
167  */
168 
169 /*
170    pigz compresses using threads to make use of multiple processors and cores.
171    The input is broken up into 128 KB chunks with each compressed in parallel.
172    The individual check value for each chunk is also calculated in parallel.
173    The compressed data is written in order to the output, and a combined check
174    value is calculated from the individual check values.
175 
176    The compressed data format generated is in the gzip, zlib, or single-entry
177    zip format using the deflate compression method.  The compression produces
178    partial raw deflate streams which are concatenated by a single write thread
179    and wrapped with the appropriate header and trailer, where the trailer
180    contains the combined check value.
181 
182    Each partial raw deflate stream is terminated by an empty stored block
183    (using the Z_SYNC_FLUSH option of zlib), in order to end that partial bit
184    stream at a byte boundary, unless that partial stream happens to already end
185    at a byte boundary (the latter requires zlib 1.2.6 or later).  Ending on a
186    byte boundary allows the partial streams to be concatenated simply as
187    sequences of bytes.  This adds a very small four to five byte overhead
188    (average 3.75 bytes) to the output for each input chunk.
189 
190    The default input block size is 128K, but can be changed with the -b option.
191    The number of compress threads is set by default to 8, which can be changed
192    using the -p option.  Specifying -p 1 avoids the use of threads entirely.
193    pigz will try to determine the number of processors in the machine, in which
194    case if that number is two or greater, pigz will use that as the default for
195    -p instead of 8.
196 
197    The input blocks, while compressed independently, have the last 32K of the
198    previous block loaded as a preset dictionary to preserve the compression
199    effectiveness of deflating in a single thread.  This can be turned off using
200    the --independent or -i option, so that the blocks can be decompressed
201    independently for partial error recovery or for random access.
202 
203    Decompression can't be parallelized, at least not without specially prepared
204    deflate streams for that purpose.  As a result, pigz uses a single thread
205    (the main thread) for decompression, but will create three other threads for
206    reading, writing, and check calculation, which can speed up decompression
207    under some circumstances.  Parallel decompression can be turned off by
208    specifying one process (-dp 1 or -tp 1).
209 
210    pigz requires zlib 1.2.1 or later to allow setting the dictionary when doing
211    raw deflate.  Since zlib 1.2.3 corrects security vulnerabilities in zlib
212    version 1.2.1 and 1.2.2, conditionals check for zlib 1.2.3 or later during
213    the compilation of pigz.c.  zlib 1.2.4 includes some improvements to
214    Z_FULL_FLUSH and deflateSetDictionary() that permit identical output for
215    pigz with and without threads, which is not possible with zlib 1.2.3.  This
216    may be important for uses of pigz -R where small changes in the contents
217    should result in small changes in the archive for rsync.  Note that due to
218    the details of how the lower levels of compression result in greater speed,
219    compression level 3 and below does not permit identical pigz output with
220    and without threads.
221 
222    pigz uses the POSIX pthread library for thread control and communication,
223    through the yarn.h interface to yarn.c.  yarn.c can be replaced with
224    equivalent implementations using other thread libraries.  pigz can be
225    compiled with NOTHREAD #defined to not use threads at all (in which case
226    pigz will not be able to live up to the "parallel" in its name).
227  */
228 
229 /*
230    Details of parallel compression implementation:
231 
232    When doing parallel compression, pigz uses the main thread to read the input
233    in 'size' sized chunks (see -b), and puts those in a compression job list,
234    each with a sequence number to keep track of the ordering.  If it is not the
235    first chunk, then that job also points to the previous input buffer, from
236    which the last 32K will be used as a dictionary (unless -i is specified).
237    This sets a lower limit of 32K on 'size'.
238 
239    pigz launches up to 'procs' compression threads (see -p).  Each compression
240    thread continues to look for jobs in the compression list and perform those
241    jobs until instructed to return.  When a job is pulled, the dictionary, if
242    provided, will be loaded into the deflate engine and then that input buffer
243    is dropped for reuse.  Then the input data is compressed into an output
244    buffer that grows in size if necessary to hold the compressed data. The job
245    is then put into the write job list, sorted by the sequence number. The
246    compress thread however continues to calculate the check value on the input
247    data, either a CRC-32 or Adler-32, possibly in parallel with the write
248    thread writing the output data.  Once that's done, the compress thread drops
249    the input buffer and also releases the lock on the check value so that the
250    write thread can combine it with the previous check values.  The compress
251    thread has then completed that job, and goes to look for another.
252 
253    All of the compress threads are left running and waiting even after the last
254    chunk is processed, so that they can support the next input to be compressed
255    (more than one input file on the command line).  Once pigz is done, it will
256    call all the compress threads home (that'll do pig, that'll do).
257 
258    Before starting to read the input, the main thread launches the write thread
259    so that it is ready pick up jobs immediately.  The compress thread puts the
260    write jobs in the list in sequence sorted order, so that the first job in
261    the list is always has the lowest sequence number.  The write thread waits
262    for the next write job in sequence, and then gets that job.  The job still
263    holds its input buffer, from which the write thread gets the input buffer
264    length for use in check value combination.  Then the write thread drops that
265    input buffer to allow its reuse.  Holding on to the input buffer until the
266    write thread starts also has the benefit that the read and compress threads
267    can't get way ahead of the write thread and build up a large backlog of
268    unwritten compressed data.  The write thread will write the compressed data,
269    drop the output buffer, and then wait for the check value to be unlocked
270    by the compress thread.  Then the write thread combines the check value for
271    this chunk with the total check value for eventual use in the trailer.  If
272    this is not the last chunk, the write thread then goes back to look for the
273    next output chunk in sequence.  After the last chunk, the write thread
274    returns and joins the main thread.  Unlike the compress threads, a new write
275    thread is launched for each input stream.  The write thread writes the
276    appropriate header and trailer around the compressed data.
277 
278    The input and output buffers are reused through their collection in pools.
279    Each buffer has a use count, which when decremented to zero returns the
280    buffer to the respective pool.  Each input buffer has up to three parallel
281    uses: as the input for compression, as the data for the check value
282    calculation, and as a dictionary for compression.  Each output buffer has
283    only one use, which is as the output of compression followed serially as
284    data to be written.  The input pool is limited in the number of buffers, so
285    that reading does not get way ahead of compression and eat up memory with
286    more input than can be used.  The limit is approximately two times the
287    number of compression threads.  In the case that reading is fast as compared
288    to compression, that number allows a second set of buffers to be read while
289    the first set of compressions are being performed.  The number of output
290    buffers is not directly limited, but is indirectly limited by the release of
291    input buffers to about the same number.
292  */
293 
294 #ifdef _WIN32
295 // stopgap non-parallel code for Windows
296 
297 #include <stdio.h>
298 #include <stdlib.h>
299 #include <windows.h>
300 #ifdef DYNAMIC_ZLIB
301   #include <zlib.h>
302 #else
303   #include "../zlib-1.2.11/zlib.h"
304 #endif
305 
306 #include "pigz.h"
307 
308 #ifdef _WIN64
309   #define putc_unlocked _fputc_nolock
310 #else
311   #define putc_unlocked putc
312 #endif
313 
pigz_init(uint32_t setprocs)314 void pigz_init(uint32_t setprocs) {
315   return;
316 }
317 
parallel_compress(char * out_fname,unsigned char * overflow_buf,uint32_t do_append,uint32_t (* emitn)(uint32_t,unsigned char *))318 void parallel_compress(char* out_fname, unsigned char* overflow_buf, uint32_t do_append, uint32_t(* emitn)(uint32_t, unsigned char*)) {
319   // minor issue: this currently writes \n instead of \r\n linebreaks.
320   uint32_t overflow_ct = 0;
321   gzFile gz_outfile = gzopen(out_fname, do_append? "ab": "wb");
322   unsigned char* write_ptr;
323   uint32_t last_size;
324   if (!gz_outfile) {
325     putc_unlocked('\n', stdout);
326     fflush(stdout);
327     fprintf(stderr, "Error: Failed to open %s.\n", out_fname);
328     exit(2);
329   }
330   do {
331     last_size = emitn(overflow_ct, overflow_buf);
332     if (last_size > PIGZ_BLOCK_SIZE) {
333       overflow_ct = last_size - PIGZ_BLOCK_SIZE;
334       last_size = PIGZ_BLOCK_SIZE;
335     } else {
336       overflow_ct = 0;
337     }
338     if (last_size) {
339       if (!gzwrite(gz_outfile, overflow_buf, last_size)) {
340 	putc_unlocked('\n', stdout);
341 	fflush(stdout);
342 	fputs("Error: File write failure.\n", stderr);
343 	gzclose(gz_outfile);
344 	exit(6);
345       }
346     }
347     if (overflow_ct) {
348       write_ptr = &(overflow_buf[PIGZ_BLOCK_SIZE]);
349       while (overflow_ct > PIGZ_BLOCK_SIZE) {
350 	if (!gzwrite(gz_outfile, write_ptr, PIGZ_BLOCK_SIZE)) {
351 	  putc_unlocked('\n', stdout);
352 	  fflush(stdout);
353 	  fputs("Error: File write failure.\n", stderr);
354 	  gzclose(gz_outfile);
355 	  exit(6);
356 	}
357 	write_ptr = &(write_ptr[PIGZ_BLOCK_SIZE]);
358 	overflow_ct -= PIGZ_BLOCK_SIZE;
359       }
360       memcpy(overflow_buf, write_ptr, overflow_ct);
361     }
362   } while (last_size);
363   if (gzclose(gz_outfile) != Z_OK) {
364     putc_unlocked('\n', stdout);
365     fflush(stdout);
366     fputs("Error: File write failure.\n", stderr);
367     exit(6);
368   }
369 }
370 
pzwrite_init(char * out_fname,unsigned char * overflow_buf,uint32_t do_append,Pigz_state * ps_ptr)371 int32_t pzwrite_init(char* out_fname, unsigned char* overflow_buf, uint32_t do_append, Pigz_state* ps_ptr) {
372     ps_ptr->outfile = fopen(out_fname, do_append? "ab" : "wb");
373     ps_ptr->gz_outfile = NULL;
374     if (!ps_ptr->outfile) {
375         putc_unlocked('\n', stdout);
376 	fflush(stdout);
377         fprintf(stderr, "Error: Failed to open %s.\n", out_fname);
378         return 2; // RET_OPEN_FAIL
379     }
380     ps_ptr->overflow_buf = overflow_buf;
381     return 0;
382 }
383 
compressed_pzwrite_init(char * out_fname,unsigned char * overflow_buf,uint32_t do_append,Pigz_state * ps_ptr)384 void compressed_pzwrite_init(char* out_fname, unsigned char* overflow_buf, uint32_t do_append, Pigz_state* ps_ptr) {
385     ps_ptr->outfile = NULL;
386     ps_ptr->gz_outfile = gzopen(out_fname, do_append? "ab" : "wb");
387     if (!ps_ptr->gz_outfile) {
388         putc_unlocked('\n', stdout);
389         fflush(stdout);
390         fprintf(stderr, "Error: Failed to open %s.\n", out_fname);
391         exit(2);
392     }
393     ps_ptr->overflow_buf = overflow_buf;
394 }
395 
flex_pzwrite_init(uint32_t output_gz,char * out_fname,unsigned char * overflow_buf,uint32_t do_append,Pigz_state * ps_ptr)396 int32_t flex_pzwrite_init(uint32_t output_gz, char* out_fname, unsigned char* overflow_buf, uint32_t do_append, Pigz_state* ps_ptr) {
397     if (!output_gz) {
398         return pzwrite_init(out_fname, overflow_buf, do_append, ps_ptr);
399     } else {
400         compressed_pzwrite_init(out_fname, overflow_buf, do_append, ps_ptr);
401         return 0;
402     }
403 }
404 
force_pzwrite(Pigz_state * ps_ptr,char ** writep_ptr,uint32_t write_min)405 int32_t force_pzwrite(Pigz_state* ps_ptr, char** writep_ptr, uint32_t write_min) {
406     unsigned char* writep = (unsigned char*)(*writep_ptr);
407     if (ps_ptr->overflow_buf != writep) {
408         if (!fwrite(ps_ptr->overflow_buf, writep - ps_ptr->overflow_buf, 1, ps_ptr->outfile)) {
409 	    return 6; // RET_WRITE_FAIL
410 	}
411         *writep_ptr = (char*)(ps_ptr->overflow_buf);
412     }
413     return 0;
414 }
415 
force_compressed_pzwrite(Pigz_state * ps_ptr,char ** writep_ptr,uint32_t write_min)416 void force_compressed_pzwrite(Pigz_state* ps_ptr, char** writep_ptr, uint32_t write_min) {
417     unsigned char* writep = (unsigned char*)(*writep_ptr);
418     if (ps_ptr->overflow_buf != writep) {
419         if (!gzwrite(ps_ptr->gz_outfile, ps_ptr->overflow_buf, writep - ps_ptr->overflow_buf)) {
420 	    putc_unlocked('\n', stdout);
421 	    fflush(stdout);
422 	    fputs("Error: File write failure.\n", stderr);
423             gzclose(ps_ptr->gz_outfile);
424             exit(6);
425         }
426         *writep_ptr = (char*)(ps_ptr->overflow_buf);
427     }
428 }
429 
flex_pzputs_std(Pigz_state * ps_ptr,char ** writep_ptr,char * ss,uint32_t sslen)430 int32_t flex_pzputs_std(Pigz_state* ps_ptr, char** writep_ptr, char* ss, uint32_t sslen) {
431     unsigned char* writep = (unsigned char*)(*writep_ptr);
432     unsigned char* readp = (unsigned char*)ss;
433     uint32_t cur_write_space = 2 * PIGZ_BLOCK_SIZE - ((uintptr_t)(writep - ps_ptr->overflow_buf));
434     while (sslen > cur_write_space) {
435         memcpy(writep, readp, cur_write_space);
436 	if (is_uncompressed_pzwrite(ps_ptr)) {
437 	    if (!fwrite(ps_ptr->overflow_buf, 2 * PIGZ_BLOCK_SIZE, 1, ps_ptr->outfile)) {
438                 return 6;
439 	    }
440 	} else {
441 	    if (!gzwrite(ps_ptr->gz_outfile, ps_ptr->overflow_buf, 2 * PIGZ_BLOCK_SIZE)) {
442 	        putc_unlocked('\n', stdout);
443 		fflush(stdout);
444 	        fputs("Error: File write failure.\n", stderr);
445 	        gzclose(ps_ptr->gz_outfile);
446 	        exit(6);
447 	    }
448 	}
449         writep = ps_ptr->overflow_buf;
450         readp = &(readp[cur_write_space]);
451 	sslen -= cur_write_space;
452 	cur_write_space = 2 * PIGZ_BLOCK_SIZE;
453     }
454     memcpy(writep, readp, sslen);
455     *writep_ptr = (char*)(&(writep[sslen]));
456     return flex_pzwrite(ps_ptr, writep_ptr);
457 }
458 
pzwrite_close_null(Pigz_state * ps_ptr,char * writep)459 int32_t pzwrite_close_null(Pigz_state* ps_ptr, char* writep) {
460     force_pzwrite(ps_ptr, &writep, 0);
461     int32_t ii = ferror(ps_ptr->outfile);
462     int32_t jj = fclose(ps_ptr->outfile);
463     ps_ptr->overflow_buf = NULL;
464     return ii || jj;
465 }
466 
compressed_pzwrite_close_null(Pigz_state * ps_ptr,char * writep)467 void compressed_pzwrite_close_null(Pigz_state* ps_ptr, char* writep) {
468     force_compressed_pzwrite(ps_ptr, &writep, 0);
469     ps_ptr->overflow_buf = NULL;
470     if (gzclose(ps_ptr->gz_outfile) != Z_OK) {
471         putc_unlocked('\n', stdout);
472 	fflush(stdout);
473         fputs("Error: File write failure.\n", stderr);
474         exit(6);
475     }
476 }
477 
flex_pzwrite_close_null(Pigz_state * ps_ptr,char * writep)478 int32_t flex_pzwrite_close_null(Pigz_state* ps_ptr, char* writep) {
479     if (is_uncompressed_pzwrite(ps_ptr)) {
480         return pzwrite_close_null(ps_ptr, writep);
481     } else {
482         compressed_pzwrite_close_null(ps_ptr, writep);
483         return 0;
484     }
485 }
486 #else
487 
488 #define VERSION "pigz 2.3\n"
489 
490 /* use large file functions if available */
491 #define _FILE_OFFSET_BITS 64
492 
493 /* included headers and what is expected from each */
494 #include <stdio.h>      /* fflush(), fprintf(), fputs(), getchar(), putc(), */
495                         /* puts(), printf(), vasprintf(), stderr, EOF, NULL,
496                            SEEK_END, size_t, off_t */
497 #include <stdlib.h>     /* exit(), malloc(), free(), realloc(), atol(), */
498                         /* atoi(), getenv() */
499 #include <stdarg.h>     /* va_start(), va_end(), va_list */
500 #include <string.h>     /* memset(), memchr(), memcpy(), strcmp(), strcpy() */
501                         /* strncpy(), strlen(), strcat(), strrchr() */
502 #include <errno.h>      /* errno, EEXIST */
503 #include <assert.h>     /* assert() */
504 #include <time.h>       /* ctime(), time(), time_t, mktime() */
505 #include <signal.h>     /* signal(), SIGINT */
506 #include <sys/types.h>  /* ssize_t */
507 #include <sys/stat.h>   /* chmod(), stat(), fstat(), lstat(), struct stat, */
508                         /* S_IFDIR, S_IFLNK, S_IFMT, S_IFREG */
509 #include <sys/time.h>   /* utimes(), gettimeofday(), struct timeval */
510 #include <unistd.h>     /* unlink(), _exit(), read(), write(), close(), */
511                         /* lseek(), isatty(), chown() */
512 #include <fcntl.h>      /* open(), O_CREAT, O_EXCL, O_RDONLY, O_TRUNC, */
513                         /* O_WRONLY */
514 #include <dirent.h>     /* opendir(), readdir(), closedir(), DIR, */
515                         /* struct dirent */
516 #include <limits.h>     /* PATH_MAX, UINT_MAX */
517 #if __STDC_VERSION__-0 >= 199901L || __GNUC__-0 >= 3
518 #  include <inttypes.h> /* intmax_t */
519 #endif
520 
521 #ifdef __hpux
522 #  include <sys/param.h>
523 #  include <sys/pstat.h>
524 #endif
525 
526 #ifdef DYNAMIC_ZLIB
527   #include <zlib.h>
528 #else
529   #include "../zlib-1.2.11/zlib.h" /* deflateInit2(), deflateReset(), deflate(), */
530                         /* deflateEnd(), deflateSetDictionary(), crc32(),
531                            inflateBackInit(), inflateBack(), inflateBackEnd(),
532                            Z_DEFAULT_COMPRESSION, Z_DEFAULT_STRATEGY,
533                            Z_DEFLATED, Z_NO_FLUSH, Z_NULL, Z_OK,
534                            Z_SYNC_FLUSH, z_stream */
535 #endif
536 #if !defined(ZLIB_VERNUM) || ZLIB_VERNUM < 0x1230
537 #  error Need zlib version 1.2.3 or later
538 #endif
539 
540 #ifndef NOTHREAD
541 #  include "yarn.h"     /* thread, launch(), join(), join_all(), */
542                         /* lock, new_lock(), possess(), twist(), wait_for(),
543                            release(), peek_lock(), free_lock(), yarn_name */
544 #endif
545 
546 #include "pigz.h"
547 
548 
549 /* for local functions and globals */
550 #define local static
551 
552 /* prevent end-of-line conversions on MSDOSish operating systems */
553 #if defined(MSDOS) || defined(OS2) || defined(WIN32) || defined(__CYGWIN__)
554 #  include <io.h>       /* setmode(), O_BINARY */
555 #  define SET_BINARY_MODE(fd) setmode(fd, O_BINARY)
556 #else
557 #  define SET_BINARY_MODE(fd)
558 #endif
559 
560 /* release an allocated pointer, if allocated, and mark as unallocated */
561 #define RELEASE(ptr) \
562     do { \
563         if ((ptr) != NULL) { \
564             free(ptr); \
565             ptr = NULL; \
566         } \
567     } while (0)
568 
569 /* sliding dictionary size for deflate */
570 #define DICT 32768U
571 
572 /* largest power of 2 that fits in an unsigned int -- used to limit requests
573    to zlib functions that use unsigned int lengths */
574 #define MAXP2 (UINT_MAX - (UINT_MAX >> 1))
575 
576 /* rsyncable constants -- RSYNCBITS is the number of bits in the mask for
577    comparison.  For random input data, there will be a hit on average every
578    1<<RSYNCBITS bytes.  So for an RSYNCBITS of 12, there will be an average of
579    one hit every 4096 bytes, resulting in a mean block size of 4096.  RSYNCMASK
580    is the resulting bit mask.  RSYNCHIT is what the hash value is compared to
581    after applying the mask.
582 
583    The choice of 12 for RSYNCBITS is consistent with the original rsyncable
584    patch for gzip which also uses a 12-bit mask.  This results in a relatively
585    small hit to compression, on the order of 1.5% to 3%.  A mask of 13 bits can
586    be used instead if a hit of less than 1% to the compression is desired, at
587    the expense of more blocks transmitted for rsync updates.  (Your mileage may
588    vary.)
589 
590    This implementation of rsyncable uses a different hash algorithm than what
591    the gzip rsyncable patch uses in order to provide better performance in
592    several regards.  The algorithm is simply to shift the hash value left one
593    bit and exclusive-or that with the next byte.  This is masked to the number
594    of hash bits (RSYNCMASK) and compared to all ones except for a zero in the
595    top bit (RSYNCHIT). This rolling hash has a very small window of 19 bytes
596    (RSYNCBITS+7).  The small window provides the benefit of much more rapid
597    resynchronization after a change, than does the 4096-byte window of the gzip
598    rsyncable patch.
599 
600    The comparison value is chosen to avoid matching any repeated bytes or short
601    sequences.  The gzip rsyncable patch on the other hand uses a sum and zero
602    for comparison, which results in certain bad behaviors, such as always
603    matching everywhere in a long sequence of zeros.  Such sequences occur
604    frequently in tar files.
605 
606    This hash efficiently discards history older than 19 bytes simply by
607    shifting that data past the top of the mask -- no history needs to be
608    retained to undo its impact on the hash value, as is needed for a sum.
609 
610    The choice of the comparison value (RSYNCHIT) has the virtue of avoiding
611    extremely short blocks.  The shortest block is five bytes (RSYNCBITS-7) from
612    hit to hit, and is unlikely.  Whereas with the gzip rsyncable algorithm,
613    blocks of one byte are not only possible, but in fact are the most likely
614    block size.
615 
616    Thanks and acknowledgement to Kevin Day for his experimentation and insights
617    on rsyncable hash characteristics that led to some of the choices here.
618  */
619 #define RSYNCBITS 12
620 #define RSYNCMASK ((1U << RSYNCBITS) - 1)
621 #define RSYNCHIT (RSYNCMASK >> 1)
622 
623 /* initial pool counts and sizes -- INBUFS is the limit on the number of input
624    spaces as a function of the number of processors (used to throttle the
625    creation of compression jobs), OUTPOOL is the initial size of the output
626    data buffer, chosen to make resizing of the buffer very unlikely and to
627    allow prepending with a dictionary for use as an input buffer for zopfli */
628 #define INBUFS(p) (((p)<<1)+3)
629 #define OUTPOOL(s) ((s)+((s)>>4)+DICT)
630 
631 /* input buffer size */
632 #define BUF 32768U
633 
634 /* globals (modified by main thread only when it's the only thread) */
635 local struct {
636     char *prog;             /* name by which pigz was invoked */
637     int outd;               /* output file descriptor */
638     char *outf;             /* output file name (allocated if not NULL) */
639     int verbosity;          /* 0 = quiet, 1 = normal, 2 = verbose, 3 = trace */
640     time_t mtime;           /* time stamp from input file for gzip header */
641     int level;              /* compression level */
642     int procs;              /* maximum number of compression threads (>= 1) */
643     size_t block;           /* uncompressed input size per thread (>= 32K) */
644     int warned;             /* true if a warning has been given */
645 } g;
646 
647 /* display a complaint with the program name on stderr */
complain(const char * fmt,...)648 local int complain(const char *fmt, ...)
649 {
650     va_list ap;
651 
652     if (g.verbosity > 0) {
653         fprintf(stderr, "%s: ", g.prog);
654         va_start(ap, fmt);
655         vfprintf(stderr, fmt, ap);
656         va_end(ap);
657         putc_unlocked('\n', stderr);
658         fflush(stderr);
659         g.warned = 1;
660     }
661     return 0;
662 }
663 
664 /* exit with error, delete output file if in the middle of writing it */
bail(const char * why,const char * what)665 local int bail(const char *why, const char *what)
666 {
667     if (g.outd != -1 && g.outf != NULL)
668         unlink(g.outf);
669     complain("abort: %s%s", why, what);
670     exit(1);
671     return 0;
672 }
673 
674 /* write len bytes, repeating write() calls as needed */
writen(int desc,unsigned char * buf,size_t len)675 local void writen(int desc, unsigned char *buf, size_t len)
676 {
677     ssize_t ret;
678 
679     while (len) {
680         ret = write(desc, buf, len);
681         if (ret < 1) {
682             complain("write error code %d", errno);
683             bail("write error on ", g.outf);
684         }
685         buf += ret;
686         len -= ret;
687     }
688 }
689 
690 /* put a 4-byte integer into a byte array in LSB order or MSB order */
691 #define PUT2L(a,b) (*(a)=(b)&0xff,(a)[1]=(b)>>8)
692 #define PUT4L(a,b) (PUT2L(a,(b)&0xffff),PUT2L((a)+2,(b)>>16))
693 #define PUT4M(a,b) (*(a)=(b)>>24,(a)[1]=(b)>>16,(a)[2]=(b)>>8,(a)[3]=(b))
694 
695 /* write a gzip, zlib, or zip header using the information in the globals */
put_header(void)696 local unsigned long put_header(void)
697 {
698     unsigned long len;
699     unsigned char head[30];
700 
701     head[0] = 31;
702     head[1] = 139;
703     head[2] = 8;                /* deflate */
704     head[3] = 0;
705     PUT4L(head + 4, g.mtime);
706     head[8] = g.level >= 9 ? 2 : (g.level == 1 ? 4 : 0);
707     head[9] = 3;                /* unix */
708     writen(g.outd, head, 10);
709     len = 10;
710     return len;
711 }
712 
713 /* write a gzip, zlib, or zip trailer */
put_trailer(unsigned long ulen,unsigned long clen,unsigned long check,unsigned long head)714 local void put_trailer(unsigned long ulen, unsigned long clen,
715                        unsigned long check, unsigned long head)
716 {
717     unsigned char tail[46];
718 
719     PUT4L(tail, check);
720     PUT4L(tail + 4, ulen);
721     writen(g.outd, tail, 8);
722 }
723 
724 /* compute check value depending on format */
725 #define CHECK(a,b,c) crc32(a,b,c)
726 
727 #ifndef NOTHREAD
728 /* -- threaded portions of pigz -- */
729 
730 /* -- check value combination routines for parallel calculation -- */
731 
732 #define COMB(a,b,c) crc32_comb(a,b,c)
733 /* combine two crc-32's or two adler-32's (copied from zlib 1.2.3 so that pigz
734    can be compatible with older versions of zlib) */
735 
736 /* we copy the combination routines from zlib here, in order to avoid
737    linkage issues with the zlib 1.2.3 builds on Sun, Ubuntu, and others */
738 
gf2_matrix_times(unsigned long * mat,unsigned long vec)739 local unsigned long gf2_matrix_times(unsigned long *mat, unsigned long vec)
740 {
741     unsigned long sum;
742 
743     sum = 0;
744     while (vec) {
745         if (vec & 1)
746             sum ^= *mat;
747         vec >>= 1;
748         mat++;
749     }
750     return sum;
751 }
752 
gf2_matrix_square(unsigned long * square,unsigned long * mat)753 local void gf2_matrix_square(unsigned long *square, unsigned long *mat)
754 {
755     int n;
756 
757     for (n = 0; n < 32; n++)
758         square[n] = gf2_matrix_times(mat, mat[n]);
759 }
760 
crc32_comb(unsigned long crc1,unsigned long crc2,size_t len2)761 local unsigned long crc32_comb(unsigned long crc1, unsigned long crc2,
762                                size_t len2)
763 {
764     int n;
765     unsigned long row;
766     unsigned long even[32];     /* even-power-of-two zeros operator */
767     unsigned long odd[32];      /* odd-power-of-two zeros operator */
768 
769     /* degenerate case */
770     if (len2 == 0)
771         return crc1;
772 
773     /* put operator for one zero bit in odd */
774     odd[0] = 0xedb88320UL;          /* CRC-32 polynomial */
775     row = 1;
776     for (n = 1; n < 32; n++) {
777         odd[n] = row;
778         row <<= 1;
779     }
780 
781     /* put operator for two zero bits in even */
782     gf2_matrix_square(even, odd);
783 
784     /* put operator for four zero bits in odd */
785     gf2_matrix_square(odd, even);
786 
787     /* apply len2 zeros to crc1 (first square will put the operator for one
788        zero byte, eight zero bits, in even) */
789     do {
790         /* apply zeros operator for this bit of len2 */
791         gf2_matrix_square(even, odd);
792         if (len2 & 1)
793             crc1 = gf2_matrix_times(even, crc1);
794         len2 >>= 1;
795 
796         /* if no more bits set, then done */
797         if (len2 == 0)
798             break;
799 
800         /* another iteration of the loop with odd and even swapped */
801         gf2_matrix_square(odd, even);
802         if (len2 & 1)
803             crc1 = gf2_matrix_times(odd, crc1);
804         len2 >>= 1;
805 
806         /* if no more bits set, then done */
807     } while (len2 != 0);
808 
809     /* return combined crc */
810     crc1 ^= crc2;
811     return crc1;
812 }
813 
814 #define BASE 65521U     /* largest prime smaller than 65536 */
815 #define LOW16 0xffff    /* mask lower 16 bits */
816 
817 /* initialize a pool (pool structure itself provided, not allocated) -- the
818    limit is the maximum number of spaces in the pool, or -1 to indicate no
819    limit, i.e., to never wait for a buffer to return to the pool */
new_pool(struct pool * pool,size_t size,int limit)820 local void new_pool(struct pool *pool, size_t size, int limit)
821 {
822     pool->have = new_lock(0);
823     pool->head = NULL;
824     pool->size = size;
825     pool->limit = limit;
826     pool->made = 0;
827 }
828 
829 /* get a space from a pool -- the use count is initially set to one, so there
830    is no need to call use_space() for the first use */
get_space(struct pool * pool)831 local struct space *get_space(struct pool *pool)
832 {
833     struct space *space;
834 
835     /* if can't create any more, wait for a space to show up */
836     possess(pool->have);
837     if (pool->limit == 0)
838         wait_for(pool->have, NOT_TO_BE, 0);
839 
840     /* if a space is available, pull it from the list and return it */
841     if (pool->head != NULL) {
842         space = pool->head;
843         possess(space->use);
844         pool->head = space->next;
845         twist(pool->have, BY, -1);      /* one less in pool */
846         twist(space->use, TO, 1);       /* initially one user */
847         space->len = 0;
848         return space;
849     }
850 
851     /* nothing available, don't want to wait, make a new space */
852     assert(pool->limit != 0);
853     if (pool->limit > 0)
854         pool->limit--;
855     pool->made++;
856     release(pool->have);
857     space = (struct space*)malloc(sizeof(struct space));
858     if (space == NULL)
859         bail("not enough memory", "");
860     space->use = new_lock(1);           /* initially one user */
861     space->buf = (unsigned char*)malloc(pool->size);
862     if (space->buf == NULL)
863         bail("not enough memory", "");
864     space->size = pool->size;
865     space->len = 0;
866     space->pool = pool;                 /* remember the pool this belongs to */
867     return space;
868 }
869 
870 /* compute next size up by multiplying by about 2**(1/3) and round to the next
871    power of 2 if we're close (so three applications results in doubling) -- if
872    small, go up to at least 16, if overflow, go to max size_t value */
grow(size_t size)873 local size_t grow(size_t size)
874 {
875     size_t was, top;
876     int shift;
877 
878     was = size;
879     size += size >> 2;
880     top = size;
881     for (shift = 0; top > 7; shift++)
882         top >>= 1;
883     if (top == 7)
884         size = (size_t)1 << (shift + 3);
885     if (size < 16)
886         size = 16;
887     if (size <= was)
888         size = (size_t)0 - 1;
889     return size;
890 }
891 
892 /* increase the size of the buffer in space */
grow_space(struct space * space)893 local void grow_space(struct space *space)
894 {
895     size_t more;
896 
897     /* compute next size up */
898     more = grow(space->size);
899     if (more == space->size)
900         bail("not enough memory", "");
901 
902     /* reallocate the buffer */
903     space->buf = (unsigned char*)realloc(space->buf, more);
904     if (space->buf == NULL)
905         bail("not enough memory", "");
906     space->size = more;
907 }
908 
909 /* increment the use count to require one more drop before returning this space
910    to the pool */
use_space(struct space * space)911 local void use_space(struct space *space)
912 {
913     possess(space->use);
914     twist(space->use, BY, +1);
915 }
916 
917 /* drop a space, returning it to the pool if the use count is zero */
drop_space(struct space * space)918 local void drop_space(struct space *space)
919 {
920     int use;
921     struct pool *pool;
922 
923     possess(space->use);
924     use = peek_lock(space->use);
925     assert(use != 0);
926     if (use == 1) {
927         pool = space->pool;
928         possess(pool->have);
929         space->next = pool->head;
930         pool->head = space;
931         twist(pool->have, BY, +1);
932     }
933     twist(space->use, BY, -1);
934 }
935 
936 /* free the memory and lock resources of a pool -- return number of spaces for
937    debugging and resource usage measurement */
free_pool(struct pool * pool)938 local int free_pool(struct pool *pool)
939 {
940     int count;
941     struct space *space;
942 
943     possess(pool->have);
944     count = 0;
945     while ((space = pool->head) != NULL) {
946         pool->head = space->next;
947         free(space->buf);
948         free_lock(space->use);
949         free(space);
950         count++;
951     }
952     assert(count == pool->made);
953     release(pool->have);
954     free_lock(pool->have);
955     return count;
956 }
957 
958 /* input and output buffer pools */
959 local struct pool in_pool;
960 local struct pool out_pool;
961 local struct pool dict_pool;
962 local struct pool lens_pool;
963 
964 /* -- parallel compression -- */
965 
966 /* compress or write job (passed from compress list to write list) -- if seq is
967    equal to -1, compress_thread is instructed to return; if more is false then
968    this is the last chunk, which after writing tells write_thread to return */
969 struct job {
970     long seq;                   /* sequence number */
971     int more;                   /* true if this is not the last chunk */
972     struct space *in;           /* input data to compress */
973     struct space *out;          /* dictionary or resulting compressed data */
974     struct space *lens;         /* coded list of flush block lengths */
975     unsigned long check;        /* check value for input data */
976     lock *calc;                 /* released when check calculation complete */
977     struct job *next;           /* next job in the list (either list) */
978 };
979 
980 /* list of compress jobs (with tail for appending to list) */
981 local lock *compress_have = NULL;   /* number of compress jobs waiting */
982 local struct job *compress_head, **compress_tail;
983 
984 /* list of write jobs */
985 local lock *write_first;            /* lowest sequence number in list */
986 local struct job *write_head;
987 
988 /* number of compression threads running */
989 local int cthreads = 0;
990 
991 /* write thread if running */
992 local thread *writeth = NULL;
993 
994 /* setup job lists (call from main thread) */
setup_jobs(void)995 local void setup_jobs(void)
996 {
997     /* set up only if not already set up*/
998     if (compress_have != NULL)
999         return;
1000 
1001     /* allocate locks and initialize lists */
1002     compress_have = new_lock(0);
1003     compress_head = NULL;
1004     compress_tail = &compress_head;
1005     write_first = new_lock(-1);
1006     write_head = NULL;
1007 
1008     /* initialize buffer pools (initial size for out_pool not critical, since
1009        buffers will be grown in size if needed -- initial size chosen to make
1010        this unlikely -- same for lens_pool) */
1011     new_pool(&in_pool, g.block, INBUFS(g.procs));
1012     new_pool(&out_pool, OUTPOOL(g.block), -1);
1013     new_pool(&dict_pool, DICT, -1);
1014     new_pool(&lens_pool, g.block >> (RSYNCBITS - 1), -1);
1015 }
1016 
1017 /* command the compress threads to all return, then join them all (call from
1018    main thread), free all the thread-related resources */
finish_jobs(void)1019 local void finish_jobs(void)
1020 {
1021     struct job job;
1022     int caught;
1023 
1024     /* only do this once */
1025     if (compress_have == NULL)
1026         return;
1027 
1028     /* command all of the extant compress threads to return */
1029     possess(compress_have);
1030     job.seq = -1;
1031     job.next = NULL;
1032     compress_head = &job;
1033     compress_tail = &(job.next);
1034     twist(compress_have, BY, +1);       /* will wake them all up */
1035 
1036     /* join all of the compress threads, verify they all came back */
1037     caught = join_all();
1038     assert(caught == cthreads);
1039     cthreads = 0;
1040 
1041     /* free the resources */
1042     caught = free_pool(&lens_pool);
1043     caught = free_pool(&dict_pool);
1044     caught = free_pool(&out_pool);
1045     caught = free_pool(&in_pool);
1046     free_lock(write_first);
1047     free_lock(compress_have);
1048     compress_have = NULL;
1049     close(g.outd);
1050     g.outd = -1;
1051 }
1052 
1053 /* compress all strm->avail_in bytes at strm->next_in to out->buf, updating
1054    out->len, grow the size of the buffer (out->size) if necessary -- respect
1055    the size limitations of the zlib stream data types (size_t may be larger
1056    than unsigned) */
deflate_engine(z_stream * strm,struct space * out,int flush)1057 local void deflate_engine(z_stream *strm, struct space *out, int flush)
1058 {
1059     size_t room;
1060 
1061     do {
1062         room = out->size - out->len;
1063         if (room == 0) {
1064             grow_space(out);
1065             room = out->size - out->len;
1066         }
1067         strm->next_out = out->buf + out->len;
1068         strm->avail_out = room < UINT_MAX ? (unsigned)room : UINT_MAX;
1069         (void)deflate(strm, flush);
1070         out->len = strm->next_out - out->buf;
1071     } while (strm->avail_out == 0);
1072     assert(strm->avail_in == 0);
1073 }
1074 
1075 /* get the next compression job from the head of the list, compress and compute
1076    the check value on the input, and put a job in the write list with the
1077    results -- keep looking for more jobs, returning when a job is found with a
1078    sequence number of -1 (leave that job in the list for other incarnations to
1079    find) */
compress_thread(void * dummy)1080 local void compress_thread(void *dummy)
1081 {
1082     struct job *job;                /* job pulled and working on */
1083     struct job *here, **prior;      /* pointers for inserting in write list */
1084     unsigned long check;            /* check value of input */
1085     unsigned char *next;            /* pointer for blocks, check value data */
1086     size_t left;                    /* input left to process */
1087     size_t len;                     /* remaining bytes to compress/check */
1088 #if ZLIB_VERNUM >= 0x1260
1089     int bits;                       /* deflate pending bits */
1090 #endif
1091     z_stream strm;                  /* deflate stream */
1092 
1093     (void)dummy;
1094 
1095     /* initialize the deflate stream for this thread */
1096     strm.zfree = Z_NULL;
1097     strm.zalloc = Z_NULL;
1098     strm.opaque = Z_NULL;
1099     if (deflateInit2(&strm, 6, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK)
1100         bail("not enough memory", "");
1101 
1102     /* keep looking for work */
1103     for (;;) {
1104         /* get a job (like I tell my son) */
1105         possess(compress_have);
1106         wait_for(compress_have, NOT_TO_BE, 0);
1107         job = compress_head;
1108         assert(job != NULL);
1109         if (job->seq == -1)
1110             break;
1111         compress_head = job->next;
1112         if (job->next == NULL)
1113             compress_tail = &compress_head;
1114         twist(compress_have, BY, -1);
1115 
1116         /* got a job -- initialize and set the compression level (note that if
1117            deflateParams() is called immediately after deflateReset(), there is
1118            no need to initialize the input/output for the stream) */
1119 	(void)deflateReset(&strm);
1120 	(void)deflateParams(&strm, g.level, Z_DEFAULT_STRATEGY);
1121 
1122         /* set dictionary if provided, release that input or dictionary buffer
1123            (not NULL if dict is true and if this is not the first work unit) */
1124         if (job->out != NULL) {
1125             len = job->out->len;
1126             left = len < DICT ? len : DICT;
1127 	    deflateSetDictionary(&strm, job->out->buf + (len - left),
1128 				 left);
1129             drop_space(job->out);
1130         }
1131 
1132         /* set up input and output */
1133         job->out = get_space(&out_pool);
1134 	strm.next_in = job->in->buf;
1135 	strm.next_out = job->out->buf;
1136 
1137         /* compress each block, either flushing or finishing */
1138         next = job->lens == NULL ? NULL : job->lens->buf;
1139         left = job->in->len;
1140         job->out->len = 0;
1141         do {
1142             /* decode next block length from blocks list */
1143             len = next == NULL ? 128 : *next++;
1144             if (len < 128)                          /* 64..32831 */
1145                 len = (len << 8) + (*next++) + 64;
1146             else if (len == 128)                    /* end of list */
1147                 len = left;
1148             else if (len < 192)                     /* 1..63 */
1149                 len &= 0x3f;
1150             else {                                  /* 32832..4227135 */
1151                 len = ((len & 0x3f) << 16) + (*next++ << 8) + 32832U;
1152                 len += *next++;
1153             }
1154             left -= len;
1155 
1156 	    /* run MAXP2-sized amounts of input through deflate -- this
1157 	       loop is needed for those cases where the unsigned type is
1158 	       smaller than the size_t type, or when len is close to the
1159 	       limit of the size_t type */
1160 	    while (len > MAXP2) {
1161 		strm.avail_in = MAXP2;
1162 		deflate_engine(&strm, job->out, Z_NO_FLUSH);
1163 		len -= MAXP2;
1164 	    }
1165 
1166 	    /* run the last piece through deflate -- end on a byte
1167 	       boundary, using a sync marker if necessary, or finish the
1168 	       deflate stream if this is the last block */
1169 	    strm.avail_in = (unsigned)len;
1170 	    if (left || job->more) {
1171 #if ZLIB_VERNUM >= 0x1260
1172 		deflate_engine(&strm, job->out, Z_BLOCK);
1173 
1174 		/* add enough empty blocks to get to a byte boundary */
1175 		(void)deflatePending(&strm, Z_NULL, &bits);
1176 		if (bits & 1)
1177 		    deflate_engine(&strm, job->out, Z_SYNC_FLUSH);
1178 		else if (bits & 7) {
1179 		    do {        /* add static empty blocks */
1180 			bits = deflatePrime(&strm, 10, 2);
1181 			assert(bits == Z_OK);
1182 			(void)deflatePending(&strm, Z_NULL, &bits);
1183 		    } while (bits & 7);
1184 		    deflate_engine(&strm, job->out, Z_BLOCK);
1185 		}
1186 #else
1187 		deflate_engine(&strm, job->out, Z_SYNC_FLUSH);
1188 #endif
1189 	    }
1190 	    else
1191 		deflate_engine(&strm, job->out, Z_FINISH);
1192         } while (left);
1193         if (job->lens != NULL) {
1194             drop_space(job->lens);
1195             job->lens = NULL;
1196         }
1197 
1198         /* reserve input buffer until check value has been calculated */
1199         use_space(job->in);
1200 
1201         /* insert write job in list in sorted order, alert write thread */
1202         possess(write_first);
1203         prior = &write_head;
1204         while ((here = *prior) != NULL) {
1205             if (here->seq > job->seq)
1206                 break;
1207             prior = &(here->next);
1208         }
1209         job->next = here;
1210         *prior = job;
1211         twist(write_first, TO, write_head->seq);
1212 
1213         /* calculate the check value in parallel with writing, alert the write
1214            thread that the calculation is complete, and drop this usage of the
1215            input buffer */
1216         len = job->in->len;
1217         next = job->in->buf;
1218         check = CHECK(0L, Z_NULL, 0);
1219         while (len > MAXP2) {
1220             check = CHECK(check, next, MAXP2);
1221             len -= MAXP2;
1222             next += MAXP2;
1223         }
1224         check = CHECK(check, next, (unsigned)len);
1225         drop_space(job->in);
1226         job->check = check;
1227         possess(job->calc);
1228         twist(job->calc, TO, 1);
1229 
1230         /* done with that one -- go find another job */
1231     }
1232 
1233     /* found job with seq == -1 -- free deflate memory and return to join */
1234     release(compress_have);
1235     (void)deflateEnd(&strm);
1236 }
1237 
1238 /* collect the write jobs off of the list in sequence order and write out the
1239    compressed data until the last chunk is written -- also write the header and
1240    trailer and combine the individual check values of the input buffers */
write_thread(void * dummy)1241 local void write_thread(void *dummy)
1242 {
1243     long seq;                       /* next sequence number looking for */
1244     struct job *job;                /* job pulled and working on */
1245     size_t len;                     /* input length */
1246     int more;                       /* true if more chunks to write */
1247     unsigned long head;             /* header length */
1248     unsigned long ulen;             /* total uncompressed size (overflow ok) */
1249     unsigned long clen;             /* total compressed size (overflow ok) */
1250     unsigned long check;            /* check value of uncompressed data */
1251 
1252     (void)dummy;
1253 
1254     /* build and write header */
1255     head = put_header();
1256 
1257     /* process output of compress threads until end of input */
1258     ulen = clen = 0;
1259     check = CHECK(0L, Z_NULL, 0);
1260     seq = 0;
1261     do {
1262         /* get next write job in order */
1263         possess(write_first);
1264         wait_for(write_first, TO_BE, seq);
1265         job = write_head;
1266         write_head = job->next;
1267         twist(write_first, TO, write_head == NULL ? -1 : write_head->seq);
1268 
1269         /* update lengths, save uncompressed length for COMB */
1270         more = job->more;
1271         len = job->in->len;
1272         drop_space(job->in);
1273         ulen += (unsigned long)len;
1274         clen += (unsigned long)(job->out->len);
1275 
1276         /* write the compressed data and drop the output buffer */
1277         writen(g.outd, job->out->buf, job->out->len);
1278         drop_space(job->out);
1279 
1280         /* wait for check calculation to complete, then combine, once
1281            the compress thread is done with the input, release it */
1282         possess(job->calc);
1283         wait_for(job->calc, TO_BE, 1);
1284         release(job->calc);
1285         check = COMB(check, job->check, len);
1286 
1287         /* free the job */
1288         free_lock(job->calc);
1289         free(job);
1290 
1291         /* get the next buffer in sequence */
1292         seq++;
1293     } while (more);
1294 
1295     /* write trailer */
1296     put_trailer(ulen, clen, check, head);
1297 
1298     /* verify no more jobs, prepare for next use */
1299     possess(compress_have);
1300     assert(compress_head == NULL && peek_lock(compress_have) == 0);
1301     release(compress_have);
1302     possess(write_first);
1303     assert(write_head == NULL);
1304     twist(write_first, TO, -1);
1305 }
1306 
1307 /* compress ind to outd, using multiple threads for the compression and check
1308    value calculations and one other thread for writing the output -- compress
1309    threads will be launched and left running (waiting actually) to support
1310    subsequent calls of parallel_compress() */
parallel_compress(char * out_fname,unsigned char * overflow_buf,uint32_t do_append,uint32_t (* emitn)(uint32_t,unsigned char *))1311 void parallel_compress(char* out_fname, unsigned char* overflow_buf, uint32_t do_append, uint32_t(* emitn)(uint32_t, unsigned char*))
1312 {
1313     // overflow_buf must have size >= PIGZ_BLOCK_SIZE + maximum emission
1314 
1315     // if overflow_ct is nonzero, this points to the first uncompressed
1316     // character in overflow_buf
1317     unsigned char* read_ptr = NULL;
1318 
1319     uint32_t overflow_ct;
1320     long seq;                       /* sequence number */
1321     struct space *curr;             /* input data to compress */
1322     struct space *next;             /* input data that follows curr */
1323     struct space *dict;             /* dictionary for next compression */
1324     struct job *job;                /* job for compress, then write */
1325 
1326     int more;                       /* true if more input to read */
1327     size_t len;                     /* for various length computations */
1328     uint32_t cur_len;
1329 
1330     g.outf = out_fname;
1331     g.outd = open(g.outf, O_WRONLY | (do_append? O_APPEND : (O_CREAT | O_TRUNC)), 0644);
1332 
1333     /* if first time or after an option change, setup the job lists */
1334     setup_jobs();
1335 
1336     /* start write thread */
1337     writeth = launch(write_thread, NULL);
1338 
1339     /* read from input and start compress threads (write thread will pick up
1340      the output of the compress threads) */
1341     seq = 0;
1342     next = get_space(&in_pool);
1343     cur_len = emitn(0, overflow_buf);
1344     if (cur_len > PIGZ_BLOCK_SIZE) {
1345         memcpy(next->buf, overflow_buf, PIGZ_BLOCK_SIZE);
1346 	next->len = PIGZ_BLOCK_SIZE;
1347 	read_ptr = &(overflow_buf[PIGZ_BLOCK_SIZE]);
1348     } else {
1349 	memcpy(next->buf, overflow_buf, cur_len);
1350         next->len = cur_len;
1351     }
1352     overflow_ct = cur_len - next->len;
1353 
1354     dict = NULL;
1355     do {
1356         /* create a new job */
1357         job = (struct job*)malloc(sizeof(struct job));
1358         if (job == NULL)
1359             bail("not enough memory", "");
1360         job->calc = new_lock(0);
1361 
1362         /* update input spaces */
1363         curr = next;
1364 
1365         /* get more input if we don't already have some */
1366 	next = get_space(&in_pool);
1367 	if (overflow_ct >= PIGZ_BLOCK_SIZE) {
1368 	    // no need to call emitn(), since we still have >= 128K of text
1369 	    // from the previous call to compress
1370 	    memcpy(next->buf, read_ptr, PIGZ_BLOCK_SIZE);
1371 	    next->len = PIGZ_BLOCK_SIZE;
1372 	    read_ptr = &(read_ptr[PIGZ_BLOCK_SIZE]);
1373 	    overflow_ct -= PIGZ_BLOCK_SIZE;
1374 	} else {
1375 	    if (overflow_ct) {
1376 	        memcpy(overflow_buf, read_ptr, overflow_ct);
1377 	    }
1378 	    cur_len = emitn(overflow_ct, overflow_buf);
1379 	    if (cur_len > PIGZ_BLOCK_SIZE) {
1380 	        memcpy(next->buf, overflow_buf, PIGZ_BLOCK_SIZE);
1381 		next->len = PIGZ_BLOCK_SIZE;
1382 		read_ptr = &(overflow_buf[PIGZ_BLOCK_SIZE]);
1383 	    } else {
1384 	        memcpy(next->buf, overflow_buf, cur_len);
1385 		next->len = cur_len;
1386 	    }
1387 	    overflow_ct = cur_len - next->len;
1388 	}
1389 
1390         /* if rsyncable, generate block lengths and prepare curr for job to
1391            likely have less than size bytes (up to the last hash hit) */
1392         job->lens = NULL;
1393 
1394         /* compress curr->buf to curr->len -- compress thread will drop curr */
1395         job->in = curr;
1396 
1397         /* set job->more if there is more to compress after curr */
1398         more = next->len != 0;
1399         job->more = more;
1400 
1401         /* provide dictionary for this job, prepare dictionary for next job */
1402         job->out = dict;
1403         if (more) {
1404             if (curr->len >= DICT || job->out == NULL) {
1405                 dict = curr;
1406                 use_space(dict);
1407             }
1408             else {
1409                 dict = get_space(&dict_pool);
1410                 len = DICT - curr->len;
1411                 memcpy(dict->buf, job->out->buf + (job->out->len - len), len);
1412                 memcpy(dict->buf + len, curr->buf, curr->len);
1413                 dict->len = DICT;
1414             }
1415         }
1416 
1417         /* preparation of job is complete */
1418         job->seq = seq;
1419         if (++seq < 1)
1420             bail("input too long: ", "");
1421 
1422         /* start another compress thread if needed */
1423         if (cthreads < seq && cthreads < g.procs) {
1424             (void)launch(compress_thread, NULL);
1425             cthreads++;
1426         }
1427 
1428         /* put job at end of compress list, let all the compressors know */
1429         possess(compress_have);
1430         job->next = NULL;
1431         *compress_tail = job;
1432         compress_tail = &(job->next);
1433         twist(compress_have, BY, +1);
1434     } while (more);
1435     drop_space(next);
1436 
1437     /* wait for the write thread to complete (we leave the compress threads out
1438        there and waiting in case there is another stream to compress) */
1439     join(writeth);
1440     writeth = NULL;
1441     finish_jobs();
1442 }
1443 
1444 
1445 // about time to implement this without the awkward callback interface...
pzwrite_init(char * out_fname,unsigned char * overflow_buf,uint32_t do_append,Pigz_state * ps_ptr)1446 int32_t pzwrite_init(char* out_fname, unsigned char* overflow_buf, uint32_t do_append, Pigz_state* ps_ptr) {
1447     // unbuffered, and doesn't need to support Windows
1448     ps_ptr->outd = open(out_fname, O_WRONLY | (do_append? O_APPEND : (O_CREAT | O_TRUNC)), 0644);
1449     if (ps_ptr->outd == -1) {
1450         putc_unlocked('\n', stdout);
1451 	fflush(stdout);
1452         fprintf(stderr, "Error: Failed to open %s.\n", out_fname);
1453         return 2; // RET_OPEN_FAIL
1454     }
1455     ps_ptr->overflow_buf = overflow_buf;
1456     return 0;
1457 }
1458 
compressed_pzwrite_init(char * out_fname,unsigned char * overflow_buf,uint32_t do_append,Pigz_state * ps_ptr)1459 void compressed_pzwrite_init(char* out_fname, unsigned char* overflow_buf, uint32_t do_append, Pigz_state* ps_ptr) {
1460     ps_ptr->outd = -1;
1461     g.outf = out_fname;
1462     g.outd = open(g.outf, O_WRONLY | (do_append? O_APPEND : (O_CREAT | O_TRUNC)), 0644);
1463 
1464     /* if first time or after an option change, setup the job lists */
1465     setup_jobs();
1466 
1467     /* start write thread */
1468     writeth = launch(write_thread, NULL);
1469 
1470     ps_ptr->overflow_buf = overflow_buf;
1471     ps_ptr->next = NULL;
1472 }
1473 
flex_pzwrite_init(uint32_t output_gz,char * out_fname,unsigned char * overflow_buf,uint32_t do_append,Pigz_state * ps_ptr)1474 int32_t flex_pzwrite_init(uint32_t output_gz, char* out_fname, unsigned char* overflow_buf, uint32_t do_append, Pigz_state* ps_ptr) {
1475     if (!output_gz) {
1476         return pzwrite_init(out_fname, overflow_buf, do_append, ps_ptr);
1477     } else {
1478         compressed_pzwrite_init(out_fname, overflow_buf, do_append, ps_ptr);
1479         return 0;
1480     }
1481 }
1482 
force_pzwrite(Pigz_state * ps_ptr,char ** writep_ptr,uint32_t write_min)1483 int32_t force_pzwrite(Pigz_state* ps_ptr, char** writep_ptr, uint32_t write_min) {
1484     unsigned char* writep = (unsigned char*)(*writep_ptr);
1485     unsigned char* buf = ps_ptr->overflow_buf;
1486     uint32_t len = (uintptr_t)(writep - buf);
1487     ssize_t ret;
1488     while (len) {
1489         ret = write(ps_ptr->outd, ps_ptr->overflow_buf, len);
1490 	if (ret < 1) {
1491 	    return 6; // RET_WRITE_FAIL
1492 	}
1493         buf += ret;
1494         len -= ret;
1495     }
1496     *writep_ptr = (char*)(ps_ptr->overflow_buf);
1497     return 0;
1498 }
1499 
force_compressed_pzwrite(Pigz_state * ps_ptr,char ** writep_ptr,uint32_t write_min)1500 void force_compressed_pzwrite(Pigz_state* ps_ptr, char** writep_ptr, uint32_t write_min) {
1501     // Caller must not request a length-0 write until it's time to close the
1502     // file.
1503     unsigned char* writep = (unsigned char*)(*writep_ptr);
1504     unsigned char* readp = ps_ptr->overflow_buf;
1505     uint32_t cur_len = (uintptr_t)(writep - readp);
1506 
1507     struct space* curr;             /* input data to compress */
1508     struct job *job;                /* job for compress, then write */
1509 
1510     int more;                       /* true if more input to read */
1511     size_t len;                     /* for various length computations */
1512     if (!ps_ptr->next) {
1513         ps_ptr->seq = 0;
1514         ps_ptr->next = get_space(&in_pool);
1515         if (cur_len > PIGZ_BLOCK_SIZE) {
1516 	    memcpy(ps_ptr->next->buf, readp, PIGZ_BLOCK_SIZE);
1517             ps_ptr->next->len = PIGZ_BLOCK_SIZE;
1518             readp = &(readp[PIGZ_BLOCK_SIZE]);
1519 	    cur_len -= PIGZ_BLOCK_SIZE;
1520 	} else {
1521 	    memcpy(ps_ptr->next->buf, readp, cur_len);
1522             ps_ptr->next->len = cur_len;
1523 	    readp = writep;
1524 	    cur_len = 0;
1525 	}
1526         ps_ptr->dict = NULL;
1527 	if ((cur_len <= PIGZ_BLOCK_SIZE) && write_min) {
1528 	    // need more input to handle dict properly
1529 	    if (cur_len) {
1530 		memcpy(ps_ptr->overflow_buf, readp, cur_len);
1531 	    }
1532 	    *writep_ptr = (char*)(&(ps_ptr->overflow_buf[cur_len]));
1533 	    return;
1534 	}
1535     }
1536 
1537     do {
1538 	// create a new job
1539 	job = (struct job*)malloc(sizeof(struct job));
1540 	if (job == NULL) {
1541 	    bail("not enough memory", "");
1542 	}
1543 	job->calc = new_lock(0);
1544         curr = ps_ptr->next;
1545 	ps_ptr->next = get_space(&in_pool);
1546         if (cur_len > PIGZ_BLOCK_SIZE) {
1547 	    memcpy(ps_ptr->next->buf, readp, PIGZ_BLOCK_SIZE);
1548 	    ps_ptr->next->len = PIGZ_BLOCK_SIZE;
1549             readp = &(readp[PIGZ_BLOCK_SIZE]);
1550 	} else {
1551 	    memcpy(ps_ptr->next->buf, readp, cur_len);
1552 	    ps_ptr->next->len = cur_len;
1553 	    readp = writep;
1554 	}
1555 	job->lens = NULL;
1556 	job->in = curr;
1557 	more = (cur_len != 0);
1558 	job->more = more;
1559         job->out = ps_ptr->dict;
1560 	if (more) {
1561 	    if (curr->len >= DICT || job->out == NULL) {
1562 	        ps_ptr->dict = curr;
1563 	        use_space(ps_ptr->dict);
1564 	    } else {
1565 	        ps_ptr->dict = get_space(&dict_pool);
1566                 len = DICT - curr->len;
1567                 memcpy(ps_ptr->dict->buf, job->out->buf + (job->out->len - len), len);
1568                 memcpy(ps_ptr->dict->buf + len, curr->buf, curr->len);
1569                 ps_ptr->dict->len = DICT;
1570 	    }
1571 	}
1572 	job->seq = ps_ptr->seq;
1573         if (++(ps_ptr->seq) < 1) {
1574 	    bail("input too long: ", "");
1575 	}
1576 	if (cthreads < ps_ptr->seq && cthreads < g.procs) {
1577 	    (void)launch(compress_thread, NULL);
1578             cthreads++;
1579 	}
1580         possess(compress_have);
1581         job->next = NULL;
1582         *compress_tail = job;
1583         compress_tail = &(job->next);
1584 	twist(compress_have, BY, +1);
1585 	cur_len = (uintptr_t)(writep - readp);
1586     } while ((cur_len >= write_min) && more);
1587     if (cur_len) {
1588         memcpy(ps_ptr->overflow_buf, readp, cur_len);
1589     }
1590     *writep_ptr = (char*)(&(ps_ptr->overflow_buf[cur_len]));
1591 }
1592 
flex_pzputs_std(Pigz_state * ps_ptr,char ** writep_ptr,char * ss,uint32_t sslen)1593 int32_t flex_pzputs_std(Pigz_state* ps_ptr, char** writep_ptr, char* ss, uint32_t sslen) {
1594     unsigned char* writep = (unsigned char*)(*writep_ptr);
1595     unsigned char* readp = (unsigned char*)ss;
1596     uint32_t cur_write_pos = (uintptr_t)(writep - ps_ptr->overflow_buf);
1597     uint32_t delta;
1598     int32_t ii;
1599     while ((sslen + cur_write_pos) > PIGZ_BLOCK_SIZE) {
1600         if (cur_write_pos <= PIGZ_BLOCK_SIZE) {
1601 	    delta = PIGZ_BLOCK_SIZE + 1 - cur_write_pos;
1602 	    memcpy(writep, readp, delta);
1603 	    writep = &(writep[delta]);
1604 	    readp = &(readp[delta]);
1605             sslen -= delta;
1606         }
1607 	if (is_uncompressed_pzwrite(ps_ptr)) {
1608 	    ii = force_pzwrite(ps_ptr, (char**)(&writep), PIGZ_BLOCK_SIZE + 1);
1609 	    if (ii) {
1610 	        return ii;
1611 	    }
1612 	} else {
1613 	    force_compressed_pzwrite(ps_ptr, (char**)(&writep), PIGZ_BLOCK_SIZE + 1);
1614 	}
1615         cur_write_pos = (uintptr_t)(writep - ps_ptr->overflow_buf);
1616     }
1617     memcpy(writep, readp, sslen);
1618     *writep_ptr = (char*)(&(writep[sslen]));
1619     return flex_pzwrite(ps_ptr, writep_ptr);
1620 }
1621 
pzwrite_close_null(Pigz_state * ps_ptr,char * writep)1622 int32_t pzwrite_close_null(Pigz_state* ps_ptr, char* writep) {
1623     int32_t ii = force_pzwrite(ps_ptr, &writep, 0);
1624     int32_t jj = close(ps_ptr->outd);
1625     ps_ptr->overflow_buf = NULL;
1626     return ii || jj;
1627 }
1628 
compressed_pzwrite_close_null(Pigz_state * ps_ptr,char * writep)1629 void compressed_pzwrite_close_null(Pigz_state* ps_ptr, char* writep) {
1630     force_compressed_pzwrite(ps_ptr, &writep, 0);
1631     drop_space(ps_ptr->next);
1632     /* wait for the write thread to complete (we leave the compress threads out
1633        there and waiting in case there is another stream to compress) */
1634     join(writeth);
1635     writeth = NULL;
1636     finish_jobs();
1637     ps_ptr->overflow_buf = NULL;
1638 }
1639 
flex_pzwrite_close_null(Pigz_state * ps_ptr,char * writep)1640 int32_t flex_pzwrite_close_null(Pigz_state* ps_ptr, char* writep) {
1641     if (is_uncompressed_pzwrite(ps_ptr)) {
1642         return pzwrite_close_null(ps_ptr, writep);
1643     } else {
1644         compressed_pzwrite_close_null(ps_ptr, writep);
1645         return 0;
1646     }
1647 }
1648 #endif
1649 
1650 /* catch termination signal */
cut_short(int sig)1651 local void cut_short(int sig)
1652 {
1653     (void)sig;
1654     if (g.outd != -1 && g.outf != NULL)
1655         unlink(g.outf);
1656     putc_unlocked('\n', stdout);
1657     _exit(1);
1658 }
1659 
1660 /* set option defaults */
pigz_init(uint32_t setprocs)1661 void pigz_init(uint32_t setprocs)
1662 {
1663     signal(SIGINT, cut_short);
1664     g.level = Z_DEFAULT_COMPRESSION;
1665 #ifdef NOTHREAD
1666     g.procs = 1;
1667 #else
1668     g.procs = setprocs;
1669     // 1023 threads here failed on the NIH test machine
1670     if (g.procs > 127) {
1671       g.procs = 127;
1672     }
1673 #endif
1674     yarn_prefix = g.prog;
1675     yarn_abort = cut_short;
1676     g.block = PIGZ_BLOCK_SIZE;            /* 128K */
1677     g.verbosity = 1;                /* normal message level */
1678 }
1679 #endif // _WIN32
1680 
1681 // provide identical interface for uncompressed writing, to simplify code that
1682 // can generate either compressed or uncompressed output
write_uncompressed(char * out_fname,unsigned char * overflow_buf,uint32_t do_append,uint32_t (* emitn)(uint32_t,unsigned char *))1683 int32_t write_uncompressed(char* out_fname, unsigned char* overflow_buf, uint32_t do_append, uint32_t(* emitn)(uint32_t, unsigned char*)) {
1684   uint32_t overflow_ct = 0;
1685   // if it's potentially worth compressing, it should be text, hence mode "w"
1686   // instead of "wb"
1687   // (er, that actually does the wrong thing on Windows.  Fixed in pzwrite.)
1688   FILE* outfile = fopen(out_fname, do_append? "a" : "w");
1689   unsigned char* write_ptr;
1690   uint32_t last_size;
1691   if (!outfile) {
1692     putc_unlocked('\n', stdout);
1693     fflush(stdout);
1694     fprintf(stderr, "Error: Failed to open %s.\n", out_fname);
1695     return 2; // RET_OPEN_FAIL
1696   }
1697   do {
1698     last_size = emitn(overflow_ct, overflow_buf);
1699     if (last_size > PIGZ_BLOCK_SIZE) {
1700       overflow_ct = last_size - PIGZ_BLOCK_SIZE;
1701       last_size = PIGZ_BLOCK_SIZE;
1702     } else {
1703       overflow_ct = 0;
1704     }
1705     if (last_size) {
1706       if (!fwrite(overflow_buf, last_size, 1, outfile)) {
1707 	putc_unlocked('\n', stdout);
1708 	fflush(stdout);
1709 	fputs("Error: File write failure.\n", stderr);
1710 	fclose(outfile);
1711 	return 6; // RET_WRITE_FAIL
1712       }
1713     }
1714     if (overflow_ct) {
1715       write_ptr = &(overflow_buf[PIGZ_BLOCK_SIZE]);
1716       while (overflow_ct > PIGZ_BLOCK_SIZE) {
1717 	if (!fwrite(write_ptr, PIGZ_BLOCK_SIZE, 1, outfile)) {
1718 	  putc_unlocked('\n', stdout);
1719 	  fflush(stdout);
1720 	  fputs("Error: File write failure.\n", stderr);
1721 	  fclose(outfile);
1722 	  return 6;
1723 	}
1724 	write_ptr = &(write_ptr[PIGZ_BLOCK_SIZE]);
1725 	overflow_ct -= PIGZ_BLOCK_SIZE;
1726       }
1727       memcpy(overflow_buf, write_ptr, overflow_ct);
1728     }
1729   } while (last_size);
1730   if (fclose(outfile)) {
1731     putc_unlocked('\n', stdout);
1732     fflush(stdout);
1733     fputs("Error: File write failure.\n", stderr);
1734     return 6;
1735   }
1736   return 0;
1737 }
1738