1 /*
2 * File : pbzip2.cpp
3 *
4 * Title : Parallel BZIP2 (pbzip2)
5 *
6 * Author: Jeff Gilchrist (http://gilchrist.ca/jeff/)
7 * - Modified producer/consumer threading code from
8 * Andrae Muys <andrae@humbug.org.au.au>
9 * - uses libbzip2 by Julian Seward (http://sources.redhat.com/bzip2/)
10 * - Major contributions by Yavor Nikolov (http://javornikolov.wordpress.com)
11 */
12
13 #include "pbzip2.h"
14 #include "BZ2StreamScanner.h"
15 #include "ErrorContext.h"
16
17 #include <vector>
18 #include <algorithm>
19 #include <string>
20 #include <new>
21
22 extern "C"
23 {
24 #include <sys/stat.h>
25 #include <errno.h>
26 #include <fcntl.h>
27 #include <signal.h>
28 #include <stdio.h>
29 #include <stdarg.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <time.h>
33 #include <bzlib.h>
34 #include <limits.h>
35 }
36
37
38 //
39 // GLOBALS
40 //
41 static int producerDone = 0;
42 static int terminateFlag = 0; // Abnormal premature termination
43 static int finishedFlag = 0; // Main thread work finished (about to exit)
44 static int unfinishedWorkCleaned = 0;
45 static int numCPU = 2;
46 static int IgnoreTrailingGarbageFlag = 0; // ingnore trailing garbage on decompress flag
47 static int SIG_HANDLER_QUIT_SIGNAL = SIGUSR1; // signal used to stop SignalHandlerThread
48 #ifdef USE_STACKSIZE_CUSTOMIZATION
49 static int ChildThreadStackSize = 0; // -1 - don't modify stacksize; 0 - use minimum; > 0 - use specified
50 #ifndef PTHREAD_STACK_MIN
51 #define PTHREAD_STACK_MIN 4096
52 #endif
53 #endif // USE_STACKSIZE_CUSTOMIZATION
54 static unsigned char Bz2HeaderZero[] = {
55 0x42, 0x5A, 0x68, 0x39, 0x17, 0x72, 0x45, 0x38, 0x50, 0x90, 0x00, 0x00, 0x00, 0x00 };
56 static OFF_T InFileSize;
57 static int NumBlocks = 0;
58 static int NumBlocksEstimated = 0;
59 static int NumBufferedBlocks = 0;
60 static size_t NumBufferedTailBlocks = 0;
61 static size_t NumBufferedBlocksMax = 0;
62 static int NextBlockToWrite;
63 static int LastGoodBlock; // set only to terminate write prematurely (ignoring garbage)
64 static int MinErrorBlock; // lowest so far block number which has errors (on decompress; could be trailing garbage)
65 static size_t OutBufferPosToWrite; // = 0; // position in output buffer
66 static int Verbosity = 0;
67 static int QuietMode = 1;
68 static int OutputStdOut = 0;
69 static int ForceOverwrite = 0;
70 static int BWTblockSize = 9;
71 static int FileListCount = 0;
72 static std::vector <outBuff> OutputBuffer;
73 static queue *FifoQueue; // fifo queue (global var used on termination cleanup)
74 static pthread_mutex_t *OutMutex = NULL;
75 static pthread_mutex_t *ProducerDoneMutex = NULL;
76 static pthread_mutex_t ErrorHandlerMutex = PTHREAD_MUTEX_INITIALIZER;
77 static pthread_mutex_t TerminateFlagMutex = PTHREAD_MUTEX_INITIALIZER;
78 static pthread_mutex_t ProgressIndicatorsMutex = PTHREAD_MUTEX_INITIALIZER;
79 static pthread_cond_t *notTooMuchNumBuffered;
80 static pthread_cond_t TerminateCond = PTHREAD_COND_INITIALIZER;
81 static pthread_cond_t OutBufferHeadNotEmpty = PTHREAD_COND_INITIALIZER;
82 static pthread_cond_t ErrStateChangeCond = PTHREAD_COND_INITIALIZER;
83 static pthread_attr_t ChildThreadAttributes;
84 static struct stat fileMetaData;
85 static const char *sigInFilename = NULL;
86 static const char *sigOutFilename = NULL;
87 static char BWTblockSizeChar = '9';
88 static sigset_t SignalMask;
89 static pthread_t SignalHandlerThread;
90 static pthread_t TerminatorThread;
91
92 inline int syncGetProducerDone();
93 inline void syncSetProducerDone(int newValue);
94 inline int syncGetTerminateFlag();
95 inline void syncSetTerminateFlag(int newValue);
96 inline void syncSetFinishedFlag(int newValue);
97 inline void syncSetLastGoodBlock(int newValue, int errBlock);
98 inline int syncGetLastGoodBlock();
99 void cleanupUnfinishedWork();
100 void cleanupAndQuit(int exitCode);
101 int initSignalMask();
102 int setupSignalHandling();
103 int setupTerminator();
104
105 inline void safe_mutex_lock(pthread_mutex_t *mutex);
106 inline void safe_mutex_unlock(pthread_mutex_t *mutex);
107 inline void safe_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
108 inline void safe_cond_signal(pthread_cond_t *cond);
109 int safe_cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mutex, int seconds, const char *caller = "safe_cond_timed_wait");
110
111 template <typename FI1, typename FI2>
112 FI1 memstr(FI1 searchBuf, int searchBufSize, FI2 searchString, int searchStringSize);
113 int producer_decompress(int, OFF_T, queue *);
114 int directcompress(int, OFF_T, int, const char *);
115 int directdecompress(const char *, const char *);
116 int producer(int hInfile, int blockSize, queue *fifo);
117 int mutexesInit();
118 void mutexesDelete();
119 queue *queueInit(int);
120 void queueDelete(queue *);
121 void outputBufferInit(size_t size);
122 outBuff * outputBufferAdd(const outBuff & element, const char *caller);
123 outBuff * outputBufferSeqAddNext(outBuff * preveElement, outBuff * newElement);
124 inline size_t getOutputBufferPos(int blockNum);
125 int getFileMetaData(const char *);
126 int writeFileMetaData(const char *);
127 int testBZ2ErrorHandling(int, BZFILE *, int);
128 int testCompressedData(char *);
129 ssize_t bufread(int hf, char *buf, size_t bsize);
130 int detectCPUs(void);
131
132 inline bool isIgnoredTrailingGarbage();
133 int waitForPreviousBlock(int blockNumToWait, int errBlockNumber);
134 inline int getLastGoodBlockBeforeErr(int errBlockNumber, int outSequenceNumber);
135 inline int issueDecompressError(int bzret, const outBuff * fileData,
136 int outSequenceNumber, const bz_stream & strm, const char * errmsg,
137 int exitCode);
138 int decompressErrCheckSingle(int bzret, const outBuff * fileData,
139 int outSequenceNumber, const bz_stream & strm, const char * errmsg,
140 bool isTrailingGarbageErr);
141 int decompressErrCheck(int bzret, const outBuff * fileData,
142 int outSequenceNumber, const bz_stream & strm);
143 inline bool hasTrailingGarbage(int bzret, const outBuff * fileData,
144 const bz_stream & strm);
145 int producerDecompressCheckInterrupt(int hInfile, outBuff *& fileData, int lastBlock);
146
147 using pbzip2::ErrorContext;
148
149 /*
150 * Pointers to functions used by plain C pthreads API require C calling
151 * conventions.
152 */
153 extern "C"
154 {
155 void* signalHandlerProc(void* arg);
156 void* terminatorThreadProc(void* arg);
157 void *consumer_decompress(void *);
158 void *fileWriter(void *);
159 void *consumer(void *);
160 }
161
162 /*
163 * Lock mutex or exit application immediately on error.
164 */
safe_mutex_lock(pthread_mutex_t * mutex)165 inline void safe_mutex_lock(pthread_mutex_t *mutex)
166 {
167 int ret = pthread_mutex_lock(mutex);
168 if (ret != 0)
169 {
170 fprintf(stderr, "pthread_mutex_lock error [%d]! Aborting immediately!\n", ret);
171 cleanupAndQuit(-5);
172 }
173 }
174
175 /*
176 * Unlock mutex or exit application immediately on error.
177 */
safe_mutex_unlock(pthread_mutex_t * mutex)178 inline void safe_mutex_unlock(pthread_mutex_t *mutex)
179 {
180 int ret = pthread_mutex_unlock(mutex);
181 if (ret != 0)
182 {
183 fprintf(stderr, "pthread_mutex_unlock error [%d]! Aborting immediately!\n", ret);
184 cleanupAndQuit(-6);
185 }
186 }
187
188 /*
189 * Call pthread_cond_signal - check return code and exit application immediately
190 * on error.
191 */
safe_cond_signal(pthread_cond_t * cond)192 inline void safe_cond_signal(pthread_cond_t *cond)
193 {
194 int ret = pthread_cond_signal(cond);
195 if (ret != 0)
196 {
197 fprintf(stderr, "pthread_cond_signal error [%d]! Aborting immediately!\n", ret);
198 cleanupAndQuit(-7);
199 }
200 }
201
202 /*
203 * Call pthread_cond_signal - check return code and exit application immediately
204 * on error.
205 */
safe_cond_broadcast(pthread_cond_t * cond)206 inline void safe_cond_broadcast(pthread_cond_t *cond)
207 {
208 int ret = pthread_cond_broadcast(cond);
209 if (ret != 0)
210 {
211 fprintf(stderr, "pthread_cond_broadcast error [%d]! Aborting immediately!\n", ret);
212 cleanupAndQuit(-7);
213 }
214 }
215
216 /*
217 * Unlock mutex or exit application immediately on error.
218 */
safe_cond_wait(pthread_cond_t * cond,pthread_mutex_t * mutex)219 inline void safe_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
220 {
221 int ret = pthread_cond_wait(cond, mutex);
222 if (ret != 0)
223 {
224 fprintf(stderr, "pthread_cond_wait error [%d]! Aborting immediately!\n", ret);
225 pthread_mutex_unlock(mutex);
226 cleanupAndQuit(-8);
227 }
228 }
229
230 /*
231 * Delegate to pthread_cond_timedwait. Check for errors and abort if
232 * any encountered. Return 0 on success and non-zero code on error
233 */
safe_cond_timed_wait(pthread_cond_t * cond,pthread_mutex_t * mutex,int seconds,const char * caller)234 int safe_cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mutex, int seconds, const char *caller)
235 {
236 struct timespec waitTimer;
237 #ifndef WIN32
238 struct timeval tv;
239 struct timezone tz;
240 #else
241 SYSTEMTIME systemtime;
242 LARGE_INTEGER filetime;
243 #endif
244
245 #ifndef WIN32
246 gettimeofday(&tv, &tz);
247 waitTimer.tv_sec = tv.tv_sec + seconds;
248 waitTimer.tv_nsec = tv.tv_usec * 1000;
249 #else
250 GetSystemTime(&systemtime);
251 SystemTimeToFileTime(&systemtime, (FILETIME *)&filetime);
252 waitTimer.tv_sec = filetime.QuadPart / 10000000;
253 waitTimer.tv_nsec = filetime.QuadPart - ((LONGLONG)waitTimer.tv_sec * 10000000) * 10;
254 waitTimer.tv_sec += seconds;
255 #endif
256 #ifdef PBZIP_DEBUG
257 fprintf(stderr, "%s: waitTimer.tv_sec: %" PRIiMAX " waitTimer.tv_nsec: %" PRIiMAX "\n", caller,
258 (intmax_t)waitTimer.tv_sec, (intmax_t)waitTimer.tv_nsec);
259 #endif
260 int pret = pthread_cond_timedwait(cond, mutex, &waitTimer);
261 // we are not using a compatible pthreads library so abort
262 if ((pret != 0) && (pret != EINTR) && (pret != EBUSY) && (pret != ETIMEDOUT))
263 {
264 ErrorContext::getInstance()->saveError();
265 pthread_mutex_unlock(mutex);
266 handle_error(EF_EXIT, 1,
267 "pbzip2: *ERROR: %s: pthread_cond_timedwait() call invalid [pret=%d]. This machine\n"
268 " does not have compatible pthreads library. Aborting.\n", caller, pret);
269
270 cleanupAndQuit(-9);
271 }
272 #ifdef PBZIP_DEBUG
273 else if (pret != 0)
274 {
275 fprintf(stderr, "%s: pthread_cond_timedwait returned with non-fatal error [%d]\n", caller, pret);
276 }
277 #endif // PBZIP_DEBUG
278
279 return 0;
280 }
281
282 /*
283 * Delegate to write but keep writing until count bytes are written or
284 * error is encountered (on success all count bytes would be written)
285 */
do_write(int fd,const void * buf,size_t count)286 ssize_t do_write(int fd, const void *buf, size_t count)
287 {
288 ssize_t bytesRemaining = count;
289 ssize_t nbytes = 0;
290 const char *pbuf = (const char *)buf;
291 while ((bytesRemaining > 0) && ((nbytes = write(fd, pbuf, bytesRemaining)) > 0))
292 {
293 bytesRemaining -= nbytes;
294 pbuf += nbytes;
295 }
296
297 if (nbytes < 0)
298 {
299 ErrorContext::getInstance()->saveError();
300 return nbytes;
301 }
302
303 return (count - bytesRemaining);
304 }
305
306 /*
307 * Delegate to read but keep writing until count bytes are read or
308 * error is encountered (on success all count bytes would be read)
309 */
do_read(int fd,void * buf,size_t count)310 ssize_t do_read(int fd, void *buf, size_t count)
311 {
312 ssize_t bytesRemaining = count;
313 ssize_t nbytes = 0;
314 char *pbuf = (char *)buf;
315 while ((bytesRemaining > 0) && (nbytes = read(fd, pbuf, bytesRemaining)) > 0)
316 {
317 bytesRemaining -= nbytes;
318 pbuf += nbytes;
319 }
320
321 if (nbytes < 0)
322 {
323 ErrorContext::getInstance()->saveError();
324 return nbytes;
325 }
326
327 return (count - bytesRemaining);
328 }
329
330 /*
331 * Open output file with least required privileges
332 */
safe_open_output(const char * path)333 int safe_open_output(const char *path)
334 {
335 int ret = open(path, O_WRONLY | O_CREAT | O_EXCL | O_BINARY, FILE_MODE);
336 if (ret == -1)
337 {
338 ErrorContext::getInstance()->saveError();
339 }
340
341 return ret;
342 }
343
344 /*
345 * Based on bzip2.c code
346 */
safe_fopen_output(const char * path,const char * mode)347 FILE *safe_fopen_output(const char *path, const char *mode)
348 {
349 int fh = safe_open_output(path);
350 if (fh == -1)
351 {
352 return NULL;
353 }
354
355 FILE *fp = fdopen(fh, mode);
356 if (fp == NULL)
357 {
358 ErrorContext::getInstance()->saveError();
359 close(fh);
360 }
361
362 return fp;
363 }
364
365 /**
366 * Save the given file and save errno on failure.
367 *
368 * @param fd file to close
369 * @return -1 on failure or 0 on success.
370 */
do_close(int fd)371 inline int do_close(int fd)
372 {
373 int ret = close(fd);
374 if (ret == -1)
375 {
376 ErrorContext::getInstance()->saveError();
377 }
378
379 return ret;
380 }
381
do_fclose(FILE * file)382 inline int do_fclose(FILE *file)
383 {
384 int ret = fclose(file);
385 if ( ret == EOF )
386 {
387 ErrorContext::getInstance()->saveError();
388 }
389
390 return ret;
391 }
392
do_fflush(FILE * file)393 inline int do_fflush(FILE *file)
394 {
395 int ret = fflush(file);
396 if ( ret == EOF )
397 {
398 ErrorContext::getInstance()->saveError();
399 }
400
401 return ret;
402 }
403
404 /**
405 * Close the given file. In case of error - save errno and print error message.
406 *
407 * @param file file to close
408 * @param fileName name of file to print in case of failure
409 * @return fclose return code
410 */
verbose_fclose(FILE * file,const char * fileName)411 inline int verbose_fclose(FILE *file, const char *fileName)
412 {
413 int ret;
414 if ( (ret = fclose(file)) == EOF )
415 {
416 ErrorContext::syncPrintErrnoMsg(stderr, errno);
417 fprintf(stderr, "pbzip2: *ERROR: Failed to close file [%s]!\n", fileName);
418 }
419
420 return ret;
421 }
422
do_remove(const char * pathname)423 int do_remove(const char* pathname)
424 {
425 int ret = remove(pathname);
426 if (ret == -1)
427 {
428 ErrorContext::getInstance()->saveError();
429 }
430
431 return ret;
432 }
433
434 /**
435 * Check if a given file exists.
436 *
437 * @return true if file exists and false if it doesn't
438 */
check_file_exists(const char * filename)439 bool check_file_exists( const char * filename )
440 {
441 int hOutfile = open( filename, O_RDONLY | O_BINARY );
442
443 if ( hOutfile == -1 )
444 {
445 ErrorContext::getInstance()->saveError();
446 return false;
447 }
448 else
449 {
450 close( hOutfile );
451 return true;
452 }
453 }
454
455 /*
456 *********************************************************
457 Atomically get producerDone value.
458 */
syncGetProducerDone()459 inline int syncGetProducerDone()
460 {
461 int ret;
462 safe_mutex_lock(ProducerDoneMutex);
463 ret = producerDone;
464 safe_mutex_unlock(ProducerDoneMutex);
465
466 return ret;
467 }
468
469 /*
470 *********************************************************
471 Atomically set producerDone value.
472 */
syncSetProducerDone(int newValue)473 inline void syncSetProducerDone(int newValue)
474 {
475 safe_mutex_lock(ProducerDoneMutex);
476 producerDone = newValue;
477 safe_mutex_unlock(ProducerDoneMutex);
478 }
479
480 /*
481 * Atomic get terminateFlag
482 */
syncGetTerminateFlag()483 inline int syncGetTerminateFlag()
484 {
485 int ret;
486 safe_mutex_lock(&TerminateFlagMutex);
487 ret = terminateFlag;
488 safe_mutex_unlock(&TerminateFlagMutex);
489
490 return ret;
491 }
492
493 /*
494 * Atomically set termination flag and signal the related
495 * condition.
496 */
syncSetTerminateFlag(int newValue)497 inline void syncSetTerminateFlag(int newValue)
498 {
499 safe_mutex_lock(&TerminateFlagMutex);
500
501 terminateFlag = newValue;
502 if (terminateFlag != 0)
503 {
504 // wake up terminator thread
505 pthread_cond_signal(&TerminateCond);
506 safe_mutex_unlock(&TerminateFlagMutex);
507
508 // wake up all other possibly blocked on cond threads
509 safe_mutex_lock(OutMutex);
510 pthread_cond_broadcast(notTooMuchNumBuffered);
511 safe_mutex_unlock(OutMutex);
512 if (FifoQueue != NULL)
513 {
514 safe_mutex_lock(FifoQueue->mut);
515 pthread_cond_broadcast(FifoQueue->notFull);
516 pthread_cond_broadcast(FifoQueue->notEmpty);
517 safe_mutex_unlock(FifoQueue->mut);
518 }
519 }
520 else
521 {
522 safe_mutex_unlock(&TerminateFlagMutex);
523 }
524 }
525
526 /*
527 * Set finishedSucessFlag and signal the related condition.
528 */
syncSetFinishedFlag(int newValue)529 inline void syncSetFinishedFlag(int newValue)
530 {
531 safe_mutex_lock(&TerminateFlagMutex);
532
533 finishedFlag = newValue;
534 if (finishedFlag != 0)
535 {
536 pthread_cond_signal(&TerminateCond);
537 }
538
539 safe_mutex_unlock(&TerminateFlagMutex);
540 }
541
542 /**
543 * Set last block which is maybe good (not guaranteed) and MinErrorBlock
544 * (lowest block with errors encountered so far).
545 * Only moving downwards has effect (attempts to raise the block numbers are ignored).
546 * -1 means infinity.
547 *
548 *
549 * @param newValue last block which is maybe good. -1 means +infinity.
550 * @param errBlock block number which has errors
551 */
syncSetLastGoodBlock(int newValue,int errBlock)552 inline void syncSetLastGoodBlock(int newValue, int errBlock)
553 {
554 bool changed = false;
555
556 safe_mutex_lock(OutMutex);
557 #ifdef PBZIP_DEBUG
558 uintmax_t thid = (uintmax_t) pthread_self();
559 fprintf(stderr, "(%" PRIuMAX ") syncSetLastGoodBlock: %d -> %d; MinErrorBlock: %d -> %d\n",
560 thid, LastGoodBlock, newValue, MinErrorBlock, errBlock);
561 #endif
562
563 if ( (LastGoodBlock == -1) || (newValue < LastGoodBlock) )
564 {
565 LastGoodBlock = newValue;
566 changed = true;
567 }
568
569 if ( (MinErrorBlock == -1) || (errBlock < MinErrorBlock) )
570 {
571 MinErrorBlock = errBlock;
572 changed = true;
573 }
574
575 if ( changed )
576 {
577 safe_cond_signal(&ErrStateChangeCond);
578 safe_cond_signal(&OutBufferHeadNotEmpty);
579
580 // wake up all other possibly blocked on cond threads
581 pthread_cond_broadcast(notTooMuchNumBuffered);
582 safe_mutex_unlock(OutMutex);
583
584 if (FifoQueue != NULL)
585 {
586 safe_mutex_lock(FifoQueue->mut);
587 pthread_cond_broadcast(FifoQueue->notFull);
588 pthread_cond_broadcast(FifoQueue->notEmpty);
589 safe_mutex_unlock(FifoQueue->mut);
590 }
591 }
592 else
593 {
594 safe_mutex_unlock(OutMutex);
595 }
596 }
597
syncGetLastGoodBlock()598 inline int syncGetLastGoodBlock()
599 {
600 int ret;
601 safe_mutex_lock(OutMutex);
602 ret = LastGoodBlock;
603 safe_mutex_unlock(OutMutex);
604
605 return ret;
606 }
607
syncGetMinErrorBlock()608 inline int syncGetMinErrorBlock()
609 {
610 int ret;
611 safe_mutex_lock(OutMutex);
612 ret = MinErrorBlock;
613 safe_mutex_unlock(OutMutex);
614
615 return ret;
616 }
617
isIgnoredTrailingGarbage()618 inline bool isIgnoredTrailingGarbage()
619 {
620 return (IgnoreTrailingGarbageFlag != 0);
621 }
622
623 /*
624 *********************************************************
625 Print error message and optionally exit or abort
626 depending on exitFlag:
627 0 - don't quit;
628 1 - exit;
629 2 - abort.
630 On exit - exitCode status is used.
631 */
handle_error(ExitFlag exitFlag,int exitCode,const char * fmt,...)632 int handle_error(ExitFlag exitFlag, int exitCode, const char *fmt, ...)
633 {
634 va_list args;
635
636 va_start(args, fmt);
637 vfprintf(stderr, fmt, args);
638 ErrorContext::getInstance()->printErrorMessages(stderr);
639 fflush(stderr);
640 va_end(args);
641
642 if (exitFlag == EF_ABORT)
643 {
644 syncSetTerminateFlag(1);
645 abort();
646 }
647 if (exitFlag == EF_EXIT)
648 {
649 syncSetTerminateFlag(1);
650 }
651
652 return exitCode;
653 }
654
655 /**
656 *
657 * @return -1 - terminate flag set (error)
658 * 0 - prev block is OK (i.e. we're on the first error here)
659 * 2 - lower block number already in error state
660 */
waitForPreviousBlock(int blockNumToWait,int errBlockNumber)661 int waitForPreviousBlock(int blockNumToWait, int errBlockNumber)
662 {
663 #ifdef PBZIP_DEBUG
664 uintmax_t thid = (uintmax_t) pthread_self();
665 safe_mutex_lock(OutMutex);
666 fprintf( stderr, "(%" PRIuMAX ") waitForPreviousBlock enter: LastGoodBlock=%d"
667 "; blockNumToWait=%d; NextBlockToWrite=%d; MinErrorBlock=%d; errBlockNumber=%d\n",
668 thid,
669 LastGoodBlock, blockNumToWait, NextBlockToWrite,
670 MinErrorBlock, errBlockNumber );
671 safe_mutex_unlock(OutMutex);
672 #endif
673
674 for (;;)
675 {
676 if (syncGetTerminateFlag() != 0)
677 {
678 #ifdef PBZIP_DEBUG
679 fprintf(stderr, "(%" PRIuMAX ") waitForPreviousBlock terminated [%d]: blockNumToWait=%d\n",
680 thid, -1, blockNumToWait );
681 #endif
682 return -1;
683 }
684
685 safe_mutex_lock(OutMutex);
686
687 #ifdef PBZIP_DEBUG
688 fprintf( stderr, "(%" PRIuMAX ") waitForPreviousBlock before check: LastGoodBlock=%d; blockNumToWait=%d; NextBlockToWrite=%d; MinErrorBlock=%d\n",
689 thid, LastGoodBlock, blockNumToWait, NextBlockToWrite, MinErrorBlock );
690 #endif
691
692 // This check should (min error block) be before next one (next block to write)
693 if ( (MinErrorBlock != -1) && (MinErrorBlock < errBlockNumber) )
694 {
695 #ifdef PBZIP_DEBUG
696 fprintf( stderr, "(%" PRIuMAX ") waitForPreviousBlock exit [%d]: LastGoodBlock=%d; blockNumToWait=%d; NextBlockToWrite=%d; MinErrorBlock=%d\n",
697 thid, 2, LastGoodBlock, blockNumToWait, NextBlockToWrite, MinErrorBlock );
698 #endif
699 safe_mutex_unlock(OutMutex);
700 return 2;
701 }
702
703 if (errBlockNumber <= NextBlockToWrite)
704 {
705 #ifdef PBZIP_DEBUG
706 fprintf( stderr, "(%" PRIuMAX ") waitForPreviousBlock exit [%d]: LastGoodBlock=%d; blockNumToWait=%d; NextBlockToWrite=%d; MinErrorBlock=%d\n",
707 thid, 0, LastGoodBlock, blockNumToWait, NextBlockToWrite, MinErrorBlock );
708 #endif
709 safe_mutex_unlock(OutMutex);
710 return 0;
711 }
712
713 #ifdef PBZIP_DEBUG
714 fprintf( stderr, "(%" PRIuMAX ") waitForPreviousBlock to sleep: LastGoodBlock=%d; blockNumToWait=%d; NextBlockToWrite=%d; MinErrorBlock=%d\n",
715 thid, LastGoodBlock, blockNumToWait, NextBlockToWrite, MinErrorBlock );
716 #endif
717
718 safe_cond_timed_wait(&ErrStateChangeCond, OutMutex, 1, "waitForPreviousBlock");
719
720 safe_mutex_unlock(OutMutex);
721 }
722 }
723
724 /**
725 *
726 * @param errBlockNumber block from input file
727 * @param outSequenceNumber sequence in the output tail for the given block
728 * @return Last input block not after the given which possibly (not guaranteed)
729 * resulted in good out blocks.
730 * -1 if such don't exist.
731 */
getLastGoodBlockBeforeErr(int errBlockNumber,int outSequenceNumber)732 inline int getLastGoodBlockBeforeErr(int errBlockNumber, int outSequenceNumber)
733 {
734 // if we got the error just in the beginning of a bzip2 stream
735 if ( outSequenceNumber != -1 )
736 {
737 return errBlockNumber;
738 }
739 else
740 {
741 return errBlockNumber - 1;
742 }
743 }
744
745 /**
746 * Helper function delegating to handle_error with the relevant error
747 * message
748 *
749 * @param bzret
750 * @param fileData
751 * @param outSequenceNumber
752 * @param strm
753 * @param errmsg
754 * @param exitCode
755 * @return exitCode is returned
756 */
issueDecompressError(int bzret,const outBuff * fileData,int outSequenceNumber,const bz_stream & strm,const char * errmsg,int exitCode)757 inline int issueDecompressError(int bzret, const outBuff * fileData,
758 int outSequenceNumber, const bz_stream & strm, const char * errmsg,
759 int exitCode)
760 {
761 #ifdef PBZIP_DEBUG
762 uintmax_t thid = (uintmax_t) pthread_self();
763 fprintf(stderr, "(%" PRIuMAX ") enter issueDecompressError: msg=%s; ret=%d; block=%d; seq=%d; isLastInSeq=%d; avail_in=%d\n",
764 thid,
765 errmsg, bzret, fileData->blockNumber,
766 outSequenceNumber, (int)fileData->isLastInSequence, strm.avail_in);
767 #endif
768
769 handle_error(EF_EXIT, exitCode,
770 "pbzip2: %s: ret=%d; block=%d; seq=%d; isLastInSeq=%d; avail_in=%d\n",
771 errmsg, bzret, fileData->blockNumber,
772 outSequenceNumber, (int)fileData->isLastInSequence, strm.avail_in);
773 return exitCode;
774 }
775
776 /**
777 * Handle an error condition which is either trailing garbage-like one or not.
778 *
779 *
780 * @param bzret
781 * @param fileData block from input file
782 * @param outSequenceNumber
783 * @param strm
784 * @param errmsg
785 * @param isTrailingGarbageErr
786 * @return ret < 0 - fatal error;
787 * 0 - OK (no error at all);
788 * 1 - first block of ignored trailing garbage;
789 * 2 - error already signalled for earlier block
790 */
decompressErrCheckSingle(int bzret,const outBuff * fileData,int outSequenceNumber,const bz_stream & strm,const char * errmsg,bool isTrailingGarbageErr)791 int decompressErrCheckSingle(int bzret, const outBuff * fileData,
792 int outSequenceNumber, const bz_stream & strm, const char * errmsg,
793 bool isTrailingGarbageErr)
794 {
795 int lastGoodBlock = getLastGoodBlockBeforeErr(fileData->blockNumber, outSequenceNumber);
796
797 #ifdef PBZIP_DEBUG
798 uintmax_t thid = (uintmax_t) pthread_self();
799 fprintf(stderr, "(%" PRIuMAX ") enter decompressErrCheckSingle: msg=%s; ret=%d; block=%d; seq=%d; isLastInSeq=%d; avail_in=%d; lastGoodBlock=%d\n",
800 thid,
801 errmsg, bzret, fileData->blockNumber,
802 outSequenceNumber, (int)fileData->isLastInSequence, strm.avail_in, lastGoodBlock);
803 #endif
804
805 if ( (lastGoodBlock == -1) || !isIgnoredTrailingGarbage() )
806 {
807 issueDecompressError(bzret, fileData, outSequenceNumber, strm, errmsg, -1);
808 return -1;
809 }
810 else
811 {
812 // Cut off larger block numbers
813 syncSetLastGoodBlock(lastGoodBlock, fileData->blockNumber);
814 // wait until the state of previous block is known
815 int prevState = waitForPreviousBlock(lastGoodBlock, fileData->blockNumber);
816
817 if (prevState == 0)
818 {
819 // we're the first error block
820
821 if (isTrailingGarbageErr)
822 {
823 // Trailing garbage detected and ignored - not a fatal warning
824 fprintf(stderr, "pbzip2: *WARNING: Trailing garbage after EOF ignored!\n");
825 return 1;
826 }
827 else
828 {
829 // the first error is not kind of trailing garbage -> fatal one
830 issueDecompressError(bzret, fileData, outSequenceNumber, strm, errmsg, -1);
831 return -1;
832 }
833 }
834 else if (prevState == 2)
835 {
836 // we're not the first error
837 return 2;
838 }
839 else // (prevState == -1)
840 {
841 // fatal state encountered
842 return -1;
843 }
844 }
845 }
846
847 /**
848 * Check if trailing garbage has been identified during the last decompression
849 * operation.
850 *
851 * @param bzret last bzip2 return code
852 * @param fileData should be initialized before calling this
853 * @param strm bzip2 library bz_stream
854 * @return true if trailing garbage has been detected. false otherwise
855 */
hasTrailingGarbage(int bzret,const outBuff * fileData,const bz_stream & strm)856 inline bool hasTrailingGarbage(int bzret, const outBuff * fileData, const bz_stream & strm)
857 {
858 return (bzret == BZ_STREAM_END) &&
859 ( (strm.avail_in != 0) || !fileData->isLastInSequence );
860 }
861
862 /**
863 *
864 * @param bzret
865 * @param fileData
866 * @param outSequenceNumber
867 * @param strm
868 * @return ret < 0 - fatal error;
869 * 0 - OK (no error at all);
870 * 1 - first block of ignored trailing garbage;
871 * 2 - error already signalled for earlier block
872 */
decompressErrCheck(int bzret,const outBuff * fileData,int outSequenceNumber,const bz_stream & strm)873 int decompressErrCheck(int bzret, const outBuff * fileData,
874 int outSequenceNumber, const bz_stream & strm)
875 {
876 if ( hasTrailingGarbage( bzret, fileData, strm ) )
877 {
878 // Potential trailing garbage
879 return decompressErrCheckSingle(bzret, fileData, outSequenceNumber, strm,
880 "*ERROR during BZ2_bzDecompress - trailing garbage", true);
881 }
882 else if ( (bzret != BZ_STREAM_END) && (bzret != BZ_OK) )
883 {
884 return decompressErrCheckSingle(bzret, fileData, outSequenceNumber, strm,
885 "*ERROR during BZ2_bzDecompress - failure exit code", false);
886 }
887 else if ( strm.avail_in != 0 )
888 {
889 return decompressErrCheckSingle(bzret, fileData, outSequenceNumber, strm,
890 "*ERROR unconsumed in after BZ2_bzDecompress loop", false);
891 }
892 else if ( (bzret != BZ_STREAM_END) && fileData->isLastInSequence )
893 {
894 return decompressErrCheckSingle(bzret, fileData, outSequenceNumber, strm,
895 "*ERROR on decompress - last in segment reached before BZ_STREAM_END",
896 false);
897 }
898
899 return 0;
900 }
901
902 /*
903 * Initialize and set thread signal mask
904 */
initSignalMask()905 int initSignalMask()
906 {
907 int ret = 0;
908 ret = sigemptyset(&SignalMask);
909
910 ret = sigaddset(&SignalMask, SIGINT) | ret;
911 ret = sigaddset(&SignalMask, SIGTERM) | ret;
912 ret = sigaddset(&SignalMask, SIGABRT) | ret;
913 ret = sigaddset(&SignalMask, SIG_HANDLER_QUIT_SIGNAL) | ret;
914 #ifndef WIN32
915 ret = sigaddset(&SignalMask, SIGHUP) | ret;
916 #endif
917
918 if (ret == 0)
919 {
920 ret = pthread_sigmask(SIG_BLOCK, &SignalMask, NULL);
921 }
922
923 return ret;
924 }
925
926 /*
927 * Initialize attributes for child threads.
928 *
929 */
initChildThreadAttributes()930 int initChildThreadAttributes()
931 {
932 int ret = pthread_attr_init(&ChildThreadAttributes);
933
934 if (ret < 0)
935 {
936 fprintf(stderr, "Can't initialize thread atrributes [err=%d]! Aborting...\n", ret);
937 exit(-1);
938 }
939
940 #ifdef USE_STACKSIZE_CUSTOMIZATION
941 if (ChildThreadStackSize > 0)
942 {
943 ret = pthread_attr_setstacksize(&ChildThreadAttributes, ChildThreadStackSize);
944
945 if (ret != 0)
946 {
947 fprintf(stderr, "Can't set thread stacksize [err=%d]! Countinue with default one.\n", ret);
948 }
949 }
950 #endif // USE_STACKSIZE_CUSTOMIZATION
951
952 return ret;
953 }
954
955 /*
956 * Setup and start signal handling.
957 */
setupSignalHandling()958 int setupSignalHandling()
959 {
960 int ret = initSignalMask();
961
962 if (ret == 0)
963 {
964 ret = pthread_create(&SignalHandlerThread, &ChildThreadAttributes, signalHandlerProc, NULL);
965 }
966
967 return ret;
968 }
969
970 /*
971 * Setup and start signal handling.
972 */
setupTerminator()973 int setupTerminator()
974 {
975 return pthread_create(&TerminatorThread, &ChildThreadAttributes, terminatorThreadProc, NULL );
976 }
977
978 /*
979 *********************************************************
980 * Clean unfinished work (after error).
981 * Deletes output file if such exists and if not using pipes.
982 */
cleanupUnfinishedWork()983 void cleanupUnfinishedWork()
984 {
985 if (unfinishedWorkCleaned != 0)
986 {
987 return;
988 }
989
990 struct stat statBuf;
991 int ret = 0;
992
993 #ifdef PBZIP_DEBUG
994 fprintf(stderr, " Infile: %s Outfile: %s\n", sigInFilename, sigOutFilename);
995 #endif
996
997 // only cleanup files if we did something with them
998 if ((sigInFilename == NULL) || (sigOutFilename == NULL) || (OutputStdOut == 1))
999 {
1000 unfinishedWorkCleaned = 1;
1001 return;
1002 }
1003
1004 if (QuietMode != 1)
1005 {
1006 fprintf(stderr, "Cleanup unfinished work [Outfile: %s]...\n", sigOutFilename);
1007 }
1008
1009 // check to see if input file still exists
1010 ret = stat(sigInFilename, &statBuf);
1011 if (ret == 0)
1012 {
1013 // only want to remove output file if input still exists
1014 if (QuietMode != 1)
1015 fprintf(stderr, "Deleting output file: %s, if it exists...\n", sigOutFilename);
1016 ret = remove(sigOutFilename);
1017 if (ret != 0)
1018 {
1019 ErrorContext::syncPrintErrnoMsg(stderr, errno);
1020 fprintf(stderr, "pbzip2: *WARNING: Deletion of output file (apparently) failed.\n");
1021 }
1022 else
1023 {
1024 fprintf(stderr, "pbzip2: *INFO: Deletion of output file succeeded.\n");
1025 sigOutFilename = NULL;
1026 }
1027 }
1028 else
1029 {
1030 fprintf(stderr, "pbzip2: *WARNING: Output file was not deleted since input file no longer exists.\n");
1031 fprintf(stderr, "pbzip2: *WARNING: Output file: %s, may be incomplete!\n", sigOutFilename);
1032 }
1033
1034 unfinishedWorkCleaned = 1;
1035 }
1036
1037 /*
1038 *********************************************************
1039 */
1040
1041 /*
1042 *********************************************************
1043 * Terminator thread procedure: looking at terminateFlag
1044 * and exit application when it's set.
1045 */
terminatorThreadProc(void * arg)1046 void* terminatorThreadProc(void* arg)
1047 {
1048 int ret = pthread_mutex_lock(&TerminateFlagMutex);
1049
1050 if (ret != 0)
1051 {
1052 ErrorContext::syncPrintErrnoMsg(stderr, errno);
1053 fprintf(stderr, "Terminator thread: pthread_mutex_lock error [%d]! Aborting...\n", ret);
1054 syncSetTerminateFlag(1);
1055 cleanupAndQuit(1);
1056 }
1057
1058 while ((finishedFlag == 0) && (terminateFlag == 0))
1059 {
1060 ret = pthread_cond_wait(&TerminateCond, &TerminateFlagMutex);
1061 }
1062
1063 // Successfull end
1064 if (finishedFlag != 0)
1065 {
1066 ret = pthread_mutex_unlock(&TerminateFlagMutex);
1067 return NULL;
1068 }
1069
1070 // Being here implies (terminateFlag != 0)
1071 ret = pthread_mutex_unlock(&TerminateFlagMutex);
1072
1073 fprintf(stderr, "Terminator thread: premature exit requested - quitting...\n");
1074 cleanupAndQuit(1);
1075
1076 return NULL; // never reachable
1077 }
1078
1079 /*
1080 *********************************************************
1081 * Signal handler thread function to hook cleanup on
1082 * certain signals.
1083 */
signalHandlerProc(void * arg)1084 void* signalHandlerProc(void* arg)
1085 {
1086 int signalCaught;
1087
1088 // wait for specified in mask signal
1089 int ret = sigwait(&SignalMask, &signalCaught);
1090
1091 if (ret != 0)
1092 {
1093 fprintf(stderr, "\n *signalHandlerProc - sigwait error: %d\n", ret);
1094 }
1095 else if (signalCaught == SIG_HANDLER_QUIT_SIGNAL)
1096 {
1097 return NULL;
1098 }
1099 else // ret == 0
1100 {
1101 fprintf(stderr, "\n *Control-C or similar caught [sig=%d], quitting...\n", signalCaught);
1102 // Delegating cleanup and termination to Terminator Thread
1103 syncSetTerminateFlag(1);
1104 }
1105
1106 return NULL;
1107 }
1108
1109 /*
1110 * Cleanup unfinished work (output file) and exit with the given exit code.
1111 * To be used to quite on error with non-zero exitCode.
1112 */
cleanupAndQuit(int exitCode)1113 void cleanupAndQuit(int exitCode)
1114 {
1115 // syncSetTerminateFlag(1);
1116
1117 int ret = pthread_mutex_lock(&ErrorHandlerMutex);
1118 if (ret != 0)
1119 {
1120 fprintf(stderr, "Cleanup Handler: Failed to lock ErrorHandlerMutex! May double cleanup...\n");
1121 }
1122 cleanupUnfinishedWork();
1123 pthread_mutex_unlock(&ErrorHandlerMutex);
1124
1125 exit(exitCode);
1126 }
1127
1128 /*
1129 *********************************************************
1130 This function will search the array pointed to by
1131 searchBuf[] for the string searchString[] and return
1132 a pointer to the start of the searchString[] if found
1133 otherwise return NULL if not found.
1134 */
1135 template <typename FI1, typename FI2>
1136 FI1 memstr(FI1 searchBuf, int searchBufSize, FI2 searchString, int searchStringSize)
1137 {
1138 FI1 searchBufEnd = searchBuf + searchBufSize;
1139 FI1 s = std::search(searchBuf, searchBufEnd,
1140 searchString, searchString + searchStringSize);
1141
1142 return (s != searchBufEnd) ? s : NULL;
1143 }
1144
1145 /**
1146 * Check for interrupt conditions - report if any and perform the relevant
1147 * cleanup
1148 *
1149 * @param hInfile
1150 * @param fileData
1151 * @param lastBlock
1152 * @return 0 - not interrupted; 1 - interrupted (terminate flag or other error encountered)
1153 */
producerDecompressCheckInterrupt(int hInfile,outBuff * & fileData,int lastBlock)1154 int producerDecompressCheckInterrupt(int hInfile, outBuff *& fileData, int lastBlock)
1155 {
1156 bool isInterrupted = false;
1157
1158 if (syncGetLastGoodBlock() != -1)
1159 {
1160 isInterrupted = true;
1161
1162 #ifdef PBZIP_DEBUG
1163 fprintf (stderr, "producer_decompress: interrupt1 - LastGoodBlock set. "
1164 "Last produced=%d\n", lastBlock);
1165 #endif
1166 }
1167 if (syncGetTerminateFlag() != 0)
1168 {
1169 isInterrupted = true;
1170
1171 #ifdef PBZIP_DEBUG
1172 fprintf (stderr, "producer_decompress: interrupt2 - TerminateFlag set. "
1173 "Last produced=%d\n", lastBlock);
1174 #endif
1175 }
1176
1177 if (isInterrupted)
1178 {
1179 close(hInfile);
1180 disposeMemorySingle(fileData);
1181
1182 return 1;
1183 }
1184
1185 return 0;
1186 }
1187
1188 /*
1189 *********************************************************
1190 Function works in single pass. It's Splitting long
1191 streams into sequences of multiple segments.
1192 */
producer_decompress(int hInfile,OFF_T fileSize,queue * fifo)1193 int producer_decompress(int hInfile, OFF_T fileSize, queue *fifo)
1194 {
1195 safe_mutex_lock(&ProgressIndicatorsMutex);
1196 NumBlocks = 0;
1197 safe_mutex_unlock(&ProgressIndicatorsMutex);
1198
1199 pbzip2::BZ2StreamScanner bz2StreamScanner(hInfile);
1200
1201 // keep going until all the blocks are processed
1202 outBuff * fileData = bz2StreamScanner.getNextStream();
1203 while (!bz2StreamScanner.failed() && (fileData->bufSize > 0))
1204 {
1205 #ifdef PBZIP_DEBUG
1206 fprintf(stderr, " -> Bytes Read: %u bytes...\n", fileData->bufSize);
1207 #endif
1208
1209 if (producerDecompressCheckInterrupt(hInfile, fileData, NumBlocks) != 0)
1210 {
1211 safe_mutex_lock(fifo->mut);
1212 safe_cond_broadcast(fifo->notEmpty); // just in case
1213 safe_mutex_unlock(fifo->mut);
1214 syncSetProducerDone(1);
1215 return 0;
1216 }
1217
1218 if (QuietMode != 1)
1219 {
1220 // give warning to user if block is larger than 250 million bytes
1221 if (fileData->bufSize > 250000000)
1222 {
1223 fprintf(stderr, "pbzip2: *WARNING: Compressed block size is large [%" PRIuMAX " bytes].\n",
1224 (uintmax_t) fileData->bufSize);
1225 fprintf(stderr, " If program aborts, use regular BZIP2 to decompress.\n");
1226 }
1227 }
1228
1229 // add data to the decompression queue
1230 safe_mutex_lock(fifo->mut);
1231 while (fifo->full)
1232 {
1233 #ifdef PBZIP_DEBUG
1234 fprintf (stderr, "producer: queue FULL.\n");
1235 #endif
1236 safe_cond_wait (fifo->notFull, fifo->mut);
1237
1238 if (producerDecompressCheckInterrupt(hInfile, fileData, NumBlocks) != 0)
1239 {
1240 safe_cond_broadcast(fifo->notEmpty); // just in case
1241 syncSetProducerDone(1);
1242 safe_mutex_unlock(fifo->mut);
1243 return 0;
1244 }
1245 }
1246 #ifdef PBZIP_DEBUG
1247 fprintf(stderr, "producer: Buffer: %p Size: %" PRIuMAX " Block: %d\n", fileData->buf,
1248 (uintmax_t)fileData->bufSize, NumBlocks);
1249 #endif
1250
1251 fifo->add(fileData);
1252 safe_cond_signal (fifo->notEmpty);
1253
1254 safe_mutex_lock(&ProgressIndicatorsMutex);
1255 NumBlocks = fileData->blockNumber + 1;
1256 safe_mutex_unlock(&ProgressIndicatorsMutex);
1257
1258 safe_mutex_unlock(fifo->mut);
1259
1260 fileData = bz2StreamScanner.getNextStream();
1261 } // for
1262
1263 close(hInfile);
1264
1265 // last stream is always dummy one (either error or eof)
1266 delete fileData;
1267
1268
1269 if (bz2StreamScanner.failed())
1270 {
1271 handle_error(EF_EXIT, 1, "pbzip2: producer_decompress: *ERROR: when reading bzip2 input stream\n");
1272 return -1;
1273 }
1274 else if (!bz2StreamScanner.isBz2HeaderFound() || !bz2StreamScanner.eof())
1275 {
1276 handle_error(EF_EXIT, 1, "pbzip2: producer_decompress: *ERROR: input file is not a valid bzip2 stream\n");
1277 return -1;
1278 }
1279
1280 syncSetProducerDone(1);
1281 safe_mutex_lock(fifo->mut);
1282 safe_cond_broadcast(fifo->notEmpty); // just in case
1283 safe_mutex_unlock(fifo->mut);
1284
1285 #ifdef PBZIP_DEBUG
1286 fprintf(stderr, "producer: Done - exiting. Last Block: %d\n", NumBlocks);
1287 #endif
1288
1289 return 0;
1290 }
1291
1292 /**
1293 * Check for interrupt conditions - report if any and perform the relevant
1294 * cleanup
1295 *
1296 * @return 0 - not interrupted; 1 - interrupted (terminate flag or other error encountered)
1297 */
consumerDecompressCheckInterrupt(const outBuff * lastElement)1298 int consumerDecompressCheckInterrupt(const outBuff * lastElement)
1299 {
1300 bool isInterrupted = false;
1301
1302 #ifdef PBZIP_DEBUG
1303 uintmax_t thid = (uintmax_t) pthread_self();
1304 #endif
1305
1306 if (syncGetTerminateFlag() != 0)
1307 {
1308 isInterrupted = true;
1309
1310 #ifdef PBZIP_DEBUG
1311 fprintf (stderr, "(%" PRIuMAX ") consumer_decompress: interrupt1 - TerminateFlag set.\n", thid);
1312 #endif
1313 }
1314 int minErrBlock = syncGetMinErrorBlock();
1315 if ( (minErrBlock != -1) &&
1316 ( (lastElement == NULL)
1317 || (lastElement->blockNumber >= minErrBlock)
1318 || lastElement->isLastInSequence ) )
1319 {
1320 isInterrupted = true;
1321
1322 #ifdef PBZIP_DEBUG
1323 fprintf (stderr, "(%" PRIuMAX ") consumer_decompress: terminating1 - LastGoodBlock set [%d].\n", thid, syncGetLastGoodBlock());
1324 #endif
1325 }
1326
1327 if (isInterrupted)
1328 {
1329 return 1;
1330 }
1331
1332 return 0;
1333 }
1334
1335 /*
1336 *********************************************************
1337 */
consumer_decompress(void * q)1338 void *consumer_decompress(void *q)
1339 {
1340 queue *fifo = (queue *)q;
1341
1342 outBuff *fileData = NULL;
1343 outBuff *lastFileData = NULL;
1344 char *DecompressedData = NULL;
1345 unsigned int outSize = 0;
1346 outBuff * prevOutBlockInSequence = NULL;
1347 int outSequenceNumber = -1; // sequence number in multi-part output blocks
1348 unsigned int processedIn = 0;
1349 int errState = 0;
1350
1351 bz_stream strm;
1352 strm.bzalloc = NULL;
1353 strm.bzfree = NULL;
1354 strm.opaque = NULL;
1355
1356 for (;;)
1357 {
1358 safe_mutex_lock(fifo->mut);
1359 for (;;)
1360 {
1361 if (consumerDecompressCheckInterrupt(fileData) != 0)
1362 {
1363 safe_mutex_unlock(fifo->mut);
1364 return (NULL);
1365 }
1366
1367 if (!fifo->empty && (fifo->remove(fileData) == 1))
1368 {
1369 // block retreived - break the loop and continue further
1370 break;
1371 }
1372
1373 #ifdef PBZIP_DEBUG
1374 fprintf (stderr, "consumer: queue EMPTY.\n");
1375 #endif
1376
1377 if (fifo->empty && ((syncGetProducerDone() == 1) || (syncGetTerminateFlag() != 0)))
1378 {
1379 // finished - either OK or terminated forcibly
1380 pthread_mutex_unlock(fifo->mut);
1381 // BZ2_bzDecompressEnd( &strm );
1382
1383 if ((syncGetTerminateFlag() == 0) && (outSequenceNumber != -1))
1384 {
1385 handle_error(EF_EXIT, -1, "pbzip2: *ERROR on decompress - "
1386 "premature end of archive stream (block=%d; seq=%d; outseq=%d)!\n",
1387 lastFileData->blockNumber,
1388 lastFileData->sequenceNumber,
1389 outSequenceNumber);
1390 }
1391 #ifdef PBZIP_DEBUG
1392 else
1393 {
1394 fprintf (stderr, "consumer: exiting2\n");
1395 }
1396 #endif
1397
1398 disposeMemorySingle( lastFileData );
1399
1400 return (NULL);
1401 }
1402
1403 #ifdef PBZIP_DEBUG
1404 safe_cond_timed_wait(fifo->notEmpty, fifo->mut, 1, "consumer");
1405 #else
1406 safe_cond_wait(fifo->notEmpty, fifo->mut);
1407 #endif
1408 }
1409
1410 #ifdef PBZIP_DEBUG
1411 fprintf(stderr, "consumer: FileData: %p\n", fileData);
1412 fprintf(stderr, "consumer: Buffer: %p Size: %u Block: %d\n",
1413 fileData->buf, (unsigned)fileData->bufSize, fileData->blockNumber);
1414 #endif
1415
1416 safe_cond_signal(fifo->notFull);
1417 safe_mutex_unlock(fifo->mut);
1418
1419 if (lastFileData != NULL)
1420 {
1421 delete lastFileData;
1422 }
1423 lastFileData = fileData;
1424
1425 #ifdef PBZIP_DEBUG
1426 fprintf (stderr, "consumer: recieved %d.\n", fileData->blockNumber);
1427 #endif
1428
1429 outSize = 900000;
1430
1431 int bzret = BZ_OK;
1432
1433 if (fileData->sequenceNumber < 2)
1434 {
1435 // start of new stream from in queue (0 -> single block; 1 - mutli)
1436 bzret = BZ2_bzDecompressInit(&strm, Verbosity, 0);
1437 if (bzret != BZ_OK)
1438 {
1439 handle_error(EF_EXIT, -1, "pbzip2: *ERROR during BZ2_bzDecompressInit: %d\n", bzret);
1440 return (NULL);
1441 }
1442 }
1443
1444 strm.avail_in = fileData->bufSize;
1445 strm.next_in = fileData->buf;
1446 while ((bzret == BZ_OK) && (strm.avail_in != 0))
1447 {
1448 #ifdef PBZIP_DEBUG
1449 fprintf(stderr, "decompress: block=%d; seq=%d; prev=%llx; avail_in=%u; avail_out=%u\n",
1450 fileData->blockNumber, outSequenceNumber,
1451 (unsigned long long) prevOutBlockInSequence,
1452 strm.avail_in, strm.avail_out);
1453 #endif
1454
1455 if (DecompressedData == NULL)
1456 {
1457 // allocate memory for decompressed data (start with default 900k block size)
1458 DecompressedData = new(std::nothrow) char[outSize];
1459 // make sure memory was allocated properly
1460
1461 if (DecompressedData == NULL)
1462 {
1463 handle_error(EF_EXIT, -1,
1464 " *ERROR: Could not allocate memory (DecompressedData)! Aborting...\n");
1465 return (NULL);
1466 }
1467
1468 processedIn = 0;
1469
1470 strm.avail_out = outSize;
1471 strm.next_out = DecompressedData;
1472 }
1473
1474 unsigned int availIn = strm.avail_in;
1475 bzret = BZ2_bzDecompress(&strm);
1476 processedIn += (availIn - strm.avail_in);
1477
1478 #ifdef PBZIP_DEBUG
1479 fprintf(stderr, "decompress: BZ2_bzDecompress=%d; block=%d; seq=%d; prev=%llx; avail_in=%u; avail_out=%u\n",
1480 bzret,
1481 fileData->blockNumber, outSequenceNumber,
1482 (unsigned long long) prevOutBlockInSequence,
1483 strm.avail_in, strm.avail_out);
1484 #endif
1485
1486 // issue out block if out buffer is full or stream end is detected
1487 if ( ((bzret == BZ_OK) && strm.avail_out == 0) || (bzret == BZ_STREAM_END) )
1488 {
1489 outBuff * addret = NULL;
1490 unsigned int len = outSize - strm.avail_out;
1491 bool isLast = (bzret == BZ_STREAM_END);
1492
1493 if ( hasTrailingGarbage( bzret, fileData, strm ) )
1494 {
1495 // trailing garbage detected
1496 syncSetLastGoodBlock(fileData->blockNumber, fileData->blockNumber);
1497 }
1498
1499 if (outSequenceNumber>0)
1500 {
1501 ++outSequenceNumber;
1502
1503 outBuff * nextOutBlock = new(std::nothrow) outBuff(
1504 DecompressedData, len, fileData->blockNumber,
1505 outSequenceNumber, processedIn, isLast, NULL);
1506
1507 if (nextOutBlock == NULL)
1508 {
1509 BZ2_bzDecompressEnd( &strm );
1510 handle_error(EF_EXIT, -1,
1511 " *ERROR: Could not allocate memory (nextOutBlock)! Aborting...\n");
1512 return (NULL);
1513 }
1514
1515 addret = outputBufferSeqAddNext(prevOutBlockInSequence, nextOutBlock);
1516 #ifdef PBZIP_DEBUG
1517 fprintf(stderr, "decompress: outputBufferSeqAddNext->%llx; block=%d; seq=%d; prev=%llx\n",
1518 (unsigned long long)addret,
1519 fileData->blockNumber, outSequenceNumber,
1520 (unsigned long long) prevOutBlockInSequence);
1521 #endif
1522 }
1523 else // sequenceNumber = 0
1524 {
1525 outSequenceNumber = (bzret == BZ_OK) ? 1 : 0;
1526 addret = outputBufferAdd(outBuff(
1527 DecompressedData, len,
1528 fileData->blockNumber,
1529 outSequenceNumber, processedIn, isLast, NULL), "consumer_decompress");
1530
1531 #ifdef PBZIP_DEBUG
1532 fprintf(stderr, "decompress: outputBufferAdd->%llx; block=%d; seq=%d; prev=%llx\n",
1533 (unsigned long long)addret,
1534 fileData->blockNumber, outSequenceNumber,
1535 (unsigned long long) prevOutBlockInSequence);
1536 #endif
1537 }
1538
1539 if (addret == NULL)
1540 {
1541 // error encountered
1542 BZ2_bzDecompressEnd( &strm );
1543 return (NULL);
1544 }
1545
1546 prevOutBlockInSequence = addret;
1547 DecompressedData = NULL;
1548 }
1549 }
1550
1551 /*
1552 * < 0 - fatal error;
1553 * 0 - OK (no error at all);
1554 * 1 - first block of ignored trailing garbage;
1555 * 2 - error already signalled for earlier block
1556 */
1557 errState = decompressErrCheck(bzret, fileData, outSequenceNumber, strm);
1558
1559 if (bzret == BZ_STREAM_END)
1560 {
1561 bzret = BZ2_bzDecompressEnd(&strm);
1562 if ( (bzret != BZ_OK) && ((errState == 0) || (errState == 1)) )
1563 {
1564 handle_error(EF_EXIT, -1, "pbzip2: *ERROR during BZ2_bzDecompressEnd: %d\n", bzret);
1565 return (NULL);
1566 }
1567
1568 outSequenceNumber = -1;
1569 prevOutBlockInSequence = NULL;
1570 }
1571
1572 #ifdef PBZIP_DEBUG
1573 fprintf(stderr, "\n Compressed Block Size: %u\n", (unsigned)fileData->bufSize);
1574 fprintf(stderr, " Original Block Size: %u\n", outSize);
1575 #endif
1576
1577 disposeMemory(fileData->buf);
1578
1579 #ifdef PBZIP_DEBUG
1580 fprintf(stderr, " OutputBuffer[%d].buf = %p\n", fileData->blockNumber, DecompressedData);
1581 fprintf(stderr, " OutputBuffer[%d].bufSize = %u\n", fileData->blockNumber, outSize);
1582 fflush(stderr);
1583 #endif
1584
1585 if (errState != 0)
1586 {
1587 #ifdef PBZIP_DEBUG
1588 fprintf (stderr, "consumer: exiting prematurely: errState=%d\n", errState);
1589 #endif
1590
1591 return (NULL);
1592 }
1593 } // for
1594
1595 #ifdef PBZIP_DEBUG
1596 fprintf (stderr, "consumer: exiting\n");
1597 #endif
1598 return (NULL);
1599 }
1600
1601 /*
1602 *********************************************************
1603 */
fileWriter(void * outname)1604 void *fileWriter(void *outname)
1605 {
1606 char *OutFilename;
1607 OFF_T CompressedSize = 0;
1608 int percentComplete = 0;
1609 int hOutfile = STDOUT_FILENO; // default to stdout
1610 int currBlock = 0;
1611 size_t outBufferPos = 0;
1612 int ret = -1;
1613 OFF_T bytesProcessed = 0;
1614
1615 OutFilename = (char *) outname;
1616 outBuff * prevBlockInSequence = NULL;
1617
1618 #ifdef PBZIP_DEBUG
1619 fprintf(stderr, "fileWriter function started\n");
1620 #endif
1621
1622 // write to file instead of stdout
1623 if (OutputStdOut == 0)
1624 {
1625 hOutfile = safe_open_output(OutFilename);
1626 // check to see if file creation was successful
1627 if (hOutfile == -1)
1628 {
1629 handle_error(EF_EXIT, -1,
1630 "pbzip2: *ERROR: Could not create output file [%s]!\n", OutFilename);
1631 return (NULL);
1632 }
1633 }
1634
1635 while (true)
1636 {
1637 #ifdef PBZIP_DEBUG
1638 int lastseq = 0;
1639 if (prevBlockInSequence != NULL)
1640 {
1641 lastseq = prevBlockInSequence->sequenceNumber;
1642 }
1643 #endif
1644
1645 // Order is important. We don't need sync on NumBlocks when producer
1646 // is done.
1647 if ((syncGetProducerDone() == 1) && (currBlock >= NumBlocks) && (prevBlockInSequence == NULL))
1648 {
1649 #ifdef PBZIP_DEBUG
1650 fprintf(stderr, "fileWriter [b:%d:%d]: done - quit loop.\n", currBlock, lastseq);
1651 #endif
1652 // We're done
1653 break;
1654 }
1655
1656 if (syncGetTerminateFlag() != 0)
1657 {
1658 #ifdef PBZIP_DEBUG
1659 fprintf (stderr, "fileWriter [b:%d]: terminating1 - terminateFlag set\n", currBlock);
1660 #endif
1661 break;
1662 }
1663
1664 safe_mutex_lock(OutMutex);
1665 #ifdef PBZIP_DEBUG
1666 outBuff * lastnext = (prevBlockInSequence != NULL) ? prevBlockInSequence->next : NULL;
1667 fprintf(stderr, "fileWriter: Block: %d Size: %" PRIuMAX " Next File Block: %d"
1668 ", outBufferPos: %" PRIuMAX ", NumBlocks: %d, producerDone: %d, lastseq=%d"
1669 ", prev=%p, next=%p\n",
1670 currBlock, (uintmax_t)NumBufferedBlocksMax, NextBlockToWrite,
1671 (uintmax_t)outBufferPos, NumBlocks, syncGetProducerDone(), lastseq,
1672 prevBlockInSequence,
1673 lastnext);
1674 #endif
1675
1676 if ( (LastGoodBlock != -1) && (NextBlockToWrite > LastGoodBlock) )
1677 {
1678 #ifdef PBZIP_DEBUG
1679 fprintf (stderr, "fileWriter [b:%d]: quit - LastGoodBlock=%d\n",
1680 currBlock, LastGoodBlock);
1681 #endif
1682 safe_mutex_unlock(OutMutex);
1683
1684 break;
1685 }
1686
1687 if ((OutputBuffer[outBufferPos].buf == NULL) &&
1688 ((prevBlockInSequence == NULL) || (prevBlockInSequence->next == NULL)))
1689 {
1690 safe_cond_timed_wait(&OutBufferHeadNotEmpty, OutMutex, 1, "fileWriter");
1691 safe_mutex_unlock(OutMutex);
1692 // sleep a little so we don't go into a tight loop using up all the CPU
1693 // usleep(50000);
1694 continue;
1695 }
1696 else
1697 {
1698 safe_mutex_unlock(OutMutex);
1699 }
1700
1701 outBuff * outBlock;
1702 if (prevBlockInSequence != NULL)
1703 {
1704 outBlock = prevBlockInSequence->next;
1705 }
1706 else
1707 {
1708 outBlock = &OutputBuffer[outBufferPos];
1709 }
1710
1711 #ifdef PBZIP_DEBUG
1712 fprintf(stderr, "fileWriter: Buffer: %p Size: %u Block: %d, Seq: %d, isLast: %d\n",
1713 OutputBuffer[outBufferPos].buf, OutputBuffer[outBufferPos].bufSize, currBlock,
1714 outBlock->sequenceNumber, (int)outBlock->isLastInSequence);
1715 #endif
1716
1717 // write data to the output file
1718 ret = do_write(hOutfile, outBlock->buf, outBlock->bufSize);
1719
1720 #ifdef PBZIP_DEBUG
1721 fprintf(stderr, "\n -> Total Bytes Written[%d:%d]: %d bytes...\n", currBlock, outBlock->sequenceNumber, ret);
1722 #endif
1723
1724 if (ret < 0)
1725 {
1726 if (OutputStdOut == 0)
1727 close(hOutfile);
1728
1729 handle_error(EF_EXIT, -1,
1730 "pbzip2: *ERROR: Could not write %d bytes to file [ret=%d]! Aborting...\n",
1731 outBlock->bufSize, ret);
1732 return (NULL);
1733 }
1734 CompressedSize += ret;
1735
1736 bytesProcessed += outBlock->inSize;
1737 delete [] outBlock->buf;
1738 outBlock->buf = NULL;
1739 outBlock->bufSize = 0;
1740
1741 if (outBlock->isLastInSequence)
1742 {
1743 if (++outBufferPos == NumBufferedBlocksMax)
1744 {
1745 outBufferPos = 0;
1746 }
1747 ++currBlock;
1748 }
1749
1750 safe_mutex_lock(OutMutex);
1751
1752 if (outBlock->isLastInSequence)
1753 {
1754 ++NextBlockToWrite;
1755 OutBufferPosToWrite = outBufferPos;
1756 }
1757 if (outBlock->sequenceNumber > 1)
1758 {
1759 --NumBufferedTailBlocks;
1760 }
1761 // --NumBufferedBlocks; // to be removed
1762 safe_cond_broadcast(notTooMuchNumBuffered);
1763 safe_cond_broadcast(&ErrStateChangeCond);
1764 safe_mutex_unlock(OutMutex);
1765
1766 if (outBlock->sequenceNumber > 2)
1767 {
1768 delete prevBlockInSequence;
1769 }
1770
1771 if (outBlock->isLastInSequence)
1772 {
1773 prevBlockInSequence = NULL;
1774 if (outBlock->sequenceNumber > 1)
1775 {
1776 delete outBlock;
1777 }
1778 }
1779 else
1780 {
1781 prevBlockInSequence = outBlock;
1782 }
1783
1784 if (QuietMode != 1)
1785 {
1786 // print current completion status
1787 int percentCompleteOld = percentComplete;
1788 if (InFileSize > 0)
1789 {
1790 percentComplete = (100.0 * (double)bytesProcessed / (double)InFileSize);
1791 }
1792
1793 #ifdef PBZIP_DEBUG
1794 fprintf(stderr, "Completed: %d%% NextBlockToWrite: %d/%" PRIuMAX " \r", percentComplete, NextBlockToWrite, (uintmax_t)NumBufferedBlocksMax);
1795 fflush(stderr);
1796 #else
1797 if (percentComplete != percentCompleteOld)
1798 {
1799 fprintf(stderr, "Completed: %d%% \r", percentComplete);
1800 fflush(stderr);
1801 }
1802 #endif
1803 }
1804 } // while
1805
1806 if (currBlock == 0)
1807 {
1808 // zero-size file needs special handling
1809 ret = do_write(hOutfile, Bz2HeaderZero, sizeof(Bz2HeaderZero) );
1810
1811 if (ret < 0)
1812 {
1813 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not write to file! Aborting...\n");
1814 return (NULL);
1815 }
1816 }
1817
1818
1819 if (OutputStdOut == 0)
1820 {
1821 ret = close(hOutfile);
1822 if (ret == -1)
1823 {
1824 ErrorContext::getInstance()->saveError();
1825 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not close output file! Aborting...\n");
1826 return (NULL);
1827 }
1828 }
1829
1830 if (QuietMode != 1)
1831 {
1832 fprintf(stderr, " Output Size: %" PRIuMAX " bytes\n", (uintmax_t)CompressedSize);
1833 }
1834
1835 #ifdef PBZIP_DEBUG
1836 fprintf(stderr, "fileWriter exit\n");
1837 fflush(stderr);
1838 #endif
1839
1840 // wake up all other possibly blocked on cond threads
1841 if (FifoQueue != NULL)
1842 {
1843 safe_mutex_lock(FifoQueue->mut);
1844 safe_cond_broadcast(FifoQueue->notEmpty); // important
1845 safe_cond_broadcast(FifoQueue->notFull); // not really needed
1846 safe_mutex_unlock(FifoQueue->mut);
1847 }
1848 safe_mutex_lock(OutMutex);
1849 safe_cond_broadcast(notTooMuchNumBuffered); // not really needed
1850 safe_mutex_unlock(OutMutex);
1851
1852 if (QuietMode != 1)
1853 {
1854 // print current completion status
1855 percentComplete = 100;
1856
1857 #ifdef PBZIP_DEBUG
1858 fprintf(stderr, "Completed: %d%% NextBlockToWrite: %d/%" PRIuMAX " \r", percentComplete, NextBlockToWrite, (uintmax_t)NumBufferedBlocksMax);
1859 fflush(stderr);
1860 #else
1861
1862 fprintf(stderr, "Completed: %d%% \r", percentComplete);
1863 fflush(stderr);
1864 #endif
1865 }
1866
1867 return (NULL);
1868 }
1869
1870 /*
1871 *********************************************************
1872 */
directcompress(int hInfile,OFF_T fileSize,int blockSize,const char * OutFilename)1873 int directcompress(int hInfile, OFF_T fileSize, int blockSize, const char *OutFilename)
1874 {
1875 char *FileData = NULL;
1876 char *CompressedData = NULL;
1877 OFF_T CompressedSize = 0;
1878 OFF_T bytesLeft = 0;
1879 OFF_T inSize = 0;
1880 unsigned int outSize = 0;
1881 int percentComplete = 0;
1882 int hOutfile = STDOUT_FILENO; // default to stdout
1883 int currBlock = 0;
1884 int rret = 0;
1885 int ret = 0;
1886
1887 bytesLeft = fileSize;
1888
1889 // write to file instead of stdout
1890 if (OutputStdOut == 0)
1891 {
1892 hOutfile = safe_open_output(OutFilename);
1893 // check to see if file creation was successful
1894 if (hOutfile == -1)
1895 {
1896 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not create output file [%s]!\n", OutFilename);
1897 return -1;
1898 }
1899 }
1900 #ifdef WIN32
1901 else
1902 {
1903 setmode(fileno(stdout), O_BINARY);
1904 }
1905 #endif
1906
1907 // keep going until all the file is processed
1908 while (bytesLeft > 0)
1909 {
1910 if (syncGetTerminateFlag() != 0)
1911 {
1912 close(hInfile);
1913 if (OutputStdOut == 0)
1914 close(hOutfile);
1915
1916 fprintf (stderr, "directcompress: terminating - terminateFlag set\n");
1917
1918 return -1;
1919 }
1920
1921 //
1922 // READ DATA
1923 //
1924
1925 // set buffer size
1926 if (bytesLeft > blockSize)
1927 inSize = blockSize;
1928 else
1929 inSize = bytesLeft;
1930
1931 #ifdef PBZIP_DEBUG
1932 fprintf(stderr, " -> Bytes To Read: %" PRIuMAX " bytes...\n", (uintmax_t)inSize);
1933 #endif
1934
1935 // allocate memory to read in file
1936 FileData = NULL;
1937 FileData = new(std::nothrow) char[inSize];
1938 // make sure memory was allocated properly
1939 if (FileData == NULL)
1940 {
1941 close(hInfile);
1942 if (OutputStdOut == 0)
1943 close(hOutfile);
1944
1945 handle_error(EF_EXIT, -1,
1946 "pbzip2: *ERROR: Could not allocate memory (FileData)! Aborting...\n");
1947 return -1;
1948 }
1949
1950 // read file data
1951 rret = do_read(hInfile, (char *) FileData, inSize);
1952 #ifdef PBZIP_DEBUG
1953 fprintf(stderr, " -> Total Bytes Read: %d bytes...\n\n", rret);
1954 #endif
1955 if (rret == 0)
1956 {
1957 if (FileData != NULL)
1958 delete [] FileData;
1959 break;
1960 }
1961 else if (rret < 0)
1962 {
1963 close(hInfile);
1964 if (FileData != NULL)
1965 delete [] FileData;
1966 if (OutputStdOut == 0)
1967 close(hOutfile);
1968
1969 handle_error(EF_EXIT, -1,
1970 "pbzip2: *ERROR: Could not read from file! Aborting...\n");
1971 return -1;
1972 }
1973
1974 // set bytes left after read
1975 bytesLeft -= rret;
1976
1977 //
1978 // COMPRESS DATA
1979 //
1980
1981 outSize = (int) ((inSize*1.01)+600);
1982 // allocate memory for compressed data
1983 CompressedData = new(std::nothrow) char[outSize];
1984 // make sure memory was allocated properly
1985 if (CompressedData == NULL)
1986 {
1987 close(hInfile);
1988 if (FileData != NULL)
1989 delete [] FileData;
1990
1991 handle_error(EF_EXIT, -1,
1992 "pbzip2: *ERROR: Could not allocate memory (CompressedData)! Aborting...\n");
1993 return -1;
1994 }
1995
1996 // compress the memory buffer (blocksize=9*100k, verbose=0, worklevel=30)
1997 ret = BZ2_bzBuffToBuffCompress(CompressedData, &outSize, FileData, inSize, BWTblockSize, Verbosity, 30);
1998 if (ret != BZ_OK)
1999 {
2000 close(hInfile);
2001 if (FileData != NULL)
2002 delete [] FileData;
2003
2004 handle_error(EF_EXIT, -1, "pbzip2: *ERROR during compression: %d! Aborting...\n", ret);
2005 return -1;
2006 }
2007
2008 #ifdef PBZIP_DEBUG
2009 fprintf(stderr, "\n Original Block Size: %" PRIuMAX "\n", (uintmax_t)inSize);
2010 fprintf(stderr, " Compressed Block Size: %u\n", outSize);
2011 #endif
2012
2013 //
2014 // WRITE DATA
2015 //
2016
2017 // write data to the output file
2018 ret = do_write(hOutfile, CompressedData, outSize);
2019
2020 #ifdef PBZIP_DEBUG
2021 fprintf(stderr, "\n -> Total Bytes Written[%d]: %d bytes...\n", currBlock, ret);
2022 #endif
2023 if (ret <= 0)
2024 {
2025 close(hInfile);
2026 if (FileData != NULL)
2027 delete [] FileData;
2028 if (CompressedData != NULL)
2029 delete [] CompressedData;
2030 if (OutputStdOut == 0)
2031 close(hOutfile);
2032
2033 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not write to file! Aborting...\n");
2034 return -1;
2035 }
2036
2037 CompressedSize += ret;
2038
2039 currBlock++;
2040 // print current completion status
2041 int percentCompleteOld = percentComplete;
2042 percentComplete = 100 * currBlock / NumBlocksEstimated;
2043 if (QuietMode != 1)
2044 {
2045 if (percentComplete != percentCompleteOld)
2046 {
2047 fprintf(stderr, "Completed: %d%% \r", percentComplete);
2048 fflush(stderr);
2049 }
2050 }
2051
2052 // clean up memory
2053 if (FileData != NULL)
2054 {
2055 delete [] FileData;
2056 FileData = NULL;
2057 }
2058 if (CompressedData != NULL)
2059 {
2060 delete [] CompressedData;
2061 CompressedData = NULL;
2062 }
2063
2064 // check to make sure all the data we expected was read in
2065 if (rret != inSize)
2066 inSize = rret;
2067 } // while
2068
2069 close(hInfile);
2070
2071 if (OutputStdOut == 0)
2072 close(hOutfile);
2073 if (QuietMode != 1)
2074 {
2075 fprintf(stderr, " Output Size: %" PRIuMAX " bytes\n", (uintmax_t)CompressedSize);
2076 }
2077
2078 syncSetProducerDone(1); // Not really needed for direct version
2079 return 0;
2080 }
2081
close_streams(FILE * out,FILE * in)2082 void close_streams(FILE *out, FILE *in)
2083 {
2084 if (out != NULL) {
2085 fflush(out);
2086 }
2087
2088 if (in != NULL && in != stdin) {
2089 fclose(in);
2090 }
2091
2092 if (out != NULL && out != stdout) {
2093 fclose(out);
2094 }
2095 }
2096
2097 /*
2098 *********************************************************
2099 */
directdecompress(const char * InFilename,const char * OutFilename)2100 int directdecompress(const char *InFilename, const char *OutFilename)
2101 {
2102 FILE *stream = NULL;
2103 FILE *zStream = NULL;
2104 BZFILE* bzf = NULL;
2105 unsigned char obuf[5000];
2106 unsigned char unused[BZ_MAX_UNUSED];
2107 unsigned char *unusedTmp;
2108 int bzerr, nread, streamNo;
2109 int nUnused;
2110 int ret = 0;
2111 int i;
2112
2113 nUnused = 0;
2114 streamNo = 0;
2115
2116 // see if we are using stdin or not
2117 if (strcmp(InFilename, "-") != 0)
2118 {
2119 // open the file for reading
2120 zStream = fopen(InFilename, "rb");
2121 if (zStream == NULL)
2122 {
2123 handle_error(EF_EXIT, -1,
2124 "pbzip2: *ERROR: Could not open input file [%s]! Aborting...\n", InFilename);
2125 return -1;
2126 }
2127 }
2128 else
2129 {
2130 #ifdef WIN32
2131 setmode(fileno(stdin), O_BINARY);
2132 #endif
2133 zStream = stdin;
2134 }
2135
2136 // check file stream for errors
2137 if (ferror(zStream))
2138 {
2139 close_streams(stream, zStream);
2140 handle_error(EF_EXIT, -1,
2141 "pbzip2: *ERROR: Problem with input stream of file [%s]! Aborting...\n", InFilename);
2142 return -1;
2143 }
2144
2145 // see if we are outputting to stdout
2146 if (OutputStdOut == 0)
2147 {
2148 stream = safe_fopen_output(OutFilename, "wb");
2149 if (stream == NULL)
2150 {
2151 handle_error(EF_EXIT, -1,
2152 "pbzip2: *ERROR: Could not open output file [%s]! Aborting...\n", OutFilename);
2153 return -1;
2154 }
2155 }
2156 else
2157 {
2158 #ifdef WIN32
2159 setmode(fileno(stdout), O_BINARY);
2160 #endif
2161 stream = stdout;
2162 }
2163
2164 // check file stream for errors
2165 if (ferror(stream))
2166 {
2167 close_streams(stream, zStream);
2168 handle_error(EF_EXIT, -1,
2169 "pbzip2: *ERROR: Problem with output stream of file [%s]! Aborting...\n", InFilename);
2170 return -1;
2171 }
2172
2173 // loop until end of file
2174 while(true)
2175 {
2176 if (syncGetTerminateFlag() != 0)
2177 {
2178 fprintf (stderr, "directdecompress: terminating1 - terminateFlag set\n");
2179 close_streams(stream, zStream);
2180 return -1;
2181 }
2182
2183 bzf = BZ2_bzReadOpen(&bzerr, zStream, Verbosity, 0, unused, nUnused);
2184 if (bzf == NULL || bzerr != BZ_OK)
2185 {
2186 ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
2187 close_streams(stream, zStream);
2188
2189 if (ret != 0)
2190 {
2191 syncSetTerminateFlag(1);
2192 }
2193
2194 return ret;
2195 }
2196
2197 streamNo++;
2198
2199 while (bzerr == BZ_OK)
2200 {
2201 if (syncGetTerminateFlag() != 0)
2202 {
2203 fprintf (stderr, "directdecompress: terminating2 - terminateFlag set\n");
2204 close_streams(stream, zStream);
2205 return -1;
2206 }
2207
2208 nread = BZ2_bzRead(&bzerr, bzf, obuf, sizeof(obuf));
2209 if (bzerr == BZ_DATA_ERROR_MAGIC)
2210 {
2211 // try alternate way of reading data
2212 if (ForceOverwrite == 1)
2213 {
2214 rewind(zStream);
2215 while (true)
2216 {
2217 int c = fgetc(zStream);
2218 if (c == EOF)
2219 break;
2220 ungetc(c,zStream);
2221
2222 nread = fread(obuf, sizeof(unsigned char), sizeof(obuf), zStream );
2223 if (ferror(zStream))
2224 {
2225 ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
2226 close_streams(stream, zStream);
2227
2228 if (ret != 0)
2229 {
2230 syncSetTerminateFlag(1);
2231 }
2232
2233 return ret;
2234 }
2235 if (nread > 0)
2236 (void) fwrite (obuf, sizeof(unsigned char), nread, stream);
2237 if (ferror(stream))
2238 {
2239 ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
2240 close_streams(stream, zStream);
2241
2242 if (ret != 0)
2243 {
2244 syncSetTerminateFlag(1);
2245 }
2246
2247 return ret;
2248 }
2249 }
2250 goto closeok;
2251 }
2252 }
2253 if ((bzerr == BZ_OK || bzerr == BZ_STREAM_END) && nread > 0)
2254 (void) fwrite(obuf, sizeof(unsigned char), nread, stream );
2255 if (ferror(stream))
2256 {
2257 ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
2258 close_streams(stream, zStream);
2259
2260 if (ret != 0)
2261 {
2262 syncSetTerminateFlag(1);
2263 }
2264 return ret;
2265 }
2266 }
2267 if (bzerr != BZ_STREAM_END)
2268 {
2269 ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
2270 close_streams(stream, zStream);
2271
2272 if (ret != 0)
2273 {
2274 syncSetTerminateFlag(1);
2275 }
2276 return ret;
2277 }
2278
2279 BZ2_bzReadGetUnused(&bzerr, bzf, (void**)(&unusedTmp), &nUnused);
2280 if (bzerr != BZ_OK)
2281 {
2282 handle_error(EF_EXIT, 3, "pbzip2: *ERROR: Unexpected error [bzerr=%d]. Aborting!\n", bzerr);
2283 return 3;
2284 }
2285
2286 for (i = 0; i < nUnused; i++)
2287 unused[i] = unusedTmp[i];
2288
2289 BZ2_bzReadClose(&bzerr, bzf);
2290 if (bzerr != BZ_OK)
2291 {
2292 handle_error(EF_EXIT, 3, "pbzip2: *ERROR: Unexpected error [bzerr=%d]. Aborting!\n", bzerr);
2293 return 3;
2294 }
2295
2296 // check to see if we are at the end of the file
2297 if (nUnused == 0)
2298 {
2299 int c = fgetc(zStream);
2300 if (c == EOF)
2301 break;
2302 ungetc(c, zStream);
2303 }
2304 }
2305
2306 closeok:
2307 // check file stream for errors
2308 if (ferror(zStream))
2309 {
2310 if (zStream != stdin)
2311 fclose(zStream);
2312 if (stream != stdout)
2313 fclose(stream);
2314
2315 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Problem with input stream of file [%s]! Skipping...\n", InFilename);
2316
2317 return -1;
2318 }
2319 // close file
2320 ret = do_fclose(zStream);
2321 if (ret == EOF)
2322 {
2323 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Problem closing file [%s]! Skipping...\n", InFilename);
2324 return -1;
2325 }
2326
2327 // check file stream for errors
2328 if (ferror(stream))
2329 {
2330 if (stream != stdout)
2331 fclose(stream);
2332 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Problem with output stream of file [%s]! Skipping...\n", InFilename);
2333
2334 return -1;
2335 }
2336 ret = do_fflush(stream);
2337 if (ret != 0)
2338 {
2339 if (stream != stdout)
2340 fclose(stream);
2341 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Problem with output stream of file [%s]! Skipping...\n", InFilename);
2342 return -1;
2343 }
2344 if (stream != stdout)
2345 {
2346 ret = do_fclose(stream);
2347 if (ret == EOF)
2348 {
2349 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Problem closing file [%s]! Skipping...\n", OutFilename);
2350 return -1;
2351 }
2352 }
2353
2354 syncSetProducerDone(1); // Not really needed for direct version.
2355 return 0;
2356 }
2357
2358 /*
2359 * Simulate an unconditional read(), reading in data to fill the
2360 * bsize-sized buffer if it can, even if it means calling read() multiple
2361 * times. This is needed since pipes and other "special" streams
2362 * sometimes don't allow reading of arbitrary sized buffers.
2363 */
bufread(int hf,char * buf,size_t bsize)2364 ssize_t bufread(int hf, char *buf, size_t bsize)
2365 {
2366 size_t bufr = 0;
2367 int ret;
2368 int rsize = bsize;
2369
2370 while (1)
2371 {
2372 ret = read(hf, buf, rsize);
2373
2374 if (ret < 0)
2375 return ret;
2376 if (ret == 0)
2377 return bufr;
2378
2379 bufr += ret;
2380 if (bufr == bsize)
2381 return bsize;
2382 rsize -= ret;
2383 buf += ret;
2384 }
2385 }
2386
2387 /*
2388 *********************************************************
2389 */
producer(int hInfile,int blockSize,queue * fifo)2390 int producer(int hInfile, int blockSize, queue *fifo)
2391 {
2392 char *FileData = NULL;
2393 size_t inSize = 0;
2394 // int blockNum = 0;
2395 int ret = 0;
2396 // int pret = -1;
2397
2398 // We will now totally ignore the fileSize and read the data as it
2399 // comes in. Aside from allowing us to process arbitrary streams, it's
2400 // also the *right thing to do* in unix environments where data may
2401 // be appended to the file as it's processed (e.g. log files).
2402
2403 safe_mutex_lock(&ProgressIndicatorsMutex);
2404 NumBlocks = 0;
2405 safe_mutex_unlock(&ProgressIndicatorsMutex);
2406
2407 // keep going until all the file is processed
2408 while (1)
2409 {
2410 if (syncGetTerminateFlag() != 0)
2411 {
2412 close(hInfile);
2413 return -1;
2414 }
2415
2416 // set buffer size
2417 inSize = blockSize;
2418
2419 #ifdef PBZIP_DEBUG
2420 fprintf(stderr, " -> Bytes To Read: %" PRIuMAX " bytes...\n", (uintmax_t)inSize);
2421 #endif
2422
2423 // allocate memory to read in file
2424 FileData = NULL;
2425 FileData = new(std::nothrow) char[inSize];
2426 // make sure memory was allocated properly
2427 if (FileData == NULL)
2428 {
2429 close(hInfile);
2430 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not allocate memory (FileData)! Aborting...\n");
2431 return -1;
2432 }
2433
2434 // read file data
2435 ret = bufread(hInfile, (char *) FileData, inSize);
2436 #ifdef PBZIP_DEBUG
2437 fprintf(stderr, " -> Total Bytes Read: %d bytes...\n\n", ret);
2438 #endif
2439 if (ret == 0)
2440 {
2441 // finished reading.
2442 if (FileData != NULL)
2443 delete [] FileData;
2444 break;
2445 }
2446 else if (ret < 0)
2447 {
2448 close(hInfile);
2449 if (FileData != NULL)
2450 delete [] FileData;
2451
2452 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not read from file! Aborting...\n");
2453 return -1;
2454 }
2455
2456 // check to make sure all the data we expected was read in
2457 if ((size_t)ret != inSize)
2458 inSize = ret;
2459
2460 #ifdef PBZIP_DEBUG
2461 fprintf(stderr, "producer: Going into fifo-mut lock (NumBlocks: %d)\n", NumBlocks);
2462 #endif
2463
2464 // add data to the compression queue
2465 safe_mutex_lock(fifo->mut);
2466 while (fifo->full)
2467 {
2468 #ifdef PBZIP_DEBUG
2469 fprintf (stderr, "producer: queue FULL.\n");
2470 #endif
2471 safe_cond_wait(fifo->notFull, fifo->mut);
2472
2473 if (syncGetTerminateFlag() != 0)
2474 {
2475 pthread_mutex_unlock(fifo->mut);
2476 close(hInfile);
2477 return -1;
2478 }
2479 }
2480 #ifdef PBZIP_DEBUG
2481 fprintf(stderr, "producer: Buffer: %p Size: %" PRIuMAX " Block: %d\n", FileData, (uintmax_t)inSize, NumBlocks);
2482 #endif
2483
2484 outBuff * queueElement = new(std::nothrow) outBuff(FileData, inSize, NumBlocks, 0);
2485 // make sure memory was allocated properly
2486 if (queueElement == NULL)
2487 {
2488 close(hInfile);
2489 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not allocate memory (queueElement)! Aborting...\n");
2490 return -1;
2491 }
2492
2493 fifo->add(queueElement);
2494 safe_cond_signal(fifo->notEmpty);
2495
2496 safe_mutex_lock(&ProgressIndicatorsMutex);
2497 ++NumBlocks;
2498 safe_mutex_unlock(&ProgressIndicatorsMutex);
2499
2500 safe_mutex_unlock(fifo->mut);
2501 } // while
2502
2503 close(hInfile);
2504
2505 syncSetProducerDone(1);
2506 safe_mutex_lock(fifo->mut);
2507 safe_cond_broadcast(fifo->notEmpty); // just in case
2508 safe_mutex_unlock(fifo->mut);
2509
2510 #ifdef PBZIP_DEBUG
2511 fprintf(stderr, "producer: Done - exiting. Num Blocks: %d\n", NumBlocks);
2512 #endif
2513
2514 return 0;
2515 }
2516
2517 /*
2518 *********************************************************
2519 */
consumer(void * q)2520 void *consumer (void *q)
2521 {
2522 queue *fifo;
2523 // char *FileData = NULL;
2524 outBuff *fileData = NULL;
2525 char *CompressedData = NULL;
2526 // unsigned int inSize = 0;
2527 unsigned int outSize = 0;
2528 // int blockNum = -1;
2529 int ret = -1;
2530
2531 fifo = (queue *)q;
2532
2533 for (;;)
2534 {
2535 if (syncGetTerminateFlag() != 0)
2536 {
2537 #ifdef PBZIP_DEBUG
2538 fprintf (stderr, "consumer: terminating1 - terminateFlag set\n");
2539 #endif
2540 return (NULL);
2541 }
2542
2543 safe_mutex_lock(fifo->mut);
2544 for (;;)
2545 {
2546 if (!fifo->empty && (fifo->remove(fileData) == 1))
2547 {
2548 // block retreived - break the loop and continue further
2549 break;
2550 }
2551
2552 #ifdef PBZIP_DEBUG
2553 fprintf (stderr, "consumer: queue EMPTY.\n");
2554 #endif
2555
2556 if (fifo->empty && ((syncGetProducerDone() == 1) || (syncGetTerminateFlag() != 0)))
2557 {
2558 safe_mutex_unlock(fifo->mut);
2559 #ifdef PBZIP_DEBUG
2560 fprintf (stderr, "consumer: exiting2\n");
2561 #endif
2562 return (NULL);
2563 }
2564
2565 #ifdef PBZIP_DEBUG
2566 safe_cond_timed_wait(fifo->notEmpty, fifo->mut, 1, "consumer");
2567 #else
2568 safe_cond_wait(fifo->notEmpty, fifo->mut);
2569 #endif
2570 }
2571
2572 #ifdef PBZIP_DEBUG
2573 fprintf(stderr, "consumer: Buffer: %p Size: %u Block: %d\n",
2574 fileData->buf, (unsigned)fileData->bufSize, fileData->blockNumber);
2575 #endif
2576
2577 safe_cond_signal(fifo->notFull);
2578 safe_mutex_unlock(fifo->mut);
2579 #ifdef PBZIP_DEBUG
2580 fprintf(stderr, "consumer: received %d.\n", fileData->blockNumber);
2581 #endif
2582
2583 outSize = (unsigned int) (((fileData->bufSize)*1.01)+600);
2584 // allocate memory for compressed data
2585 CompressedData = new(std::nothrow) char[outSize];
2586 // make sure memory was allocated properly
2587 if (CompressedData == NULL)
2588 {
2589 handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not allocate memory (CompressedData)! Aborting...\n");
2590 return (NULL);
2591 }
2592
2593 // compress the memory buffer (blocksize=9*100k, verbose=0, worklevel=30)
2594 ret = BZ2_bzBuffToBuffCompress(CompressedData, &outSize,
2595 fileData->buf, fileData->bufSize, BWTblockSize, Verbosity, 30);
2596 if (ret != BZ_OK)
2597 {
2598 handle_error(EF_EXIT, -1, "pbzip2: *ERROR during compression: %d! Aborting...\n", ret);
2599 return (NULL);
2600 }
2601
2602 #ifdef PBZIP_DEBUG
2603 fprintf(stderr, "\n Original Block Size: %u\n", (unsigned)fileData->bufSize);
2604 fprintf(stderr, " Compressed Block Size: %u\n", outSize);
2605 #endif
2606
2607 disposeMemory(fileData->buf);
2608
2609 // store data to be written in output bin
2610 outBuff outBlock = outBuff(CompressedData, outSize, fileData->blockNumber, 0, fileData->bufSize);
2611 if (outputBufferAdd(outBlock, "consumer") == NULL)
2612 {
2613 return (NULL);
2614 }
2615
2616 delete fileData;
2617 fileData = NULL;
2618 } // for
2619
2620 #ifdef PBZIP_DEBUG
2621 fprintf (stderr, "consumer: exiting\n");
2622 #endif
2623 return (NULL);
2624 }
2625
2626 /*
2627 *********************************************************
2628 */
mutexesInit()2629 int mutexesInit()
2630 {
2631 // initialize mutexes
2632 OutMutex = new(std::nothrow) pthread_mutex_t;
2633 // make sure memory was allocated properly
2634 if (OutMutex == NULL)
2635 {
2636 fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (OutMutex)! Aborting...\n");
2637 return 1;
2638 }
2639 pthread_mutex_init(OutMutex, NULL);
2640
2641 ProducerDoneMutex = new(std::nothrow) pthread_mutex_t;
2642 // make sure memory was allocated properly
2643 if (ProducerDoneMutex == NULL)
2644 {
2645 fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (ProducerDoneMutex)! Aborting...\n");
2646 return 1;
2647 }
2648 pthread_mutex_init(ProducerDoneMutex, NULL);
2649
2650 return 0;
2651 }
2652
2653 /*
2654 *********************************************************
2655 */
mutexesDelete()2656 void mutexesDelete()
2657 {
2658 if (OutMutex != NULL)
2659 {
2660 pthread_mutex_destroy(OutMutex);
2661 delete OutMutex;
2662 OutMutex = NULL;
2663 }
2664
2665 if (ProducerDoneMutex != NULL)
2666 {
2667 pthread_mutex_destroy(ProducerDoneMutex);
2668 delete ProducerDoneMutex;
2669 ProducerDoneMutex = NULL;
2670 }
2671 }
2672
2673 /*
2674 *********************************************************
2675 */
queueInit(int queueSize)2676 queue *queueInit(int queueSize)
2677 {
2678 queue *q;
2679
2680 q = new(std::nothrow) queue;
2681 if (q == NULL)
2682 return NULL;
2683
2684 q->qData = new(std::nothrow) queue::ElementTypePtr[queueSize];
2685
2686 if (q->qData == NULL)
2687 return NULL;
2688
2689 q->size = queueSize;
2690
2691 q->clear();
2692
2693 q->mut = NULL;
2694 q->mut = new(std::nothrow) pthread_mutex_t;
2695 if (q->mut == NULL)
2696 return NULL;
2697 pthread_mutex_init(q->mut, NULL);
2698
2699 q->notFull = NULL;
2700 q->notFull = new(std::nothrow) pthread_cond_t;
2701 if (q->notFull == NULL)
2702 return NULL;
2703 pthread_cond_init(q->notFull, NULL);
2704
2705 q->notEmpty = NULL;
2706 q->notEmpty = new(std::nothrow) pthread_cond_t;
2707 if (q->notEmpty == NULL)
2708 return NULL;
2709 pthread_cond_init(q->notEmpty, NULL);
2710
2711 q->consumers = NULL;
2712 q->consumers = new(std::nothrow) pthread_t[queueSize];
2713 if (q->consumers == NULL)
2714 return NULL;
2715
2716 notTooMuchNumBuffered = NULL;
2717 notTooMuchNumBuffered = new(std::nothrow) pthread_cond_t;
2718 if (notTooMuchNumBuffered == NULL)
2719 return NULL;
2720 pthread_cond_init(notTooMuchNumBuffered, NULL);
2721
2722 return (q);
2723 }
2724
2725
2726 /*
2727 *********************************************************
2728 */
queueDelete(queue * q)2729 void queueDelete (queue *q)
2730 {
2731 if (q == NULL)
2732 return;
2733
2734 if (q->mut != NULL)
2735 {
2736 pthread_mutex_destroy(q->mut);
2737 delete q->mut;
2738 q->mut = NULL;
2739 }
2740
2741 if (q->notFull != NULL)
2742 {
2743 pthread_cond_destroy(q->notFull);
2744 delete q->notFull;
2745 q->notFull = NULL;
2746 }
2747
2748 if (q->notEmpty != NULL)
2749 {
2750 pthread_cond_destroy(q->notEmpty);
2751 delete q->notEmpty;
2752 q->notEmpty = NULL;
2753 }
2754
2755 delete [] q->consumers;
2756 delete [] q->qData;
2757
2758 delete q;
2759 q = NULL;
2760
2761 if (notTooMuchNumBuffered != NULL)
2762 {
2763 pthread_cond_destroy(notTooMuchNumBuffered);
2764 delete notTooMuchNumBuffered;
2765 notTooMuchNumBuffered = NULL;
2766 }
2767
2768 return;
2769 }
2770
2771
2772 /**
2773 * Initialize output buffer contents with empty (NULL, 0) blocks
2774 *
2775 * @param size new size of buffer
2776 *
2777 */
outputBufferInit(size_t size)2778 void outputBufferInit(size_t size)
2779 {
2780 safe_mutex_lock(OutMutex);
2781
2782 NextBlockToWrite = 0;
2783 OutBufferPosToWrite = 0;
2784 NumBufferedBlocks = 0;
2785 NumBufferedTailBlocks = 0;
2786
2787 outBuff emptyElement;
2788 emptyElement.buf = NULL;
2789 emptyElement.bufSize = 0;
2790
2791 // Resize and fill-in with empty elements
2792 OutputBuffer.assign(size, emptyElement);
2793
2794 // unlikely to get here since more likely exception will be thrown
2795 if (OutputBuffer.size() != size)
2796 {
2797 fprintf(stderr, "pbzip2: *ERROR: Could not initialize (OutputBuffer); size=%" PRIuMAX "! Aborting...\n", (uintmax_t)size);
2798 safe_mutex_unlock(OutMutex);
2799 exit(1);
2800 }
2801
2802 safe_mutex_unlock(OutMutex);
2803 }
2804
2805 /**
2806 * Get output buffer index corresponding to the given absolute blockNumber
2807 * (buffer is used in circular mode)
2808 *
2809 * @param blockNum - absolute block number to translate
2810 * @return 0-based Output Buffer index where blockNum data should go
2811 */
getOutputBufferPos(int blockNum)2812 inline size_t getOutputBufferPos(int blockNum)
2813 {
2814 // calculate output buffer position (used in circular mode)
2815 size_t outBuffPos = OutBufferPosToWrite + blockNum - NextBlockToWrite;
2816
2817 if (outBuffPos >= NumBufferedBlocksMax)
2818 {
2819 outBuffPos -= NumBufferedBlocksMax;
2820 }
2821
2822 return outBuffPos;
2823 }
2824
2825 /**
2826 * Add next element to the given out buffer tail.
2827 *
2828 */
outputBufferSeqAddNext(outBuff * preveElement,outBuff * newElement)2829 outBuff * outputBufferSeqAddNext(outBuff * preveElement, outBuff * newElement)
2830 {
2831 safe_mutex_lock(OutMutex);
2832
2833 while ((NumBufferedTailBlocks >= NumBufferedBlocksMax) &&
2834 (preveElement->buf != NULL))
2835 {
2836 if (syncGetTerminateFlag() != 0)
2837 {
2838 #ifdef PBZIP_DEBUG
2839 fprintf (stderr, "%s: terminating2 - terminateFlag set\n", "consumer");
2840 #endif
2841 pthread_mutex_unlock(OutMutex);
2842 return NULL;
2843 }
2844
2845 if ( (LastGoodBlock != -1) && (LastGoodBlock < newElement->blockNumber) )
2846 {
2847 #ifdef PBZIP_DEBUG
2848 fprintf (stderr, "%s: terminating3 - LastGoodBlock set\n", "consumer");
2849 #endif
2850 pthread_mutex_unlock(OutMutex);
2851 return NULL;
2852 }
2853
2854 #ifdef PBZIP_DEBUG
2855 fprintf (stderr, "%s/outputBufferSeqAddNext: Throttling from FileWriter backlog: %d\n", "consumer", NumBufferedBlocks);
2856 #endif
2857 safe_cond_wait(notTooMuchNumBuffered, OutMutex);
2858 }
2859
2860 preveElement->next = newElement;
2861
2862 ++NumBufferedTailBlocks;
2863
2864 // size_t outBufPos = getOutputBufferPos(newElement->blockNumber);
2865 if (preveElement->buf == NULL)
2866 {
2867 // fileWriter has already consumed the previous block. Let it know
2868 // for that one early
2869 safe_cond_signal(&OutBufferHeadNotEmpty);
2870 }
2871
2872 safe_mutex_unlock(OutMutex);
2873
2874 return newElement;
2875 }
2876
2877 /**
2878 * Store an item in OutputBuffer out bin. Synchronization is embedded to protect
2879 * from simultaneous access.
2880 *
2881 * @param elem - output buffer element to add
2882 * @param caller - used for debug purposes (caller function name)
2883 *
2884 * @return pointer to added element on success; NULL - on error
2885 */
outputBufferAdd(const outBuff & element,const char * caller)2886 outBuff * outputBufferAdd(const outBuff & element, const char *caller)
2887 {
2888 safe_mutex_lock(OutMutex);
2889
2890 // wait while blockNum is out of range
2891 // [NextBlockToWrite, NextBlockToWrite + NumBufferedBlocksMax)
2892 int dist = element.blockNumber - NumBufferedBlocksMax;
2893 while (dist >= NextBlockToWrite)
2894 {
2895 if (syncGetTerminateFlag() != 0)
2896 {
2897 #ifdef PBZIP_DEBUG
2898 fprintf (stderr, "%s/outputBufferAdd: terminating2 - terminateFlag set\n", caller);
2899 #endif
2900 pthread_mutex_unlock(OutMutex);
2901 return NULL;
2902 }
2903
2904 if ( (LastGoodBlock != -1) && (LastGoodBlock < element.blockNumber) )
2905 {
2906 #ifdef PBZIP_DEBUG
2907 fprintf (stderr, "%s: terminating3 - LastGoodBlock set\n", "consumer");
2908 #endif
2909 pthread_mutex_unlock(OutMutex);
2910 return NULL;
2911 }
2912
2913 #ifdef PBZIP_DEBUG
2914 fprintf (stderr, "%s: Throttling from FileWriter backlog: %d\n", caller, NumBufferedBlocks);
2915 #endif
2916 safe_cond_wait(notTooMuchNumBuffered, OutMutex);
2917 }
2918
2919 // calculate output buffer position (used in circular mode)
2920 size_t outBuffPos = getOutputBufferPos(element.blockNumber);
2921
2922 OutputBuffer[outBuffPos] = element;
2923 ++NumBufferedBlocks;
2924
2925 if (NextBlockToWrite == element.blockNumber)
2926 {
2927 safe_cond_signal(&OutBufferHeadNotEmpty);
2928 }
2929
2930 safe_mutex_unlock(OutMutex);
2931
2932 return &(OutputBuffer[outBuffPos]);
2933 }
2934
2935 /*
2936 *********************************************************
2937 Much of the code in this function is taken from bzip2.c
2938 */
testBZ2ErrorHandling(int bzerr,BZFILE * bzf,int streamNo)2939 int testBZ2ErrorHandling(int bzerr, BZFILE* bzf, int streamNo)
2940 {
2941 int bzerr_dummy;
2942
2943 BZ2_bzReadClose(&bzerr_dummy, bzf);
2944 switch (bzerr)
2945 {
2946 case BZ_CONFIG_ERROR:
2947 fprintf(stderr, "pbzip2: *ERROR: Integers are not the right size for libbzip2. Aborting!\n");
2948 exit(3);
2949 break;
2950 case BZ_IO_ERROR:
2951 fprintf(stderr, "pbzip2: *ERROR: Integers are not the right size for libbzip2. Aborting!\n");
2952 return 1;
2953 break;
2954 case BZ_DATA_ERROR:
2955 fprintf(stderr, "pbzip2: *ERROR: Data integrity (CRC) error in data! Skipping...\n");
2956 return -1;
2957 break;
2958 case BZ_MEM_ERROR:
2959 fprintf(stderr, "pbzip2: *ERROR: Could NOT allocate enough memory. Aborting!\n");
2960 return 1;
2961 break;
2962 case BZ_UNEXPECTED_EOF:
2963 fprintf(stderr, "pbzip2: *ERROR: File ends unexpectedly! Skipping...\n");
2964 return -1;
2965 break;
2966 case BZ_DATA_ERROR_MAGIC:
2967 if (streamNo == 1)
2968 {
2969 fprintf(stderr, "pbzip2: *ERROR: Bad magic number (file not created by bzip2)! Skipping...\n");
2970 return -1;
2971 }
2972 else
2973 {
2974 fprintf(stderr, "pbzip2: *WARNING: Trailing garbage after EOF ignored!\n");
2975 return 0;
2976 }
2977 default:
2978 fprintf(stderr, "pbzip2: *ERROR: Unexpected error. Aborting!\n");
2979 exit(3);
2980 }
2981
2982 return 0;
2983 }
2984
2985 /*
2986 *********************************************************
2987 Much of the code in this function is taken from bzip2.c
2988 */
testCompressedData(char * fileName)2989 int testCompressedData(char *fileName)
2990 {
2991 FILE *zStream = NULL;
2992 int ret = 0;
2993
2994 BZFILE* bzf = NULL;
2995 unsigned char obuf[5000];
2996 unsigned char unused[BZ_MAX_UNUSED];
2997 unsigned char *unusedTmp;
2998 int bzerr, nread, streamNo;
2999 int nUnused;
3000 int i;
3001
3002 nUnused = 0;
3003 streamNo = 0;
3004
3005 // see if we are using stdin or not
3006 if (strcmp(fileName, "-") != 0)
3007 {
3008 // open the file for reading
3009 zStream = fopen(fileName, "rb");
3010 if (zStream == NULL)
3011 {
3012 ErrorContext::getInstance()->saveError();
3013 handle_error(EF_NOQUIT, -1, "pbzip2: *ERROR: Could not open input file [%s]! Skipping...\n", fileName);
3014 return -1;
3015 }
3016 }
3017 else
3018 {
3019 zStream = stdin;
3020 }
3021
3022 // check file stream for errors
3023 if (ferror(zStream))
3024 {
3025
3026 handle_error(EF_NOQUIT, -1, "pbzip2: *ERROR: Problem with stream of file [%s]! Skipping...\n", fileName);
3027 if (zStream != stdin)
3028 verbose_fclose(zStream, fileName);
3029 return -1;
3030 }
3031
3032 // loop until end of file
3033 while(true)
3034 {
3035 bzf = BZ2_bzReadOpen(&bzerr, zStream, Verbosity, 0, unused, nUnused);
3036 if (bzf == NULL || bzerr != BZ_OK)
3037 {
3038 ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
3039 if (zStream != stdin)
3040 verbose_fclose(zStream, fileName);
3041 return ret;
3042 }
3043
3044 streamNo++;
3045
3046 while (bzerr == BZ_OK)
3047 {
3048 nread = BZ2_bzRead(&bzerr, bzf, obuf, sizeof(obuf));
3049 if (bzerr == BZ_DATA_ERROR_MAGIC)
3050 {
3051 ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
3052 if (zStream != stdin)
3053 verbose_fclose(zStream, fileName);
3054 return ret;
3055 }
3056 }
3057 if (bzerr != BZ_STREAM_END)
3058 {
3059 ret = testBZ2ErrorHandling(bzerr, bzf, streamNo);
3060 if (zStream != stdin)
3061 verbose_fclose(zStream, fileName);
3062 return ret;
3063 }
3064
3065 BZ2_bzReadGetUnused(&bzerr, bzf, (void**)(&unusedTmp), &nUnused);
3066 if (bzerr != BZ_OK)
3067 {
3068 fprintf(stderr, "pbzip2: *ERROR: Unexpected error. Aborting!\n");
3069 exit(3);
3070 }
3071
3072 for (i = 0; i < nUnused; i++)
3073 unused[i] = unusedTmp[i];
3074
3075 BZ2_bzReadClose(&bzerr, bzf);
3076 if (bzerr != BZ_OK)
3077 {
3078 fprintf(stderr, "pbzip2: *ERROR: Unexpected error. Aborting!\n");
3079 exit(3);
3080 }
3081
3082 // check to see if we are at the end of the file
3083 if (nUnused == 0)
3084 {
3085 int c = fgetc(zStream);
3086 if (c == EOF)
3087 break;
3088 else
3089 ungetc(c, zStream);
3090 }
3091 }
3092
3093 // check file stream for errors
3094 if (ferror(zStream))
3095 {
3096 ErrorContext::getInstance()->saveError();
3097 handle_error(EF_NOQUIT, -1, "pbzip2: *ERROR: Problem with stream of file [%s]! Skipping...\n", fileName);
3098 if (zStream != stdin)
3099 verbose_fclose(zStream, fileName);
3100 return -1;
3101 }
3102
3103 // close file
3104 ret = verbose_fclose(zStream, fileName);
3105 if (ret == EOF)
3106 {
3107 fprintf(stderr, "pbzip2: *ERROR: Problem closing file [%s]! Skipping...\n", fileName);
3108 return -1;
3109 }
3110
3111 return 0;
3112 }
3113
3114 /*
3115 *********************************************************
3116 */
getFileMetaData(const char * fileName)3117 int getFileMetaData(const char *fileName)
3118 {
3119 // get the file meta data and store it in the global structure
3120 return stat(fileName, &fileMetaData);
3121 }
3122
3123 /*
3124 *********************************************************
3125 */
writeFileMetaData(const char * fileName)3126 int writeFileMetaData(const char *fileName)
3127 {
3128 int ret = 0;
3129 #ifndef WIN32
3130 struct utimbuf uTimBuf;
3131 #else
3132 _utimbuf uTimBuf;
3133 #endif
3134
3135 // store file times in structure
3136 uTimBuf.actime = fileMetaData.st_atime;
3137 uTimBuf.modtime = fileMetaData.st_mtime;
3138
3139 // update file with stored file permissions
3140 ret = chmod(fileName, fileMetaData.st_mode);
3141 if (ret != 0)
3142 {
3143 ErrorContext::getInstance()->saveError();
3144 return ret;
3145 }
3146
3147 // update file with stored file access and modification times
3148 ret = utime(fileName, &uTimBuf);
3149 if (ret != 0)
3150 {
3151 ErrorContext::getInstance()->saveError();
3152 return ret;
3153 }
3154
3155 // update file with stored file ownership (if access allows)
3156 #ifndef WIN32
3157 ret = chown(fileName, fileMetaData.st_uid, fileMetaData.st_gid);
3158 // following may happen on some Linux filesystems (i.e. NTFS)
3159 // extra error messages do no harm
3160 if (ret != 0)
3161 {
3162 ErrorContext::getInstance()->saveError();
3163 if (geteuid() == 0)
3164 return ret;
3165 }
3166 #endif
3167
3168 return 0;
3169 }
3170
3171 /*
3172 *********************************************************
3173 */
detectCPUs()3174 int detectCPUs()
3175 {
3176 int ncpu;
3177
3178 // Set default to 1 in case there is no auto-detect
3179 ncpu = 1;
3180
3181 // Autodetect the number of CPUs on a box, if available
3182 #if defined(__APPLE__)
3183 size_t len = sizeof(ncpu);
3184 int mib[2];
3185 mib[0] = CTL_HW;
3186 mib[1] = HW_NCPU;
3187 if (sysctl(mib, 2, &ncpu, &len, 0, 0) < 0 || len != sizeof(ncpu))
3188 ncpu = 1;
3189 #elif defined(_SC_NPROCESSORS_ONLN)
3190 ncpu = sysconf(_SC_NPROCESSORS_ONLN);
3191 #elif defined(WIN32)
3192 SYSTEM_INFO si;
3193 GetSystemInfo(&si);
3194 ncpu = si.dwNumberOfProcessors;
3195 #endif
3196
3197 // Ensure we have at least one processor to use
3198 if (ncpu < 1)
3199 ncpu = 1;
3200
3201 return ncpu;
3202 }
3203
3204
3205 /*
3206 *********************************************************
3207 */
banner()3208 void banner()
3209 {
3210 fprintf(stderr, "Parallel BZIP2 v1.1.13 [Dec 18, 2015]\n");
3211 fprintf(stderr, "By: Jeff Gilchrist [http://compression.ca]\n");
3212 fprintf(stderr, "Major contributions: Yavor Nikolov [http://javornikolov.wordpress.com]\n");
3213 fprintf(stderr, "Uses libbzip2 by Julian Seward\n");
3214
3215 return;
3216 }
3217
3218 /*
3219 *********************************************************
3220 */
usage(char * progname,const char * reason)3221 void usage(char* progname, const char *reason)
3222 {
3223 banner();
3224
3225 if (strncmp(reason, "HELP", 4) == 0)
3226 fprintf(stderr, "\n");
3227 else
3228 fprintf(stderr, "\nInvalid command line: %s. Aborting...\n\n", reason);
3229
3230 #ifndef PBZIP_NO_LOADAVG
3231 fprintf(stderr, "Usage: %s [-1 .. -9] [-b#cdfhklm#p#qrS#tVz] <filename> <filename2> <filenameN>\n", progname);
3232 #else
3233 fprintf(stderr, "Usage: %s [-1 .. -9] [-b#cdfhkm#p#qrS#tVz] <filename> <filename2> <filenameN>\n", progname);
3234 #endif // PBZIP_NO_LOADAVG
3235 fprintf(stderr, " -1 .. -9 set BWT block size to 100k .. 900k (default 900k)\n");
3236 fprintf(stderr, " -b# Block size in 100k steps (default 9 = 900k)\n");
3237 fprintf(stderr, " -c,--stdout Output to standard out (stdout)\n");
3238 fprintf(stderr, " -d,--decompress Decompress file\n");
3239 fprintf(stderr, " -f,--force Overwrite existing output file\n");
3240 fprintf(stderr, " -h,--help Print this help message\n");
3241 fprintf(stderr, " -k,--keep Keep input file, don't delete\n");
3242 #ifndef PBZIP_NO_LOADAVG
3243 fprintf(stderr, " -l,--loadavg Load average determines max number processors to use\n");
3244 #endif // PBZIP_NO_LOADAVG
3245 fprintf(stderr, " -m# Maximum memory usage in 1MB steps (default 100 = 100MB)\n");
3246 fprintf(stderr, " -p# Number of processors to use (default");
3247 #if defined(_SC_NPROCESSORS_ONLN) || defined(__APPLE__)
3248 fprintf(stderr, ": autodetect [%d])\n", detectCPUs());
3249 #else
3250 fprintf(stderr, " 2)\n");
3251 #endif // _SC_NPROCESSORS_ONLN || __APPLE__
3252 fprintf(stderr, " -q,--quiet Quiet mode (default)\n");
3253 fprintf(stderr, " -r,--read Read entire input file into RAM and split between processors\n");
3254 #ifdef USE_STACKSIZE_CUSTOMIZATION
3255 fprintf(stderr, " -S# Child thread stack size in 1KB steps (default stack size if unspecified)\n");
3256 #endif // USE_STACKSIZE_CUSTOMIZATION
3257 fprintf(stderr, " -t,--test Test compressed file integrity\n");
3258 fprintf(stderr, " -v,--verbose Verbose mode\n");
3259 fprintf(stderr, " -V,--version Display version info for pbzip2 then exit\n");
3260 fprintf(stderr, " -z,--compress Compress file (default)\n");
3261 fprintf(stderr, " --ignore-trailing-garbage=# Ignore trailing garbage flag (1 - ignored; 0 - forbidden)\n");
3262 fprintf(stderr, "\n");
3263 fprintf(stderr, "If no file names are given, pbzip2 compresses or decompresses from standard input to standard output.\n");
3264 fprintf(stderr, "\n");
3265 fprintf(stderr, "Example: pbzip2 -b15vk myfile.tar\n");
3266 fprintf(stderr, "Example: pbzip2 -p4 -r -5 myfile.tar second*.txt\n");
3267 fprintf(stderr, "Example: tar cf myfile.tar.bz2 --use-compress-prog=pbzip2 dir_to_compress/\n");
3268 fprintf(stderr, "Example: pbzip2 -d -m500 myfile.tar.bz2\n");
3269 fprintf(stderr, "Example: pbzip2 -dc myfile.tar.bz2 | tar x\n");
3270 fprintf(stderr, "Example: pbzip2 -c < myfile.txt > myfile.txt.bz2 \n");
3271 fprintf(stderr, "\n");
3272 exit(-1);
3273 }
3274
3275 /*
3276 *********************************************************
3277 */
main(int argc,char * argv[])3278 int main(int argc, char* argv[])
3279 {
3280 queue *fifo;
3281 pthread_t output;
3282 char **FileList = NULL;
3283 char *InFilename = NULL;
3284 bool hasInFile = false;
3285 char *progName = NULL;
3286 char *progNamePos = NULL;
3287 char bz2Header[] = {"BZh91AY&SY"}; // using 900k block size
3288 std::string outFilename; // [2048];
3289 char cmdLineTemp[2048];
3290 unsigned char tmpBuff[50];
3291 char stdinFile[2] = {"-"};
3292 struct timeval tvStartTime;
3293 struct timeval tvStopTime;
3294 #ifndef WIN32
3295 struct timezone tz;
3296 double loadAverage = 0.0;
3297 double loadAvgArray[3];
3298 int useLoadAverage = 0;
3299 int numCPUtotal = 0;
3300 int numCPUidle = 0;
3301 #else
3302 SYSTEMTIME systemtime;
3303 LARGE_INTEGER filetime;
3304 LARGE_INTEGER fileSize_temp;
3305 HANDLE hInfile_temp;
3306 #endif
3307 double timeCalc = 0.0;
3308 double timeStart = 0.0;
3309 double timeStop = 0.0;
3310 int cmdLineTempCount = 0;
3311 int readEntireFile = 0;
3312 int zeroByteFile = 0;
3313 int hInfile = -1;
3314 int hOutfile = -1;
3315 int numBlocks = 0;
3316 int blockSize = 9*100000;
3317 int maxMemory = 100000000;
3318 int maxMemorySwitch = 0;
3319 int decompress = 0;
3320 int compress = 0;
3321 int testFile = 0;
3322 int errLevel = 0;
3323 int noThreads = 0;
3324 int keep = 0;
3325 int force = 0;
3326 int ret = 0;
3327 int fileLoop;
3328 size_t i, j, k;
3329 bool switchedMtToSt = false; // switched from multi- to single-thread
3330
3331 // Initialize error context
3332 if (ErrorContext::getInstance() == NULL)
3333 {
3334 return 1;
3335 }
3336
3337 // get current time for benchmark reference
3338 #ifndef WIN32
3339 gettimeofday(&tvStartTime, &tz);
3340 #else
3341 GetSystemTime(&systemtime);
3342 SystemTimeToFileTime(&systemtime, (FILETIME *)&filetime);
3343 tvStartTime.tv_sec = filetime.QuadPart / 10000000;
3344 tvStartTime.tv_usec = (filetime.QuadPart - (LONGLONG)tvStartTime.tv_sec * 10000000) / 10;
3345 #endif
3346
3347 // check to see if we are likely being called from TAR
3348 if (argc < 2)
3349 {
3350 OutputStdOut = 1;
3351 keep = 1;
3352 }
3353
3354 // get program name to determine if decompress mode should be used
3355 progName = argv[0];
3356 for (progNamePos = argv[0]; progNamePos[0] != '\0'; progNamePos++)
3357 {
3358 if (progNamePos[0] == PATH_SEP)
3359 progName = progNamePos + 1;
3360 }
3361 if ((strstr(progName, "unzip") != 0) || (strstr(progName, "UNZIP") != 0))
3362 {
3363 decompress = 1;
3364 }
3365 if ((strstr(progName, "zcat") != 0) || (strstr(progName, "ZCAT") != 0))
3366 {
3367 decompress = OutputStdOut = keep = 1;
3368 }
3369
3370 #ifdef IGNORE_TRAILING_GARBAGE
3371 // default behavior is hard-coded (still dynamically changeable)
3372 IgnoreTrailingGarbageFlag = IGNORE_TRAILING_GARBAGE;
3373 #else
3374 // default depends on program name
3375 if ((strcmp(progName, "bzip2") == 0) || (strcmp(progName, "BZIP2") == 0) ||
3376 (strcmp(progName, "bunzip2") == 0) || (strcmp(progName, "BUNZIP2") == 0) ||
3377 (strcmp(progName, "bzcat") == 0) || (strcmp(progName, "BZCAT") == 0))
3378 {
3379 // Favour traditional non-parallel bzip2 behavior
3380 IgnoreTrailingGarbageFlag = 1;
3381 }
3382 #endif // IGNORE_TRAILING_GARBAGE
3383
3384 FileListCount = 0;
3385 FileList = new(std::nothrow) char *[argc];
3386 if (FileList == NULL)
3387 {
3388 fprintf(stderr, "pbzip2: *ERROR: Not enough memory! Aborting...\n");
3389 return 1;
3390 }
3391 // set default max memory usage to 100MB
3392 maxMemory = 100000000;
3393 NumBufferedBlocksMax = 0;
3394
3395 numCPU = detectCPUs();
3396
3397 #ifndef WIN32
3398 numCPUtotal = numCPU;
3399 #endif
3400
3401 // parse command line switches
3402 for (i=1; (int)i < argc; i++)
3403 {
3404 if (argv[i][0] == '-')
3405 {
3406 if (argv[i][1] == '\0')
3407 {
3408 // support "-" as a filename
3409 FileList[FileListCount] = argv[i];
3410 FileListCount++;
3411 continue;
3412 }
3413 else if (argv[i][1] == '-')
3414 {
3415 // get command line options with "--"
3416 if (strcmp(argv[i], "--best") == 0)
3417 {
3418 BWTblockSize = 9;
3419 }
3420 else if (strcmp(argv[i], "--decompress") == 0)
3421 {
3422 decompress = 1;
3423 }
3424 else if (strcmp(argv[i], "--compress") == 0)
3425 {
3426 compress = 1;
3427 }
3428 else if (strcmp(argv[i], "--fast") == 0)
3429 {
3430 BWTblockSize = 1;
3431 }
3432 else if (strcmp(argv[i], "--force") == 0)
3433 {
3434 force = 1; ForceOverwrite = 1;
3435 }
3436 else if (strcmp(argv[i], "--help") == 0)
3437 {
3438 usage(argv[0], "HELP");
3439 }
3440 else if (strcmp(argv[i], "--keep") == 0)
3441 {
3442 keep = 1;
3443 }
3444 else if (strcmp(argv[i], "--license") == 0)
3445 {
3446 usage(argv[0], "HELP");
3447 }
3448 #ifndef PBZIP_NO_LOADAVG
3449 else if (strcmp(argv[i], "--loadavg") == 0)
3450 {
3451 useLoadAverage = 1;
3452 }
3453 #endif
3454 else if (strcmp(argv[i], "--quiet") == 0)
3455 {
3456 QuietMode = 1;
3457 }
3458 else if (strcmp(argv[i], "--read") == 0)
3459 {
3460 readEntireFile = 1;
3461 }
3462 else if (strcmp(argv[i], "--stdout") == 0)
3463 {
3464 OutputStdOut = 1; keep = 1;
3465 }
3466 else if (strcmp(argv[i], "--test") == 0)
3467 {
3468 testFile = 1;
3469 }
3470 else if (strcmp(argv[i], "--verbose") == 0)
3471 {
3472 QuietMode = 0;
3473 }
3474 else if (strcmp(argv[i], "--version") == 0)
3475 {
3476 banner(); exit(0);
3477 }
3478 else if (strcmp(argv[i], "--ignore-trailing-garbage") == 0 )
3479 {
3480 IgnoreTrailingGarbageFlag = 1;
3481 }
3482 else if (strcmp(argv[i], "--ignore-trailing-garbage=1") == 0 )
3483 {
3484 IgnoreTrailingGarbageFlag = 1;
3485 }
3486 else if (strcmp(argv[i], "--ignore-trailing-garbage=0") == 0 )
3487 {
3488 IgnoreTrailingGarbageFlag = 0;
3489 }
3490
3491 continue;
3492 }
3493 #ifdef PBZIP_DEBUG
3494 fprintf(stderr, "argv[%u]: %s Len: %d\n", (unsigned)i, argv[i], (int)strlen(argv[i]));
3495 #endif
3496 // get command line options with single "-"
3497 // check for multiple switches grouped together
3498 for (j=1; argv[i][j] != '\0'; j++)
3499 {
3500 switch (argv[i][j])
3501 {
3502 case 'p': k = j+1; cmdLineTempCount = 0; strcpy(cmdLineTemp, "2");
3503 while (argv[i][k] != '\0' && k < sizeof(cmdLineTemp))
3504 {
3505 // no more numbers, finish
3506 if ((argv[i][k] < '0') || (argv[i][k] > '9'))
3507 break;
3508 k++;
3509 cmdLineTempCount++;
3510 }
3511 if (cmdLineTempCount == 0)
3512 usage(argv[0], "Cannot parse -p argument");
3513 strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount);
3514 cmdLineTemp[cmdLineTempCount] = '\0';
3515 numCPU = atoi(cmdLineTemp);
3516 if (numCPU > 4096)
3517 {
3518 fprintf(stderr,"pbzip2: *ERROR: Maximal number of supported processors is 4096! Aborting...\n");
3519 return 1;
3520 }
3521 else if (numCPU < 1)
3522 {
3523 fprintf(stderr,"pbzip2: *ERROR: Minimum number of supported processors is 1! Aborting...\n");
3524 return 1;
3525 }
3526 j += cmdLineTempCount;
3527 #ifdef PBZIP_DEBUG
3528 fprintf(stderr, "-p%d\n", numCPU);
3529 #endif
3530 break;
3531 case 'b': k = j+1; cmdLineTempCount = 0; strcpy(cmdLineTemp, "9"); blockSize = 900000;
3532 while (argv[i][k] != '\0' && k < sizeof(cmdLineTemp))
3533 {
3534 // no more numbers, finish
3535 if ((argv[i][k] < '0') || (argv[i][k] > '9'))
3536 break;
3537 k++;
3538 cmdLineTempCount++;
3539 }
3540 if (cmdLineTempCount == 0)
3541 usage(argv[0], "Cannot parse file block size");
3542 strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount);
3543 cmdLineTemp[cmdLineTempCount] = '\0';
3544 blockSize = atoi(cmdLineTemp)*100000;
3545 if ((blockSize < 100000) || (blockSize > 1000000000))
3546 {
3547 fprintf(stderr,"pbzip2: *ERROR: File block size Min: 100k and Max: 1000000k! Aborting...\n");
3548 return 1;
3549 }
3550 j += cmdLineTempCount;
3551 #ifdef PBZIP_DEBUG
3552 fprintf(stderr, "-b%d\n", blockSize);
3553 #endif
3554 break;
3555 case 'm': k = j+1; cmdLineTempCount = 0; strcpy(cmdLineTemp, "1"); maxMemory = 1000000;
3556 while (argv[i][k] != '\0' && k < sizeof(cmdLineTemp))
3557 {
3558 // no more numbers, finish
3559 if ((argv[i][k] < '0') || (argv[i][k] > '9'))
3560 break;
3561 k++;
3562 cmdLineTempCount++;
3563 }
3564 if (cmdLineTempCount == 0)
3565 usage(argv[0], "Cannot parse -m argument");
3566 strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount);
3567 cmdLineTemp[cmdLineTempCount] = '\0';
3568 maxMemory = atoi(cmdLineTemp)*1000000;
3569 if ((maxMemory < 1000000) || (maxMemory > 2000000000))
3570 {
3571 fprintf(stderr,"pbzip2: *ERROR: Memory usage size Min: 1MB and Max: 2000MB! Aborting...\n");
3572 return 1;
3573 }
3574 maxMemorySwitch = 1;
3575 j += cmdLineTempCount;
3576 #ifdef PBZIP_DEBUG
3577 fprintf(stderr, "-m%d\n", maxMemory);
3578 #endif
3579 break;
3580 #ifdef USE_STACKSIZE_CUSTOMIZATION
3581 case 'S': k = j+1; cmdLineTempCount = 0; strcpy(cmdLineTemp, "0"); ChildThreadStackSize = -1;
3582 while (argv[i][k] != '\0' && k < sizeof(cmdLineTemp))
3583 {
3584 // no more numbers, finish
3585 if ((argv[i][k] < '0') || (argv[i][k] > '9'))
3586 break;
3587 k++;
3588 cmdLineTempCount++;
3589 }
3590 if (cmdLineTempCount == 0)
3591 usage(argv[0], "Cannot parse -S argument");
3592 strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount);
3593 cmdLineTemp[cmdLineTempCount] = '\0';
3594 ChildThreadStackSize = atoi(cmdLineTemp)*1024;
3595 if (ChildThreadStackSize < 0)
3596 {
3597 fprintf(stderr,"pbzip2: *ERROR: Parsing -S: invalid stack size specified [%d]! Ignoring...\n",
3598 ChildThreadStackSize);
3599 }
3600 else if (ChildThreadStackSize < PTHREAD_STACK_MIN)
3601 {
3602 fprintf(stderr,"pbzip2: *WARNING: Stack size %d bytes less than minumum - adjusting to %d bytes.\n",
3603 ChildThreadStackSize, PTHREAD_STACK_MIN);
3604 ChildThreadStackSize = PTHREAD_STACK_MIN;
3605 }
3606 j += cmdLineTempCount;
3607 #ifdef PBZIP_DEBUG
3608 fprintf(stderr, "-S%d\n", ChildThreadStackSize);
3609 #endif
3610 break;
3611 #endif // USE_STACKSIZE_CUSTOMIZATION
3612 case 'd': decompress = 1; break;
3613 case 'c': OutputStdOut = 1; keep = 1; break;
3614 case 'f': force = 1; ForceOverwrite = 1; break;
3615 case 'h': usage(argv[0], "HELP"); break;
3616 case 'k': keep = 1; break;
3617 #ifndef PBZIP_NO_LOADAVG
3618 case 'l': useLoadAverage = 1; break;
3619 #endif
3620 case 'L': banner(); exit(0); break;
3621 case 'q': QuietMode = 1; break;
3622 case 'r': readEntireFile = 1; break;
3623 case 't': testFile = 1; break;
3624 case 'v': QuietMode = 0; break;
3625 case 'V': banner(); exit(0); break;
3626 case 'z': compress = 1; break;
3627 case '1': BWTblockSize = 1; break;
3628 case '2': BWTblockSize = 2; break;
3629 case '3': BWTblockSize = 3; break;
3630 case '4': BWTblockSize = 4; break;
3631 case '5': BWTblockSize = 5; break;
3632 case '6': BWTblockSize = 6; break;
3633 case '7': BWTblockSize = 7; break;
3634 case '8': BWTblockSize = 8; break;
3635 case '9': BWTblockSize = 9; break;
3636 }
3637 }
3638 }
3639 else
3640 {
3641 // add filename to list for processing FileListCount
3642 FileList[FileListCount] = argv[i];
3643 FileListCount++;
3644 }
3645 } /* for */
3646
3647 Bz2HeaderZero[3] = '0' + BWTblockSize;
3648 bz2Header[3] = Bz2HeaderZero[3];
3649
3650 // check to make sure we are not trying to compress and decompress at same time
3651 if ((compress == 1) && (decompress == 1))
3652 {
3653 fprintf(stderr,"pbzip2: *ERROR: Can't compress and uncompress data at same time. Aborting!\n");
3654 fprintf(stderr,"pbzip2: For help type: %s -h\n", argv[0]);
3655 return 1;
3656 }
3657
3658 if (FileListCount == 0)
3659 {
3660 if (testFile == 1)
3661 {
3662 #ifndef WIN32
3663 if (isatty(fileno(stdin)))
3664 #else
3665 if (_isatty(_fileno(stdin)))
3666 #endif
3667 {
3668 fprintf(stderr,"pbzip2: *ERROR: Won't read compressed data from terminal. Aborting!\n");
3669 fprintf(stderr,"pbzip2: For help type: %s -h\n", argv[0]);
3670 return 1;
3671 }
3672 // expecting data from stdin
3673 FileList[FileListCount] = stdinFile;
3674 FileListCount++;
3675 }
3676 else if (decompress == 1)
3677 {
3678 #ifndef WIN32
3679 if (isatty(fileno(stdin)))
3680 #else
3681 if (_isatty(_fileno(stdin)))
3682 #endif
3683 {
3684 fprintf(stderr,"pbzip2: *ERROR: Won't read compressed data from terminal. Aborting!\n");
3685 fprintf(stderr,"pbzip2: For help type: %s -h\n", argv[0]);
3686 return 1;
3687 }
3688 // expecting data from stdin via TAR
3689 OutputStdOut = 1;
3690 keep = 1;
3691 FileList[FileListCount] = stdinFile;
3692 FileListCount++;
3693 }
3694 else
3695 {
3696 if (OutputStdOut == 0)
3697 {
3698 // probably trying to input data from stdin
3699 if (QuietMode != 1)
3700 fprintf(stderr,"pbzip2: Assuming input data coming from stdin...\n\n");
3701
3702 OutputStdOut = 1;
3703 keep = 1;
3704 }
3705
3706 #ifndef WIN32
3707 if (isatty(fileno(stdout)))
3708 #else
3709 if (_isatty(_fileno(stdout)))
3710 #endif
3711 {
3712 fprintf(stderr,"pbzip2: *ERROR: Won't write compressed data to terminal. Aborting!\n");
3713 fprintf(stderr,"pbzip2: For help type: %s -h\n", argv[0]);
3714 return 1;
3715 }
3716 // expecting data from stdin
3717 FileList[FileListCount] = stdinFile;
3718 FileListCount++;
3719 }
3720 }
3721
3722 if (QuietMode != 1)
3723 {
3724 // display program banner
3725 banner();
3726
3727 // do sanity check to make sure integers are the size we expect
3728 #ifdef PBZIP_DEBUG
3729 fprintf(stderr, "off_t size: %u uint size: %u\n", (unsigned)sizeof(OFF_T), (unsigned)sizeof(unsigned int));
3730 #endif
3731 if (sizeof(OFF_T) <= 4)
3732 {
3733 fprintf(stderr, "\npbzip2: *WARNING: off_t variable size only %u bits!\n", (unsigned)(sizeof(OFF_T)*CHAR_BIT));
3734 if (decompress == 1)
3735 fprintf(stderr, " You will only able to uncompress files smaller than 2GB in size.\n\n");
3736 else
3737 fprintf(stderr, " You will only able to compress files smaller than 2GB in size.\n\n");
3738 }
3739 }
3740
3741 // Calculate number of processors to use based on load average if requested
3742 #ifndef PBZIP_NO_LOADAVG
3743 if (useLoadAverage == 1)
3744 {
3745 // get current load average
3746 ret = getloadavg(loadAvgArray, 3);
3747 if (ret != 3)
3748 {
3749 loadAverage = 0.0;
3750 useLoadAverage = 0;
3751 if (QuietMode != 1)
3752 fprintf(stderr, "pbzip2: *WARNING: Could not get load average! Using requested processors...\n");
3753 }
3754 else
3755 {
3756 #ifdef PBZIP_DEBUG
3757 fprintf(stderr, "Load Avg1: %f Avg5: %f Avg15: %f\n", loadAvgArray[0], loadAvgArray[1], loadAvgArray[2]);
3758 #endif
3759 // use 1 min load average to adjust number of processors used
3760 loadAverage = loadAvgArray[0]; // use [1] for 5 min average and [2] for 15 min average
3761 // total number processors minus load average rounded up
3762 numCPUidle = numCPUtotal - (int)(loadAverage + 0.5);
3763 // if user asked for a specific # processors and they are idle, use all requested
3764 // otherwise give them whatever idle processors are available
3765 if (numCPUidle < numCPU)
3766 numCPU = numCPUidle;
3767 if (numCPU < 1)
3768 numCPU = 1;
3769 }
3770 }
3771 #endif
3772
3773 // Initialize child threads attributes
3774 initChildThreadAttributes();
3775
3776 // setup signal handling (should be before creating any child thread)
3777 sigInFilename = NULL;
3778 sigOutFilename = NULL;
3779 ret = setupSignalHandling();
3780 if (ret != 0)
3781 {
3782 fprintf(stderr, "pbzip2: *ERROR: Can't setup signal handling [%d]. Aborting!\n", ret);
3783 return 1;
3784 }
3785
3786 // Create and start terminator thread.
3787 ret = setupTerminator();
3788 if (ret != 0)
3789 {
3790 fprintf(stderr, "pbzip2: *ERROR: Can't setup terminator thread [%d]. Aborting!\n", ret);
3791 return 1;
3792 }
3793
3794 if (numCPU < 1)
3795 numCPU = 1;
3796
3797 // display global settings
3798 if (QuietMode != 1)
3799 {
3800 if (testFile != 1)
3801 {
3802 fprintf(stderr, "\n # CPUs: %d\n", numCPU);
3803 #ifndef PBZIP_NO_LOADAVG
3804 if (useLoadAverage == 1)
3805 fprintf(stderr, " Load Average: %.2f\n", loadAverage);
3806 #endif
3807 if (decompress != 1)
3808 {
3809 fprintf(stderr, " BWT Block Size: %d00 KB\n", BWTblockSize);
3810 if (blockSize < 100000)
3811 fprintf(stderr, "File Block Size: %d bytes\n", blockSize);
3812 else
3813 fprintf(stderr, "File Block Size: %d KB\n", blockSize/1000);
3814 }
3815 fprintf(stderr, " Maximum Memory: %d MB\n", maxMemory/1000000);
3816 #ifdef USE_STACKSIZE_CUSTOMIZATION
3817 if (ChildThreadStackSize > 0)
3818 fprintf(stderr, " Stack Size: %d KB\n", ChildThreadStackSize/1024);
3819 #endif
3820
3821 if (decompress == 1)
3822 {
3823 fprintf(stderr, " Ignore Trailing Garbage: %s\n",
3824 (IgnoreTrailingGarbageFlag == 1) ? "on" : "off" );
3825 }
3826 }
3827 fprintf(stderr, "-------------------------------------------\n");
3828 }
3829
3830 int mutexesInitRet = mutexesInit();
3831 if ( mutexesInitRet != 0 )
3832 {
3833 return mutexesInitRet;
3834 }
3835
3836 // create queue
3837 fifo = FifoQueue = queueInit(numCPU);
3838 if (fifo == NULL)
3839 {
3840 fprintf (stderr, "pbzip2: *ERROR: Queue Init failed. Aborting...\n");
3841 return 1;
3842 }
3843
3844 // process all files
3845 for (fileLoop=0; fileLoop < FileListCount; fileLoop++)
3846 {
3847 producerDone = 0;
3848 InFileSize = 0;
3849 NumBlocks = 0;
3850 switchedMtToSt = false;
3851 int errLevelCurrentFile = 0;
3852
3853 ErrorContext::getInstance()->reset();
3854
3855 // set input filename
3856 InFilename = FileList[fileLoop];
3857 hasInFile = (strcmp(InFilename, "-") != 0);
3858
3859 // test file for errors if requested
3860 if (testFile != 0)
3861 {
3862 if (QuietMode != 1)
3863 {
3864 fprintf(stderr, " File #: %d of %d\n", fileLoop+1, FileListCount);
3865 if (hasInFile)
3866 fprintf(stderr, " Testing: %s\n", InFilename);
3867 else
3868 fprintf(stderr, " Testing: <stdin>\n");
3869 }
3870 ret = testCompressedData(InFilename);
3871 if (ret > 0)
3872 return ret;
3873 else if (ret == 0)
3874 {
3875 if (QuietMode != 1)
3876 fprintf(stderr, " Test: OK\n");
3877 }
3878 else
3879 errLevel = 2;
3880
3881 if (QuietMode != 1)
3882 fprintf(stderr, "-------------------------------------------\n");
3883 continue;
3884 }
3885
3886 // set ouput filename
3887 outFilename = std::string(FileList[fileLoop]);
3888 if ((decompress == 1) && hasInFile)
3889 {
3890 // check if input file is a valid .bz2 compressed file
3891 hInfile = open(InFilename, O_RDONLY | O_BINARY);
3892 // check to see if file exists before processing
3893 if (hInfile == -1)
3894 {
3895 ErrorContext::printErrnoMsg(stderr, errno);
3896 fprintf(stderr, "pbzip2: *ERROR: File [%s] NOT found! Skipping...\n", InFilename);
3897 fprintf(stderr, "-------------------------------------------\n");
3898 errLevel = 1;
3899 continue;
3900 }
3901 memset(tmpBuff, 0, sizeof(tmpBuff));
3902 size_t size = do_read(hInfile, tmpBuff, strlen(bz2Header)+1);
3903 do_close(hInfile);
3904 if ((size == (size_t)(-1)) || (size < strlen(bz2Header)+1))
3905 {
3906 ErrorContext::getInstance()->printErrorMessages(stderr);
3907 fprintf(stderr, "pbzip2: *ERROR: File [%s] is NOT a valid bzip2! Skipping...\n", InFilename);
3908 fprintf(stderr, "-------------------------------------------\n");
3909 errLevel = 1;
3910 continue;
3911 }
3912 else
3913 {
3914 // make sure start of file has valid bzip2 header
3915 if (memstr(tmpBuff, 4, bz2Header, 3) == NULL)
3916 {
3917 fprintf(stderr, "pbzip2: *ERROR: File [%s] is NOT a valid bzip2! Skipping...\n", InFilename);
3918 fprintf(stderr, "-------------------------------------------\n");
3919 errLevel = 1;
3920 continue;
3921 }
3922 // skip 4th char which differs depending on BWT block size used
3923 if (memstr(tmpBuff+4, size-4, bz2Header+4, strlen(bz2Header)-4) == NULL)
3924 {
3925 // check to see if this is a special 0 byte file
3926 if (memstr(tmpBuff+4, size-4, Bz2HeaderZero+4, strlen(bz2Header)-4) == NULL)
3927 {
3928 fprintf(stderr, "pbzip2: *ERROR: File [%s] is NOT a valid bzip2! Skipping...\n", InFilename);
3929 fprintf(stderr, "-------------------------------------------\n");
3930 errLevel = 1;
3931 continue;
3932 }
3933 #ifdef PBZIP_DEBUG
3934 fprintf(stderr, "** ZERO byte compressed file detected\n");
3935 #endif
3936 }
3937 // set block size for decompression
3938 if ((tmpBuff[3] >= '1') && (tmpBuff[3] <= '9'))
3939 BWTblockSizeChar = tmpBuff[3];
3940 else
3941 {
3942 fprintf(stderr, "pbzip2: *ERROR: File [%s] is NOT a valid bzip2! Skipping...\n", InFilename);
3943 fprintf(stderr, "-------------------------------------------\n");
3944 errLevel = 1;
3945 continue;
3946 }
3947 }
3948
3949 // check if filename ends with .bz2
3950 std::string bz2Tail(".bz2");
3951 std::string tbz2Tail(".tbz2");
3952 if ( ends_with_icase(outFilename, bz2Tail) )
3953 {
3954 // remove .bz2 extension
3955 outFilename.resize( outFilename.size() - bz2Tail.size() );
3956 }
3957 else if ( ends_with_icase(outFilename, tbz2Tail) )
3958 {
3959 outFilename.resize( outFilename.size() - tbz2Tail.size() );
3960 outFilename += ".tar";
3961 }
3962 else
3963 {
3964 // add .out extension so we don't overwrite original file
3965 outFilename += ".out";
3966 }
3967 } // decompress == 1
3968 else
3969 {
3970 // check input file to make sure its not already a .bz2 file
3971 std::string bz2Tail(".bz2");
3972 if ( ends_with_icase(std::string(InFilename), bz2Tail) )
3973 {
3974 fprintf(stderr, "pbzip2: *ERROR: Input file [%s] already has a .bz2 extension! Skipping...\n", InFilename);
3975 fprintf(stderr, "-------------------------------------------\n");
3976 errLevel = 1;
3977 continue;
3978 }
3979 outFilename += bz2Tail;
3980 }
3981
3982 // setup signal handling filenames
3983 safe_mutex_lock(&ErrorHandlerMutex);
3984 sigInFilename = InFilename;
3985 sigOutFilename = outFilename.c_str();
3986 safe_mutex_unlock(&ErrorHandlerMutex);
3987
3988 if (hasInFile)
3989 {
3990 struct stat statbuf;
3991 // read file for compression
3992 hInfile = open(InFilename, O_RDONLY | O_BINARY);
3993 // check to see if file exists before processing
3994 if (hInfile == -1)
3995 {
3996 fprintf(stderr, "pbzip2: *ERROR: File [%s] NOT found! Skipping...\n", InFilename);
3997 fprintf(stderr, "-------------------------------------------\n");
3998 errLevel = 1;
3999 continue;
4000 }
4001
4002 // get some information about the file
4003 fstat(hInfile, &statbuf);
4004 // check to make input is not a directory
4005 if (S_ISDIR(statbuf.st_mode))
4006 {
4007 fprintf(stderr, "pbzip2: *ERROR: File [%s] is a directory! Skipping...\n", InFilename);
4008 fprintf(stderr, "-------------------------------------------\n");
4009 errLevel = 1;
4010 continue;
4011 }
4012 // check to make sure input is a regular file
4013 if (!S_ISREG(statbuf.st_mode))
4014 {
4015 fprintf(stderr, "pbzip2: *ERROR: File [%s] is not a regular file! Skipping...\n", InFilename);
4016 fprintf(stderr, "-------------------------------------------\n");
4017 errLevel = 1;
4018 continue;
4019 }
4020 // get size of file
4021 #ifndef WIN32
4022 InFileSize = statbuf.st_size;
4023 #else
4024 fileSize_temp.LowPart = GetFileSize((HANDLE)_get_osfhandle(hInfile), (unsigned long *)&fileSize_temp.HighPart);
4025 InFileSize = fileSize_temp.QuadPart;
4026 #endif
4027 // don't process a 0 byte file
4028 if (InFileSize == 0)
4029 {
4030 if (decompress == 1)
4031 {
4032 fprintf(stderr, "pbzip2: *ERROR: File is of size 0 [%s]! Skipping...\n", InFilename);
4033 fprintf(stderr, "-------------------------------------------\n");
4034 errLevel = 1;
4035 continue;
4036 }
4037
4038 // make sure we handle zero byte files specially
4039 zeroByteFile = 1;
4040 }
4041 else
4042 {
4043 zeroByteFile = 0;
4044 }
4045
4046 // get file meta data to write to output file
4047 if (getFileMetaData(InFilename) != 0)
4048 {
4049 fprintf(stderr, "pbzip2: *ERROR: Could not get file meta data from [%s]! Skipping...\n", InFilename);
4050 fprintf(stderr, "-------------------------------------------\n");
4051 errLevel = 1;
4052 continue;
4053 }
4054 }
4055 else
4056 {
4057 hInfile = STDIN_FILENO; // stdin
4058 InFileSize = -1; // fake it
4059 }
4060
4061 // check to see if output file exists
4062 if ((OutputStdOut == 0) && check_file_exists(outFilename.c_str()))
4063 {
4064 if (force != 1)
4065 {
4066 fprintf(stderr, "pbzip2: *ERROR: Output file [%s] already exists! Use -f to overwrite...\n", outFilename.c_str());
4067 fprintf(stderr, "-------------------------------------------\n");
4068 errLevel = 1;
4069 continue;
4070 }
4071 else
4072 {
4073 remove(outFilename.c_str());
4074 }
4075 }
4076
4077 if (readEntireFile == 1)
4078 {
4079 if (hInfile == STDIN_FILENO)
4080 {
4081 if (QuietMode != 1)
4082 fprintf(stderr, " *Warning: Ignoring -r switch since input is stdin.\n");
4083 }
4084 else
4085 {
4086 // determine block size to try and spread data equally over # CPUs
4087 blockSize = InFileSize / numCPU;
4088 }
4089 }
4090
4091 // display per file settings
4092 if (QuietMode != 1)
4093 {
4094 fprintf(stderr, " File #: %d of %d\n", fileLoop+1, FileListCount);
4095 fprintf(stderr, " Input Name: %s\n", hInfile != STDIN_FILENO ? InFilename : "<stdin>");
4096
4097 if (OutputStdOut == 0)
4098 fprintf(stderr, " Output Name: %s\n\n", outFilename.c_str());
4099 else
4100 fprintf(stderr, " Output Name: <stdout>\n\n");
4101
4102 if (decompress == 1)
4103 fprintf(stderr, " BWT Block Size: %c00k\n", BWTblockSizeChar);
4104 if (hasInFile)
4105 fprintf(stderr, " Input Size: %" PRIuMAX " bytes\n", (uintmax_t)InFileSize);
4106 }
4107
4108 if (decompress == 1)
4109 {
4110 numBlocks = 0;
4111 // Do not use threads if we only have 1 CPU or small files
4112 if ((numCPU == 1) || (InFileSize < 1000000))
4113 noThreads = 1;
4114 else
4115 noThreads = 0;
4116
4117 // Enable threads method for uncompressing from stdin
4118 if ((numCPU > 1) && !hasInFile)
4119 noThreads = 0;
4120 } // if (decompress == 1)
4121 else
4122 {
4123 if (InFileSize > 0)
4124 {
4125 // calculate the # of blocks of data
4126 numBlocks = (InFileSize + blockSize - 1) / blockSize;
4127 // Do not use threads for small files where we only have 1 block to process
4128 // or if we only have 1 CPU
4129 if ((numBlocks == 1) || (numCPU == 1))
4130 noThreads = 1;
4131 else
4132 noThreads = 0;
4133 }
4134 else
4135 {
4136 // Simulate a "big" number of buffers. Will need to resize it later
4137 numBlocks = 10000;
4138 }
4139
4140 // write special compressed data for special 0 byte input file case
4141 if (zeroByteFile == 1)
4142 {
4143 hOutfile = STDOUT_FILENO;
4144 // write to file instead of stdout
4145 if (OutputStdOut == 0)
4146 {
4147 hOutfile = safe_open_output(outFilename.c_str());
4148 // check to see if file creation was successful
4149 if (hOutfile == -1)
4150 {
4151 handle_error(EF_EXIT, 1,
4152 "pbzip2: *ERROR: Could not create output file [%s]!\n", outFilename.c_str());
4153 errLevelCurrentFile = errLevel = 1;
4154 break;
4155 }
4156 }
4157 // write data to the output file
4158 ret = do_write(hOutfile, Bz2HeaderZero, sizeof(Bz2HeaderZero));
4159 int close_ret = 0;
4160 if (OutputStdOut == 0)
4161 {
4162 close_ret = do_close(hOutfile);
4163 // write store file meta data to output file
4164 if (writeFileMetaData(outFilename.c_str()) != 0)
4165 {
4166 handle_error(EF_NOQUIT, -1,
4167 "pbzip2: *ERROR: Could not write file meta data to [%s]!\n", outFilename.c_str());
4168 }
4169 }
4170 if ( (ret != sizeof(Bz2HeaderZero)) || (close_ret == -1) )
4171 {
4172 handle_error(EF_EXIT, 1,
4173 "pbzip2: *ERROR: Could not write to file [%s]! Aborting...\n", outFilename.c_str());
4174 fprintf(stderr, "-------------------------------------------\n");
4175 errLevelCurrentFile = errLevel = 1;
4176 break;
4177 }
4178 if (QuietMode != 1)
4179 {
4180 fprintf(stderr, " Output Size: %u bytes\n", (unsigned)sizeof(Bz2HeaderZero));
4181 fprintf(stderr, "-------------------------------------------\n");
4182 }
4183 // remove input file unless requested not to by user or error occurred
4184 if ( (keep != 1) && (errLevelCurrentFile == 0) )
4185 {
4186 struct stat statbuf;
4187 // only remove input file if output file exists
4188 bool removeFlag =
4189 (OutputStdOut != 0) ||
4190 (stat(outFilename.c_str(), &statbuf) == 0);
4191
4192 if (removeFlag)
4193 {
4194 if (do_remove(InFilename) == -1)
4195 {
4196 handle_error(EF_NOQUIT, 1, "Can't remove input file [%s]!", InFilename);
4197 }
4198 }
4199 }
4200 continue;
4201 } // if (zeroByteFile == 1)
4202 } // else (decompress == 1)
4203 #ifdef PBZIP_DEBUG
4204 fprintf(stderr, "# Blocks: %d\n", numBlocks);
4205 #endif
4206 // set global variable
4207 NumBlocksEstimated = numBlocks;
4208 // Calculate maximum number of buffered blocks to use
4209 NumBufferedBlocksMax = maxMemory / blockSize;
4210 // Subtract blocks for number of extra buffers in producer and fileWriter (~ numCPU for each)
4211 if ((int)NumBufferedBlocksMax - (numCPU * 2) < 1)
4212 NumBufferedBlocksMax = 1;
4213 else
4214 NumBufferedBlocksMax = NumBufferedBlocksMax - (numCPU * 2);
4215 #ifdef PBZIP_DEBUG
4216 fprintf(stderr, "pbzip2: maxMemory: %d blockSize: %d\n", maxMemory, blockSize);
4217 fprintf(stderr, "pbzip2: NumBufferedBlocksMax: %" PRIuMAX "\n", (uintmax_t)NumBufferedBlocksMax);
4218 #endif
4219 // check to see if our max buffered blocks is less than numCPU, if yes increase maxMemory
4220 // to support numCPU requested unless -m switch given by user
4221 if (NumBufferedBlocksMax < (size_t)numCPU)
4222 {
4223 if (maxMemorySwitch == 0)
4224 {
4225 NumBufferedBlocksMax = numCPU;
4226 if (QuietMode != 1)
4227 fprintf(stderr, "*Warning* Max memory limit increased to %" PRIuMAX " MB to support %d CPUs\n", (uintmax_t)((NumBufferedBlocksMax + (numCPU * 2)) * blockSize)/1000000, numCPU);
4228 }
4229 else
4230 {
4231 if (QuietMode != 1)
4232 fprintf(stderr, "*Warning* CPU usage and performance may be suboptimal due to max memory limit.\n");
4233 }
4234 }
4235
4236 LastGoodBlock = -1;
4237 MinErrorBlock = -1;
4238
4239 // create output buffer
4240 outputBufferInit(NumBufferedBlocksMax);
4241
4242 if (decompress == 1)
4243 {
4244 // use multi-threaded code
4245 if (noThreads == 0)
4246 {
4247 // do decompression
4248 if (QuietMode != 1)
4249 fprintf(stderr, "Decompressing data...\n");
4250 for (i=0; (int)i < numCPU; i++)
4251 {
4252 ret = pthread_create(&fifo->consumers[i], &ChildThreadAttributes, consumer_decompress, fifo);
4253 if (ret != 0)
4254 {
4255 ErrorContext::getInstance()->saveError();
4256 handle_error(EF_EXIT, 1, "pbzip2: *ERROR: Not enough resources to create consumer thread #%u (code = %d) Aborting...\n", i, ret);
4257 ret = pthread_join(TerminatorThread, NULL);
4258 return 1;
4259 }
4260 }
4261
4262 ret = pthread_create(&output, &ChildThreadAttributes, fileWriter, (void*)outFilename.c_str());
4263 if (ret != 0)
4264 {
4265 handle_error(EF_EXIT, 1,
4266 "pbzip2: *ERROR: Not enough resources to create fileWriter thread (code = %d) Aborting...\n", ret);
4267 ret = pthread_join(TerminatorThread, NULL);
4268 return 1;
4269 }
4270
4271 // start reading in data for decompression
4272 ret = producer_decompress(hInfile, InFileSize, fifo);
4273 if (ret == -99)
4274 {
4275 // only 1 block detected, use single threaded code to decompress
4276 noThreads = 1;
4277
4278 switchedMtToSt = true;
4279
4280 // wait for fileWriter thread to exit
4281 if (pthread_join(output, NULL) != 0)
4282 {
4283 ErrorContext::getInstance()->saveError();
4284 handle_error(EF_EXIT, 1,
4285 "pbzip2: *ERROR: Error joining fileWriter thread (code = %d) Aborting...\n", ret);
4286 errLevelCurrentFile = errLevel = 1;
4287 ret = pthread_join(TerminatorThread, NULL);
4288 return 1;
4289 }
4290 }
4291 else if (ret != 0)
4292 {
4293 errLevelCurrentFile = errLevel = 1;
4294 }
4295 }
4296
4297 // use single threaded code
4298 if ((noThreads == 1) && (errLevelCurrentFile == 0))
4299 {
4300 if (QuietMode != 1)
4301 fprintf(stderr, "Decompressing data (no threads)...\n");
4302
4303 if (hInfile > 0)
4304 close(hInfile);
4305 ret = directdecompress(InFilename, outFilename.c_str());
4306 if (ret != 0)
4307 {
4308 errLevelCurrentFile = errLevel = 1;
4309 }
4310 }
4311 } // if (decompress == 1)
4312 else
4313 {
4314 // do compression code
4315
4316 // use multi-threaded code
4317 if (noThreads == 0)
4318 {
4319 if (QuietMode != 1)
4320 fprintf(stderr, "Compressing data...\n");
4321
4322 for (i=0; (int)i < numCPU; i++)
4323 {
4324 ret = pthread_create(&fifo->consumers[i], &ChildThreadAttributes, consumer, fifo);
4325 if (ret != 0)
4326 {
4327 ErrorContext::getInstance()->saveError();
4328 handle_error(EF_EXIT, 1,
4329 "pbzip2: *ERROR: Not enough resources to create consumer thread #%u (code = %d) Aborting...\n", i, ret);
4330 pthread_join(TerminatorThread, NULL);
4331 return 1;
4332 }
4333 }
4334
4335 ret = pthread_create(&output, &ChildThreadAttributes, fileWriter, (void*)outFilename.c_str());
4336 if (ret != 0)
4337 {
4338 handle_error(EF_EXIT, 1,
4339 "pbzip2: *ERROR: Not enough resources to create fileWriter thread (code = %d) Aborting...\n", ret);
4340 pthread_join(TerminatorThread, NULL);
4341 return 1;
4342 }
4343
4344 // start reading in data for compression
4345 ret = producer(hInfile, blockSize, fifo);
4346 if (ret != 0)
4347 errLevelCurrentFile = errLevel = 1;
4348 }
4349 else
4350 {
4351 // do not use threads for compression
4352 if (QuietMode != 1)
4353 fprintf(stderr, "Compressing data (no threads)...\n");
4354
4355 ret = directcompress(hInfile, InFileSize, blockSize, outFilename.c_str());
4356 if (ret != 0)
4357 errLevelCurrentFile = errLevel = 1;
4358 }
4359 } // else
4360
4361 if (noThreads == 0)
4362 {
4363 // wait for fileWriter thread to exit
4364 ret = pthread_join(output, NULL);
4365 if (ret != 0)
4366 {
4367 ErrorContext::printErrnoMsg(stderr, errno);
4368 errLevelCurrentFile = errLevel = 1;
4369 }
4370 }
4371
4372 if ((noThreads == 0) || switchedMtToSt )
4373 {
4374 // wait for consumer threads to exit
4375 for (i = 0; (int)i < numCPU; i++)
4376 {
4377 ret = pthread_join(fifo->consumers[i], NULL);
4378 if (ret != 0)
4379 {
4380 ErrorContext::printErrnoMsg(stderr, errno);
4381 errLevelCurrentFile = errLevel = 1;
4382 }
4383 }
4384 }
4385
4386 if (syncGetTerminateFlag() != 0)
4387 {
4388 errLevelCurrentFile = errLevel = 1;
4389 }
4390
4391 if (OutputStdOut == 0)
4392 {
4393 // write store file meta data to output file
4394 if (writeFileMetaData(outFilename.c_str()) != 0)
4395 {
4396 handle_error(EF_NOQUIT, -1,
4397 "pbzip2: *ERROR: Could not write file meta data to [%s]!\n", outFilename.c_str());
4398 }
4399 }
4400
4401 // remove input file unless requested not to by user or error occurred
4402 if ( (keep != 1) && (errLevelCurrentFile == 0) )
4403 {
4404 struct stat statbuf;
4405 // only remove input file if output file exists
4406 bool removeFlag =
4407 (OutputStdOut != 0) ||
4408 (stat(outFilename.c_str(), &statbuf) == 0);
4409
4410 if (removeFlag)
4411 {
4412 if (do_remove(InFilename) == -1)
4413 {
4414 handle_error(EF_NOQUIT, 1, "Can't remove input file [%s]!", InFilename);
4415 }
4416 }
4417 }
4418
4419 // reclaim memory
4420 OutputBuffer.clear();
4421 fifo->clear();
4422
4423 if ( (errLevelCurrentFile == 0) && (syncGetTerminateFlag() == 0) )
4424 {
4425 // finished processing file (mutex since accessed by cleanup procedure)
4426 safe_mutex_lock(&ErrorHandlerMutex);
4427 sigInFilename = NULL;
4428 sigOutFilename = NULL;
4429 safe_mutex_unlock(&ErrorHandlerMutex);
4430 }
4431
4432 if (errLevelCurrentFile == 1)
4433 {
4434 syncSetTerminateFlag(1);
4435 break;
4436 }
4437
4438 if (QuietMode != 1)
4439 fprintf(stderr, "-------------------------------------------\n");
4440 } /* for */
4441
4442 // Explicit close on stdout if we've been writing there, after all input has been processed
4443 if (OutputStdOut == 1)
4444 {
4445 ret = close(STDOUT_FILENO);
4446 if (ret == -1)
4447 {
4448 ErrorContext::getInstance()->saveError();
4449 handle_error(EF_EXIT, 1, "pbzip2: *ERROR: Failed to close STDOUT! Aborting...\n");
4450 exit(1);
4451 }
4452 }
4453
4454 // Terminate signal handler thread sending SIGQUIT signal
4455 ret = pthread_kill(SignalHandlerThread, SIG_HANDLER_QUIT_SIGNAL);
4456 if (ret != 0)
4457 {
4458 fprintf(stderr, "Couldn't signal signal QUIT to SignalHandlerThread [%d]. Quitting prematurely!\n", ret);
4459 exit(errLevel);
4460 }
4461 else
4462 {
4463 ret = pthread_join(SignalHandlerThread, NULL);
4464 if (ret != 0)
4465 {
4466 fprintf(stderr, "Error on join of SignalHandlerThread [%d]\n", ret);
4467 }
4468 }
4469
4470 if (syncGetTerminateFlag() == 0)
4471 {
4472 syncSetFinishedFlag(1);
4473 }
4474
4475 ret = pthread_join(TerminatorThread, NULL);
4476 if (ret != 0)
4477 {
4478 fprintf(stderr, "Error on join of TerminatorThread [%d]\n", ret);
4479 }
4480
4481 // reclaim memory
4482 queueDelete(fifo);
4483 mutexesDelete();
4484 disposeMemory(FileList);
4485
4486 // get current time for end of benchmark
4487 #ifndef WIN32
4488 gettimeofday(&tvStopTime, &tz);
4489 #else
4490 GetSystemTime(&systemtime);
4491 SystemTimeToFileTime(&systemtime, (FILETIME *)&filetime);
4492 tvStopTime.tv_sec = filetime.QuadPart / 10000000;
4493 tvStopTime.tv_usec = (filetime.QuadPart - (LONGLONG)tvStopTime.tv_sec * 10000000) / 10;
4494 #endif
4495
4496 #ifdef PBZIP_DEBUG
4497 fprintf(stderr, "\n Start Time: %ld + %ld\n", tvStartTime.tv_sec, tvStartTime.tv_usec);
4498 fprintf(stderr, " Stop Time : %ld + %ld\n", tvStopTime.tv_sec, tvStopTime.tv_usec);
4499 #endif
4500
4501 // convert time structure to real numbers
4502 timeStart = (double)tvStartTime.tv_sec + ((double)tvStartTime.tv_usec / 1000000);
4503 timeStop = (double)tvStopTime.tv_sec + ((double)tvStopTime.tv_usec / 1000000);
4504 timeCalc = timeStop - timeStart;
4505 if (QuietMode != 1)
4506 fprintf(stderr, "\n Wall Clock: %f seconds\n", timeCalc);
4507
4508 exit(errLevel);
4509 }
4510