1 /*
2  *  File  : pbzip2.cpp
3  *
4  *  Title : Parallel BZIP2 (pbzip2)
5  *
6  *  Author: Jeff Gilchrist (http://gilchrist.ca/jeff/)
7  *           - Modified producer/consumer threading code from
8  *             Andrae Muys <andrae@humbug.org.au.au>
9  *           - uses libbzip2 by Julian Seward (http://sources.redhat.com/bzip2/)
10  *           - Major contributions by Yavor Nikolov (http://javornikolov.wordpress.com)
11  */
12 
13 #include "pbzip2.h"
14 #include "BZ2StreamScanner.h"
15 #include "ErrorContext.h"
16 
17 #include <vector>
18 #include <algorithm>
19 #include <string>
20 #include <new>
21 
22 extern "C"
23 {
24 #include <sys/stat.h>
25 #include <errno.h>
26 #include <fcntl.h>
27 #include <signal.h>
28 #include <stdio.h>
29 #include <stdarg.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <time.h>
33 #include <bzlib.h>
34 #include <limits.h>
35 }
36 
37 
38 //
39 // GLOBALS
40 //
41 static int producerDone = 0;
42 static int terminateFlag = 0; // Abnormal premature termination
43 static int finishedFlag = 0; // Main thread work finished (about to exit)
44 static int unfinishedWorkCleaned = 0;
45 static int numCPU = 2;
46 static int IgnoreTrailingGarbageFlag = 0; // ingnore trailing garbage on decompress flag
47 static int SIG_HANDLER_QUIT_SIGNAL = SIGUSR1; // signal used to stop SignalHandlerThread
48 #ifdef USE_STACKSIZE_CUSTOMIZATION
49 static int ChildThreadStackSize = 0; // -1 - don't modify stacksize; 0 - use minimum; > 0 - use specified
50 #ifndef PTHREAD_STACK_MIN
51 	#define PTHREAD_STACK_MIN 4096
52 #endif
53 #endif // USE_STACKSIZE_CUSTOMIZATION
54 static unsigned char Bz2HeaderZero[] = {
55 	0x42, 0x5A, 0x68, 0x39, 0x17, 0x72, 0x45, 0x38, 0x50, 0x90, 0x00, 0x00, 0x00, 0x00 };
56 static OFF_T InFileSize;
57 static int NumBlocks = 0;
58 static int NumBlocksEstimated = 0;
59 static int NumBufferedBlocks = 0;
60 static size_t NumBufferedTailBlocks = 0;
61 static size_t NumBufferedBlocksMax = 0;
62 static int NextBlockToWrite;
63 static int LastGoodBlock; // set only to terminate write prematurely (ignoring garbage)
64 static int MinErrorBlock; // lowest so far block number which has errors (on decompress; could be trailing garbage)
65 static size_t OutBufferPosToWrite; // = 0; // position in output buffer
66 static int Verbosity = 0;
67 static int QuietMode = 1;
68 static int OutputStdOut = 0;
69 static int ForceOverwrite = 0;
70 static int BWTblockSize = 9;
71 static int FileListCount = 0;
72 static std::vector <outBuff> OutputBuffer;
73 static queue *FifoQueue; // fifo queue (global var used on termination cleanup)
74 static pthread_mutex_t *OutMutex = NULL;
75 static pthread_mutex_t *ProducerDoneMutex = NULL;
76 static pthread_mutex_t ErrorHandlerMutex = PTHREAD_MUTEX_INITIALIZER;
77 static pthread_mutex_t TerminateFlagMutex = PTHREAD_MUTEX_INITIALIZER;
78 static pthread_mutex_t ProgressIndicatorsMutex = PTHREAD_MUTEX_INITIALIZER;
79 static pthread_cond_t *notTooMuchNumBuffered;
80 static pthread_cond_t TerminateCond = PTHREAD_COND_INITIALIZER;
81 static pthread_cond_t OutBufferHeadNotEmpty = PTHREAD_COND_INITIALIZER;
82 static pthread_cond_t ErrStateChangeCond = PTHREAD_COND_INITIALIZER;
83 static pthread_attr_t ChildThreadAttributes;
84 static struct stat fileMetaData;
85 static const char *sigInFilename = NULL;
86 static const char *sigOutFilename = NULL;
87 static char BWTblockSizeChar = '9';
88 static sigset_t SignalMask;
89 static pthread_t SignalHandlerThread;
90 static pthread_t TerminatorThread;
91 
92 inline int syncGetProducerDone();
93 inline void syncSetProducerDone(int newValue);
94 inline int syncGetTerminateFlag();
95 inline void syncSetTerminateFlag(int newValue);
96 inline void syncSetFinishedFlag(int newValue);
97 inline void syncSetLastGoodBlock(int newValue, int errBlock);
98 inline int syncGetLastGoodBlock();
99 void cleanupUnfinishedWork();
100 void cleanupAndQuit(int exitCode);
101 int initSignalMask();
102 int setupSignalHandling();
103 int setupTerminator();
104 
105 inline void safe_mutex_lock(pthread_mutex_t *mutex);
106 inline void safe_mutex_unlock(pthread_mutex_t *mutex);
107 inline void safe_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
108 inline void safe_cond_signal(pthread_cond_t *cond);
109 int safe_cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mutex, int seconds, const char *caller = "safe_cond_timed_wait");
110 
111 template <typename FI1, typename FI2>
112 FI1 memstr(FI1 searchBuf, int searchBufSize, FI2 searchString, int searchStringSize);
113 int producer_decompress(int, OFF_T, queue *);
114 int directcompress(int, OFF_T, int, const char *);
115 int directdecompress(const char *, const char *);
116 int producer(int hInfile, int blockSize, queue *fifo);
117 int mutexesInit();
118 void mutexesDelete();
119 queue *queueInit(int);
120 void queueDelete(queue *);
121 void outputBufferInit(size_t size);
122 outBuff * outputBufferAdd(const outBuff & element, const char *caller);
123 outBuff * outputBufferSeqAddNext(outBuff * preveElement, outBuff * newElement);
124 inline size_t getOutputBufferPos(int blockNum);
125 int getFileMetaData(const char *);
126 int writeFileMetaData(const char *);
127 int testBZ2ErrorHandling(int, BZFILE *, int);
128 int testCompressedData(char *);
129 ssize_t bufread(int hf, char *buf, size_t bsize);
130 int detectCPUs(void);
131 
132 inline bool isIgnoredTrailingGarbage();
133 int waitForPreviousBlock(int blockNumToWait, int errBlockNumber);
134 inline int getLastGoodBlockBeforeErr(int errBlockNumber, int outSequenceNumber);
135 inline int issueDecompressError(int bzret, const outBuff * fileData,
136 	int outSequenceNumber, const bz_stream & strm, const char * errmsg,
137 	int exitCode);
138 int decompressErrCheckSingle(int bzret, const outBuff * fileData,
139 	int outSequenceNumber, const bz_stream & strm, const char * errmsg,
140 	bool isTrailingGarbageErr);
141 int decompressErrCheck(int bzret, const outBuff * fileData,
142 	int outSequenceNumber, const bz_stream & strm);
143 inline bool hasTrailingGarbage(int bzret, const outBuff * fileData,
144 	const bz_stream & strm);
145 int producerDecompressCheckInterrupt(int hInfile, outBuff *& fileData, int lastBlock);
146 
147 using pbzip2::ErrorContext;
148 
149 /*
150  * Pointers to functions used by plain C pthreads API require C calling
151  * conventions.
152  */
153 extern "C"
154 {
155 void* signalHandlerProc(void* arg);
156 void* terminatorThreadProc(void* arg);
157 void *consumer_decompress(void *);
158 void *fileWriter(void *);
159 void *consumer(void *);
160 }
161 
162 /*
163  * Lock mutex or exit application immediately on error.
164  */
safe_mutex_lock(pthread_mutex_t * mutex)165 inline void safe_mutex_lock(pthread_mutex_t *mutex)
166 {
167 	int ret = pthread_mutex_lock(mutex);
168 	if (ret != 0)
169 	{
170 		fprintf(stderr, "pthread_mutex_lock error [%d]! Aborting immediately!\n", ret);
171 		cleanupAndQuit(-5);
172 	}
173 }
174 
175 /*
176  * Unlock mutex or exit application immediately on error.
177  */
safe_mutex_unlock(pthread_mutex_t * mutex)178 inline void safe_mutex_unlock(pthread_mutex_t *mutex)
179 {
180 	int ret = pthread_mutex_unlock(mutex);
181 	if (ret != 0)
182 	{
183 		fprintf(stderr, "pthread_mutex_unlock error [%d]! Aborting immediately!\n", ret);
184 		cleanupAndQuit(-6);
185 	}
186 }
187 
188 /*
189  * Call pthread_cond_signal - check return code and exit application immediately
190  * on error.
191  */
safe_cond_signal(pthread_cond_t * cond)192 inline void safe_cond_signal(pthread_cond_t *cond)
193 {
194 	int ret = pthread_cond_signal(cond);
195 	if (ret != 0)
196 	{
197 		fprintf(stderr, "pthread_cond_signal error [%d]! Aborting immediately!\n", ret);
198 		cleanupAndQuit(-7);
199 	}
200 }
201 
202 /*
203  * Call pthread_cond_signal - check return code and exit application immediately
204  * on error.
205  */
safe_cond_broadcast(pthread_cond_t * cond)206 inline void safe_cond_broadcast(pthread_cond_t *cond)
207 {
208 	int ret = pthread_cond_broadcast(cond);
209 	if (ret != 0)
210 	{
211 		fprintf(stderr, "pthread_cond_broadcast error [%d]! Aborting immediately!\n", ret);
212 		cleanupAndQuit(-7);
213 	}
214 }
215 
216 /*
217  * Unlock mutex or exit application immediately on error.
218  */
safe_cond_wait(pthread_cond_t * cond,pthread_mutex_t * mutex)219 inline void safe_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
220 {
221 	int ret = pthread_cond_wait(cond, mutex);
222 	if (ret != 0)
223 	{
224 		fprintf(stderr, "pthread_cond_wait error [%d]! Aborting immediately!\n", ret);
225 		pthread_mutex_unlock(mutex);
226 		cleanupAndQuit(-8);
227 	}
228 }
229 
230 /*
231  * Delegate to pthread_cond_timedwait. Check for errors and abort if
232  * any encountered. Return 0 on success and non-zero code on error
233  */
safe_cond_timed_wait(pthread_cond_t * cond,pthread_mutex_t * mutex,int seconds,const char * caller)234 int safe_cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mutex, int seconds, const char *caller)
235 {
236 	struct timespec waitTimer;
237 	#ifndef WIN32
238 	struct timeval tv;
239 	struct timezone tz;
240 	#else
241 	SYSTEMTIME systemtime;
242 	LARGE_INTEGER filetime;
243 	#endif
244 
245 	#ifndef WIN32
246 	gettimeofday(&tv, &tz);
247 	waitTimer.tv_sec = tv.tv_sec + seconds;
248 	waitTimer.tv_nsec = tv.tv_usec * 1000;
249 	#else
250 	GetSystemTime(&systemtime);
251 	SystemTimeToFileTime(&systemtime, (FILETIME *)&filetime);
252 	waitTimer.tv_sec = filetime.QuadPart / 10000000;
253 	waitTimer.tv_nsec = filetime.QuadPart - ((LONGLONG)waitTimer.tv_sec * 10000000) * 10;
254 	waitTimer.tv_sec += seconds;
255 	#endif
256 	#ifdef PBZIP_DEBUG
257 	fprintf(stderr, "%s:  waitTimer.tv_sec: %" PRIiMAX "  waitTimer.tv_nsec: %" PRIiMAX "\n", caller,
258 		(intmax_t)waitTimer.tv_sec, (intmax_t)waitTimer.tv_nsec);
259 	#endif
260 	int pret = pthread_cond_timedwait(cond, mutex, &waitTimer);
261 	// we are not using a compatible pthreads library so abort
262 	if ((pret != 0) && (pret != EINTR) && (pret != EBUSY) && (pret != ETIMEDOUT))
263 	{
264 		ErrorContext::getInstance()->saveError();
265 		pthread_mutex_unlock(mutex);
266 		handle_error(EF_EXIT, 1,
267 				"pbzip2: *ERROR: %s:  pthread_cond_timedwait() call invalid [pret=%d].  This machine\n"
268 				"         does not have compatible pthreads library.  Aborting.\n", caller, pret);
269 
270 		cleanupAndQuit(-9);
271 	}
272 	#ifdef PBZIP_DEBUG
273 	else if (pret != 0)
274 	{
275 		fprintf(stderr, "%s: pthread_cond_timedwait returned with non-fatal error [%d]\n", caller, pret);
276 	}
277 	#endif // PBZIP_DEBUG
278 
279 	return 0;
280 }
281 
282 /*
283  * Delegate to write but keep writing until count bytes are written or
284  * error is encountered (on success all count bytes would be written)
285  */
do_write(int fd,const void * buf,size_t count)286 ssize_t do_write(int fd, const void *buf, size_t count)
287 {
288 	ssize_t bytesRemaining = count;
289 	ssize_t nbytes = 0;
290 	const char *pbuf = (const char *)buf;
291 	while ((bytesRemaining > 0) && ((nbytes = write(fd, pbuf, bytesRemaining)) > 0))
292 	{
293 		bytesRemaining -= nbytes;
294 		pbuf += nbytes;
295 	}
296 
297 	if (nbytes < 0)
298 	{
299 		ErrorContext::getInstance()->saveError();
300 		return nbytes;
301 	}
302 
303 	return (count - bytesRemaining);
304 }
305 
306 /*
307  * Delegate to read but keep writing until count bytes are read or
308  * error is encountered (on success all count bytes would be read)
309  */
do_read(int fd,void * buf,size_t count)310 ssize_t do_read(int fd, void *buf, size_t count)
311 {
312 	ssize_t bytesRemaining = count;
313 	ssize_t nbytes = 0;
314 	char *pbuf = (char *)buf;
315 	while ((bytesRemaining > 0) && (nbytes = read(fd, pbuf, bytesRemaining)) > 0)
316 	{
317 		bytesRemaining -= nbytes;
318 		pbuf += nbytes;
319 	}
320 
321 	if (nbytes < 0)
322 	{
323 		ErrorContext::getInstance()->saveError();
324 		return nbytes;
325 	}
326 
327 	return (count - bytesRemaining);
328 }
329 
330 /*
331  * Open output file with least required privileges
332  */
safe_open_output(const char * path)333 int safe_open_output(const char *path)
334 {
335 	int ret = open(path, O_WRONLY | O_CREAT | O_EXCL | O_BINARY, FILE_MODE);
336 	if (ret == -1)
337 	{
338 		ErrorContext::getInstance()->saveError();
339 	}
340 
341 	return ret;
342 }
343 
344 /*
345  * Based on bzip2.c code
346  */
safe_fopen_output(const char * path,const char * mode)347 FILE *safe_fopen_output(const char *path, const char *mode)
348 {
349 	int fh = safe_open_output(path);
350 	if (fh == -1)
351 	{
352 		return NULL;
353 	}
354 
355 	FILE *fp = fdopen(fh, mode);
356 	if (fp == NULL)
357 	{
358 		ErrorContext::getInstance()->saveError();
359 		close(fh);
360 	}
361 
362 	return fp;
363 }
364 
365 /**
366  * Save the given file and save errno on failure.
367  *
368  * @param fd file to close
369  * @return -1 on failure or 0 on success.
370  */
do_close(int fd)371 inline int do_close(int fd)
372 {
373 	int ret = close(fd);
374 	if (ret == -1)
375 	{
376 		ErrorContext::getInstance()->saveError();
377 	}
378 
379 	return ret;
380 }
381 
do_fclose(FILE * file)382 inline int do_fclose(FILE *file)
383 {
384 	int ret = fclose(file);
385 	if ( ret == EOF )
386 	{
387 		ErrorContext::getInstance()->saveError();
388 	}
389 
390 	return ret;
391 }
392 
do_fflush(FILE * file)393 inline int do_fflush(FILE *file)
394 {
395 	int ret = fflush(file);
396 	if ( ret == EOF )
397 	{
398 		ErrorContext::getInstance()->saveError();
399 	}
400 
401 	return ret;
402 }
403 
404 /**
405  * Close the given file. In case of error - save errno and print error message.
406  *
407  * @param file file to close
408  * @param fileName name of file to print in case of failure
409  * @return fclose return code
410  */
verbose_fclose(FILE * file,const char * fileName)411 inline int verbose_fclose(FILE *file, const char *fileName)
412 {
413 	int ret;
414 	if ( (ret = fclose(file)) == EOF )
415 	{
416 		ErrorContext::syncPrintErrnoMsg(stderr, errno);
417 		fprintf(stderr, "pbzip2: *ERROR: Failed to close file [%s]!\n", fileName);
418 	}
419 
420 	return ret;
421 }
422 
do_remove(const char * pathname)423 int do_remove(const char* pathname)
424 {
425 	int ret = remove(pathname);
426 	if (ret == -1)
427 	{
428 		ErrorContext::getInstance()->saveError();
429 	}
430 
431 	return ret;
432 }
433 
434 /**
435  * Check if a given file exists.
436  *
437  * @return true if file exists and false if it doesn't
438  */
check_file_exists(const char * filename)439 bool check_file_exists( const char * filename )
440 {
441 	int hOutfile = open( filename, O_RDONLY | O_BINARY );
442 
443 	if ( hOutfile == -1 )
444 	{
445 		ErrorContext::getInstance()->saveError();
446 		return false;
447 	}
448 	else
449 	{
450 		close( hOutfile );
451 		return true;
452 	}
453 }
454 
455 /*
456  *********************************************************
457 	Atomically get producerDone value.
458 */
syncGetProducerDone()459 inline int syncGetProducerDone()
460 {
461 	int ret;
462 	safe_mutex_lock(ProducerDoneMutex);
463 	ret = producerDone;
464 	safe_mutex_unlock(ProducerDoneMutex);
465 
466 	return ret;
467 }
468 
469 /*
470  *********************************************************
471 	Atomically set producerDone value.
472 */
syncSetProducerDone(int newValue)473 inline void syncSetProducerDone(int newValue)
474 {
475 	safe_mutex_lock(ProducerDoneMutex);
476 	producerDone = newValue;
477 	safe_mutex_unlock(ProducerDoneMutex);
478 }
479 
480 /*
481  * Atomic get terminateFlag
482  */
syncGetTerminateFlag()483 inline int syncGetTerminateFlag()
484 {
485 	int ret;
486 	safe_mutex_lock(&TerminateFlagMutex);
487 	ret = terminateFlag;
488 	safe_mutex_unlock(&TerminateFlagMutex);
489 
490 	return ret;
491 }
492 
493 /*
494  * Atomically set termination flag and signal the related
495  * condition.
496  */
syncSetTerminateFlag(int newValue)497 inline void syncSetTerminateFlag(int newValue)
498 {
499 	safe_mutex_lock(&TerminateFlagMutex);
500 
501 	terminateFlag = newValue;
502 	if (terminateFlag != 0)
503 	{
504 		// wake up terminator thread
505 		pthread_cond_signal(&TerminateCond);
506 		safe_mutex_unlock(&TerminateFlagMutex);
507 
508 		// wake up all other possibly blocked on cond threads
509 		safe_mutex_lock(OutMutex);
510 		pthread_cond_broadcast(notTooMuchNumBuffered);
511 		safe_mutex_unlock(OutMutex);
512 		if (FifoQueue != NULL)
513 		{
514 			safe_mutex_lock(FifoQueue->mut);
515 			pthread_cond_broadcast(FifoQueue->notFull);
516 			pthread_cond_broadcast(FifoQueue->notEmpty);
517 			safe_mutex_unlock(FifoQueue->mut);
518 		}
519 	}
520 	else
521 	{
522 		safe_mutex_unlock(&TerminateFlagMutex);
523 	}
524 }
525 
526 /*
527  *  Set finishedSucessFlag and signal the related condition.
528  */
syncSetFinishedFlag(int newValue)529 inline void syncSetFinishedFlag(int newValue)
530 {
531 	safe_mutex_lock(&TerminateFlagMutex);
532 
533 	finishedFlag = newValue;
534 	if (finishedFlag != 0)
535 	{
536 		pthread_cond_signal(&TerminateCond);
537 	}
538 
539 	safe_mutex_unlock(&TerminateFlagMutex);
540 }
541 
542 /**
543  * Set last block which is maybe good (not guaranteed) and MinErrorBlock
544  * (lowest block with errors encountered so far).
545  * Only moving downwards has effect (attempts to raise the block numbers are ignored).
546  * -1 means infinity.
547  *
548  *
549  * @param newValue last block which is maybe good. -1 means +infinity.
550  * @param errBlock block number which has errors
551  */
syncSetLastGoodBlock(int newValue,int errBlock)552 inline void syncSetLastGoodBlock(int newValue, int errBlock)
553 {
554 	bool changed = false;
555 
556 	safe_mutex_lock(OutMutex);
557 	#ifdef PBZIP_DEBUG
558 	uintmax_t thid = (uintmax_t) pthread_self();
559 	fprintf(stderr, "(%" PRIuMAX ") syncSetLastGoodBlock: %d -> %d; MinErrorBlock: %d -> %d\n",
560 		 thid, LastGoodBlock, newValue, MinErrorBlock, errBlock);
561 	#endif
562 
563 	if ( (LastGoodBlock == -1) || (newValue < LastGoodBlock) )
564 	{
565 		LastGoodBlock = newValue;
566 		changed = true;
567 	}
568 
569 	if ( (MinErrorBlock == -1) || (errBlock < MinErrorBlock) )
570 	{
571 		MinErrorBlock = errBlock;
572 		changed = true;
573 	}
574 
575 	if ( changed )
576 	{
577 		safe_cond_signal(&ErrStateChangeCond);
578 		safe_cond_signal(&OutBufferHeadNotEmpty);
579 
580 		// wake up all other possibly blocked on cond threads
581 		pthread_cond_broadcast(notTooMuchNumBuffered);
582 		safe_mutex_unlock(OutMutex);
583 
584 		if (FifoQueue != NULL)
585 		{
586 			safe_mutex_lock(FifoQueue->mut);
587 			pthread_cond_broadcast(FifoQueue->notFull);
588 			pthread_cond_broadcast(FifoQueue->notEmpty);
589 			safe_mutex_unlock(FifoQueue->mut);
590 		}
591 	}
592 	else
593 	{
594 		safe_mutex_unlock(OutMutex);
595 	}
596 }
597 
syncGetLastGoodBlock()598 inline int syncGetLastGoodBlock()
599 {
600 	int ret;
601 	safe_mutex_lock(OutMutex);
602 	ret = LastGoodBlock;
603 	safe_mutex_unlock(OutMutex);
604 
605 	return ret;
606 }
607 
syncGetMinErrorBlock()608 inline int syncGetMinErrorBlock()
609 {
610 	int ret;
611 	safe_mutex_lock(OutMutex);
612 	ret = MinErrorBlock;
613 	safe_mutex_unlock(OutMutex);
614 
615 	return ret;
616 }
617 
isIgnoredTrailingGarbage()618 inline bool isIgnoredTrailingGarbage()
619 {
620 	return (IgnoreTrailingGarbageFlag != 0);
621 }
622 
623 /*
624  *********************************************************
625 	Print error message and optionally exit or abort
626     depending on exitFlag:
627      0 - don't quit;
628      1 - exit;
629      2 - abort.
630     On exit - exitCode status is used.
631 */
handle_error(ExitFlag exitFlag,int exitCode,const char * fmt,...)632 int handle_error(ExitFlag exitFlag, int exitCode, const char *fmt, ...)
633 {
634 	va_list args;
635 
636 	va_start(args, fmt);
637 	vfprintf(stderr, fmt, args);
638 	ErrorContext::getInstance()->printErrorMessages(stderr);
639 	fflush(stderr);
640 	va_end(args);
641 
642 	if (exitFlag == EF_ABORT)
643 	{
644 		syncSetTerminateFlag(1);
645 		abort();
646 	}
647 	if (exitFlag == EF_EXIT)
648 	{
649 		syncSetTerminateFlag(1);
650 	}
651 
652 	return exitCode;
653 }
654 
655 /**
656  *
657  * @return -1 - terminate flag set (error)
658  *          0 - prev block is OK (i.e. we're on the first error here)
659  *          2 - lower block number already in error state
660  */
waitForPreviousBlock(int blockNumToWait,int errBlockNumber)661 int waitForPreviousBlock(int blockNumToWait, int errBlockNumber)
662 {
663 	#ifdef PBZIP_DEBUG
664 	uintmax_t thid = (uintmax_t) pthread_self();
665 	safe_mutex_lock(OutMutex);
666 	fprintf( stderr, "(%" PRIuMAX ") waitForPreviousBlock enter: LastGoodBlock=%d"
667 		"; blockNumToWait=%d; NextBlockToWrite=%d; MinErrorBlock=%d; errBlockNumber=%d\n",
668 		thid,
669 		LastGoodBlock, blockNumToWait, NextBlockToWrite,
670 		MinErrorBlock, errBlockNumber );
671 	safe_mutex_unlock(OutMutex);
672 	#endif
673 
674 	for (;;)
675 	{
676 		if (syncGetTerminateFlag() != 0)
677 		{
678 			#ifdef PBZIP_DEBUG
679 			fprintf(stderr, "(%" PRIuMAX ") waitForPreviousBlock terminated [%d]: blockNumToWait=%d\n",
680 				thid, -1, blockNumToWait );
681 			#endif
682 			return -1;
683 		}
684 
685 		safe_mutex_lock(OutMutex);
686 
687 		#ifdef PBZIP_DEBUG
688 		fprintf( stderr, "(%" PRIuMAX ") waitForPreviousBlock before check: LastGoodBlock=%d; blockNumToWait=%d; NextBlockToWrite=%d; MinErrorBlock=%d\n",
689 			thid, LastGoodBlock, blockNumToWait, NextBlockToWrite, MinErrorBlock );
690 		#endif
691 
692 		// This check should (min error block) be before next one (next block to write)
693 		if ( (MinErrorBlock != -1) && (MinErrorBlock < errBlockNumber) )
694 		{
695 			#ifdef PBZIP_DEBUG
696 			fprintf( stderr, "(%" PRIuMAX ") waitForPreviousBlock exit [%d]: LastGoodBlock=%d; blockNumToWait=%d; NextBlockToWrite=%d; MinErrorBlock=%d\n",
697 				thid, 2, LastGoodBlock, blockNumToWait, NextBlockToWrite, MinErrorBlock );
698 			#endif
699 			safe_mutex_unlock(OutMutex);
700 			return 2;
701 		}
702 
703 		if (errBlockNumber <= NextBlockToWrite)
704 		{
705 			#ifdef PBZIP_DEBUG
706 			fprintf( stderr, "(%" PRIuMAX ") waitForPreviousBlock exit [%d]: LastGoodBlock=%d; blockNumToWait=%d; NextBlockToWrite=%d; MinErrorBlock=%d\n",
707 				thid, 0, LastGoodBlock, blockNumToWait, NextBlockToWrite, MinErrorBlock );
708 			#endif
709 			safe_mutex_unlock(OutMutex);
710 			return 0;
711 		}
712 
713 		#ifdef PBZIP_DEBUG
714 		fprintf( stderr, "(%" PRIuMAX ") waitForPreviousBlock to sleep: LastGoodBlock=%d; blockNumToWait=%d; NextBlockToWrite=%d; MinErrorBlock=%d\n",
715 			thid, LastGoodBlock, blockNumToWait, NextBlockToWrite, MinErrorBlock );
716 		#endif
717 
718 		safe_cond_timed_wait(&ErrStateChangeCond, OutMutex, 1, "waitForPreviousBlock");
719 
720 		safe_mutex_unlock(OutMutex);
721 	}
722 }
723 
724 /**
725  *
726  * @param errBlockNumber block from input file
727  * @param outSequenceNumber sequence in the output tail for the given block
728  * @return Last input block not after the given which possibly (not guaranteed)
729  *         resulted in good out blocks.
730  *         -1 if such don't exist.
731  */
getLastGoodBlockBeforeErr(int errBlockNumber,int outSequenceNumber)732 inline int getLastGoodBlockBeforeErr(int errBlockNumber, int outSequenceNumber)
733 {
734 	// if we got the error just in the beginning of a bzip2 stream
735 	if ( outSequenceNumber != -1 )
736 	{
737 		return errBlockNumber;
738 	}
739 	else
740 	{
741 		return errBlockNumber - 1;
742 	}
743 }
744 
745 /**
746  * Helper function delegating to handle_error with the relevant error
747  * message
748  *
749  * @param bzret
750  * @param fileData
751  * @param outSequenceNumber
752  * @param strm
753  * @param errmsg
754  * @param exitCode
755  * @return exitCode is returned
756  */
issueDecompressError(int bzret,const outBuff * fileData,int outSequenceNumber,const bz_stream & strm,const char * errmsg,int exitCode)757 inline int issueDecompressError(int bzret, const outBuff * fileData,
758 	int outSequenceNumber, const bz_stream & strm, const char * errmsg,
759 	int exitCode)
760 {
761 	#ifdef PBZIP_DEBUG
762 	uintmax_t thid = (uintmax_t) pthread_self();
763 	fprintf(stderr, "(%" PRIuMAX ") enter issueDecompressError: msg=%s; ret=%d; block=%d; seq=%d; isLastInSeq=%d; avail_in=%d\n",
764 		thid,
765 		errmsg, bzret, fileData->blockNumber,
766 		outSequenceNumber, (int)fileData->isLastInSequence, strm.avail_in);
767 	#endif
768 
769 	handle_error(EF_EXIT, exitCode,
770 				"pbzip2: %s: ret=%d; block=%d; seq=%d; isLastInSeq=%d; avail_in=%d\n",
771 				errmsg, bzret, fileData->blockNumber,
772 				outSequenceNumber, (int)fileData->isLastInSequence, strm.avail_in);
773 	return exitCode;
774 }
775 
776 /**
777  * Handle an error condition which is either trailing garbage-like one or not.
778  *
779  *
780  * @param bzret
781  * @param fileData block from input file
782  * @param outSequenceNumber
783  * @param strm
784  * @param errmsg
785  * @param isTrailingGarbageErr
786  * @return ret < 0 - fatal error;
787  *               0 - OK (no error at all);
788  *               1 - first block of ignored trailing garbage;
789  *               2 - error already signalled for earlier block
790  */
decompressErrCheckSingle(int bzret,const outBuff * fileData,int outSequenceNumber,const bz_stream & strm,const char * errmsg,bool isTrailingGarbageErr)791 int decompressErrCheckSingle(int bzret, const outBuff * fileData,
792 	int outSequenceNumber, const bz_stream & strm, const char * errmsg,
793 	bool isTrailingGarbageErr)
794 {
795 	int lastGoodBlock = getLastGoodBlockBeforeErr(fileData->blockNumber, outSequenceNumber);
796 
797 	#ifdef PBZIP_DEBUG
798 	uintmax_t thid = (uintmax_t) pthread_self();
799 	fprintf(stderr, "(%" PRIuMAX ") enter decompressErrCheckSingle: msg=%s; ret=%d; block=%d; seq=%d; isLastInSeq=%d; avail_in=%d; lastGoodBlock=%d\n",
800 		thid,
801 		errmsg, bzret, fileData->blockNumber,
802 		outSequenceNumber, (int)fileData->isLastInSequence, strm.avail_in, lastGoodBlock);
803 	#endif
804 
805 	if ( (lastGoodBlock == -1) || !isIgnoredTrailingGarbage() )
806 	{
807 		issueDecompressError(bzret, fileData, outSequenceNumber, strm, errmsg, -1);
808 		return -1;
809 	}
810 	else
811 	{
812 		// Cut off larger block numbers
813 		syncSetLastGoodBlock(lastGoodBlock, fileData->blockNumber);
814 		// wait until the state of previous block is known
815 		int prevState = waitForPreviousBlock(lastGoodBlock, fileData->blockNumber);
816 
817 		if (prevState == 0)
818 		{
819 			// we're the first error block
820 
821 			if (isTrailingGarbageErr)
822 			{
823 				// Trailing garbage detected and ignored - not a fatal warning
824 				fprintf(stderr, "pbzip2: *WARNING: Trailing garbage after EOF ignored!\n");
825 				return 1;
826 			}
827 			else
828 			{
829 				// the first error is not kind of trailing garbage -> fatal one
830 				issueDecompressError(bzret, fileData, outSequenceNumber, strm, errmsg, -1);
831 				return -1;
832 			}
833 		}
834 		else if (prevState == 2)
835 		{
836 			// we're not the first error
837 			return 2;
838 		}
839 		else // (prevState == -1)
840 		{
841 			// fatal state encountered
842 			return -1;
843 		}
844 	}
845 }
846 
847 /**
848  * Check if trailing garbage has been identified during the last decompression
849  * operation.
850  *
851  * @param bzret last bzip2 return code
852  * @param fileData should be initialized before calling this
853  * @param strm bzip2 library bz_stream
854  * @return true if trailing garbage has been detected. false otherwise
855  */
hasTrailingGarbage(int bzret,const outBuff * fileData,const bz_stream & strm)856 inline bool hasTrailingGarbage(int bzret, const outBuff * fileData, const bz_stream & strm)
857 {
858 	return (bzret == BZ_STREAM_END) &&
859 		( (strm.avail_in != 0) || !fileData->isLastInSequence );
860 }
861 
862 /**
863  *
864  * @param bzret
865  * @param fileData
866  * @param outSequenceNumber
867  * @param strm
868  * @return ret < 0 - fatal error;
869  *               0 - OK (no error at all);
870  *               1 - first block of ignored trailing garbage;
871  *               2 - error already signalled for earlier block
872  */
decompressErrCheck(int bzret,const outBuff * fileData,int outSequenceNumber,const bz_stream & strm)873 int decompressErrCheck(int bzret, const outBuff * fileData,
874 	int outSequenceNumber, const bz_stream & strm)
875 {
876 	if ( hasTrailingGarbage( bzret, fileData, strm ) )
877 	{
878 		// Potential trailing garbage
879 		return decompressErrCheckSingle(bzret, fileData, outSequenceNumber, strm,
880 				"*ERROR during BZ2_bzDecompress - trailing garbage", true);
881 	}
882 	else if ( (bzret != BZ_STREAM_END) && (bzret != BZ_OK) )
883 	{
884 		return decompressErrCheckSingle(bzret, fileData, outSequenceNumber, strm,
885 				"*ERROR during BZ2_bzDecompress - failure exit code", false);
886 	}
887 	else if ( strm.avail_in != 0 )
888 	{
889 		return decompressErrCheckSingle(bzret, fileData, outSequenceNumber, strm,
890 				"*ERROR unconsumed in after BZ2_bzDecompress loop", false);
891 	}
892 	else if ( (bzret != BZ_STREAM_END) && fileData->isLastInSequence )
893 	{
894 		return decompressErrCheckSingle(bzret, fileData, outSequenceNumber, strm,
895 				"*ERROR on decompress - last in segment reached before BZ_STREAM_END",
896 				false);
897 	}
898 
899 	return 0;
900 }
901 
902 /*
903  * Initialize and set thread signal mask
904  */
initSignalMask()905 int initSignalMask()
906 {
907 	int ret = 0;
908 	ret = sigemptyset(&SignalMask);
909 
910 	ret = sigaddset(&SignalMask, SIGINT) | ret;
911 	ret = sigaddset(&SignalMask, SIGTERM) | ret;
912 	ret = sigaddset(&SignalMask, SIGABRT) | ret;
913 	ret = sigaddset(&SignalMask, SIG_HANDLER_QUIT_SIGNAL) | ret;
914 	#ifndef WIN32
915 	ret = sigaddset(&SignalMask, SIGHUP) | ret;
916 	#endif
917 
918 	if (ret == 0)
919 	{
920 		ret = pthread_sigmask(SIG_BLOCK, &SignalMask, NULL);
921 	}
922 
923 	return ret;
924 }
925 
926 /*
927  * Initialize attributes for child threads.
928  *
929  */
initChildThreadAttributes()930 int initChildThreadAttributes()
931 {
932 	int ret = pthread_attr_init(&ChildThreadAttributes);
933 
934 	if (ret < 0)
935 	{
936 		fprintf(stderr, "Can't initialize thread atrributes [err=%d]! Aborting...\n", ret);
937 		exit(-1);
938 	}
939 
940 	#ifdef USE_STACKSIZE_CUSTOMIZATION
941 	if (ChildThreadStackSize > 0)
942 	{
943 		ret = pthread_attr_setstacksize(&ChildThreadAttributes, ChildThreadStackSize);
944 
945 		if (ret != 0)
946 		{
947 			fprintf(stderr, "Can't set thread stacksize [err=%d]! Countinue with default one.\n", ret);
948 		}
949 	}
950 	#endif // USE_STACKSIZE_CUSTOMIZATION
951 
952 	return ret;
953 }
954 
955 /*
956  * Setup and start signal handling.
957  */
setupSignalHandling()958 int setupSignalHandling()
959 {
960 	int ret = initSignalMask();
961 
962 	if (ret == 0)
963 	{
964 		ret = pthread_create(&SignalHandlerThread, &ChildThreadAttributes, signalHandlerProc, NULL);
965 	}
966 
967 	return ret;
968 }
969 
970 /*
971  * Setup and start signal handling.
972  */
setupTerminator()973 int setupTerminator()
974 {
975 	return pthread_create(&TerminatorThread, &ChildThreadAttributes, terminatorThreadProc, NULL );
976 }
977 
978 /*
979  *********************************************************
980  * Clean unfinished work (after error).
981  * Deletes output file if such exists and if not using pipes.
982  */
cleanupUnfinishedWork()983 void cleanupUnfinishedWork()
984 {
985 	if (unfinishedWorkCleaned != 0)
986 	{
987 		return;
988 	}
989 
990 	struct stat statBuf;
991 	int ret = 0;
992 
993 	#ifdef PBZIP_DEBUG
994 	fprintf(stderr, " Infile: %s   Outfile: %s\n", sigInFilename, sigOutFilename);
995 	#endif
996 
997 	// only cleanup files if we did something with them
998 	if ((sigInFilename == NULL) || (sigOutFilename == NULL) || (OutputStdOut == 1))
999 	{
1000 		unfinishedWorkCleaned = 1;
1001 		return;
1002 	}
1003 
1004 	if (QuietMode != 1)
1005 	{
1006 		fprintf(stderr, "Cleanup unfinished work [Outfile: %s]...\n", sigOutFilename);
1007 	}
1008 
1009 	// check to see if input file still exists
1010 	ret = stat(sigInFilename, &statBuf);
1011 	if (ret == 0)
1012 	{
1013 		// only want to remove output file if input still exists
1014 		if (QuietMode != 1)
1015 			fprintf(stderr, "Deleting output file: %s, if it exists...\n", sigOutFilename);
1016 		ret = remove(sigOutFilename);
1017 		if (ret != 0)
1018 		{
1019 			ErrorContext::syncPrintErrnoMsg(stderr, errno);
1020 			fprintf(stderr, "pbzip2:  *WARNING: Deletion of output file (apparently) failed.\n");
1021 		}
1022 		else
1023 		{
1024 			fprintf(stderr, "pbzip2:  *INFO: Deletion of output file succeeded.\n");
1025 			sigOutFilename = NULL;
1026 		}
1027 	}
1028 	else
1029 	{
1030 		fprintf(stderr, "pbzip2:  *WARNING: Output file was not deleted since input file no longer exists.\n");
1031 		fprintf(stderr, "pbzip2:  *WARNING: Output file: %s, may be incomplete!\n", sigOutFilename);
1032 	}
1033 
1034 	unfinishedWorkCleaned = 1;
1035 }
1036 
1037 /*
1038  *********************************************************
1039  */
1040 
1041 /*
1042  *********************************************************
1043  * Terminator thread procedure: looking at terminateFlag
1044  * and exit application when it's set.
1045  */
terminatorThreadProc(void * arg)1046 void* terminatorThreadProc(void* arg)
1047 {
1048 	int ret = pthread_mutex_lock(&TerminateFlagMutex);
1049 
1050 	if (ret != 0)
1051 	{
1052 		ErrorContext::syncPrintErrnoMsg(stderr, errno);
1053 		fprintf(stderr, "Terminator thread: pthread_mutex_lock error [%d]! Aborting...\n", ret);
1054 		syncSetTerminateFlag(1);
1055 		cleanupAndQuit(1);
1056 	}
1057 
1058 	while ((finishedFlag == 0) && (terminateFlag == 0))
1059 	{
1060 		ret = pthread_cond_wait(&TerminateCond, &TerminateFlagMutex);
1061 	}
1062 
1063 	// Successfull end
1064 	if (finishedFlag != 0)
1065 	{
1066 		ret = pthread_mutex_unlock(&TerminateFlagMutex);
1067 		return NULL;
1068 	}
1069 
1070 	// Being here implies (terminateFlag != 0)
1071 	ret = pthread_mutex_unlock(&TerminateFlagMutex);
1072 
1073 	fprintf(stderr, "Terminator thread: premature exit requested - quitting...\n");
1074 	cleanupAndQuit(1);
1075 
1076 	return NULL; // never reachable
1077 }
1078 
1079 /*
1080  *********************************************************
1081  * Signal handler thread function to hook cleanup on
1082  * certain signals.
1083  */
signalHandlerProc(void * arg)1084 void* signalHandlerProc(void* arg)
1085 {
1086 	int signalCaught;
1087 
1088 	// wait for specified in mask signal
1089 	int ret = sigwait(&SignalMask, &signalCaught);
1090 
1091 	if (ret != 0)
1092 	{
1093 		fprintf(stderr, "\n *signalHandlerProc - sigwait error: %d\n", ret);
1094 	}
1095 	else if (signalCaught == SIG_HANDLER_QUIT_SIGNAL)
1096 	{
1097 		return NULL;
1098 	}
1099 	else // ret == 0
1100 	{
1101 		fprintf(stderr, "\n *Control-C or similar caught [sig=%d], quitting...\n", signalCaught);
1102 		// Delegating cleanup and termination to Terminator Thread
1103 		syncSetTerminateFlag(1);
1104 	}
1105 
1106 	return NULL;
1107 }
1108 
1109 /*
1110  * Cleanup unfinished work (output file) and exit with the given exit code.
1111  * To be used to quite on error with non-zero exitCode.
1112  */
cleanupAndQuit(int exitCode)1113 void cleanupAndQuit(int exitCode)
1114 {
1115 	// syncSetTerminateFlag(1);
1116 
1117 	int ret = pthread_mutex_lock(&ErrorHandlerMutex);
1118 	if (ret != 0)
1119 	{
1120 		fprintf(stderr, "Cleanup Handler: Failed to lock ErrorHandlerMutex! May double cleanup...\n");
1121 	}
1122 	cleanupUnfinishedWork();
1123 	pthread_mutex_unlock(&ErrorHandlerMutex);
1124 
1125 	exit(exitCode);
1126 }
1127 
1128 /*
1129  *********************************************************
1130     This function will search the array pointed to by
1131     searchBuf[] for the string searchString[] and return
1132     a pointer to the start of the searchString[] if found
1133     otherwise return NULL if not found.
1134 */
1135 template <typename FI1, typename FI2>
1136 FI1 memstr(FI1 searchBuf, int searchBufSize, FI2 searchString, int searchStringSize)
1137 {
1138 	FI1 searchBufEnd = searchBuf + searchBufSize;
1139 	FI1 s = std::search(searchBuf, searchBufEnd,
1140 						searchString, searchString + searchStringSize);
1141 
1142 	return (s != searchBufEnd) ? s : NULL;
1143 }
1144 
1145 /**
1146  * Check for interrupt conditions - report if any and perform the relevant
1147  * cleanup
1148  *
1149  * @param hInfile
1150  * @param fileData
1151  * @param lastBlock
1152  * @return 0 - not interrupted; 1 - interrupted (terminate flag or other error encountered)
1153  */
producerDecompressCheckInterrupt(int hInfile,outBuff * & fileData,int lastBlock)1154 int producerDecompressCheckInterrupt(int hInfile, outBuff *& fileData, int lastBlock)
1155 {
1156 	bool isInterrupted = false;
1157 
1158 	if (syncGetLastGoodBlock() != -1)
1159 	{
1160 		isInterrupted = true;
1161 
1162 		#ifdef PBZIP_DEBUG
1163 		fprintf (stderr, "producer_decompress: interrupt1 - LastGoodBlock set. "
1164 			"Last produced=%d\n", lastBlock);
1165 		#endif
1166 	}
1167 	if (syncGetTerminateFlag() != 0)
1168 	{
1169 		isInterrupted = true;
1170 
1171 		#ifdef PBZIP_DEBUG
1172 		fprintf (stderr, "producer_decompress: interrupt2 - TerminateFlag set. "
1173 			"Last produced=%d\n", lastBlock);
1174 		#endif
1175 	}
1176 
1177 	if (isInterrupted)
1178 	{
1179 		close(hInfile);
1180 		disposeMemorySingle(fileData);
1181 
1182 		return 1;
1183 	}
1184 
1185 	return 0;
1186 }
1187 
1188 /*
1189  *********************************************************
1190     Function works in single pass. It's Splitting long
1191     streams into sequences of multiple segments.
1192  */
producer_decompress(int hInfile,OFF_T fileSize,queue * fifo)1193 int producer_decompress(int hInfile, OFF_T fileSize, queue *fifo)
1194 {
1195 	safe_mutex_lock(&ProgressIndicatorsMutex);
1196 	NumBlocks = 0;
1197 	safe_mutex_unlock(&ProgressIndicatorsMutex);
1198 
1199 	pbzip2::BZ2StreamScanner bz2StreamScanner(hInfile);
1200 
1201 	// keep going until all the blocks are processed
1202 	outBuff * fileData = bz2StreamScanner.getNextStream();
1203 	while (!bz2StreamScanner.failed() && (fileData->bufSize > 0))
1204 	{
1205 		#ifdef PBZIP_DEBUG
1206 		fprintf(stderr, " -> Bytes Read: %u bytes...\n", fileData->bufSize);
1207 		#endif
1208 
1209 		if (producerDecompressCheckInterrupt(hInfile, fileData, NumBlocks) != 0)
1210 		{
1211 			safe_mutex_lock(fifo->mut);
1212 			safe_cond_broadcast(fifo->notEmpty); // just in case
1213 			safe_mutex_unlock(fifo->mut);
1214 			syncSetProducerDone(1);
1215 			return 0;
1216 		}
1217 
1218 		if (QuietMode != 1)
1219 		{
1220 			// give warning to user if block is larger than 250 million bytes
1221 			if (fileData->bufSize > 250000000)
1222 			{
1223 				fprintf(stderr, "pbzip2:  *WARNING: Compressed block size is large [%" PRIuMAX " bytes].\n",
1224 						(uintmax_t) fileData->bufSize);
1225 				fprintf(stderr, "          If program aborts, use regular BZIP2 to decompress.\n");
1226 			}
1227 		}
1228 
1229 		// add data to the decompression queue
1230 		safe_mutex_lock(fifo->mut);
1231 		while (fifo->full)
1232 		{
1233 			#ifdef PBZIP_DEBUG
1234 			fprintf (stderr, "producer: queue FULL.\n");
1235 			#endif
1236 			safe_cond_wait (fifo->notFull, fifo->mut);
1237 
1238 			if (producerDecompressCheckInterrupt(hInfile, fileData, NumBlocks) != 0)
1239 			{
1240 				safe_cond_broadcast(fifo->notEmpty); // just in case
1241 				syncSetProducerDone(1);
1242 				safe_mutex_unlock(fifo->mut);
1243 				return 0;
1244 			}
1245 		}
1246 		#ifdef PBZIP_DEBUG
1247 		fprintf(stderr, "producer:  Buffer: %p  Size: %" PRIuMAX "   Block: %d\n", fileData->buf,
1248 			(uintmax_t)fileData->bufSize, NumBlocks);
1249 		#endif
1250 
1251 		fifo->add(fileData);
1252 		safe_cond_signal (fifo->notEmpty);
1253 
1254 		safe_mutex_lock(&ProgressIndicatorsMutex);
1255 		NumBlocks = fileData->blockNumber + 1;
1256 		safe_mutex_unlock(&ProgressIndicatorsMutex);
1257 
1258 		safe_mutex_unlock(fifo->mut);
1259 
1260 		fileData = bz2StreamScanner.getNextStream();
1261 	} // for
1262 
1263 	close(hInfile);
1264 
1265 	// last stream is always dummy one (either error or eof)
1266 	delete fileData;
1267 
1268 
1269 	if (bz2StreamScanner.failed())
1270 	{
1271 		handle_error(EF_EXIT, 1, "pbzip2: producer_decompress: *ERROR: when reading bzip2 input stream\n");
1272 		return -1;
1273 	}
1274 	else if (!bz2StreamScanner.isBz2HeaderFound() || !bz2StreamScanner.eof())
1275 	{
1276 		handle_error(EF_EXIT, 1, "pbzip2: producer_decompress: *ERROR: input file is not a valid bzip2 stream\n");
1277 		return -1;
1278 	}
1279 
1280 	syncSetProducerDone(1);
1281 	safe_mutex_lock(fifo->mut);
1282 	safe_cond_broadcast(fifo->notEmpty); // just in case
1283 	safe_mutex_unlock(fifo->mut);
1284 
1285 	#ifdef PBZIP_DEBUG
1286 		fprintf(stderr, "producer:  Done - exiting. Last Block: %d\n", NumBlocks);
1287 	#endif
1288 
1289 	return 0;
1290 }
1291 
1292 /**
1293  * Check for interrupt conditions - report if any and perform the relevant
1294  * cleanup
1295  *
1296  * @return 0 - not interrupted; 1 - interrupted (terminate flag or other error encountered)
1297  */
consumerDecompressCheckInterrupt(const outBuff * lastElement)1298 int consumerDecompressCheckInterrupt(const outBuff * lastElement)
1299 {
1300 	bool isInterrupted = false;
1301 
1302 	#ifdef PBZIP_DEBUG
1303 	uintmax_t thid = (uintmax_t) pthread_self();
1304 	#endif
1305 
1306 	if (syncGetTerminateFlag() != 0)
1307 	{
1308 		isInterrupted = true;
1309 
1310 		#ifdef PBZIP_DEBUG
1311 		fprintf (stderr, "(%" PRIuMAX ") consumer_decompress: interrupt1 - TerminateFlag set.\n", thid);
1312 		#endif
1313 	}
1314 	int minErrBlock = syncGetMinErrorBlock();
1315 	if ( (minErrBlock != -1) &&
1316 		( (lastElement == NULL)
1317 			|| (lastElement->blockNumber >= minErrBlock)
1318 			|| lastElement->isLastInSequence ) )
1319 	{
1320 		isInterrupted = true;
1321 
1322 		#ifdef PBZIP_DEBUG
1323 		fprintf (stderr, "(%" PRIuMAX ") consumer_decompress: terminating1 - LastGoodBlock set [%d].\n", thid, syncGetLastGoodBlock());
1324 		#endif
1325 	}
1326 
1327 	if (isInterrupted)
1328 	{
1329 		return 1;
1330 	}
1331 
1332 	return 0;
1333 }
1334 
1335 /*
1336  *********************************************************
1337  */
consumer_decompress(void * q)1338 void *consumer_decompress(void *q)
1339 {
1340 	queue *fifo = (queue *)q;
1341 
1342 	outBuff *fileData = NULL;
1343 	outBuff *lastFileData = NULL;
1344 	char *DecompressedData = NULL;
1345 	unsigned int outSize = 0;
1346 	outBuff * prevOutBlockInSequence = NULL;
1347 	int outSequenceNumber = -1; // sequence number in multi-part output blocks
1348 	unsigned int processedIn = 0;
1349 	int errState = 0;
1350 
1351 	bz_stream strm;
1352 	strm.bzalloc = NULL;
1353 	strm.bzfree = NULL;
1354 	strm.opaque = NULL;
1355 
1356 	for (;;)
1357 	{
1358 		safe_mutex_lock(fifo->mut);
1359 		for (;;)
1360 		{
1361 			if (consumerDecompressCheckInterrupt(fileData) != 0)
1362 			{
1363 				safe_mutex_unlock(fifo->mut);
1364 				return (NULL);
1365 			}
1366 
1367 			if (!fifo->empty && (fifo->remove(fileData) == 1))
1368 			{
1369 				// block retreived - break the loop and continue further
1370 				break;
1371 			}
1372 
1373 			#ifdef PBZIP_DEBUG
1374 			fprintf (stderr, "consumer: queue EMPTY.\n");
1375 			#endif
1376 
1377 			if (fifo->empty && ((syncGetProducerDone() == 1) || (syncGetTerminateFlag() != 0)))
1378 			{
1379 				// finished - either OK or terminated forcibly
1380 				pthread_mutex_unlock(fifo->mut);
1381 				// BZ2_bzDecompressEnd( &strm );
1382 
1383 				if ((syncGetTerminateFlag() == 0) && (outSequenceNumber != -1))
1384 				{
1385 					handle_error(EF_EXIT, -1, "pbzip2: *ERROR on decompress - "
1386 						"premature end of archive stream (block=%d; seq=%d; outseq=%d)!\n",
1387 						lastFileData->blockNumber,
1388 						lastFileData->sequenceNumber,
1389 						outSequenceNumber);
1390 				}
1391 				#ifdef PBZIP_DEBUG
1392 				else
1393 				{
1394 					fprintf (stderr, "consumer: exiting2\n");
1395 				}
1396 				#endif
1397 
1398 				disposeMemorySingle( lastFileData );
1399 
1400 				return (NULL);
1401 			}
1402 
1403 			#ifdef PBZIP_DEBUG
1404 			safe_cond_timed_wait(fifo->notEmpty, fifo->mut, 1, "consumer");
1405 			#else
1406 			safe_cond_wait(fifo->notEmpty, fifo->mut);
1407 			#endif
1408 		}
1409 
1410 		#ifdef PBZIP_DEBUG
1411 		fprintf(stderr, "consumer:  FileData: %p\n", fileData);
1412 		fprintf(stderr, "consumer:  Buffer: %p  Size: %u   Block: %d\n",
1413 				fileData->buf, (unsigned)fileData->bufSize, fileData->blockNumber);
1414 		#endif
1415 
1416 		safe_cond_signal(fifo->notFull);
1417 		safe_mutex_unlock(fifo->mut);
1418 
1419 		if (lastFileData != NULL)
1420 		{
1421 			delete lastFileData;
1422 		}
1423 		lastFileData = fileData;
1424 
1425 		#ifdef PBZIP_DEBUG
1426 		fprintf (stderr, "consumer: recieved %d.\n", fileData->blockNumber);
1427 		#endif
1428 
1429 		outSize = 900000;
1430 
1431 		int bzret = BZ_OK;
1432 
1433 		if (fileData->sequenceNumber < 2)
1434 		{
1435 			// start of new stream from in queue (0 -> single block; 1 - mutli)
1436 			bzret = BZ2_bzDecompressInit(&strm, Verbosity, 0);
1437 			if (bzret != BZ_OK)
1438 			{
1439 				handle_error(EF_EXIT, -1, "pbzip2: *ERROR during BZ2_bzDecompressInit: %d\n", bzret);
1440 				return (NULL);
1441 			}
1442 		}
1443 
1444 		strm.avail_in = fileData->bufSize;
1445 		strm.next_in = fileData->buf;
1446 		while ((bzret == BZ_OK) && (strm.avail_in != 0))
1447 		{
1448 			#ifdef PBZIP_DEBUG
1449 			fprintf(stderr, "decompress: block=%d; seq=%d; prev=%llx; avail_in=%u; avail_out=%u\n",
1450 				 fileData->blockNumber, outSequenceNumber,
1451 				 (unsigned long long) prevOutBlockInSequence,
1452 				 strm.avail_in, strm.avail_out);
1453 			#endif
1454 
1455 			if (DecompressedData == NULL)
1456 			{
1457 				// allocate memory for decompressed data (start with default 900k block size)
1458 				DecompressedData = new(std::nothrow) char[outSize];
1459 				// make sure memory was allocated properly
1460 
1461 				if (DecompressedData == NULL)
1462 				{
1463 					handle_error(EF_EXIT, -1,
1464 							" *ERROR: Could not allocate memory (DecompressedData)!  Aborting...\n");
1465 					return (NULL);
1466 				}
1467 
1468 				processedIn = 0;
1469 
1470 				strm.avail_out = outSize;
1471 				strm.next_out = DecompressedData;
1472 			}
1473 
1474 			unsigned int availIn = strm.avail_in;
1475 			bzret = BZ2_bzDecompress(&strm);
1476 			processedIn += (availIn - strm.avail_in);
1477 
1478 			#ifdef PBZIP_DEBUG
1479 			fprintf(stderr, "decompress: BZ2_bzDecompress=%d; block=%d; seq=%d; prev=%llx; avail_in=%u; avail_out=%u\n",
1480 				 bzret,
1481 				 fileData->blockNumber, outSequenceNumber,
1482 				 (unsigned long long) prevOutBlockInSequence,
1483 				 strm.avail_in, strm.avail_out);
1484 			#endif
1485 
1486 			// issue out block if out buffer is full or stream end is detected
1487 			if ( ((bzret == BZ_OK) && strm.avail_out == 0) || (bzret == BZ_STREAM_END) )
1488 			{
1489 				outBuff * addret = NULL;
1490 				unsigned int len = outSize - strm.avail_out;
1491 				bool isLast = (bzret == BZ_STREAM_END);
1492 
1493 				if ( hasTrailingGarbage( bzret, fileData, strm ) )
1494 				{
1495 					// trailing garbage detected
1496 					syncSetLastGoodBlock(fileData->blockNumber, fileData->blockNumber);
1497 				}
1498 
1499 				if (outSequenceNumber>0)
1500 				{
1501 					++outSequenceNumber;
1502 
1503 					outBuff * nextOutBlock = new(std::nothrow) outBuff(
1504 						DecompressedData, len, fileData->blockNumber,
1505 						outSequenceNumber, processedIn, isLast, NULL);
1506 
1507 					if (nextOutBlock == NULL)
1508 					{
1509 						BZ2_bzDecompressEnd( &strm );
1510 						handle_error(EF_EXIT, -1,
1511 								" *ERROR: Could not allocate memory (nextOutBlock)!  Aborting...\n");
1512 						return (NULL);
1513 					}
1514 
1515 					addret = outputBufferSeqAddNext(prevOutBlockInSequence, nextOutBlock);
1516 					#ifdef PBZIP_DEBUG
1517 					fprintf(stderr, "decompress: outputBufferSeqAddNext->%llx; block=%d; seq=%d; prev=%llx\n",
1518 						(unsigned long long)addret,
1519 						fileData->blockNumber, outSequenceNumber,
1520 						(unsigned long long) prevOutBlockInSequence);
1521 					#endif
1522 				}
1523 				else // sequenceNumber = 0
1524 				{
1525 					outSequenceNumber = (bzret == BZ_OK) ? 1 : 0;
1526 					addret = outputBufferAdd(outBuff(
1527 						DecompressedData, len,
1528 						fileData->blockNumber,
1529 						outSequenceNumber, processedIn, isLast, NULL), "consumer_decompress");
1530 
1531 					#ifdef PBZIP_DEBUG
1532 					fprintf(stderr, "decompress: outputBufferAdd->%llx; block=%d; seq=%d; prev=%llx\n",
1533 						(unsigned long long)addret,
1534 						fileData->blockNumber, outSequenceNumber,
1535 						(unsigned long long) prevOutBlockInSequence);
1536 					#endif
1537 				}
1538 
1539 				if (addret == NULL)
1540 				{
1541 					// error encountered
1542 					BZ2_bzDecompressEnd( &strm );
1543 					return (NULL);
1544 				}
1545 
1546 				prevOutBlockInSequence = addret;
1547 				DecompressedData = NULL;
1548 			}
1549 		}
1550 
1551 		/*
1552 		 * < 0 - fatal error;
1553 		 *   0 - OK (no error at all);
1554 		 *   1 - first block of ignored trailing garbage;
1555 		 *   2 - error already signalled for earlier block
1556 		 */
1557 		errState = decompressErrCheck(bzret, fileData, outSequenceNumber, strm);
1558 
1559 		if (bzret == BZ_STREAM_END)
1560 		{
1561 			bzret = BZ2_bzDecompressEnd(&strm);
1562 			if ( (bzret != BZ_OK) && ((errState == 0) || (errState == 1)) )
1563 			{
1564 				handle_error(EF_EXIT, -1, "pbzip2: *ERROR during BZ2_bzDecompressEnd: %d\n", bzret);
1565 				return (NULL);
1566 			}
1567 
1568 			outSequenceNumber = -1;
1569 			prevOutBlockInSequence = NULL;
1570 		}
1571 
1572 		#ifdef PBZIP_DEBUG
1573 		fprintf(stderr, "\n Compressed Block Size: %u\n", (unsigned)fileData->bufSize);
1574 		fprintf(stderr, "   Original Block Size: %u\n", outSize);
1575 		#endif
1576 
1577 		disposeMemory(fileData->buf);
1578 
1579 		#ifdef PBZIP_DEBUG
1580 		fprintf(stderr, " OutputBuffer[%d].buf = %p\n", fileData->blockNumber, DecompressedData);
1581 		fprintf(stderr, " OutputBuffer[%d].bufSize = %u\n", fileData->blockNumber, outSize);
1582 		fflush(stderr);
1583 		#endif
1584 
1585 		if (errState != 0)
1586 		{
1587 			#ifdef PBZIP_DEBUG
1588 			fprintf (stderr, "consumer: exiting prematurely: errState=%d\n", errState);
1589 			#endif
1590 
1591 			return (NULL);
1592 		}
1593 	} // for
1594 
1595 	#ifdef PBZIP_DEBUG
1596 	fprintf (stderr, "consumer: exiting\n");
1597 	#endif
1598 	return (NULL);
1599 }
1600 
1601 /*
1602  *********************************************************
1603  */
fileWriter(void * outname)1604 void *fileWriter(void *outname)
1605 {
1606 	char *OutFilename;
1607 	OFF_T CompressedSize = 0;
1608 	int percentComplete = 0;
1609 	int hOutfile = STDOUT_FILENO;  // default to stdout
1610 	int currBlock = 0;
1611 	size_t outBufferPos = 0;
1612 	int ret = -1;
1613 	OFF_T bytesProcessed = 0;
1614 
1615 	OutFilename = (char *) outname;
1616 	outBuff * prevBlockInSequence = NULL;
1617 
1618 	#ifdef PBZIP_DEBUG
1619 	fprintf(stderr, "fileWriter function started\n");
1620 	#endif
1621 
1622 	// write to file instead of stdout
1623 	if (OutputStdOut == 0)
1624 	{
1625 		hOutfile = safe_open_output(OutFilename);
1626 		// check to see if file creation was successful
1627 		if (hOutfile == -1)
1628 		{
1629 			handle_error(EF_EXIT, -1,
1630 				"pbzip2: *ERROR: Could not create output file [%s]!\n", OutFilename);
1631 			return (NULL);
1632 		}
1633 	}
1634 
1635 	while (true)
1636 	{
1637 		#ifdef PBZIP_DEBUG
1638 		int lastseq = 0;
1639 		if (prevBlockInSequence != NULL)
1640 		{
1641 			lastseq = prevBlockInSequence->sequenceNumber;
1642 		}
1643 		#endif
1644 
1645 		// Order is important. We don't need sync on NumBlocks when producer
1646 		// is done.
1647 		if ((syncGetProducerDone() == 1) && (currBlock >= NumBlocks) && (prevBlockInSequence == NULL))
1648 		{
1649 			#ifdef PBZIP_DEBUG
1650 			fprintf(stderr, "fileWriter [b:%d:%d]: done - quit loop.\n", currBlock, lastseq);
1651 			#endif
1652 			// We're done
1653 			break;
1654 		}
1655 
1656 		if (syncGetTerminateFlag() != 0)
1657 		{
1658 			#ifdef PBZIP_DEBUG
1659 			fprintf (stderr, "fileWriter [b:%d]: terminating1 - terminateFlag set\n", currBlock);
1660 			#endif
1661 			break;
1662 		}
1663 
1664 		safe_mutex_lock(OutMutex);
1665 		#ifdef PBZIP_DEBUG
1666 		outBuff * lastnext = (prevBlockInSequence != NULL) ? prevBlockInSequence->next : NULL;
1667 		fprintf(stderr, "fileWriter:  Block: %d Size: %" PRIuMAX " Next File Block: %d"
1668 				", outBufferPos: %" PRIuMAX ", NumBlocks: %d, producerDone: %d, lastseq=%d"
1669 				", prev=%p, next=%p\n",
1670 				currBlock, (uintmax_t)NumBufferedBlocksMax, NextBlockToWrite,
1671 				(uintmax_t)outBufferPos, NumBlocks, syncGetProducerDone(), lastseq,
1672 				prevBlockInSequence,
1673 				lastnext);
1674 		#endif
1675 
1676 		if ( (LastGoodBlock != -1) && (NextBlockToWrite > LastGoodBlock) )
1677 		{
1678 			#ifdef PBZIP_DEBUG
1679 			fprintf (stderr, "fileWriter [b:%d]: quit - LastGoodBlock=%d\n",
1680 					 currBlock, LastGoodBlock);
1681 			#endif
1682 			safe_mutex_unlock(OutMutex);
1683 
1684 			break;
1685 		}
1686 
1687 		if ((OutputBuffer[outBufferPos].buf == NULL) &&
1688 			((prevBlockInSequence == NULL) || (prevBlockInSequence->next == NULL)))
1689 		{
1690 			safe_cond_timed_wait(&OutBufferHeadNotEmpty, OutMutex, 1, "fileWriter");
1691 			safe_mutex_unlock(OutMutex);
1692 			// sleep a little so we don't go into a tight loop using up all the CPU
1693 			// usleep(50000);
1694 			continue;
1695 		}
1696 		else
1697 		{
1698 			safe_mutex_unlock(OutMutex);
1699 		}
1700 
1701 		outBuff * outBlock;
1702 		if (prevBlockInSequence != NULL)
1703 		{
1704 			outBlock = prevBlockInSequence->next;
1705 		}
1706 		else
1707 		{
1708 			outBlock = &OutputBuffer[outBufferPos];
1709 		}
1710 
1711 		#ifdef PBZIP_DEBUG
1712 		fprintf(stderr, "fileWriter:  Buffer: %p  Size: %u   Block: %d, Seq: %d, isLast: %d\n",
1713 			OutputBuffer[outBufferPos].buf, OutputBuffer[outBufferPos].bufSize, currBlock,
1714 			outBlock->sequenceNumber, (int)outBlock->isLastInSequence);
1715 		#endif
1716 
1717 		// write data to the output file
1718 		ret = do_write(hOutfile, outBlock->buf, outBlock->bufSize);
1719 
1720 		#ifdef PBZIP_DEBUG
1721 		fprintf(stderr, "\n -> Total Bytes Written[%d:%d]: %d bytes...\n", currBlock, outBlock->sequenceNumber, ret);
1722 		#endif
1723 
1724 		if (ret < 0)
1725 		{
1726 			if (OutputStdOut == 0)
1727 				close(hOutfile);
1728 
1729 			handle_error(EF_EXIT, -1,
1730 				"pbzip2: *ERROR: Could not write %d bytes to file [ret=%d]!  Aborting...\n",
1731 				outBlock->bufSize, ret);
1732 			return (NULL);
1733 		}
1734 		CompressedSize += ret;
1735 
1736 		bytesProcessed += outBlock->inSize;
1737 		delete [] outBlock->buf;
1738 		outBlock->buf = NULL;
1739 		outBlock->bufSize = 0;
1740 
1741 		if (outBlock->isLastInSequence)
1742 		{
1743 			if (++outBufferPos == NumBufferedBlocksMax)
1744 			{
1745 				outBufferPos = 0;
1746 			}
1747 			++currBlock;
1748 		}
1749 
1750 		safe_mutex_lock(OutMutex);
1751 
1752 		if (outBlock->isLastInSequence)
1753 		{
1754 			++NextBlockToWrite;
1755 			OutBufferPosToWrite = outBufferPos;
1756 		}
1757 		if (outBlock->sequenceNumber > 1)
1758 		{
1759 			--NumBufferedTailBlocks;
1760 		}
1761 		// --NumBufferedBlocks; // to be removed
1762 		safe_cond_broadcast(notTooMuchNumBuffered);
1763 		safe_cond_broadcast(&ErrStateChangeCond);
1764 		safe_mutex_unlock(OutMutex);
1765 
1766 		if (outBlock->sequenceNumber > 2)
1767 		{
1768 			delete prevBlockInSequence;
1769 		}
1770 
1771 		if (outBlock->isLastInSequence)
1772 		{
1773 			prevBlockInSequence = NULL;
1774 			if (outBlock->sequenceNumber > 1)
1775 			{
1776 				delete outBlock;
1777 			}
1778 		}
1779 		else
1780 		{
1781 			prevBlockInSequence = outBlock;
1782 		}
1783 
1784 		if (QuietMode != 1)
1785 		{
1786 			// print current completion status
1787 			int percentCompleteOld = percentComplete;
1788 			if (InFileSize > 0)
1789 			{
1790 				percentComplete = (100.0 * (double)bytesProcessed / (double)InFileSize);
1791 			}
1792 
1793 			#ifdef PBZIP_DEBUG
1794 			fprintf(stderr, "Completed: %d%%  NextBlockToWrite: %d/%" PRIuMAX "        \r", percentComplete, NextBlockToWrite, (uintmax_t)NumBufferedBlocksMax);
1795 			fflush(stderr);
1796 			#else
1797 			if (percentComplete != percentCompleteOld)
1798 			{
1799 				fprintf(stderr, "Completed: %d%%             \r", percentComplete);
1800 				fflush(stderr);
1801 			}
1802 			#endif
1803 		}
1804 	} // while
1805 
1806 	if (currBlock == 0)
1807 	{
1808 		// zero-size file needs special handling
1809 		ret = do_write(hOutfile, Bz2HeaderZero, sizeof(Bz2HeaderZero) );
1810 
1811 		if (ret < 0)
1812 		{
1813 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not write to file!  Aborting...\n");
1814 			return (NULL);
1815 		}
1816 	}
1817 
1818 
1819 	if (OutputStdOut == 0)
1820 	{
1821 		ret = close(hOutfile);
1822 		if (ret == -1)
1823 		{
1824 			ErrorContext::getInstance()->saveError();
1825 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not close output file!  Aborting...\n");
1826 			return (NULL);
1827 		}
1828 	}
1829 
1830 	if (QuietMode != 1)
1831 	{
1832 		fprintf(stderr, "    Output Size: %" PRIuMAX " bytes\n", (uintmax_t)CompressedSize);
1833 	}
1834 
1835 	#ifdef PBZIP_DEBUG
1836 	fprintf(stderr, "fileWriter exit\n");
1837 	fflush(stderr);
1838 	#endif
1839 
1840 	// wake up all other possibly blocked on cond threads
1841 	if (FifoQueue != NULL)
1842 	{
1843 		safe_mutex_lock(FifoQueue->mut);
1844 		safe_cond_broadcast(FifoQueue->notEmpty); // important
1845 		safe_cond_broadcast(FifoQueue->notFull); // not really needed
1846 		safe_mutex_unlock(FifoQueue->mut);
1847 	}
1848 	safe_mutex_lock(OutMutex);
1849 	safe_cond_broadcast(notTooMuchNumBuffered); // not really needed
1850 	safe_mutex_unlock(OutMutex);
1851 
1852 	if (QuietMode != 1)
1853 	{
1854 		// print current completion status
1855 		percentComplete = 100;
1856 
1857 		#ifdef PBZIP_DEBUG
1858 		fprintf(stderr, "Completed: %d%%  NextBlockToWrite: %d/%" PRIuMAX "        \r", percentComplete, NextBlockToWrite, (uintmax_t)NumBufferedBlocksMax);
1859 		fflush(stderr);
1860 		#else
1861 
1862 			fprintf(stderr, "Completed: %d%%             \r", percentComplete);
1863 			fflush(stderr);
1864 		#endif
1865 	}
1866 
1867 	return (NULL);
1868 }
1869 
1870 /*
1871  *********************************************************
1872  */
directcompress(int hInfile,OFF_T fileSize,int blockSize,const char * OutFilename)1873 int directcompress(int hInfile, OFF_T fileSize, int blockSize, const char *OutFilename)
1874 {
1875 	char *FileData = NULL;
1876 	char *CompressedData = NULL;
1877 	OFF_T CompressedSize = 0;
1878 	OFF_T bytesLeft = 0;
1879 	OFF_T inSize = 0;
1880 	unsigned int outSize = 0;
1881 	int percentComplete = 0;
1882 	int hOutfile = STDOUT_FILENO;  // default to stdout
1883 	int currBlock = 0;
1884 	int rret = 0;
1885 	int ret = 0;
1886 
1887 	bytesLeft = fileSize;
1888 
1889 	// write to file instead of stdout
1890 	if (OutputStdOut == 0)
1891 	{
1892 		hOutfile = safe_open_output(OutFilename);
1893 		// check to see if file creation was successful
1894 		if (hOutfile == -1)
1895 		{
1896 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not create output file [%s]!\n", OutFilename);
1897 			return -1;
1898 		}
1899 	}
1900     #ifdef WIN32
1901 	else
1902 	{
1903         setmode(fileno(stdout), O_BINARY);
1904     }
1905     #endif
1906 
1907 	// keep going until all the file is processed
1908 	while (bytesLeft > 0)
1909 	{
1910 		if (syncGetTerminateFlag() != 0)
1911 		{
1912 			close(hInfile);
1913 			if (OutputStdOut == 0)
1914 				close(hOutfile);
1915 
1916 			fprintf (stderr, "directcompress: terminating - terminateFlag set\n");
1917 
1918 			return -1;
1919 		}
1920 
1921 		//
1922 		// READ DATA
1923 		//
1924 
1925 		// set buffer size
1926 		if (bytesLeft > blockSize)
1927 			inSize = blockSize;
1928 		else
1929 			inSize = bytesLeft;
1930 
1931 		#ifdef PBZIP_DEBUG
1932 		fprintf(stderr, " -> Bytes To Read: %" PRIuMAX " bytes...\n", (uintmax_t)inSize);
1933 		#endif
1934 
1935 		// allocate memory to read in file
1936 		FileData = NULL;
1937 		FileData = new(std::nothrow) char[inSize];
1938 		// make sure memory was allocated properly
1939 		if (FileData == NULL)
1940 		{
1941 			close(hInfile);
1942 			if (OutputStdOut == 0)
1943 				close(hOutfile);
1944 
1945 			handle_error(EF_EXIT, -1,
1946 					 "pbzip2: *ERROR: Could not allocate memory (FileData)!  Aborting...\n");
1947 			return -1;
1948 		}
1949 
1950 		// read file data
1951 		rret = do_read(hInfile, (char *) FileData, inSize);
1952 		#ifdef PBZIP_DEBUG
1953 		fprintf(stderr, " -> Total Bytes Read: %d bytes...\n\n", rret);
1954 		#endif
1955 		if (rret == 0)
1956 		{
1957 			if (FileData != NULL)
1958 				delete [] FileData;
1959 			break;
1960 		}
1961 		else if (rret < 0)
1962 		{
1963 			close(hInfile);
1964 			if (FileData != NULL)
1965 				delete [] FileData;
1966 			if (OutputStdOut == 0)
1967 				close(hOutfile);
1968 
1969 			handle_error(EF_EXIT, -1,
1970 					"pbzip2: *ERROR: Could not read from file!  Aborting...\n");
1971 			return -1;
1972 		}
1973 
1974 		// set bytes left after read
1975 		bytesLeft -= rret;
1976 
1977 		//
1978 		// COMPRESS DATA
1979 		//
1980 
1981 		outSize = (int) ((inSize*1.01)+600);
1982 		// allocate memory for compressed data
1983 		CompressedData = new(std::nothrow) char[outSize];
1984 		// make sure memory was allocated properly
1985 		if (CompressedData == NULL)
1986 		{
1987 			close(hInfile);
1988 			if (FileData != NULL)
1989 				delete [] FileData;
1990 
1991 			handle_error(EF_EXIT, -1,
1992 					"pbzip2: *ERROR: Could not allocate memory (CompressedData)!  Aborting...\n");
1993 			return -1;
1994 		}
1995 
1996 		// compress the memory buffer (blocksize=9*100k, verbose=0, worklevel=30)
1997 		ret = BZ2_bzBuffToBuffCompress(CompressedData, &outSize, FileData, inSize, BWTblockSize, Verbosity, 30);
1998 		if (ret != BZ_OK)
1999 		{
2000 			close(hInfile);
2001 			if (FileData != NULL)
2002 				delete [] FileData;
2003 
2004 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR during compression: %d!  Aborting...\n", ret);
2005 			return -1;
2006 		}
2007 
2008 		#ifdef PBZIP_DEBUG
2009 		fprintf(stderr, "\n   Original Block Size: %" PRIuMAX "\n", (uintmax_t)inSize);
2010 		fprintf(stderr, " Compressed Block Size: %u\n", outSize);
2011 		#endif
2012 
2013 		//
2014 		// WRITE DATA
2015 		//
2016 
2017 		// write data to the output file
2018 		ret = do_write(hOutfile, CompressedData, outSize);
2019 
2020 		#ifdef PBZIP_DEBUG
2021 		fprintf(stderr, "\n -> Total Bytes Written[%d]: %d bytes...\n", currBlock, ret);
2022 		#endif
2023 		if (ret <= 0)
2024 		{
2025 			close(hInfile);
2026 			if (FileData != NULL)
2027 				delete [] FileData;
2028 			if (CompressedData != NULL)
2029 				delete [] CompressedData;
2030 			if (OutputStdOut == 0)
2031 				close(hOutfile);
2032 
2033 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not write to file!  Aborting...\n");
2034 			return -1;
2035 		}
2036 
2037 		CompressedSize += ret;
2038 
2039 		currBlock++;
2040 		// print current completion status
2041 		int percentCompleteOld = percentComplete;
2042 		percentComplete = 100 * currBlock / NumBlocksEstimated;
2043 		if (QuietMode != 1)
2044 		{
2045 			if (percentComplete != percentCompleteOld)
2046 			{
2047 				fprintf(stderr, "Completed: %d%%             \r", percentComplete);
2048 				fflush(stderr);
2049 			}
2050 		}
2051 
2052 		// clean up memory
2053 		if (FileData != NULL)
2054 		{
2055 			delete [] FileData;
2056 			FileData = NULL;
2057 		}
2058 		if (CompressedData != NULL)
2059 		{
2060 			delete [] CompressedData;
2061 			CompressedData = NULL;
2062 		}
2063 
2064 		// check to make sure all the data we expected was read in
2065 		if (rret != inSize)
2066 			inSize = rret;
2067 	} // while
2068 
2069 	close(hInfile);
2070 
2071 	if (OutputStdOut == 0)
2072 		close(hOutfile);
2073 	if (QuietMode != 1)
2074 	{
2075 		fprintf(stderr, "    Output Size: %" PRIuMAX " bytes\n", (uintmax_t)CompressedSize);
2076 	}
2077 
2078 	syncSetProducerDone(1); // Not really needed for direct version
2079 	return 0;
2080 }
2081 
close_streams(FILE * out,FILE * in)2082 void close_streams(FILE *out, FILE *in)
2083 {
2084 	if (out != NULL) {
2085 		fflush(out);
2086 	}
2087 
2088 	if (in != NULL && in != stdin) {
2089 		fclose(in);
2090 	}
2091 
2092 	if (out != NULL && out != stdout) {
2093 		fclose(out);
2094 	}
2095 }
2096 
2097 /*
2098  *********************************************************
2099  */
directdecompress(const char * InFilename,const char * OutFilename)2100 int directdecompress(const char *InFilename, const char *OutFilename)
2101 {
2102 	FILE *stream = NULL;
2103 	FILE *zStream = NULL;
2104 	BZFILE* bzf = NULL;
2105 	unsigned char obuf[5000];
2106 	unsigned char unused[BZ_MAX_UNUSED];
2107 	unsigned char *unusedTmp;
2108 	int bzerr, nread, streamNo;
2109 	int nUnused;
2110 	int ret = 0;
2111 	int i;
2112 
2113 	nUnused = 0;
2114 	streamNo = 0;
2115 
2116 	// see if we are using stdin or not
2117 	if (strcmp(InFilename, "-") != 0)
2118 	{
2119 		// open the file for reading
2120 		zStream = fopen(InFilename, "rb");
2121 		if (zStream == NULL)
2122 		{
2123 			handle_error(EF_EXIT, -1,
2124 					"pbzip2: *ERROR: Could not open input file [%s]!  Aborting...\n", InFilename);
2125 			return -1;
2126 		}
2127 	}
2128 	else
2129 	{
2130 		#ifdef WIN32
2131         setmode(fileno(stdin), O_BINARY);
2132 		#endif
2133 		zStream = stdin;
2134     }
2135 
2136 	// check file stream for errors
2137 	if (ferror(zStream))
2138 	{
2139 		close_streams(stream, zStream);
2140 		handle_error(EF_EXIT, -1,
2141 				"pbzip2: *ERROR: Problem with input stream of file [%s]!  Aborting...\n", InFilename);
2142 		return -1;
2143 	}
2144 
2145 	// see if we are outputting to stdout
2146 	if (OutputStdOut == 0)
2147 	{
2148 		stream = safe_fopen_output(OutFilename, "wb");
2149 		if (stream == NULL)
2150 		{
2151 			handle_error(EF_EXIT, -1,
2152 					"pbzip2: *ERROR: Could not open output file [%s]!  Aborting...\n", OutFilename);
2153 			return -1;
2154 		}
2155 	}
2156 	else
2157 	{
2158         #ifdef WIN32
2159         setmode(fileno(stdout), O_BINARY);
2160         #endif
2161 		stream = stdout;
2162     }
2163 
2164 	// check file stream for errors
2165 	if (ferror(stream))
2166 	{
2167 		close_streams(stream, zStream);
2168 		handle_error(EF_EXIT, -1,
2169 				"pbzip2: *ERROR: Problem with output stream of file [%s]!  Aborting...\n", InFilename);
2170 		return -1;
2171 	}
2172 
2173 	// loop until end of file
2174 	while(true)
2175 	{
2176 		if (syncGetTerminateFlag() != 0)
2177 		{
2178 			fprintf (stderr, "directdecompress: terminating1 - terminateFlag set\n");
2179 			close_streams(stream, zStream);
2180 			return -1;
2181 		}
2182 
2183 		bzf = BZ2_bzReadOpen(&bzerr, zStream, Verbosity, 0, unused, nUnused);
2184 		if (bzf == NULL || bzerr != BZ_OK)
2185 		{
2186 			ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
2187 			close_streams(stream, zStream);
2188 
2189 			if (ret != 0)
2190 			{
2191 				syncSetTerminateFlag(1);
2192 			}
2193 
2194 			return ret;
2195 		}
2196 
2197 		streamNo++;
2198 
2199 		while (bzerr == BZ_OK)
2200 		{
2201 			if (syncGetTerminateFlag() != 0)
2202 			{
2203 				fprintf (stderr, "directdecompress: terminating2 - terminateFlag set\n");
2204 				close_streams(stream, zStream);
2205 				return -1;
2206 			}
2207 
2208 			nread = BZ2_bzRead(&bzerr, bzf, obuf, sizeof(obuf));
2209 			if (bzerr == BZ_DATA_ERROR_MAGIC)
2210 			{
2211 				// try alternate way of reading data
2212 				if (ForceOverwrite == 1)
2213 				{
2214 					rewind(zStream);
2215 					while (true)
2216 					{
2217 						int c = fgetc(zStream);
2218 						if (c == EOF)
2219 							break;
2220 						ungetc(c,zStream);
2221 
2222 						nread = fread(obuf, sizeof(unsigned char), sizeof(obuf), zStream );
2223 						if (ferror(zStream))
2224 						{
2225 							ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
2226 							close_streams(stream, zStream);
2227 
2228 							if (ret != 0)
2229 							{
2230 								syncSetTerminateFlag(1);
2231 							}
2232 
2233 							return ret;
2234 						}
2235 						if (nread > 0)
2236 							(void) fwrite (obuf, sizeof(unsigned char), nread, stream);
2237 						if (ferror(stream))
2238 						{
2239 							ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
2240 							close_streams(stream, zStream);
2241 
2242 							if (ret != 0)
2243 							{
2244 								syncSetTerminateFlag(1);
2245 							}
2246 
2247 							return ret;
2248 						}
2249 					}
2250 					goto closeok;
2251 				}
2252 			}
2253 			if ((bzerr == BZ_OK || bzerr == BZ_STREAM_END) && nread > 0)
2254 				(void) fwrite(obuf, sizeof(unsigned char), nread, stream );
2255 			if (ferror(stream))
2256 			{
2257 				ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
2258 				close_streams(stream, zStream);
2259 
2260 				if (ret != 0)
2261 				{
2262 					syncSetTerminateFlag(1);
2263 				}
2264 				return ret;
2265 			}
2266 		}
2267 		if (bzerr != BZ_STREAM_END)
2268 		{
2269 			ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
2270 			close_streams(stream, zStream);
2271 
2272 			if (ret != 0)
2273 			{
2274 				syncSetTerminateFlag(1);
2275 			}
2276 			return ret;
2277 		}
2278 
2279 		BZ2_bzReadGetUnused(&bzerr, bzf, (void**)(&unusedTmp), &nUnused);
2280 		if (bzerr != BZ_OK)
2281 		{
2282 			handle_error(EF_EXIT, 3, "pbzip2: *ERROR: Unexpected error [bzerr=%d]. Aborting!\n", bzerr);
2283 			return 3;
2284 		}
2285 
2286 		for (i = 0; i < nUnused; i++)
2287 			unused[i] = unusedTmp[i];
2288 
2289 		BZ2_bzReadClose(&bzerr, bzf);
2290 		if (bzerr != BZ_OK)
2291 		{
2292 			handle_error(EF_EXIT, 3, "pbzip2: *ERROR: Unexpected error [bzerr=%d]. Aborting!\n", bzerr);
2293 			return 3;
2294 		}
2295 
2296 		// check to see if we are at the end of the file
2297 		if (nUnused == 0)
2298 		{
2299 			int c = fgetc(zStream);
2300 			if (c == EOF)
2301 				break;
2302 			ungetc(c, zStream);
2303 		}
2304 	}
2305 
2306 closeok:
2307 	// check file stream for errors
2308 	if (ferror(zStream))
2309 	{
2310 		if (zStream != stdin)
2311 			fclose(zStream);
2312 		if (stream != stdout)
2313 			fclose(stream);
2314 
2315 		handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Problem with input stream of file [%s]!  Skipping...\n", InFilename);
2316 
2317 		return -1;
2318 	}
2319 	// close file
2320 	ret = do_fclose(zStream);
2321 	if (ret == EOF)
2322 	{
2323 		handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Problem closing file [%s]!  Skipping...\n", InFilename);
2324 		return -1;
2325 	}
2326 
2327 	// check file stream for errors
2328 	if (ferror(stream))
2329 	{
2330 		if (stream != stdout)
2331 			fclose(stream);
2332 		handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Problem with output stream of file [%s]!  Skipping...\n", InFilename);
2333 
2334 		return -1;
2335 	}
2336 	ret = do_fflush(stream);
2337 	if (ret != 0)
2338 	{
2339 		if (stream != stdout)
2340 			fclose(stream);
2341 		handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Problem with output stream of file [%s]!  Skipping...\n", InFilename);
2342 		return -1;
2343 	}
2344 	if (stream != stdout)
2345 	{
2346 		ret = do_fclose(stream);
2347 		if (ret == EOF)
2348 		{
2349 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Problem closing file [%s]!  Skipping...\n", OutFilename);
2350 			return -1;
2351 		}
2352 	}
2353 
2354 	syncSetProducerDone(1); // Not really needed for direct version.
2355 	return 0;
2356 }
2357 
2358 /*
2359  * Simulate an unconditional read(), reading in data to fill the
2360  * bsize-sized buffer if it can, even if it means calling read() multiple
2361  * times. This is needed since pipes and other "special" streams
2362  * sometimes don't allow reading of arbitrary sized buffers.
2363  */
bufread(int hf,char * buf,size_t bsize)2364 ssize_t bufread(int hf, char *buf, size_t bsize)
2365 {
2366 	size_t bufr = 0;
2367 	int ret;
2368 	int rsize = bsize;
2369 
2370 	while (1)
2371 	{
2372 		ret = read(hf, buf, rsize);
2373 
2374 		if (ret < 0)
2375 			return ret;
2376 		if (ret == 0)
2377 			return bufr;
2378 
2379 		bufr += ret;
2380 		if (bufr == bsize)
2381 			return bsize;
2382 		rsize -= ret;
2383 		buf += ret;
2384 	}
2385 }
2386 
2387 /*
2388  *********************************************************
2389  */
producer(int hInfile,int blockSize,queue * fifo)2390 int producer(int hInfile, int blockSize, queue *fifo)
2391 {
2392 	char *FileData = NULL;
2393 	size_t inSize = 0;
2394 	// int blockNum = 0;
2395 	int ret = 0;
2396 	// int pret = -1;
2397 
2398 	// We will now totally ignore the fileSize and read the data as it
2399 	// comes in. Aside from allowing us to process arbitrary streams, it's
2400 	// also the *right thing to do* in unix environments where data may
2401 	// be appended to the file as it's processed (e.g. log files).
2402 
2403 	safe_mutex_lock(&ProgressIndicatorsMutex);
2404 	NumBlocks = 0;
2405 	safe_mutex_unlock(&ProgressIndicatorsMutex);
2406 
2407 	// keep going until all the file is processed
2408 	while (1)
2409 	{
2410 		if (syncGetTerminateFlag() != 0)
2411 		{
2412 			close(hInfile);
2413 			return -1;
2414 		}
2415 
2416 		// set buffer size
2417 		inSize = blockSize;
2418 
2419 		#ifdef PBZIP_DEBUG
2420 		fprintf(stderr, " -> Bytes To Read: %" PRIuMAX " bytes...\n", (uintmax_t)inSize);
2421 		#endif
2422 
2423 		// allocate memory to read in file
2424 		FileData = NULL;
2425 		FileData = new(std::nothrow) char[inSize];
2426 		// make sure memory was allocated properly
2427 		if (FileData == NULL)
2428 		{
2429 			close(hInfile);
2430 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not allocate memory (FileData)!  Aborting...\n");
2431 			return -1;
2432 		}
2433 
2434 		// read file data
2435 		ret = bufread(hInfile, (char *) FileData, inSize);
2436 		#ifdef PBZIP_DEBUG
2437 		fprintf(stderr, " -> Total Bytes Read: %d bytes...\n\n", ret);
2438 		#endif
2439 		if (ret == 0)
2440 		{
2441 			// finished reading.
2442 			if (FileData != NULL)
2443 				delete [] FileData;
2444 			break;
2445 		}
2446 		else if (ret < 0)
2447 		{
2448 			close(hInfile);
2449 			if (FileData != NULL)
2450 				delete [] FileData;
2451 
2452 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not read from file!  Aborting...\n");
2453 			return -1;
2454 		}
2455 
2456 		// check to make sure all the data we expected was read in
2457 		if ((size_t)ret != inSize)
2458 			inSize = ret;
2459 
2460 		#ifdef PBZIP_DEBUG
2461 		fprintf(stderr, "producer:  Going into fifo-mut lock (NumBlocks: %d)\n", NumBlocks);
2462 		#endif
2463 
2464 		// add data to the compression queue
2465 		safe_mutex_lock(fifo->mut);
2466 		while (fifo->full)
2467 		{
2468 			#ifdef PBZIP_DEBUG
2469 			fprintf (stderr, "producer: queue FULL.\n");
2470 			#endif
2471 			safe_cond_wait(fifo->notFull, fifo->mut);
2472 
2473 			if (syncGetTerminateFlag() != 0)
2474 			{
2475 				pthread_mutex_unlock(fifo->mut);
2476 				close(hInfile);
2477 				return -1;
2478 			}
2479 		}
2480 		#ifdef PBZIP_DEBUG
2481 		fprintf(stderr, "producer:  Buffer: %p  Size: %" PRIuMAX "   Block: %d\n", FileData, (uintmax_t)inSize, NumBlocks);
2482 		#endif
2483 
2484 		outBuff * queueElement = new(std::nothrow) outBuff(FileData, inSize, NumBlocks, 0);
2485 		// make sure memory was allocated properly
2486 		if (queueElement == NULL)
2487 		{
2488 			close(hInfile);
2489 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not allocate memory (queueElement)!  Aborting...\n");
2490 			return -1;
2491 		}
2492 
2493 		fifo->add(queueElement);
2494 		safe_cond_signal(fifo->notEmpty);
2495 
2496 		safe_mutex_lock(&ProgressIndicatorsMutex);
2497 		++NumBlocks;
2498 		safe_mutex_unlock(&ProgressIndicatorsMutex);
2499 
2500 		safe_mutex_unlock(fifo->mut);
2501 	} // while
2502 
2503 	close(hInfile);
2504 
2505 	syncSetProducerDone(1);
2506 	safe_mutex_lock(fifo->mut);
2507 	safe_cond_broadcast(fifo->notEmpty); // just in case
2508 	safe_mutex_unlock(fifo->mut);
2509 
2510 	#ifdef PBZIP_DEBUG
2511 		fprintf(stderr, "producer:  Done - exiting. Num Blocks: %d\n", NumBlocks);
2512 	#endif
2513 
2514 	return 0;
2515 }
2516 
2517 /*
2518  *********************************************************
2519  */
consumer(void * q)2520 void *consumer (void *q)
2521 {
2522 	queue *fifo;
2523 	// char *FileData = NULL;
2524 	outBuff *fileData = NULL;
2525 	char *CompressedData = NULL;
2526 	// unsigned int inSize = 0;
2527 	unsigned int outSize = 0;
2528 	// int blockNum = -1;
2529 	int ret = -1;
2530 
2531 	fifo = (queue *)q;
2532 
2533 	for (;;)
2534 	{
2535 		if (syncGetTerminateFlag() != 0)
2536 		{
2537 			#ifdef PBZIP_DEBUG
2538 			fprintf (stderr, "consumer: terminating1 - terminateFlag set\n");
2539 			#endif
2540 			return (NULL);
2541 		}
2542 
2543 		safe_mutex_lock(fifo->mut);
2544 		for (;;)
2545 		{
2546 			if (!fifo->empty && (fifo->remove(fileData) == 1))
2547 			{
2548 				// block retreived - break the loop and continue further
2549 				break;
2550 			}
2551 
2552 			#ifdef PBZIP_DEBUG
2553 			fprintf (stderr, "consumer: queue EMPTY.\n");
2554 			#endif
2555 
2556 			if (fifo->empty && ((syncGetProducerDone() == 1) || (syncGetTerminateFlag() != 0)))
2557 			{
2558 				safe_mutex_unlock(fifo->mut);
2559 				#ifdef PBZIP_DEBUG
2560 				fprintf (stderr, "consumer: exiting2\n");
2561 				#endif
2562 				return (NULL);
2563 			}
2564 
2565 			#ifdef PBZIP_DEBUG
2566 			safe_cond_timed_wait(fifo->notEmpty, fifo->mut, 1, "consumer");
2567 			#else
2568 			safe_cond_wait(fifo->notEmpty, fifo->mut);
2569 			#endif
2570 		}
2571 
2572 		#ifdef PBZIP_DEBUG
2573 		fprintf(stderr, "consumer:  Buffer: %p  Size: %u   Block: %d\n",
2574 				fileData->buf, (unsigned)fileData->bufSize, fileData->blockNumber);
2575 		#endif
2576 
2577 		safe_cond_signal(fifo->notFull);
2578 		safe_mutex_unlock(fifo->mut);
2579 		#ifdef PBZIP_DEBUG
2580 		fprintf(stderr, "consumer: received %d.\n", fileData->blockNumber);
2581 		#endif
2582 
2583 		outSize = (unsigned int) (((fileData->bufSize)*1.01)+600);
2584 		// allocate memory for compressed data
2585 		CompressedData = new(std::nothrow) char[outSize];
2586 		// make sure memory was allocated properly
2587 		if (CompressedData == NULL)
2588 		{
2589 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not allocate memory (CompressedData)!  Aborting...\n");
2590 			return (NULL);
2591 		}
2592 
2593 		// compress the memory buffer (blocksize=9*100k, verbose=0, worklevel=30)
2594 		ret = BZ2_bzBuffToBuffCompress(CompressedData, &outSize,
2595 				fileData->buf, fileData->bufSize, BWTblockSize, Verbosity, 30);
2596 		if (ret != BZ_OK)
2597 		{
2598 			handle_error(EF_EXIT, -1, "pbzip2: *ERROR during compression: %d!  Aborting...\n", ret);
2599 			return (NULL);
2600 		}
2601 
2602 		#ifdef PBZIP_DEBUG
2603 		fprintf(stderr, "\n   Original Block Size: %u\n", (unsigned)fileData->bufSize);
2604 		fprintf(stderr, " Compressed Block Size: %u\n", outSize);
2605 		#endif
2606 
2607 		disposeMemory(fileData->buf);
2608 
2609 		// store data to be written in output bin
2610 		outBuff outBlock = outBuff(CompressedData, outSize, fileData->blockNumber, 0, fileData->bufSize);
2611 		if (outputBufferAdd(outBlock, "consumer") == NULL)
2612 		{
2613 			return (NULL);
2614 		}
2615 
2616 		delete fileData;
2617 		fileData = NULL;
2618 	} // for
2619 
2620 	#ifdef PBZIP_DEBUG
2621 	fprintf (stderr, "consumer: exiting\n");
2622 	#endif
2623 	return (NULL);
2624 }
2625 
2626 /*
2627  *********************************************************
2628  */
mutexesInit()2629 int mutexesInit()
2630 {
2631 	// initialize mutexes
2632 	OutMutex = new(std::nothrow) pthread_mutex_t;
2633 	// make sure memory was allocated properly
2634 	if (OutMutex == NULL)
2635 	{
2636 		fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (OutMutex)!  Aborting...\n");
2637 		return 1;
2638 	}
2639 	pthread_mutex_init(OutMutex, NULL);
2640 
2641 	ProducerDoneMutex = new(std::nothrow) pthread_mutex_t;
2642 	// make sure memory was allocated properly
2643 	if (ProducerDoneMutex == NULL)
2644 	{
2645 		fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (ProducerDoneMutex)!  Aborting...\n");
2646 		return 1;
2647 	}
2648 	pthread_mutex_init(ProducerDoneMutex, NULL);
2649 
2650 	return 0;
2651 }
2652 
2653 /*
2654  *********************************************************
2655  */
mutexesDelete()2656 void mutexesDelete()
2657 {
2658 	if (OutMutex != NULL)
2659 	{
2660 		pthread_mutex_destroy(OutMutex);
2661 		delete OutMutex;
2662 		OutMutex = NULL;
2663 	}
2664 
2665 	if (ProducerDoneMutex != NULL)
2666 	{
2667 		pthread_mutex_destroy(ProducerDoneMutex);
2668 		delete ProducerDoneMutex;
2669 		ProducerDoneMutex = NULL;
2670 	}
2671 }
2672 
2673 /*
2674  *********************************************************
2675  */
queueInit(int queueSize)2676 queue *queueInit(int queueSize)
2677 {
2678 	queue *q;
2679 
2680 	q = new(std::nothrow) queue;
2681 	if (q == NULL)
2682 		return NULL;
2683 
2684 	q->qData = new(std::nothrow) queue::ElementTypePtr[queueSize];
2685 
2686 	if (q->qData == NULL)
2687 		return NULL;
2688 
2689 	q->size = queueSize;
2690 
2691 	q->clear();
2692 
2693 	q->mut = NULL;
2694 	q->mut = new(std::nothrow) pthread_mutex_t;
2695 	if (q->mut == NULL)
2696 		return NULL;
2697 	pthread_mutex_init(q->mut, NULL);
2698 
2699 	q->notFull = NULL;
2700 	q->notFull = new(std::nothrow) pthread_cond_t;
2701 	if (q->notFull == NULL)
2702 		return NULL;
2703 	pthread_cond_init(q->notFull, NULL);
2704 
2705 	q->notEmpty = NULL;
2706 	q->notEmpty = new(std::nothrow) pthread_cond_t;
2707 	if (q->notEmpty == NULL)
2708 		return NULL;
2709 	pthread_cond_init(q->notEmpty, NULL);
2710 
2711 	q->consumers = NULL;
2712 	q->consumers = new(std::nothrow) pthread_t[queueSize];
2713 	if (q->consumers == NULL)
2714 		return NULL;
2715 
2716 	notTooMuchNumBuffered = NULL;
2717 	notTooMuchNumBuffered = new(std::nothrow) pthread_cond_t;
2718 	if (notTooMuchNumBuffered == NULL)
2719 		return NULL;
2720 	pthread_cond_init(notTooMuchNumBuffered, NULL);
2721 
2722 	return (q);
2723 }
2724 
2725 
2726 /*
2727  *********************************************************
2728  */
queueDelete(queue * q)2729 void queueDelete (queue *q)
2730 {
2731 	if (q == NULL)
2732 		return;
2733 
2734 	if (q->mut != NULL)
2735 	{
2736 		pthread_mutex_destroy(q->mut);
2737 		delete q->mut;
2738 		q->mut = NULL;
2739 	}
2740 
2741 	if (q->notFull != NULL)
2742 	{
2743 		pthread_cond_destroy(q->notFull);
2744 		delete q->notFull;
2745 		q->notFull = NULL;
2746 	}
2747 
2748 	if (q->notEmpty != NULL)
2749 	{
2750 		pthread_cond_destroy(q->notEmpty);
2751 		delete q->notEmpty;
2752 		q->notEmpty = NULL;
2753 	}
2754 
2755     delete [] q->consumers;
2756 	delete [] q->qData;
2757 
2758 	delete q;
2759 	q = NULL;
2760 
2761 	if (notTooMuchNumBuffered != NULL)
2762 	{
2763 		pthread_cond_destroy(notTooMuchNumBuffered);
2764 		delete notTooMuchNumBuffered;
2765 		notTooMuchNumBuffered = NULL;
2766 	}
2767 
2768 	return;
2769 }
2770 
2771 
2772 /**
2773  * Initialize output buffer contents with empty (NULL, 0) blocks
2774  *
2775  * @param size new size of buffer
2776  *
2777  */
outputBufferInit(size_t size)2778 void outputBufferInit(size_t size)
2779 {
2780 	safe_mutex_lock(OutMutex);
2781 
2782 	NextBlockToWrite = 0;
2783 	OutBufferPosToWrite = 0;
2784 	NumBufferedBlocks = 0;
2785 	NumBufferedTailBlocks = 0;
2786 
2787 	outBuff emptyElement;
2788 	emptyElement.buf = NULL;
2789 	emptyElement.bufSize = 0;
2790 
2791 	// Resize and fill-in with empty elements
2792 	OutputBuffer.assign(size, emptyElement);
2793 
2794 	// unlikely to get here since more likely exception will be thrown
2795 	if (OutputBuffer.size() != size)
2796 	{
2797 		fprintf(stderr, "pbzip2: *ERROR: Could not initialize (OutputBuffer); size=%" PRIuMAX "!  Aborting...\n", (uintmax_t)size);
2798 		safe_mutex_unlock(OutMutex);
2799 		exit(1);
2800 	}
2801 
2802 	safe_mutex_unlock(OutMutex);
2803 }
2804 
2805 /**
2806  * Get output buffer index corresponding to the given absolute blockNumber
2807  * (buffer is used in circular mode)
2808  *
2809  * @param blockNum - absolute block number to translate
2810  * @return 0-based Output Buffer index where blockNum data should go
2811  */
getOutputBufferPos(int blockNum)2812 inline size_t getOutputBufferPos(int blockNum)
2813 {
2814 	// calculate output buffer position (used in circular mode)
2815 	size_t outBuffPos = OutBufferPosToWrite + blockNum - NextBlockToWrite;
2816 
2817 	if (outBuffPos >= NumBufferedBlocksMax)
2818 	{
2819 		outBuffPos -= NumBufferedBlocksMax;
2820 	}
2821 
2822 	return outBuffPos;
2823 }
2824 
2825 /**
2826  * Add next element to the given out buffer tail.
2827  *
2828  */
outputBufferSeqAddNext(outBuff * preveElement,outBuff * newElement)2829 outBuff * outputBufferSeqAddNext(outBuff * preveElement, outBuff * newElement)
2830 {
2831 	safe_mutex_lock(OutMutex);
2832 
2833 	while ((NumBufferedTailBlocks >= NumBufferedBlocksMax) &&
2834 			(preveElement->buf != NULL))
2835 	{
2836 		if (syncGetTerminateFlag() != 0)
2837 		{
2838 			#ifdef PBZIP_DEBUG
2839 			fprintf (stderr, "%s: terminating2 - terminateFlag set\n", "consumer");
2840 			#endif
2841 			pthread_mutex_unlock(OutMutex);
2842 			return NULL;
2843 		}
2844 
2845 		if ( (LastGoodBlock != -1) && (LastGoodBlock < newElement->blockNumber) )
2846 		{
2847 			#ifdef PBZIP_DEBUG
2848 			fprintf (stderr, "%s: terminating3 - LastGoodBlock set\n", "consumer");
2849 			#endif
2850 			pthread_mutex_unlock(OutMutex);
2851 			return NULL;
2852 		}
2853 
2854 		#ifdef PBZIP_DEBUG
2855 		fprintf (stderr, "%s/outputBufferSeqAddNext: Throttling from FileWriter backlog: %d\n", "consumer", NumBufferedBlocks);
2856 		#endif
2857 		safe_cond_wait(notTooMuchNumBuffered, OutMutex);
2858 	}
2859 
2860 	preveElement->next = newElement;
2861 
2862 	++NumBufferedTailBlocks;
2863 
2864 	// size_t outBufPos = getOutputBufferPos(newElement->blockNumber);
2865 	if (preveElement->buf == NULL)
2866 	{
2867 		// fileWriter has already consumed the previous block. Let it know
2868 		// for that one early
2869 		safe_cond_signal(&OutBufferHeadNotEmpty);
2870 	}
2871 
2872 	safe_mutex_unlock(OutMutex);
2873 
2874 	return newElement;
2875 }
2876 
2877 /**
2878  * Store an item in OutputBuffer out bin. Synchronization is embedded to protect
2879  * from simultaneous access.
2880  *
2881  * @param elem - output buffer element to add
2882  * @param caller - used for debug purposes (caller function name)
2883  *
2884  * @return pointer to added element on success; NULL - on error
2885  */
outputBufferAdd(const outBuff & element,const char * caller)2886 outBuff * outputBufferAdd(const outBuff & element, const char *caller)
2887 {
2888 	safe_mutex_lock(OutMutex);
2889 
2890 	// wait while blockNum is out of range
2891 	// [NextBlockToWrite, NextBlockToWrite + NumBufferedBlocksMax)
2892 	int dist = element.blockNumber - NumBufferedBlocksMax;
2893 	while (dist >= NextBlockToWrite)
2894 	{
2895 		if (syncGetTerminateFlag() != 0)
2896 		{
2897 			#ifdef PBZIP_DEBUG
2898 			fprintf (stderr, "%s/outputBufferAdd: terminating2 - terminateFlag set\n", caller);
2899 			#endif
2900 			pthread_mutex_unlock(OutMutex);
2901 			return NULL;
2902 		}
2903 
2904 		if ( (LastGoodBlock != -1) && (LastGoodBlock < element.blockNumber) )
2905 		{
2906 			#ifdef PBZIP_DEBUG
2907 			fprintf (stderr, "%s: terminating3 - LastGoodBlock set\n", "consumer");
2908 			#endif
2909 			pthread_mutex_unlock(OutMutex);
2910 			return NULL;
2911 		}
2912 
2913 		#ifdef PBZIP_DEBUG
2914 		fprintf (stderr, "%s: Throttling from FileWriter backlog: %d\n", caller, NumBufferedBlocks);
2915 		#endif
2916 		safe_cond_wait(notTooMuchNumBuffered, OutMutex);
2917 	}
2918 
2919 	// calculate output buffer position (used in circular mode)
2920 	size_t outBuffPos = getOutputBufferPos(element.blockNumber);
2921 
2922 	OutputBuffer[outBuffPos] = element;
2923 	++NumBufferedBlocks;
2924 
2925 	if (NextBlockToWrite == element.blockNumber)
2926 	{
2927 		safe_cond_signal(&OutBufferHeadNotEmpty);
2928 	}
2929 
2930 	safe_mutex_unlock(OutMutex);
2931 
2932 	return &(OutputBuffer[outBuffPos]);
2933 }
2934 
2935 /*
2936  *********************************************************
2937  Much of the code in this function is taken from bzip2.c
2938  */
testBZ2ErrorHandling(int bzerr,BZFILE * bzf,int streamNo)2939 int testBZ2ErrorHandling(int bzerr, BZFILE* bzf, int streamNo)
2940 {
2941 	int bzerr_dummy;
2942 
2943 	BZ2_bzReadClose(&bzerr_dummy, bzf);
2944 	switch (bzerr)
2945 	{
2946 		case BZ_CONFIG_ERROR:
2947 			fprintf(stderr, "pbzip2: *ERROR: Integers are not the right size for libbzip2. Aborting!\n");
2948 			exit(3);
2949 			break;
2950 		case BZ_IO_ERROR:
2951 			fprintf(stderr, "pbzip2: *ERROR: Integers are not the right size for libbzip2. Aborting!\n");
2952 			return 1;
2953 			break;
2954 		case BZ_DATA_ERROR:
2955 			fprintf(stderr, "pbzip2: *ERROR: Data integrity (CRC) error in data!  Skipping...\n");
2956 			return -1;
2957 			break;
2958 		case BZ_MEM_ERROR:
2959 			fprintf(stderr, "pbzip2: *ERROR: Could NOT allocate enough memory. Aborting!\n");
2960 			return 1;
2961 			break;
2962 		case BZ_UNEXPECTED_EOF:
2963 			fprintf(stderr, "pbzip2: *ERROR: File ends unexpectedly!  Skipping...\n");
2964 			return -1;
2965 			break;
2966 		case BZ_DATA_ERROR_MAGIC:
2967 			if (streamNo == 1)
2968 			{
2969 				fprintf(stderr, "pbzip2: *ERROR: Bad magic number (file not created by bzip2)!  Skipping...\n");
2970 				return -1;
2971 			}
2972 			else
2973 			{
2974 				fprintf(stderr, "pbzip2: *WARNING: Trailing garbage after EOF ignored!\n");
2975 				return 0;
2976 			}
2977 		default:
2978 			fprintf(stderr, "pbzip2: *ERROR: Unexpected error. Aborting!\n");
2979 			exit(3);
2980 	}
2981 
2982 	return 0;
2983 }
2984 
2985 /*
2986  *********************************************************
2987  Much of the code in this function is taken from bzip2.c
2988  */
testCompressedData(char * fileName)2989 int testCompressedData(char *fileName)
2990 {
2991 	FILE *zStream = NULL;
2992 	int ret = 0;
2993 
2994 	BZFILE* bzf = NULL;
2995 	unsigned char obuf[5000];
2996 	unsigned char unused[BZ_MAX_UNUSED];
2997 	unsigned char *unusedTmp;
2998 	int bzerr, nread, streamNo;
2999 	int nUnused;
3000 	int i;
3001 
3002 	nUnused = 0;
3003 	streamNo = 0;
3004 
3005 	// see if we are using stdin or not
3006 	if (strcmp(fileName, "-") != 0)
3007 	{
3008 		// open the file for reading
3009 		zStream = fopen(fileName, "rb");
3010 		if (zStream == NULL)
3011 		{
3012 			ErrorContext::getInstance()->saveError();
3013 			handle_error(EF_NOQUIT, -1, "pbzip2: *ERROR: Could not open input file [%s]!  Skipping...\n", fileName);
3014 			return -1;
3015 		}
3016 	}
3017 	else
3018 	{
3019 		zStream = stdin;
3020 	}
3021 
3022 	// check file stream for errors
3023 	if (ferror(zStream))
3024 	{
3025 
3026 		handle_error(EF_NOQUIT, -1, "pbzip2: *ERROR: Problem with stream of file [%s]!  Skipping...\n", fileName);
3027 		if (zStream != stdin)
3028 			verbose_fclose(zStream, fileName);
3029 		return -1;
3030 	}
3031 
3032 	// loop until end of file
3033 	while(true)
3034 	{
3035 		bzf = BZ2_bzReadOpen(&bzerr, zStream, Verbosity, 0, unused, nUnused);
3036 		if (bzf == NULL || bzerr != BZ_OK)
3037 		{
3038 			ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
3039 			if (zStream != stdin)
3040 				verbose_fclose(zStream, fileName);
3041 			return ret;
3042 		}
3043 
3044 		streamNo++;
3045 
3046 		while (bzerr == BZ_OK)
3047 		{
3048 			nread = BZ2_bzRead(&bzerr, bzf, obuf, sizeof(obuf));
3049 			if (bzerr == BZ_DATA_ERROR_MAGIC)
3050 			{
3051 				ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
3052 				if (zStream != stdin)
3053 					verbose_fclose(zStream, fileName);
3054 				return ret;
3055 			}
3056 		}
3057 		if (bzerr != BZ_STREAM_END)
3058 		{
3059 			ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
3060 			if (zStream != stdin)
3061 				verbose_fclose(zStream, fileName);
3062 			return ret;
3063 		}
3064 
3065 		BZ2_bzReadGetUnused(&bzerr, bzf, (void**)(&unusedTmp), &nUnused);
3066 		if (bzerr != BZ_OK)
3067 		{
3068 			fprintf(stderr, "pbzip2: *ERROR: Unexpected error. Aborting!\n");
3069 			exit(3);
3070 		}
3071 
3072 		for (i = 0; i < nUnused; i++)
3073 			unused[i] = unusedTmp[i];
3074 
3075 		BZ2_bzReadClose(&bzerr, bzf);
3076 		if (bzerr != BZ_OK)
3077 		{
3078 			fprintf(stderr, "pbzip2: *ERROR: Unexpected error. Aborting!\n");
3079 			exit(3);
3080 		}
3081 
3082 		// check to see if we are at the end of the file
3083 		if (nUnused == 0)
3084 		{
3085 			int c = fgetc(zStream);
3086 			if (c == EOF)
3087 				break;
3088 			else
3089 				ungetc(c, zStream);
3090 		}
3091 	}
3092 
3093 	// check file stream for errors
3094 	if (ferror(zStream))
3095 	{
3096 		ErrorContext::getInstance()->saveError();
3097 		handle_error(EF_NOQUIT, -1, "pbzip2: *ERROR: Problem with stream of file [%s]!  Skipping...\n", fileName);
3098 		if (zStream != stdin)
3099 			verbose_fclose(zStream, fileName);
3100 		return -1;
3101 	}
3102 
3103 	// close file
3104 	ret = verbose_fclose(zStream, fileName);
3105 	if (ret == EOF)
3106 	{
3107 		fprintf(stderr, "pbzip2: *ERROR: Problem closing file [%s]!  Skipping...\n", fileName);
3108 		return -1;
3109 	}
3110 
3111 	return 0;
3112 }
3113 
3114 /*
3115  *********************************************************
3116  */
getFileMetaData(const char * fileName)3117 int getFileMetaData(const char *fileName)
3118 {
3119 	// get the file meta data and store it in the global structure
3120 	return stat(fileName, &fileMetaData);
3121 }
3122 
3123 /*
3124  *********************************************************
3125  */
writeFileMetaData(const char * fileName)3126 int writeFileMetaData(const char *fileName)
3127 {
3128 	int ret = 0;
3129 	#ifndef WIN32
3130 	struct utimbuf uTimBuf;
3131     #else
3132 	_utimbuf uTimBuf;
3133     #endif
3134 
3135 	// store file times in structure
3136 	uTimBuf.actime = fileMetaData.st_atime;
3137 	uTimBuf.modtime = fileMetaData.st_mtime;
3138 
3139 	// update file with stored file permissions
3140 	ret = chmod(fileName, fileMetaData.st_mode);
3141 	if (ret != 0)
3142 	{
3143 		ErrorContext::getInstance()->saveError();
3144 		return ret;
3145 	}
3146 
3147 	// update file with stored file access and modification times
3148 	ret = utime(fileName, &uTimBuf);
3149 	if (ret != 0)
3150 	{
3151 		ErrorContext::getInstance()->saveError();
3152 		return ret;
3153 	}
3154 
3155 	// update file with stored file ownership (if access allows)
3156 	#ifndef WIN32
3157 	ret = chown(fileName, fileMetaData.st_uid, fileMetaData.st_gid);
3158 	// following may happen on some Linux filesystems (i.e. NTFS)
3159 	// extra error messages do no harm
3160 	if (ret != 0)
3161 	{
3162 		ErrorContext::getInstance()->saveError();
3163 		if (geteuid() == 0)
3164 			return ret;
3165 	}
3166 	#endif
3167 
3168 	return 0;
3169 }
3170 
3171 /*
3172  *********************************************************
3173  */
detectCPUs()3174 int detectCPUs()
3175 {
3176 	int ncpu;
3177 
3178 	// Set default to 1 in case there is no auto-detect
3179 	ncpu = 1;
3180 
3181 	// Autodetect the number of CPUs on a box, if available
3182 	#if defined(__APPLE__)
3183 		size_t len = sizeof(ncpu);
3184 		int mib[2];
3185 		mib[0] = CTL_HW;
3186 		mib[1] = HW_NCPU;
3187 		if (sysctl(mib, 2, &ncpu, &len, 0, 0) < 0 || len != sizeof(ncpu))
3188 			ncpu = 1;
3189 	#elif defined(_SC_NPROCESSORS_ONLN)
3190 		ncpu = sysconf(_SC_NPROCESSORS_ONLN);
3191 	#elif defined(WIN32)
3192 		SYSTEM_INFO si;
3193 		GetSystemInfo(&si);
3194 		ncpu = si.dwNumberOfProcessors;
3195 	#endif
3196 
3197 	// Ensure we have at least one processor to use
3198 	if (ncpu < 1)
3199 		ncpu = 1;
3200 
3201 	return ncpu;
3202 }
3203 
3204 
3205 /*
3206  *********************************************************
3207  */
banner()3208 void banner()
3209 {
3210 	fprintf(stderr, "Parallel BZIP2 v1.1.13 [Dec 18, 2015]\n");
3211 	fprintf(stderr, "By: Jeff Gilchrist [http://compression.ca]\n");
3212 	fprintf(stderr, "Major contributions: Yavor Nikolov [http://javornikolov.wordpress.com]\n");
3213 	fprintf(stderr, "Uses libbzip2 by Julian Seward\n");
3214 
3215 	return;
3216 }
3217 
3218 /*
3219  *********************************************************
3220  */
usage(char * progname,const char * reason)3221 void usage(char* progname, const char *reason)
3222 {
3223 	banner();
3224 
3225 	if (strncmp(reason, "HELP", 4) == 0)
3226 		fprintf(stderr, "\n");
3227 	else
3228 		fprintf(stderr, "\nInvalid command line: %s.  Aborting...\n\n", reason);
3229 
3230 #ifndef PBZIP_NO_LOADAVG
3231 	fprintf(stderr, "Usage: %s [-1 .. -9] [-b#cdfhklm#p#qrS#tVz] <filename> <filename2> <filenameN>\n", progname);
3232 #else
3233 	fprintf(stderr, "Usage: %s [-1 .. -9] [-b#cdfhkm#p#qrS#tVz] <filename> <filename2> <filenameN>\n", progname);
3234 #endif // PBZIP_NO_LOADAVG
3235 	fprintf(stderr, " -1 .. -9        set BWT block size to 100k .. 900k (default 900k)\n");
3236 	fprintf(stderr, " -b#             Block size in 100k steps (default 9 = 900k)\n");
3237 	fprintf(stderr, " -c,--stdout     Output to standard out (stdout)\n");
3238 	fprintf(stderr, " -d,--decompress Decompress file\n");
3239 	fprintf(stderr, " -f,--force      Overwrite existing output file\n");
3240 	fprintf(stderr, " -h,--help       Print this help message\n");
3241 	fprintf(stderr, " -k,--keep       Keep input file, don't delete\n");
3242 #ifndef PBZIP_NO_LOADAVG
3243 	fprintf(stderr, " -l,--loadavg    Load average determines max number processors to use\n");
3244 #endif // PBZIP_NO_LOADAVG
3245 	fprintf(stderr, " -m#             Maximum memory usage in 1MB steps (default 100 = 100MB)\n");
3246 	fprintf(stderr, " -p#             Number of processors to use (default");
3247 #if defined(_SC_NPROCESSORS_ONLN) || defined(__APPLE__)
3248 	fprintf(stderr, ": autodetect [%d])\n", detectCPUs());
3249 #else
3250 	fprintf(stderr, " 2)\n");
3251 #endif // _SC_NPROCESSORS_ONLN || __APPLE__
3252 	fprintf(stderr, " -q,--quiet      Quiet mode (default)\n");
3253 	fprintf(stderr, " -r,--read       Read entire input file into RAM and split between processors\n");
3254 #ifdef USE_STACKSIZE_CUSTOMIZATION
3255 	fprintf(stderr, " -S#             Child thread stack size in 1KB steps (default stack size if unspecified)\n");
3256 #endif // USE_STACKSIZE_CUSTOMIZATION
3257 	fprintf(stderr, " -t,--test       Test compressed file integrity\n");
3258 	fprintf(stderr, " -v,--verbose    Verbose mode\n");
3259 	fprintf(stderr, " -V,--version    Display version info for pbzip2 then exit\n");
3260 	fprintf(stderr, " -z,--compress   Compress file (default)\n");
3261 	fprintf(stderr, " --ignore-trailing-garbage=# Ignore trailing garbage flag (1 - ignored; 0 - forbidden)\n");
3262 	fprintf(stderr, "\n");
3263 	fprintf(stderr, "If no file names are given, pbzip2 compresses or decompresses from standard input to standard output.\n");
3264 	fprintf(stderr, "\n");
3265 	fprintf(stderr, "Example: pbzip2 -b15vk myfile.tar\n");
3266 	fprintf(stderr, "Example: pbzip2 -p4 -r -5 myfile.tar second*.txt\n");
3267 	fprintf(stderr, "Example: tar cf myfile.tar.bz2 --use-compress-prog=pbzip2 dir_to_compress/\n");
3268 	fprintf(stderr, "Example: pbzip2 -d -m500 myfile.tar.bz2\n");
3269 	fprintf(stderr, "Example: pbzip2 -dc myfile.tar.bz2 | tar x\n");
3270 	fprintf(stderr, "Example: pbzip2 -c < myfile.txt > myfile.txt.bz2 \n");
3271 	fprintf(stderr, "\n");
3272 	exit(-1);
3273 }
3274 
3275 /*
3276  *********************************************************
3277  */
main(int argc,char * argv[])3278 int main(int argc, char* argv[])
3279 {
3280 	queue *fifo;
3281 	pthread_t output;
3282 	char **FileList = NULL;
3283 	char *InFilename = NULL;
3284 	bool hasInFile = false;
3285 	char *progName = NULL;
3286 	char *progNamePos = NULL;
3287 	char bz2Header[] = {"BZh91AY&SY"};  // using 900k block size
3288 	std::string outFilename; // [2048];
3289 	char cmdLineTemp[2048];
3290 	unsigned char tmpBuff[50];
3291 	char stdinFile[2] = {"-"};
3292 	struct timeval tvStartTime;
3293 	struct timeval tvStopTime;
3294 	#ifndef WIN32
3295 	struct timezone tz;
3296 	double loadAverage = 0.0;
3297 	double loadAvgArray[3];
3298 	int useLoadAverage = 0;
3299 	int numCPUtotal = 0;
3300 	int numCPUidle = 0;
3301 	#else
3302 	SYSTEMTIME systemtime;
3303 	LARGE_INTEGER filetime;
3304 	LARGE_INTEGER fileSize_temp;
3305 	HANDLE hInfile_temp;
3306 	#endif
3307 	double timeCalc = 0.0;
3308 	double timeStart = 0.0;
3309 	double timeStop = 0.0;
3310 	int cmdLineTempCount = 0;
3311 	int readEntireFile = 0;
3312 	int zeroByteFile = 0;
3313 	int hInfile = -1;
3314 	int hOutfile = -1;
3315 	int numBlocks = 0;
3316 	int blockSize = 9*100000;
3317 	int maxMemory = 100000000;
3318 	int maxMemorySwitch = 0;
3319 	int decompress = 0;
3320 	int compress = 0;
3321 	int testFile = 0;
3322 	int errLevel = 0;
3323 	int noThreads = 0;
3324 	int keep = 0;
3325 	int force = 0;
3326 	int ret = 0;
3327 	int fileLoop;
3328 	size_t i, j, k;
3329 	bool switchedMtToSt = false; // switched from multi- to single-thread
3330 
3331 	// Initialize error context
3332 	if (ErrorContext::getInstance() == NULL)
3333 	{
3334 		return 1;
3335 	}
3336 
3337 	// get current time for benchmark reference
3338 	#ifndef WIN32
3339 	gettimeofday(&tvStartTime, &tz);
3340 	#else
3341 	GetSystemTime(&systemtime);
3342 	SystemTimeToFileTime(&systemtime, (FILETIME *)&filetime);
3343 	tvStartTime.tv_sec = filetime.QuadPart / 10000000;
3344 	tvStartTime.tv_usec = (filetime.QuadPart - (LONGLONG)tvStartTime.tv_sec * 10000000) / 10;
3345 	#endif
3346 
3347 	// check to see if we are likely being called from TAR
3348 	if (argc < 2)
3349 	{
3350 		OutputStdOut = 1;
3351 		keep = 1;
3352 	}
3353 
3354 	// get program name to determine if decompress mode should be used
3355 	progName = argv[0];
3356 	for (progNamePos = argv[0]; progNamePos[0] != '\0'; progNamePos++)
3357 	{
3358 		if (progNamePos[0] == PATH_SEP)
3359 			progName = progNamePos + 1;
3360 	}
3361 	if ((strstr(progName, "unzip") != 0) || (strstr(progName, "UNZIP") != 0))
3362 	{
3363 		decompress = 1;
3364 	}
3365 	if ((strstr(progName, "zcat") != 0) || (strstr(progName, "ZCAT") != 0))
3366 	{
3367 		decompress = OutputStdOut = keep = 1;
3368 	}
3369 
3370 	#ifdef IGNORE_TRAILING_GARBAGE
3371 	// default behavior is hard-coded (still dynamically changeable)
3372 	IgnoreTrailingGarbageFlag = IGNORE_TRAILING_GARBAGE;
3373 	#else
3374 	// default depends on program name
3375 	if ((strcmp(progName, "bzip2") == 0) || (strcmp(progName, "BZIP2") == 0) ||
3376 		(strcmp(progName, "bunzip2") == 0) || (strcmp(progName, "BUNZIP2") == 0) ||
3377 		(strcmp(progName, "bzcat") == 0) || (strcmp(progName, "BZCAT") == 0))
3378 	{
3379 		// Favour traditional non-parallel bzip2 behavior
3380 		IgnoreTrailingGarbageFlag = 1;
3381 	}
3382 	#endif // IGNORE_TRAILING_GARBAGE
3383 
3384 	FileListCount = 0;
3385 	FileList = new(std::nothrow) char *[argc];
3386 	if (FileList == NULL)
3387 	{
3388 		fprintf(stderr, "pbzip2: *ERROR: Not enough memory!  Aborting...\n");
3389 		return 1;
3390 	}
3391 	// set default max memory usage to 100MB
3392 	maxMemory = 100000000;
3393 	NumBufferedBlocksMax = 0;
3394 
3395 	numCPU = detectCPUs();
3396 
3397 	#ifndef WIN32
3398 	numCPUtotal = numCPU;
3399 	#endif
3400 
3401 	// parse command line switches
3402 	for (i=1; (int)i < argc; i++)
3403 	{
3404 		if (argv[i][0] == '-')
3405 		{
3406 			if (argv[i][1] == '\0')
3407 			{
3408 				// support "-" as a filename
3409 				FileList[FileListCount] = argv[i];
3410 				FileListCount++;
3411 				continue;
3412 			}
3413 			else if (argv[i][1] == '-')
3414 			{
3415 				// get command line options with "--"
3416 				if (strcmp(argv[i], "--best") == 0)
3417 				{
3418 					BWTblockSize = 9;
3419 				}
3420 				else if (strcmp(argv[i], "--decompress") == 0)
3421 				{
3422 					decompress = 1;
3423 				}
3424 				else if (strcmp(argv[i], "--compress") == 0)
3425 				{
3426 					compress = 1;
3427 				}
3428 				else if (strcmp(argv[i], "--fast") == 0)
3429 				{
3430 					BWTblockSize = 1;
3431 				}
3432 				else if (strcmp(argv[i], "--force") == 0)
3433 				{
3434 					force = 1; ForceOverwrite = 1;
3435 				}
3436 				else if (strcmp(argv[i], "--help") == 0)
3437 				{
3438 					usage(argv[0], "HELP");
3439 				}
3440 				else if (strcmp(argv[i], "--keep") == 0)
3441 				{
3442 					keep = 1;
3443 				}
3444 				else if (strcmp(argv[i], "--license") == 0)
3445 				{
3446 					usage(argv[0], "HELP");
3447 				}
3448 				#ifndef PBZIP_NO_LOADAVG
3449 				else if (strcmp(argv[i], "--loadavg") == 0)
3450 				{
3451 					useLoadAverage = 1;
3452 				}
3453 				#endif
3454 				else if (strcmp(argv[i], "--quiet") == 0)
3455 				{
3456 					QuietMode = 1;
3457 				}
3458 				else if (strcmp(argv[i], "--read") == 0)
3459 				{
3460 					readEntireFile = 1;
3461 				}
3462 				else if (strcmp(argv[i], "--stdout") == 0)
3463 				{
3464 					OutputStdOut = 1; keep = 1;
3465 				}
3466 				else if (strcmp(argv[i], "--test") == 0)
3467 				{
3468 					testFile = 1;
3469 				}
3470 				else if (strcmp(argv[i], "--verbose") == 0)
3471 				{
3472 					QuietMode = 0;
3473 				}
3474 				else if (strcmp(argv[i], "--version") == 0)
3475 				{
3476 					banner(); exit(0);
3477 				}
3478 				else if (strcmp(argv[i], "--ignore-trailing-garbage") == 0 )
3479 				{
3480 					IgnoreTrailingGarbageFlag = 1;
3481 				}
3482 				else if (strcmp(argv[i], "--ignore-trailing-garbage=1") == 0 )
3483 				{
3484 					IgnoreTrailingGarbageFlag = 1;
3485 				}
3486 				else if (strcmp(argv[i], "--ignore-trailing-garbage=0") == 0 )
3487 				{
3488 					IgnoreTrailingGarbageFlag = 0;
3489 				}
3490 
3491 				continue;
3492 			}
3493 			#ifdef PBZIP_DEBUG
3494 			fprintf(stderr, "argv[%u]: %s   Len: %d\n", (unsigned)i, argv[i], (int)strlen(argv[i]));
3495 			#endif
3496 			// get command line options with single "-"
3497 			// check for multiple switches grouped together
3498 			for (j=1; argv[i][j] != '\0'; j++)
3499 			{
3500 				switch (argv[i][j])
3501 				{
3502 				case 'p': k = j+1; cmdLineTempCount = 0; strcpy(cmdLineTemp, "2");
3503 					while (argv[i][k] != '\0' && k < sizeof(cmdLineTemp))
3504 					{
3505 						// no more numbers, finish
3506 						if ((argv[i][k] < '0') || (argv[i][k] > '9'))
3507 							break;
3508 						k++;
3509 						cmdLineTempCount++;
3510 					}
3511 					if (cmdLineTempCount == 0)
3512 						usage(argv[0], "Cannot parse -p argument");
3513 					strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount);
3514 					cmdLineTemp[cmdLineTempCount] = '\0';
3515 					numCPU = atoi(cmdLineTemp);
3516 					if (numCPU > 4096)
3517 					{
3518 						fprintf(stderr,"pbzip2: *ERROR: Maximal number of supported processors is 4096!  Aborting...\n");
3519 						return 1;
3520 					}
3521 					else if (numCPU < 1)
3522 					{
3523 						fprintf(stderr,"pbzip2: *ERROR: Minimum number of supported processors is 1!  Aborting...\n");
3524 						return 1;
3525 					}
3526 					j += cmdLineTempCount;
3527 					#ifdef PBZIP_DEBUG
3528 					fprintf(stderr, "-p%d\n", numCPU);
3529 					#endif
3530 					break;
3531 				case 'b': k = j+1; cmdLineTempCount = 0; strcpy(cmdLineTemp, "9"); blockSize = 900000;
3532 					while (argv[i][k] != '\0' && k < sizeof(cmdLineTemp))
3533 					{
3534 						// no more numbers, finish
3535 						if ((argv[i][k] < '0') || (argv[i][k] > '9'))
3536 							break;
3537 						k++;
3538 						cmdLineTempCount++;
3539 					}
3540 					if (cmdLineTempCount == 0)
3541 						usage(argv[0], "Cannot parse file block size");
3542 					strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount);
3543 					cmdLineTemp[cmdLineTempCount] = '\0';
3544 					blockSize = atoi(cmdLineTemp)*100000;
3545 					if ((blockSize < 100000) || (blockSize > 1000000000))
3546 					{
3547 						fprintf(stderr,"pbzip2: *ERROR: File block size Min: 100k and Max: 1000000k!  Aborting...\n");
3548 						return 1;
3549 					}
3550 					j += cmdLineTempCount;
3551 					#ifdef PBZIP_DEBUG
3552 					fprintf(stderr, "-b%d\n", blockSize);
3553 					#endif
3554 					break;
3555 				case 'm': k = j+1; cmdLineTempCount = 0; strcpy(cmdLineTemp, "1"); maxMemory = 1000000;
3556 					while (argv[i][k] != '\0' && k < sizeof(cmdLineTemp))
3557 					{
3558 						// no more numbers, finish
3559 						if ((argv[i][k] < '0') || (argv[i][k] > '9'))
3560 							break;
3561 						k++;
3562 						cmdLineTempCount++;
3563 					}
3564 					if (cmdLineTempCount == 0)
3565 						usage(argv[0], "Cannot parse -m argument");
3566 					strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount);
3567 					cmdLineTemp[cmdLineTempCount] = '\0';
3568 					maxMemory = atoi(cmdLineTemp)*1000000;
3569 					if ((maxMemory < 1000000) || (maxMemory > 2000000000))
3570 					{
3571 						fprintf(stderr,"pbzip2: *ERROR: Memory usage size Min: 1MB and Max: 2000MB!  Aborting...\n");
3572 						return 1;
3573 					}
3574 					maxMemorySwitch = 1;
3575 					j += cmdLineTempCount;
3576 					#ifdef PBZIP_DEBUG
3577 					fprintf(stderr, "-m%d\n", maxMemory);
3578 					#endif
3579 					break;
3580 				#ifdef USE_STACKSIZE_CUSTOMIZATION
3581 				case 'S': k = j+1; cmdLineTempCount = 0; strcpy(cmdLineTemp, "0"); ChildThreadStackSize = -1;
3582 					while (argv[i][k] != '\0' && k < sizeof(cmdLineTemp))
3583 					{
3584 						// no more numbers, finish
3585 						if ((argv[i][k] < '0') || (argv[i][k] > '9'))
3586 							break;
3587 						k++;
3588 						cmdLineTempCount++;
3589 					}
3590 					if (cmdLineTempCount == 0)
3591 						usage(argv[0], "Cannot parse -S argument");
3592 					strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount);
3593 					cmdLineTemp[cmdLineTempCount] = '\0';
3594 					ChildThreadStackSize = atoi(cmdLineTemp)*1024;
3595 					if (ChildThreadStackSize < 0)
3596 					{
3597 						fprintf(stderr,"pbzip2: *ERROR: Parsing -S: invalid stack size specified [%d]!  Ignoring...\n",
3598 							 ChildThreadStackSize);
3599 					}
3600 					else if (ChildThreadStackSize < PTHREAD_STACK_MIN)
3601 					{
3602 						fprintf(stderr,"pbzip2: *WARNING: Stack size %d bytes less than minumum - adjusting to %d bytes.\n",
3603 							 ChildThreadStackSize, PTHREAD_STACK_MIN);
3604 						ChildThreadStackSize = PTHREAD_STACK_MIN;
3605 					}
3606 					j += cmdLineTempCount;
3607 					#ifdef PBZIP_DEBUG
3608 					fprintf(stderr, "-S%d\n", ChildThreadStackSize);
3609 					#endif
3610 					break;
3611 				#endif // USE_STACKSIZE_CUSTOMIZATION
3612 				case 'd': decompress = 1; break;
3613 				case 'c': OutputStdOut = 1; keep = 1; break;
3614 				case 'f': force = 1; ForceOverwrite = 1; break;
3615 				case 'h': usage(argv[0], "HELP"); break;
3616 				case 'k': keep = 1; break;
3617 				#ifndef PBZIP_NO_LOADAVG
3618 				case 'l': useLoadAverage = 1; break;
3619 				#endif
3620 				case 'L': banner(); exit(0); break;
3621 				case 'q': QuietMode = 1; break;
3622 				case 'r': readEntireFile = 1; break;
3623 				case 't': testFile = 1; break;
3624 				case 'v': QuietMode = 0; break;
3625 				case 'V': banner(); exit(0); break;
3626 				case 'z': compress = 1; break;
3627 				case '1': BWTblockSize = 1; break;
3628 				case '2': BWTblockSize = 2; break;
3629 				case '3': BWTblockSize = 3; break;
3630 				case '4': BWTblockSize = 4; break;
3631 				case '5': BWTblockSize = 5; break;
3632 				case '6': BWTblockSize = 6; break;
3633 				case '7': BWTblockSize = 7; break;
3634 				case '8': BWTblockSize = 8; break;
3635 				case '9': BWTblockSize = 9; break;
3636 				}
3637 			}
3638 		}
3639 		else
3640 		{
3641 			// add filename to list for processing FileListCount
3642 			FileList[FileListCount] = argv[i];
3643 			FileListCount++;
3644 		}
3645 	} /* for */
3646 
3647 	Bz2HeaderZero[3] = '0' + BWTblockSize;
3648 	bz2Header[3] = Bz2HeaderZero[3];
3649 
3650 	// check to make sure we are not trying to compress and decompress at same time
3651 	if ((compress == 1) && (decompress == 1))
3652 	{
3653 		fprintf(stderr,"pbzip2: *ERROR: Can't compress and uncompress data at same time.  Aborting!\n");
3654 		fprintf(stderr,"pbzip2: For help type: %s -h\n", argv[0]);
3655 		return 1;
3656 	}
3657 
3658 	if (FileListCount == 0)
3659 	{
3660 		if (testFile == 1)
3661 		{
3662 			#ifndef WIN32
3663 			if (isatty(fileno(stdin)))
3664 			#else
3665 			if (_isatty(_fileno(stdin)))
3666 			#endif
3667 			{
3668 					fprintf(stderr,"pbzip2: *ERROR: Won't read compressed data from terminal.  Aborting!\n");
3669 					fprintf(stderr,"pbzip2: For help type: %s -h\n", argv[0]);
3670 					return 1;
3671 			}
3672 			// expecting data from stdin
3673 			FileList[FileListCount] = stdinFile;
3674 			FileListCount++;
3675 		}
3676 		else if (decompress == 1)
3677 		{
3678 			#ifndef WIN32
3679 			if (isatty(fileno(stdin)))
3680 			#else
3681 			if (_isatty(_fileno(stdin)))
3682 			#endif
3683 			{
3684 				fprintf(stderr,"pbzip2: *ERROR: Won't read compressed data from terminal.  Aborting!\n");
3685 				fprintf(stderr,"pbzip2: For help type: %s -h\n", argv[0]);
3686 				return 1;
3687 			}
3688 			// expecting data from stdin via TAR
3689 			OutputStdOut = 1;
3690 			keep = 1;
3691 			FileList[FileListCount] = stdinFile;
3692 			FileListCount++;
3693 		}
3694 		else
3695 		{
3696 			if (OutputStdOut == 0)
3697 			{
3698 				// probably trying to input data from stdin
3699 				if (QuietMode != 1)
3700 					fprintf(stderr,"pbzip2: Assuming input data coming from stdin...\n\n");
3701 
3702 				OutputStdOut = 1;
3703 				keep = 1;
3704 			}
3705 
3706 			#ifndef WIN32
3707 			if (isatty(fileno(stdout)))
3708 			#else
3709 			if (_isatty(_fileno(stdout)))
3710 			#endif
3711 			{
3712 				fprintf(stderr,"pbzip2: *ERROR: Won't write compressed data to terminal.  Aborting!\n");
3713 				fprintf(stderr,"pbzip2: For help type: %s -h\n", argv[0]);
3714 				return 1;
3715 			}
3716 			// expecting data from stdin
3717 			FileList[FileListCount] = stdinFile;
3718 			FileListCount++;
3719 		}
3720 	}
3721 
3722 	if (QuietMode != 1)
3723 	{
3724 		// display program banner
3725 		banner();
3726 
3727 		// do sanity check to make sure integers are the size we expect
3728 		#ifdef PBZIP_DEBUG
3729 		fprintf(stderr, "off_t size: %u    uint size: %u\n", (unsigned)sizeof(OFF_T), (unsigned)sizeof(unsigned int));
3730 		#endif
3731 		if (sizeof(OFF_T) <= 4)
3732 		{
3733 			fprintf(stderr, "\npbzip2: *WARNING: off_t variable size only %u bits!\n", (unsigned)(sizeof(OFF_T)*CHAR_BIT));
3734 			if (decompress == 1)
3735 				fprintf(stderr, " You will only able to uncompress files smaller than 2GB in size.\n\n");
3736 			else
3737 				fprintf(stderr, " You will only able to compress files smaller than 2GB in size.\n\n");
3738 		}
3739 	}
3740 
3741 	// Calculate number of processors to use based on load average if requested
3742 	#ifndef PBZIP_NO_LOADAVG
3743 	if (useLoadAverage == 1)
3744 	{
3745 		// get current load average
3746 		ret = getloadavg(loadAvgArray, 3);
3747 		if (ret != 3)
3748 		{
3749 			loadAverage = 0.0;
3750 			useLoadAverage = 0;
3751 			if (QuietMode != 1)
3752 				fprintf(stderr, "pbzip2:  *WARNING: Could not get load average!  Using requested processors...\n");
3753 		}
3754 		else
3755 		{
3756 			#ifdef PBZIP_DEBUG
3757 			fprintf(stderr, "Load Avg1: %f  Avg5: %f  Avg15: %f\n", loadAvgArray[0], loadAvgArray[1], loadAvgArray[2]);
3758 			#endif
3759 			// use 1 min load average to adjust number of processors used
3760 			loadAverage = loadAvgArray[0]; // use [1] for 5 min average and [2] for 15 min average
3761 			// total number processors minus load average rounded up
3762 			numCPUidle = numCPUtotal - (int)(loadAverage + 0.5);
3763 			// if user asked for a specific # processors and they are idle, use all requested
3764 			// otherwise give them whatever idle processors are available
3765 			if (numCPUidle < numCPU)
3766 				numCPU = numCPUidle;
3767 			if (numCPU < 1)
3768 				numCPU = 1;
3769 		}
3770 	}
3771 	#endif
3772 
3773 	// Initialize child threads attributes
3774 	initChildThreadAttributes();
3775 
3776 	// setup signal handling (should be before creating any child thread)
3777 	sigInFilename = NULL;
3778 	sigOutFilename = NULL;
3779 	ret = setupSignalHandling();
3780 	if (ret != 0)
3781 	{
3782 		fprintf(stderr, "pbzip2: *ERROR: Can't setup signal handling [%d]. Aborting!\n", ret);
3783 		return 1;
3784 	}
3785 
3786 	// Create and start terminator thread.
3787 	ret = setupTerminator();
3788 	if (ret != 0)
3789 	{
3790 		fprintf(stderr, "pbzip2: *ERROR: Can't setup terminator thread [%d]. Aborting!\n", ret);
3791 		return 1;
3792 	}
3793 
3794 	if (numCPU < 1)
3795 		numCPU = 1;
3796 
3797 	// display global settings
3798 	if (QuietMode != 1)
3799 	{
3800 		if (testFile != 1)
3801 		{
3802 			fprintf(stderr, "\n         # CPUs: %d\n", numCPU);
3803 			#ifndef PBZIP_NO_LOADAVG
3804 			if (useLoadAverage == 1)
3805 				fprintf(stderr, "   Load Average: %.2f\n", loadAverage);
3806 			#endif
3807 			if (decompress != 1)
3808 			{
3809 				fprintf(stderr, " BWT Block Size: %d00 KB\n", BWTblockSize);
3810 				if (blockSize < 100000)
3811 					fprintf(stderr, "File Block Size: %d bytes\n", blockSize);
3812 				else
3813 					fprintf(stderr, "File Block Size: %d KB\n", blockSize/1000);
3814 			}
3815 			fprintf(stderr, " Maximum Memory: %d MB\n", maxMemory/1000000);
3816 			#ifdef USE_STACKSIZE_CUSTOMIZATION
3817 				if (ChildThreadStackSize > 0)
3818 					fprintf(stderr, "     Stack Size: %d KB\n", ChildThreadStackSize/1024);
3819 			#endif
3820 
3821 			if (decompress == 1)
3822 			{
3823 				fprintf(stderr, " Ignore Trailing Garbage: %s\n",
3824 					 (IgnoreTrailingGarbageFlag == 1) ? "on" : "off" );
3825 			}
3826 		}
3827 		fprintf(stderr, "-------------------------------------------\n");
3828 	}
3829 
3830 	int mutexesInitRet = mutexesInit();
3831 	if ( mutexesInitRet != 0 )
3832 	{
3833 		return mutexesInitRet;
3834 	}
3835 
3836 	// create queue
3837 	fifo = FifoQueue = queueInit(numCPU);
3838 	if (fifo == NULL)
3839 	{
3840 		fprintf (stderr, "pbzip2: *ERROR: Queue Init failed.  Aborting...\n");
3841 		return 1;
3842 	}
3843 
3844 	// process all files
3845 	for (fileLoop=0; fileLoop < FileListCount; fileLoop++)
3846 	{
3847 		producerDone = 0;
3848 		InFileSize = 0;
3849 		NumBlocks = 0;
3850 		switchedMtToSt = false;
3851 		int errLevelCurrentFile = 0;
3852 
3853 		ErrorContext::getInstance()->reset();
3854 
3855 		// set input filename
3856 		InFilename = FileList[fileLoop];
3857 		hasInFile = (strcmp(InFilename, "-") != 0);
3858 
3859 		// test file for errors if requested
3860 		if (testFile != 0)
3861 		{
3862 			if (QuietMode != 1)
3863 			{
3864 				fprintf(stderr, "      File #: %d of %d\n", fileLoop+1, FileListCount);
3865 				if (hasInFile)
3866 					fprintf(stderr, "     Testing: %s\n", InFilename);
3867 				else
3868 					fprintf(stderr, "     Testing: <stdin>\n");
3869 			}
3870 			ret = testCompressedData(InFilename);
3871 			if (ret > 0)
3872 				return ret;
3873 			else if (ret == 0)
3874 			{
3875 				if (QuietMode != 1)
3876 					fprintf(stderr, "        Test: OK\n");
3877 			}
3878 			else
3879 				errLevel = 2;
3880 
3881 			if (QuietMode != 1)
3882 				fprintf(stderr, "-------------------------------------------\n");
3883 			continue;
3884 		}
3885 
3886 		// set ouput filename
3887 		outFilename = std::string(FileList[fileLoop]);
3888 		if ((decompress == 1) && hasInFile)
3889 		{
3890 			// check if input file is a valid .bz2 compressed file
3891 			hInfile = open(InFilename, O_RDONLY | O_BINARY);
3892 			// check to see if file exists before processing
3893 			if (hInfile == -1)
3894 			{
3895 				ErrorContext::printErrnoMsg(stderr, errno);
3896 				fprintf(stderr, "pbzip2: *ERROR: File [%s] NOT found!  Skipping...\n", InFilename);
3897 				fprintf(stderr, "-------------------------------------------\n");
3898 				errLevel = 1;
3899 				continue;
3900 			}
3901 			memset(tmpBuff, 0, sizeof(tmpBuff));
3902 			size_t size = do_read(hInfile, tmpBuff, strlen(bz2Header)+1);
3903 			do_close(hInfile);
3904 			if ((size == (size_t)(-1)) || (size < strlen(bz2Header)+1))
3905 			{
3906 				ErrorContext::getInstance()->printErrorMessages(stderr);
3907 				fprintf(stderr, "pbzip2: *ERROR: File [%s] is NOT a valid bzip2!  Skipping...\n", InFilename);
3908 				fprintf(stderr, "-------------------------------------------\n");
3909 				errLevel = 1;
3910 				continue;
3911 			}
3912 			else
3913 			{
3914 				// make sure start of file has valid bzip2 header
3915 				if (memstr(tmpBuff, 4, bz2Header, 3) == NULL)
3916 				{
3917 					fprintf(stderr, "pbzip2: *ERROR: File [%s] is NOT a valid bzip2!  Skipping...\n", InFilename);
3918 					fprintf(stderr, "-------------------------------------------\n");
3919 					errLevel = 1;
3920 					continue;
3921 				}
3922 				// skip 4th char which differs depending on BWT block size used
3923 				if (memstr(tmpBuff+4, size-4, bz2Header+4, strlen(bz2Header)-4) == NULL)
3924 				{
3925 					// check to see if this is a special 0 byte file
3926 					if (memstr(tmpBuff+4, size-4, Bz2HeaderZero+4, strlen(bz2Header)-4) == NULL)
3927 					{
3928 						fprintf(stderr, "pbzip2: *ERROR: File [%s] is NOT a valid bzip2!  Skipping...\n", InFilename);
3929 						fprintf(stderr, "-------------------------------------------\n");
3930 						errLevel = 1;
3931 						continue;
3932 					}
3933 					#ifdef PBZIP_DEBUG
3934 					fprintf(stderr, "** ZERO byte compressed file detected\n");
3935 					#endif
3936 				}
3937 				// set block size for decompression
3938 				if ((tmpBuff[3] >= '1') && (tmpBuff[3] <= '9'))
3939 					BWTblockSizeChar = tmpBuff[3];
3940 				else
3941 				{
3942 					fprintf(stderr, "pbzip2: *ERROR: File [%s] is NOT a valid bzip2!  Skipping...\n", InFilename);
3943 					fprintf(stderr, "-------------------------------------------\n");
3944 					errLevel = 1;
3945 					continue;
3946 				}
3947 			}
3948 
3949 			// check if filename ends with .bz2
3950 			std::string bz2Tail(".bz2");
3951 			std::string tbz2Tail(".tbz2");
3952 			if ( ends_with_icase(outFilename, bz2Tail) )
3953 			{
3954 				// remove .bz2 extension
3955 				outFilename.resize( outFilename.size() - bz2Tail.size() );
3956 			}
3957 			else if ( ends_with_icase(outFilename, tbz2Tail) )
3958 			{
3959 				outFilename.resize( outFilename.size() - tbz2Tail.size() );
3960 				outFilename += ".tar";
3961 			}
3962 			else
3963 			{
3964 				// add .out extension so we don't overwrite original file
3965 				outFilename += ".out";
3966 			}
3967 		} // decompress == 1
3968 		else
3969 		{
3970 			// check input file to make sure its not already a .bz2 file
3971 			std::string bz2Tail(".bz2");
3972 			if ( ends_with_icase(std::string(InFilename), bz2Tail) )
3973 			{
3974 				fprintf(stderr, "pbzip2: *ERROR: Input file [%s] already has a .bz2 extension!  Skipping...\n", InFilename);
3975 				fprintf(stderr, "-------------------------------------------\n");
3976 				errLevel = 1;
3977 				continue;
3978 			}
3979 			outFilename += bz2Tail;
3980 		}
3981 
3982 		// setup signal handling filenames
3983 		safe_mutex_lock(&ErrorHandlerMutex);
3984 		sigInFilename = InFilename;
3985 		sigOutFilename = outFilename.c_str();
3986 		safe_mutex_unlock(&ErrorHandlerMutex);
3987 
3988 		if (hasInFile)
3989 		{
3990 			struct stat statbuf;
3991 			// read file for compression
3992 			hInfile = open(InFilename, O_RDONLY | O_BINARY);
3993 			// check to see if file exists before processing
3994 			if (hInfile == -1)
3995 			{
3996 				fprintf(stderr, "pbzip2: *ERROR: File [%s] NOT found!  Skipping...\n", InFilename);
3997 				fprintf(stderr, "-------------------------------------------\n");
3998 				errLevel = 1;
3999 				continue;
4000 			}
4001 
4002 			// get some information about the file
4003 			fstat(hInfile, &statbuf);
4004 			// check to make input is not a directory
4005 			if (S_ISDIR(statbuf.st_mode))
4006 			{
4007 				fprintf(stderr, "pbzip2: *ERROR: File [%s] is a directory!  Skipping...\n", InFilename);
4008 				fprintf(stderr, "-------------------------------------------\n");
4009 				errLevel = 1;
4010 				continue;
4011 			}
4012 			// check to make sure input is a regular file
4013 			if (!S_ISREG(statbuf.st_mode))
4014 			{
4015 				fprintf(stderr, "pbzip2: *ERROR: File [%s] is not a regular file!  Skipping...\n", InFilename);
4016 				fprintf(stderr, "-------------------------------------------\n");
4017 				errLevel = 1;
4018 				continue;
4019 			}
4020 			// get size of file
4021 			#ifndef WIN32
4022 			InFileSize = statbuf.st_size;
4023 			#else
4024 			fileSize_temp.LowPart = GetFileSize((HANDLE)_get_osfhandle(hInfile), (unsigned long *)&fileSize_temp.HighPart);
4025 			InFileSize = fileSize_temp.QuadPart;
4026 			#endif
4027 			// don't process a 0 byte file
4028 			if (InFileSize == 0)
4029 			{
4030 				if (decompress == 1)
4031 				{
4032 					fprintf(stderr, "pbzip2: *ERROR: File is of size 0 [%s]!  Skipping...\n", InFilename);
4033 					fprintf(stderr, "-------------------------------------------\n");
4034 					errLevel = 1;
4035 					continue;
4036 				}
4037 
4038 				// make sure we handle zero byte files specially
4039 				zeroByteFile = 1;
4040 			}
4041 			else
4042 			{
4043 				zeroByteFile = 0;
4044 			}
4045 
4046 			// get file meta data to write to output file
4047 			if (getFileMetaData(InFilename) != 0)
4048 			{
4049 				fprintf(stderr, "pbzip2: *ERROR: Could not get file meta data from [%s]!  Skipping...\n", InFilename);
4050 				fprintf(stderr, "-------------------------------------------\n");
4051 				errLevel = 1;
4052 				continue;
4053 			}
4054 		}
4055 		else
4056 		{
4057 			hInfile = STDIN_FILENO; // stdin
4058 			InFileSize = -1; // fake it
4059 		}
4060 
4061 		// check to see if output file exists
4062 		if ((OutputStdOut == 0) && check_file_exists(outFilename.c_str()))
4063 		{
4064 			if (force != 1)
4065 			{
4066 				fprintf(stderr, "pbzip2: *ERROR: Output file [%s] already exists!  Use -f to overwrite...\n", outFilename.c_str());
4067 				fprintf(stderr, "-------------------------------------------\n");
4068 				errLevel = 1;
4069 				continue;
4070 			}
4071 			else
4072 			{
4073 				remove(outFilename.c_str());
4074 			}
4075 		}
4076 
4077 		if (readEntireFile == 1)
4078 		{
4079 			if (hInfile == STDIN_FILENO)
4080 			{
4081 				if (QuietMode != 1)
4082 					fprintf(stderr, " *Warning: Ignoring -r switch since input is stdin.\n");
4083 			}
4084 			else
4085 			{
4086 				// determine block size to try and spread data equally over # CPUs
4087 				blockSize = InFileSize / numCPU;
4088 			}
4089 		}
4090 
4091 		// display per file settings
4092 		if (QuietMode != 1)
4093 		{
4094 			fprintf(stderr, "         File #: %d of %d\n", fileLoop+1, FileListCount);
4095 			fprintf(stderr, "     Input Name: %s\n", hInfile != STDIN_FILENO ? InFilename : "<stdin>");
4096 
4097 			if (OutputStdOut == 0)
4098 				fprintf(stderr, "    Output Name: %s\n\n", outFilename.c_str());
4099 			else
4100 				fprintf(stderr, "    Output Name: <stdout>\n\n");
4101 
4102 			if (decompress == 1)
4103 				fprintf(stderr, " BWT Block Size: %c00k\n", BWTblockSizeChar);
4104 			if (hasInFile)
4105 				fprintf(stderr, "     Input Size: %" PRIuMAX " bytes\n", (uintmax_t)InFileSize);
4106 		}
4107 
4108 		if (decompress == 1)
4109 		{
4110 			numBlocks = 0;
4111 			// Do not use threads if we only have 1 CPU or small files
4112 			if ((numCPU == 1) || (InFileSize < 1000000))
4113 				noThreads = 1;
4114 			else
4115 				noThreads = 0;
4116 
4117 			// Enable threads method for uncompressing from stdin
4118 			if ((numCPU > 1) && !hasInFile)
4119 				noThreads = 0;
4120 		} // if (decompress == 1)
4121 		else
4122 		{
4123 			if (InFileSize > 0)
4124 			{
4125 				// calculate the # of blocks of data
4126 				numBlocks = (InFileSize + blockSize - 1) / blockSize;
4127 				// Do not use threads for small files where we only have 1 block to process
4128 				// or if we only have 1 CPU
4129 				if ((numBlocks == 1) || (numCPU == 1))
4130 					noThreads = 1;
4131 				else
4132 					noThreads = 0;
4133 			}
4134 			else
4135 			{
4136 				// Simulate a "big" number of buffers. Will need to resize it later
4137 				numBlocks = 10000;
4138 			}
4139 
4140 			// write special compressed data for special 0 byte input file case
4141 			if (zeroByteFile == 1)
4142 			{
4143 				hOutfile = STDOUT_FILENO;
4144 				// write to file instead of stdout
4145 				if (OutputStdOut == 0)
4146 				{
4147 					hOutfile = safe_open_output(outFilename.c_str());
4148 					// check to see if file creation was successful
4149 					if (hOutfile == -1)
4150 					{
4151 						handle_error(EF_EXIT, 1,
4152 							"pbzip2: *ERROR: Could not create output file [%s]!\n", outFilename.c_str());
4153 						errLevelCurrentFile = errLevel = 1;
4154 						break;
4155 					}
4156 				}
4157 				// write data to the output file
4158 				ret = do_write(hOutfile, Bz2HeaderZero, sizeof(Bz2HeaderZero));
4159 				int close_ret = 0;
4160 				if (OutputStdOut == 0)
4161 				{
4162 					close_ret = do_close(hOutfile);
4163 					// write store file meta data to output file
4164 					if (writeFileMetaData(outFilename.c_str()) != 0)
4165 					{
4166 						handle_error(EF_NOQUIT, -1,
4167 							"pbzip2: *ERROR: Could not write file meta data to [%s]!\n", outFilename.c_str());
4168 					}
4169 				}
4170 				if ( (ret != sizeof(Bz2HeaderZero)) || (close_ret == -1) )
4171 				{
4172 					handle_error(EF_EXIT, 1,
4173 						"pbzip2: *ERROR: Could not write to file [%s]! Aborting...\n", outFilename.c_str());
4174 					fprintf(stderr, "-------------------------------------------\n");
4175 					errLevelCurrentFile = errLevel = 1;
4176 					break;
4177 				}
4178 				if (QuietMode != 1)
4179 				{
4180 					fprintf(stderr, "    Output Size: %u bytes\n", (unsigned)sizeof(Bz2HeaderZero));
4181 					fprintf(stderr, "-------------------------------------------\n");
4182 				}
4183 				// remove input file unless requested not to by user or error occurred
4184 				if ( (keep != 1) && (errLevelCurrentFile == 0) )
4185 				{
4186 					struct stat statbuf;
4187 					// only remove input file if output file exists
4188 					bool removeFlag =
4189 							(OutputStdOut != 0) ||
4190 							(stat(outFilename.c_str(), &statbuf) == 0);
4191 
4192 					if (removeFlag)
4193 					{
4194 						if (do_remove(InFilename) == -1)
4195 						{
4196 							handle_error(EF_NOQUIT, 1, "Can't remove input file [%s]!", InFilename);
4197 						}
4198 					}
4199 				}
4200 				continue;
4201 			} // if (zeroByteFile == 1)
4202 		} // else (decompress == 1)
4203 		#ifdef PBZIP_DEBUG
4204 		fprintf(stderr, "# Blocks: %d\n", numBlocks);
4205 		#endif
4206 		// set global variable
4207 		NumBlocksEstimated = numBlocks;
4208 		// Calculate maximum number of buffered blocks to use
4209 		NumBufferedBlocksMax = maxMemory / blockSize;
4210 		// Subtract blocks for number of extra buffers in producer and fileWriter (~ numCPU for each)
4211 		if ((int)NumBufferedBlocksMax - (numCPU * 2) < 1)
4212 			NumBufferedBlocksMax = 1;
4213 		else
4214 			NumBufferedBlocksMax = NumBufferedBlocksMax - (numCPU * 2);
4215 		#ifdef PBZIP_DEBUG
4216 		fprintf(stderr, "pbzip2: maxMemory: %d    blockSize: %d\n", maxMemory, blockSize);
4217 		fprintf(stderr, "pbzip2: NumBufferedBlocksMax: %" PRIuMAX "\n", (uintmax_t)NumBufferedBlocksMax);
4218 		#endif
4219 		// check to see if our max buffered blocks is less than numCPU, if yes increase maxMemory
4220 		// to support numCPU requested unless -m switch given by user
4221 		if (NumBufferedBlocksMax < (size_t)numCPU)
4222 		{
4223 			if (maxMemorySwitch == 0)
4224 			{
4225 				NumBufferedBlocksMax = numCPU;
4226 				if (QuietMode != 1)
4227 					fprintf(stderr, "*Warning* Max memory limit increased to %" PRIuMAX " MB to support %d CPUs\n", (uintmax_t)((NumBufferedBlocksMax + (numCPU * 2)) * blockSize)/1000000, numCPU);
4228 			}
4229 			else
4230 			{
4231 				if (QuietMode != 1)
4232 					fprintf(stderr, "*Warning* CPU usage and performance may be suboptimal due to max memory limit.\n");
4233 			}
4234 		}
4235 
4236 		LastGoodBlock = -1;
4237 		MinErrorBlock = -1;
4238 
4239 		// create output buffer
4240 		outputBufferInit(NumBufferedBlocksMax);
4241 
4242 		if (decompress == 1)
4243 		{
4244 			// use multi-threaded code
4245 			if (noThreads == 0)
4246 			{
4247 				// do decompression
4248 				if (QuietMode != 1)
4249 					fprintf(stderr, "Decompressing data...\n");
4250 				for (i=0; (int)i < numCPU; i++)
4251 				{
4252 					ret = pthread_create(&fifo->consumers[i], &ChildThreadAttributes, consumer_decompress, fifo);
4253 					if (ret != 0)
4254 					{
4255 						ErrorContext::getInstance()->saveError();
4256 						handle_error(EF_EXIT, 1, "pbzip2: *ERROR: Not enough resources to create consumer thread #%u (code = %d)  Aborting...\n", i, ret);
4257 						ret = pthread_join(TerminatorThread, NULL);
4258 						return 1;
4259 					}
4260 				}
4261 
4262 				ret = pthread_create(&output, &ChildThreadAttributes, fileWriter, (void*)outFilename.c_str());
4263 				if (ret != 0)
4264 				{
4265 					handle_error(EF_EXIT, 1,
4266 							"pbzip2: *ERROR: Not enough resources to create fileWriter thread (code = %d)  Aborting...\n", ret);
4267 					ret = pthread_join(TerminatorThread, NULL);
4268 					return 1;
4269 				}
4270 
4271 				// start reading in data for decompression
4272 				ret = producer_decompress(hInfile, InFileSize, fifo);
4273 				if (ret == -99)
4274 				{
4275 					// only 1 block detected, use single threaded code to decompress
4276 					noThreads = 1;
4277 
4278 					switchedMtToSt = true;
4279 
4280 					// wait for fileWriter thread to exit
4281 					if (pthread_join(output, NULL) != 0)
4282 					{
4283 						ErrorContext::getInstance()->saveError();
4284 						handle_error(EF_EXIT, 1,
4285 								"pbzip2: *ERROR: Error joining fileWriter thread (code = %d)  Aborting...\n", ret);
4286 						errLevelCurrentFile = errLevel = 1;
4287 						ret = pthread_join(TerminatorThread, NULL);
4288 						return 1;
4289 					}
4290 				}
4291 				else if (ret != 0)
4292 				{
4293 					errLevelCurrentFile = errLevel = 1;
4294 				}
4295 			}
4296 
4297 			// use single threaded code
4298 			if ((noThreads == 1) && (errLevelCurrentFile == 0))
4299 			{
4300 				if (QuietMode != 1)
4301 					fprintf(stderr, "Decompressing data (no threads)...\n");
4302 
4303 				if (hInfile > 0)
4304 					close(hInfile);
4305 				ret = directdecompress(InFilename, outFilename.c_str());
4306 				if (ret != 0)
4307 				{
4308 					errLevelCurrentFile = errLevel = 1;
4309 				}
4310 			}
4311 		} // if (decompress == 1)
4312 		else
4313 		{
4314 			// do compression code
4315 
4316 			// use multi-threaded code
4317 			if (noThreads == 0)
4318 			{
4319 				if (QuietMode != 1)
4320 					fprintf(stderr, "Compressing data...\n");
4321 
4322 				for (i=0; (int)i < numCPU; i++)
4323 				{
4324 					ret = pthread_create(&fifo->consumers[i], &ChildThreadAttributes, consumer, fifo);
4325 					if (ret != 0)
4326 					{
4327 						ErrorContext::getInstance()->saveError();
4328 						handle_error(EF_EXIT, 1,
4329 									 "pbzip2: *ERROR: Not enough resources to create consumer thread #%u (code = %d)  Aborting...\n", i, ret);
4330 						pthread_join(TerminatorThread, NULL);
4331 						return 1;
4332 					}
4333 				}
4334 
4335 				ret = pthread_create(&output, &ChildThreadAttributes, fileWriter, (void*)outFilename.c_str());
4336 				if (ret != 0)
4337 				{
4338 					handle_error(EF_EXIT, 1,
4339 							"pbzip2: *ERROR: Not enough resources to create fileWriter thread (code = %d)  Aborting...\n", ret);
4340 					pthread_join(TerminatorThread, NULL);
4341 					return 1;
4342 				}
4343 
4344 				// start reading in data for compression
4345 				ret = producer(hInfile, blockSize, fifo);
4346 				if (ret != 0)
4347 					errLevelCurrentFile = errLevel = 1;
4348 			}
4349 			else
4350 			{
4351 				// do not use threads for compression
4352 				if (QuietMode != 1)
4353 					fprintf(stderr, "Compressing data (no threads)...\n");
4354 
4355 				ret = directcompress(hInfile, InFileSize, blockSize, outFilename.c_str());
4356 				if (ret != 0)
4357 					errLevelCurrentFile = errLevel = 1;
4358 			}
4359 		} // else
4360 
4361 		if (noThreads == 0)
4362 		{
4363 			// wait for fileWriter thread to exit
4364 			ret = pthread_join(output, NULL);
4365 			if (ret != 0)
4366 			{
4367 				ErrorContext::printErrnoMsg(stderr, errno);
4368 				errLevelCurrentFile = errLevel = 1;
4369 			}
4370 		}
4371 
4372 		if ((noThreads == 0) || switchedMtToSt )
4373 		{
4374 			// wait for consumer threads to exit
4375 			for (i = 0; (int)i < numCPU; i++)
4376 			{
4377 				ret = pthread_join(fifo->consumers[i], NULL);
4378 				if (ret != 0)
4379 				{
4380 					ErrorContext::printErrnoMsg(stderr, errno);
4381 					errLevelCurrentFile = errLevel = 1;
4382 				}
4383 			}
4384 		}
4385 
4386 		if (syncGetTerminateFlag() != 0)
4387 		{
4388 			errLevelCurrentFile = errLevel = 1;
4389 		}
4390 
4391 		if (OutputStdOut == 0)
4392 		{
4393 			// write store file meta data to output file
4394 			if (writeFileMetaData(outFilename.c_str()) != 0)
4395 			{
4396 				handle_error(EF_NOQUIT, -1,
4397 					"pbzip2: *ERROR: Could not write file meta data to [%s]!\n", outFilename.c_str());
4398 			}
4399 		}
4400 
4401 		// remove input file unless requested not to by user or error occurred
4402 		if ( (keep != 1) && (errLevelCurrentFile == 0) )
4403 		{
4404 			struct stat statbuf;
4405 			// only remove input file if output file exists
4406 			bool removeFlag =
4407 					(OutputStdOut != 0) ||
4408 					(stat(outFilename.c_str(), &statbuf) == 0);
4409 
4410 			if (removeFlag)
4411 			{
4412 				if (do_remove(InFilename) == -1)
4413 				{
4414 					handle_error(EF_NOQUIT, 1, "Can't remove input file [%s]!", InFilename);
4415 				}
4416 			}
4417 		}
4418 
4419 		// reclaim memory
4420 		OutputBuffer.clear();
4421 		fifo->clear();
4422 
4423 		if ( (errLevelCurrentFile == 0) && (syncGetTerminateFlag() == 0) )
4424 		{
4425 			// finished processing file (mutex since accessed by cleanup procedure)
4426 			safe_mutex_lock(&ErrorHandlerMutex);
4427 			sigInFilename = NULL;
4428 			sigOutFilename = NULL;
4429 			safe_mutex_unlock(&ErrorHandlerMutex);
4430 		}
4431 
4432 		if (errLevelCurrentFile == 1)
4433 		{
4434 			syncSetTerminateFlag(1);
4435 			break;
4436 		}
4437 
4438 		if (QuietMode != 1)
4439 			fprintf(stderr, "-------------------------------------------\n");
4440 	} /* for */
4441 
4442 	// Explicit close on stdout if we've been writing there, after all input has been processed
4443 	if (OutputStdOut == 1)
4444 	{
4445 		ret = close(STDOUT_FILENO);
4446 		if (ret == -1)
4447 		{
4448 			ErrorContext::getInstance()->saveError();
4449 			handle_error(EF_EXIT, 1, "pbzip2: *ERROR: Failed to close STDOUT! Aborting...\n");
4450 			exit(1);
4451 		}
4452 	}
4453 
4454 	// Terminate signal handler thread sending SIGQUIT signal
4455 	ret = pthread_kill(SignalHandlerThread, SIG_HANDLER_QUIT_SIGNAL);
4456 	if (ret != 0)
4457 	{
4458 		fprintf(stderr, "Couldn't signal signal QUIT to SignalHandlerThread [%d]. Quitting prematurely!\n", ret);
4459 		exit(errLevel);
4460 	}
4461 	else
4462 	{
4463 		ret = pthread_join(SignalHandlerThread, NULL);
4464 		if (ret != 0)
4465 		{
4466 			fprintf(stderr, "Error on join of SignalHandlerThread [%d]\n", ret);
4467 		}
4468 	}
4469 
4470 	if (syncGetTerminateFlag() == 0)
4471 	{
4472 		syncSetFinishedFlag(1);
4473 	}
4474 
4475 	ret = pthread_join(TerminatorThread, NULL);
4476 	if (ret != 0)
4477 	{
4478 		fprintf(stderr, "Error on join of TerminatorThread [%d]\n", ret);
4479 	}
4480 
4481 	// reclaim memory
4482 	queueDelete(fifo);
4483 	mutexesDelete();
4484 	disposeMemory(FileList);
4485 
4486 	// get current time for end of benchmark
4487 	#ifndef WIN32
4488 	gettimeofday(&tvStopTime, &tz);
4489 	#else
4490 	GetSystemTime(&systemtime);
4491 	SystemTimeToFileTime(&systemtime, (FILETIME *)&filetime);
4492 	tvStopTime.tv_sec = filetime.QuadPart / 10000000;
4493 	tvStopTime.tv_usec = (filetime.QuadPart - (LONGLONG)tvStopTime.tv_sec * 10000000) / 10;
4494 	#endif
4495 
4496 	#ifdef PBZIP_DEBUG
4497 	fprintf(stderr, "\n Start Time: %ld + %ld\n", tvStartTime.tv_sec, tvStartTime.tv_usec);
4498 	fprintf(stderr, " Stop Time : %ld + %ld\n", tvStopTime.tv_sec, tvStopTime.tv_usec);
4499 	#endif
4500 
4501 	// convert time structure to real numbers
4502 	timeStart = (double)tvStartTime.tv_sec + ((double)tvStartTime.tv_usec / 1000000);
4503 	timeStop = (double)tvStopTime.tv_sec + ((double)tvStopTime.tv_usec / 1000000);
4504 	timeCalc = timeStop - timeStart;
4505 	if (QuietMode != 1)
4506 		fprintf(stderr, "\n     Wall Clock: %f seconds\n", timeCalc);
4507 
4508 	exit(errLevel);
4509 }
4510