1 /*
2  * Copyright (c) 2017-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  */
9 
10 #include <stdio.h>      /* fprintf */
11 #include <stdlib.h>     /* malloc, free */
12 #include <pthread.h>    /* pthread functions */
13 #include <string.h>     /* memset */
14 #include "zstd_internal.h"
15 #include "util.h"
16 #include "timefn.h"     /* UTIL_time_t, UTIL_getTime, UTIL_getSpanTimeMicro */
17 
18 #define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
19 #define PRINT(...) fprintf(stdout, __VA_ARGS__)
20 #define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
21 #define FILE_CHUNK_SIZE 4 << 20
22 #define MAX_NUM_JOBS 2
23 #define stdinmark  "/*stdin*\\"
24 #define stdoutmark "/*stdout*\\"
25 #define MAX_PATH 256
26 #define DEFAULT_DISPLAY_LEVEL 1
27 #define DEFAULT_COMPRESSION_LEVEL 6
28 #define MAX_COMPRESSION_LEVEL_CHANGE 2
29 #define CONVERGENCE_LOWER_BOUND 5
30 #define CLEVEL_DECREASE_COOLDOWN 5
31 #define CHANGE_BY_TWO_THRESHOLD 0.1
32 #define CHANGE_BY_ONE_THRESHOLD 0.65
33 
34 #ifndef DEBUG_MODE
35 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
36 #else
37 static int g_displayLevel = DEBUG_MODE;
38 #endif
39 
40 static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
41 static UTIL_time_t g_startTime;
42 static size_t g_streamedSize = 0;
43 static unsigned g_useProgressBar = 1;
44 static unsigned g_forceCompressionLevel = 0;
45 static unsigned g_minCLevel = 1;
46 static unsigned g_maxCLevel;
47 
48 typedef struct {
49     void* start;
50     size_t size;
51     size_t capacity;
52 } buffer_t;
53 
54 typedef struct {
55     size_t filled;
56     buffer_t buffer;
57 } inBuff_t;
58 
59 typedef struct {
60     buffer_t src;
61     buffer_t dst;
62     unsigned jobID;
63     unsigned lastJobPlusOne;
64     size_t compressedSize;
65     size_t dictSize;
66 } jobDescription;
67 
68 typedef struct {
69     pthread_mutex_t pMutex;
70     int noError;
71 } mutex_t;
72 
73 typedef struct {
74     pthread_cond_t pCond;
75     int noError;
76 } cond_t;
77 
78 typedef struct {
79     unsigned compressionLevel;
80     unsigned numJobs;
81     unsigned nextJobID;
82     unsigned threadError;
83 
84     /*
85      * JobIDs for the next jobs to be created, compressed, and written
86      */
87     unsigned jobReadyID;
88     unsigned jobCompressedID;
89     unsigned jobWriteID;
90     unsigned allJobsCompleted;
91 
92     /*
93      * counter for how many jobs in a row the compression level has not changed
94      * if the counter becomes >= CONVERGENCE_LOWER_BOUND, the next time the
95      * compression level tries to change (by non-zero amount) resets the counter
96      * to 1 and does not apply the change
97      */
98     unsigned convergenceCounter;
99 
100     /*
101      * cooldown counter in order to prevent rapid successive decreases in compression level
102      * whenever compression level is decreased, cooldown is set to CLEVEL_DECREASE_COOLDOWN
103      * whenever adaptCompressionLevel() is called and cooldown != 0, it is decremented
104      * as long as cooldown != 0, the compression level cannot be decreased
105      */
106     unsigned cooldown;
107 
108     /*
109      * XWaitYCompletion
110      * Range from 0.0 to 1.0
111      * if the value is not 1.0, then this implies that thread X waited on thread Y to finish
112      * and thread Y was XWaitYCompletion finished at the time of the wait (i.e. compressWaitWriteCompletion=0.5
113      * implies that the compression thread waited on the write thread and it was only 50% finished writing a job)
114      */
115     double createWaitCompressionCompletion;
116     double compressWaitCreateCompletion;
117     double compressWaitWriteCompletion;
118     double writeWaitCompressionCompletion;
119 
120     /*
121      * Completion values
122      * Range from 0.0 to 1.0
123      * Jobs are divided into mini-chunks in order to measure completion
124      * these values are updated each time a thread finishes its operation on the
125      * mini-chunk (i.e. finishes writing out, compressing, etc. this mini-chunk).
126      */
127     double compressionCompletion;
128     double writeCompletion;
129     double createCompletion;
130 
131     mutex_t jobCompressed_mutex;
132     cond_t jobCompressed_cond;
133     mutex_t jobReady_mutex;
134     cond_t jobReady_cond;
135     mutex_t allJobsCompleted_mutex;
136     cond_t allJobsCompleted_cond;
137     mutex_t jobWrite_mutex;
138     cond_t jobWrite_cond;
139     mutex_t compressionCompletion_mutex;
140     mutex_t createCompletion_mutex;
141     mutex_t writeCompletion_mutex;
142     mutex_t compressionLevel_mutex;
143     size_t lastDictSize;
144     inBuff_t input;
145     jobDescription* jobs;
146     ZSTD_CCtx* cctx;
147 } adaptCCtx;
148 
149 typedef struct {
150     adaptCCtx* ctx;
151     FILE* dstFile;
152 } outputThreadArg;
153 
154 typedef struct {
155     FILE* srcFile;
156     adaptCCtx* ctx;
157     outputThreadArg* otArg;
158 } fcResources;
159 
freeCompressionJobs(adaptCCtx * ctx)160 static void freeCompressionJobs(adaptCCtx* ctx)
161 {
162     unsigned u;
163     for (u=0; u<ctx->numJobs; u++) {
164         jobDescription job = ctx->jobs[u];
165         free(job.dst.start);
166         free(job.src.start);
167     }
168 }
169 
destroyMutex(mutex_t * mutex)170 static int destroyMutex(mutex_t* mutex)
171 {
172     if (mutex->noError) {
173         int const ret = pthread_mutex_destroy(&mutex->pMutex);
174         return ret;
175     }
176     return 0;
177 }
178 
destroyCond(cond_t * cond)179 static int destroyCond(cond_t* cond)
180 {
181     if (cond->noError) {
182         int const ret = pthread_cond_destroy(&cond->pCond);
183         return ret;
184     }
185     return 0;
186 }
187 
freeCCtx(adaptCCtx * ctx)188 static int freeCCtx(adaptCCtx* ctx)
189 {
190     if (!ctx) return 0;
191     {
192         int error = 0;
193         error |= destroyMutex(&ctx->jobCompressed_mutex);
194         error |= destroyCond(&ctx->jobCompressed_cond);
195         error |= destroyMutex(&ctx->jobReady_mutex);
196         error |= destroyCond(&ctx->jobReady_cond);
197         error |= destroyMutex(&ctx->allJobsCompleted_mutex);
198         error |= destroyCond(&ctx->allJobsCompleted_cond);
199         error |= destroyMutex(&ctx->jobWrite_mutex);
200         error |= destroyCond(&ctx->jobWrite_cond);
201         error |= destroyMutex(&ctx->compressionCompletion_mutex);
202         error |= destroyMutex(&ctx->createCompletion_mutex);
203         error |= destroyMutex(&ctx->writeCompletion_mutex);
204         error |= destroyMutex(&ctx->compressionLevel_mutex);
205         error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
206         free(ctx->input.buffer.start);
207         if (ctx->jobs){
208             freeCompressionJobs(ctx);
209             free(ctx->jobs);
210         }
211         free(ctx);
212         return error;
213     }
214 }
215 
initMutex(mutex_t * mutex)216 static int initMutex(mutex_t* mutex)
217 {
218     int const ret = pthread_mutex_init(&mutex->pMutex, NULL);
219     mutex->noError = !ret;
220     return ret;
221 }
222 
initCond(cond_t * cond)223 static int initCond(cond_t* cond)
224 {
225     int const ret = pthread_cond_init(&cond->pCond, NULL);
226     cond->noError = !ret;
227     return ret;
228 }
229 
initCCtx(adaptCCtx * ctx,unsigned numJobs)230 static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
231 {
232     ctx->compressionLevel = g_compressionLevel;
233     {
234         int pthreadError = 0;
235         pthreadError |= initMutex(&ctx->jobCompressed_mutex);
236         pthreadError |= initCond(&ctx->jobCompressed_cond);
237         pthreadError |= initMutex(&ctx->jobReady_mutex);
238         pthreadError |= initCond(&ctx->jobReady_cond);
239         pthreadError |= initMutex(&ctx->allJobsCompleted_mutex);
240         pthreadError |= initCond(&ctx->allJobsCompleted_cond);
241         pthreadError |= initMutex(&ctx->jobWrite_mutex);
242         pthreadError |= initCond(&ctx->jobWrite_cond);
243         pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
244         pthreadError |= initMutex(&ctx->createCompletion_mutex);
245         pthreadError |= initMutex(&ctx->writeCompletion_mutex);
246         pthreadError |= initMutex(&ctx->compressionLevel_mutex);
247         if (pthreadError) return pthreadError;
248     }
249     ctx->numJobs = numJobs;
250     ctx->jobReadyID = 0;
251     ctx->jobCompressedID = 0;
252     ctx->jobWriteID = 0;
253     ctx->lastDictSize = 0;
254 
255 
256     ctx->createWaitCompressionCompletion = 1;
257     ctx->compressWaitCreateCompletion = 1;
258     ctx->compressWaitWriteCompletion = 1;
259     ctx->writeWaitCompressionCompletion = 1;
260     ctx->createCompletion = 1;
261     ctx->writeCompletion = 1;
262     ctx->compressionCompletion = 1;
263     ctx->convergenceCounter = 0;
264     ctx->cooldown = 0;
265 
266     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
267 
268     if (!ctx->jobs) {
269         DISPLAY("Error: could not allocate space for jobs during context creation\n");
270         return 1;
271     }
272 
273     /* initializing jobs */
274     {
275         unsigned jobNum;
276         for (jobNum=0; jobNum<numJobs; jobNum++) {
277             jobDescription* job = &ctx->jobs[jobNum];
278             job->src.start = malloc(2 * FILE_CHUNK_SIZE);
279             job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
280             job->lastJobPlusOne = 0;
281             if (!job->src.start || !job->dst.start) {
282                 DISPLAY("Could not allocate buffers for jobs\n");
283                 return 1;
284             }
285             job->src.capacity = FILE_CHUNK_SIZE;
286             job->dst.capacity = ZSTD_compressBound(FILE_CHUNK_SIZE);
287         }
288     }
289 
290     ctx->nextJobID = 0;
291     ctx->threadError = 0;
292     ctx->allJobsCompleted = 0;
293 
294     ctx->cctx = ZSTD_createCCtx();
295     if (!ctx->cctx) {
296         DISPLAY("Error: could not allocate ZSTD_CCtx\n");
297         return 1;
298     }
299 
300     ctx->input.filled = 0;
301     ctx->input.buffer.capacity = 2 * FILE_CHUNK_SIZE;
302 
303     ctx->input.buffer.start = malloc(ctx->input.buffer.capacity);
304     if (!ctx->input.buffer.start) {
305         DISPLAY("Error: could not allocate input buffer\n");
306         return 1;
307     }
308     return 0;
309 }
310 
createCCtx(unsigned numJobs)311 static adaptCCtx* createCCtx(unsigned numJobs)
312 {
313 
314     adaptCCtx* const ctx = calloc(1, sizeof(adaptCCtx));
315     if (ctx == NULL) {
316         DISPLAY("Error: could not allocate space for context\n");
317         return NULL;
318     }
319     {
320         int const error = initCCtx(ctx, numJobs);
321         if (error) {
322             freeCCtx(ctx);
323             return NULL;
324         }
325         return ctx;
326     }
327 }
328 
signalErrorToThreads(adaptCCtx * ctx)329 static void signalErrorToThreads(adaptCCtx* ctx)
330 {
331     ctx->threadError = 1;
332     pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
333     pthread_cond_signal(&ctx->jobReady_cond.pCond);
334     pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
335 
336     pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
337     pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
338     pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
339 
340     pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
341     pthread_cond_signal(&ctx->jobWrite_cond.pCond);
342     pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
343 
344     pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
345     pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
346     pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
347 }
348 
waitUntilAllJobsCompleted(adaptCCtx * ctx)349 static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
350 {
351     if (!ctx) return;
352     pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
353     while (ctx->allJobsCompleted == 0 && !ctx->threadError) {
354         pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex);
355     }
356     pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
357 }
358 
359 /* map completion percentages to values for changing compression level */
convertCompletionToChange(double completion)360 static unsigned convertCompletionToChange(double completion)
361 {
362     if (completion < CHANGE_BY_TWO_THRESHOLD) {
363         return 2;
364     }
365     else if (completion < CHANGE_BY_ONE_THRESHOLD) {
366         return 1;
367     }
368     else {
369         return 0;
370     }
371 }
372 
373 /*
374  * Compression level is changed depending on which part of the compression process is lagging
375  * Currently, three theads exist for job creation, compression, and file writing respectively.
376  * adaptCompressionLevel() increments or decrements compression level based on which of the threads is lagging
377  * job creation or file writing lag => increased compression level
378  * compression thread lag           => decreased compression level
379  * detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait
380  */
adaptCompressionLevel(adaptCCtx * ctx)381 static void adaptCompressionLevel(adaptCCtx* ctx)
382 {
383     double createWaitCompressionCompletion;
384     double compressWaitCreateCompletion;
385     double compressWaitWriteCompletion;
386     double writeWaitCompressionCompletion;
387     double const threshold = 0.00001;
388     unsigned prevCompressionLevel;
389 
390     pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
391     prevCompressionLevel = ctx->compressionLevel;
392     pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
393 
394 
395     if (g_forceCompressionLevel) {
396         pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
397         ctx->compressionLevel = g_compressionLevel;
398         pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
399         return;
400     }
401 
402 
403     DEBUG(2, "adapting compression level %u\n", prevCompressionLevel);
404 
405     /* read and reset completion measurements */
406     pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
407     DEBUG(2, "createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion);
408     DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
409     createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
410     writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
411     pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
412 
413     pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
414     DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion);
415     compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
416     pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
417 
418     pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
419     DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
420     compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
421     pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
422     DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter);
423 
424     assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel);
425 
426     /* adaptation logic */
427     if (ctx->cooldown) ctx->cooldown--;
428 
429     if ((1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) && ctx->cooldown == 0) {
430         /* create or write waiting on compression */
431         /* use whichever one waited less because it was slower */
432         double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
433         unsigned const change = convertCompletionToChange(completion);
434         unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel);
435         if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
436             /* reset convergence counter, might have been a spike */
437             ctx->convergenceCounter = 0;
438             DEBUG(2, "convergence counter reset, no change applied\n");
439         }
440         else if (boundChange != 0) {
441             pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
442             ctx->compressionLevel -= boundChange;
443             pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
444             ctx->cooldown = CLEVEL_DECREASE_COOLDOWN;
445             ctx->convergenceCounter = 1;
446 
447             DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange);
448         }
449     }
450     else if (1-compressWaitWriteCompletion > threshold || 1-compressWaitCreateCompletion > threshold) {
451         /* compress waiting on write */
452         double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion);
453         unsigned const change = convertCompletionToChange(completion);
454         unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel);
455         if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
456             /* reset convergence counter, might have been a spike */
457             ctx->convergenceCounter = 0;
458             DEBUG(2, "convergence counter reset, no change applied\n");
459         }
460         else if (boundChange != 0) {
461             pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
462             ctx->compressionLevel += boundChange;
463             pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
464             ctx->cooldown = 0;
465             ctx->convergenceCounter = 1;
466 
467             DEBUG(2, "compress waiting on write or create, tried to increase compression level by %u\n\n", boundChange);
468         }
469 
470     }
471 
472     pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
473     if (ctx->compressionLevel == prevCompressionLevel) {
474         ctx->convergenceCounter++;
475     }
476     pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
477 }
478 
getUseableDictSize(unsigned compressionLevel)479 static size_t getUseableDictSize(unsigned compressionLevel)
480 {
481     ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
482     unsigned const overlapLog = compressionLevel >= (unsigned)ZSTD_maxCLevel() ? 0 : 3;
483     size_t const overlapSize = 1 << (params.cParams.windowLog - overlapLog);
484     return overlapSize;
485 }
486 
compressionThread(void * arg)487 static void* compressionThread(void* arg)
488 {
489     adaptCCtx* const ctx = (adaptCCtx*)arg;
490     unsigned currJob = 0;
491     for ( ; ; ) {
492         unsigned const currJobIndex = currJob % ctx->numJobs;
493         jobDescription* const job = &ctx->jobs[currJobIndex];
494         DEBUG(2, "starting compression for job %u\n", currJob);
495 
496         {
497             /* check if compression thread will have to wait */
498             unsigned willWaitForCreate = 0;
499             unsigned willWaitForWrite = 0;
500 
501             pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
502             if (currJob + 1 > ctx->jobReadyID) willWaitForCreate = 1;
503             pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
504 
505             pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
506             if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1;
507             pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
508 
509 
510             pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
511             if (willWaitForCreate) {
512                 DEBUG(2, "compression will wait for create on job %u\n", currJob);
513                 ctx->compressWaitCreateCompletion = ctx->createCompletion;
514                 DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
515 
516             }
517             else {
518                 ctx->compressWaitCreateCompletion = 1;
519             }
520             pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
521 
522             pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
523             if (willWaitForWrite) {
524                 DEBUG(2, "compression will wait for write on job %u\n", currJob);
525                 ctx->compressWaitWriteCompletion = ctx->writeCompletion;
526                 DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
527             }
528             else {
529                 ctx->compressWaitWriteCompletion = 1;
530             }
531             pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
532 
533         }
534 
535         /* wait until job is ready */
536         pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
537         while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
538             pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
539         }
540         pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
541 
542         /* wait until job previously in this space is written */
543         pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
544         while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
545             pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
546         }
547         pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
548         /* reset compression completion */
549         pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
550         ctx->compressionCompletion = 0;
551         pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
552 
553         /* adapt compression level */
554         if (currJob) adaptCompressionLevel(ctx);
555 
556         pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
557         DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel);
558         pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
559 
560         /* compress the data */
561         {
562             size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */
563             unsigned cLevel;
564             unsigned blockNum = 0;
565             size_t remaining = job->src.size;
566             size_t srcPos = 0;
567             size_t dstPos = 0;
568 
569             pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
570             cLevel = ctx->compressionLevel;
571             pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
572 
573             /* reset compressed size */
574             job->compressedSize = 0;
575             DEBUG(2, "calling ZSTD_compressBegin()\n");
576             /* begin compression */
577             {
578                 size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
579                 ZSTD_parameters params = ZSTD_getParams(cLevel, 0, useDictSize);
580                 params.cParams.windowLog = 23;
581                 {
582                     size_t const initError = ZSTD_compressBegin_advanced(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, params, 0);
583                     size_t const windowSizeError = ZSTD_CCtx_setParameter(ctx->cctx, ZSTD_c_forceMaxWindow, 1);
584                     if (ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
585                         DISPLAY("Error: something went wrong while starting compression\n");
586                         signalErrorToThreads(ctx);
587                         return arg;
588                     }
589                 }
590             }
591             DEBUG(2, "finished with ZSTD_compressBegin()\n");
592 
593             do {
594                 size_t const actualBlockSize = MIN(remaining, compressionBlockSize);
595 
596                 /* continue compression */
597                 if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */
598                     size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, 0);
599                     if (ZSTD_isError(hSize)) {
600                         DISPLAY("Error: something went wrong while continuing compression\n");
601                         job->compressedSize = hSize;
602                         signalErrorToThreads(ctx);
603                         return arg;
604                     }
605                     ZSTD_invalidateRepCodes(ctx->cctx);
606                 }
607                 {
608                     size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ?
609                                             ZSTD_compressEnd     (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
610                                             ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
611                     if (ZSTD_isError(ret)) {
612                         DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(ret));
613                         signalErrorToThreads(ctx);
614                         return arg;
615                     }
616                     job->compressedSize += ret;
617                     remaining -= actualBlockSize;
618                     srcPos += actualBlockSize;
619                     dstPos += ret;
620                     blockNum++;
621 
622                     /* update completion */
623                     pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
624                     ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
625                     pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
626                 }
627             } while (remaining != 0);
628             job->dst.size = job->compressedSize;
629         }
630         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
631         ctx->jobCompressedID++;
632         pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
633         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
634         if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
635             /* finished compressing all jobs */
636             break;
637         }
638         DEBUG(2, "finished compressing job %u\n", currJob);
639         currJob++;
640     }
641     return arg;
642 }
643 
displayProgress(unsigned cLevel,unsigned last)644 static void displayProgress(unsigned cLevel, unsigned last)
645 {
646     UTIL_time_t currTime = UTIL_getTime();
647     if (!g_useProgressBar) return;
648     {   double const timeElapsed = (double)(UTIL_getSpanTimeMicro(g_startTime, currTime) / 1000.0);
649         double const sizeMB = (double)g_streamedSize / (1 << 20);
650         double const avgCompRate = sizeMB * 1000 / timeElapsed;
651         fprintf(stderr, "\r| Comp. Level: %2u | Time Elapsed: %7.2f s | Data Size: %7.1f MB | Avg Comp. Rate: %6.2f MB/s |", cLevel, timeElapsed/1000.0, sizeMB, avgCompRate);
652         if (last) {
653             fprintf(stderr, "\n");
654         } else {
655             fflush(stderr);
656     }   }
657 }
658 
outputThread(void * arg)659 static void* outputThread(void* arg)
660 {
661     outputThreadArg* const otArg = (outputThreadArg*)arg;
662     adaptCCtx* const ctx = otArg->ctx;
663     FILE* const dstFile = otArg->dstFile;
664 
665     unsigned currJob = 0;
666     for ( ; ; ) {
667         unsigned const currJobIndex = currJob % ctx->numJobs;
668         jobDescription* const job = &ctx->jobs[currJobIndex];
669         unsigned willWaitForCompress = 0;
670         DEBUG(2, "starting write for job %u\n", currJob);
671 
672         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
673         if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1;
674         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
675 
676 
677         pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
678         if (willWaitForCompress) {
679             /* write thread is waiting on compression thread */
680             ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
681             DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
682         }
683         else {
684             ctx->writeWaitCompressionCompletion = 1;
685         }
686         pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
687 
688         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
689         while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
690             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
691         }
692         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
693 
694         /* reset write completion */
695         pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
696         ctx->writeCompletion = 0;
697         pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
698 
699         {
700             size_t const compressedSize = job->compressedSize;
701             size_t remaining = compressedSize;
702             if (ZSTD_isError(compressedSize)) {
703                 DISPLAY("Error: an error occurred during compression\n");
704                 signalErrorToThreads(ctx);
705                 return arg;
706             }
707             {
708                 size_t const blockSize = MAX(compressedSize >> 7, 1 << 10);
709                 size_t pos = 0;
710                 for ( ; ; ) {
711                     size_t const writeSize = MIN(remaining, blockSize);
712                     size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile);
713                     if (ret != writeSize) break;
714                     pos += ret;
715                     remaining -= ret;
716 
717                     /* update completion variable for writing */
718                     pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
719                     ctx->writeCompletion = 1 - (double)remaining/compressedSize;
720                     pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
721 
722                     if (remaining == 0) break;
723                 }
724                 if (pos != compressedSize) {
725                     DISPLAY("Error: an error occurred during file write operation\n");
726                     signalErrorToThreads(ctx);
727                     return arg;
728                 }
729             }
730         }
731         {
732             unsigned cLevel;
733             pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
734             cLevel = ctx->compressionLevel;
735             pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
736             displayProgress(cLevel, job->lastJobPlusOne == currJob + 1);
737         }
738         pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
739         ctx->jobWriteID++;
740         pthread_cond_signal(&ctx->jobWrite_cond.pCond);
741         pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
742 
743         if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
744             /* finished with all jobs */
745             pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
746             ctx->allJobsCompleted = 1;
747             pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
748             pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
749             break;
750         }
751         DEBUG(2, "finished writing job %u\n", currJob);
752         currJob++;
753 
754     }
755     return arg;
756 }
757 
createCompressionJob(adaptCCtx * ctx,size_t srcSize,int last)758 static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
759 {
760     unsigned const nextJob = ctx->nextJobID;
761     unsigned const nextJobIndex = nextJob % ctx->numJobs;
762     jobDescription* const job = &ctx->jobs[nextJobIndex];
763 
764 
765     job->src.size = srcSize;
766     job->jobID = nextJob;
767     if (last) job->lastJobPlusOne = nextJob + 1;
768     {
769         /* swap buffer */
770         void* const copy = job->src.start;
771         job->src.start = ctx->input.buffer.start;
772         ctx->input.buffer.start = copy;
773     }
774     job->dictSize = ctx->lastDictSize;
775 
776     ctx->nextJobID++;
777     /* if not on the last job, reuse data as dictionary in next job */
778     if (!last) {
779         size_t const oldDictSize = ctx->lastDictSize;
780         memcpy(ctx->input.buffer.start, job->src.start + oldDictSize, srcSize);
781         ctx->lastDictSize = srcSize;
782         ctx->input.filled = srcSize;
783     }
784 
785     /* signal job ready */
786     pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
787     ctx->jobReadyID++;
788     pthread_cond_signal(&ctx->jobReady_cond.pCond);
789     pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
790 
791     return 0;
792 }
793 
performCompression(adaptCCtx * ctx,FILE * const srcFile,outputThreadArg * otArg)794 static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg)
795 {
796     /* early error check to exit */
797     if (!ctx || !srcFile || !otArg) {
798         return 1;
799     }
800 
801     /* create output thread */
802     {
803         pthread_t out;
804         if (pthread_create(&out, NULL, &outputThread, otArg)) {
805             DISPLAY("Error: could not create output thread\n");
806             signalErrorToThreads(ctx);
807             return 1;
808         }
809         else if (pthread_detach(out)) {
810         	DISPLAY("Error: could not detach output thread\n");
811         	signalErrorToThreads(ctx);
812         	return 1;
813         }
814     }
815 
816     /* create compression thread */
817     {
818         pthread_t compression;
819         if (pthread_create(&compression, NULL, &compressionThread, ctx)) {
820             DISPLAY("Error: could not create compression thread\n");
821             signalErrorToThreads(ctx);
822             return 1;
823         }
824         else if (pthread_detach(compression)) {
825         	DISPLAY("Error: could not detach compression thread\n");
826         	signalErrorToThreads(ctx);
827         	return 1;
828         }
829     }
830     {
831         unsigned currJob = 0;
832         /* creating jobs */
833         for ( ; ; ) {
834             size_t pos = 0;
835             size_t const readBlockSize = 1 << 15;
836             size_t remaining = FILE_CHUNK_SIZE;
837             unsigned const nextJob = ctx->nextJobID;
838             unsigned willWaitForCompress = 0;
839             DEBUG(2, "starting creation of job %u\n", currJob);
840 
841             pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
842             if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1;
843             pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
844 
845             pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
846             if (willWaitForCompress) {
847                 /* creation thread is waiting, take measurement of completion */
848                 ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
849                 DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
850             }
851             else {
852                 ctx->createWaitCompressionCompletion = 1;
853             }
854             pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
855 
856             /* wait until the job has been compressed */
857             pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
858             while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
859                 pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
860             }
861             pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
862 
863             /* reset create completion */
864             pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
865             ctx->createCompletion = 0;
866             pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
867 
868             while (remaining != 0 && !feof(srcFile)) {
869                 size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
870                 if (ret != readBlockSize && !feof(srcFile)) {
871                     /* error could not read correct number of bytes */
872                     DISPLAY("Error: problem occurred during read from src file\n");
873                     signalErrorToThreads(ctx);
874                     return 1;
875                 }
876                 pos += ret;
877                 remaining -= ret;
878                 pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
879                 ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
880                 pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
881             }
882             if (remaining != 0 && !feof(srcFile)) {
883                 DISPLAY("Error: problem occurred during read from src file\n");
884                 signalErrorToThreads(ctx);
885                 return 1;
886             }
887             g_streamedSize += pos;
888             /* reading was fine, now create the compression job */
889             {
890                 int const last = feof(srcFile);
891                 int const error = createCompressionJob(ctx, pos, last);
892                 if (error != 0) {
893                     signalErrorToThreads(ctx);
894                     return error;
895                 }
896             }
897             DEBUG(2, "finished creating job %u\n", currJob);
898             currJob++;
899             if (feof(srcFile)) {
900                 break;
901             }
902         }
903     }
904     /* success -- created all jobs */
905     return 0;
906 }
907 
createFileCompressionResources(const char * const srcFilename,const char * const dstFilenameOrNull)908 static fcResources createFileCompressionResources(const char* const srcFilename, const char* const dstFilenameOrNull)
909 {
910     fcResources fcr;
911     unsigned const stdinUsed = !strcmp(srcFilename, stdinmark);
912     FILE* const srcFile = stdinUsed ? stdin : fopen(srcFilename, "rb");
913     const char* const outFilenameIntermediate = (stdinUsed && !dstFilenameOrNull) ? stdoutmark : dstFilenameOrNull;
914     const char* outFilename = outFilenameIntermediate;
915     char fileAndSuffix[MAX_PATH];
916     size_t const numJobs = MAX_NUM_JOBS;
917 
918     memset(&fcr, 0, sizeof(fcr));
919 
920     if (!outFilenameIntermediate) {
921         if (snprintf(fileAndSuffix, MAX_PATH, "%s.zst", srcFilename) + 1 > MAX_PATH) {
922             DISPLAY("Error: output filename is too long\n");
923             return fcr;
924         }
925         outFilename = fileAndSuffix;
926     }
927 
928     {
929         unsigned const stdoutUsed = !strcmp(outFilename, stdoutmark);
930         FILE* const dstFile = stdoutUsed ? stdout : fopen(outFilename, "wb");
931         fcr.otArg = malloc(sizeof(outputThreadArg));
932         if (!fcr.otArg) {
933             DISPLAY("Error: could not allocate space for output thread argument\n");
934             return fcr;
935         }
936         fcr.otArg->dstFile = dstFile;
937     }
938     /* checking for errors */
939     if (!fcr.otArg->dstFile || !srcFile) {
940         DISPLAY("Error: some file(s) could not be opened\n");
941         return fcr;
942     }
943 
944     /* creating context */
945     fcr.ctx = createCCtx(numJobs);
946     fcr.otArg->ctx = fcr.ctx;
947     fcr.srcFile = srcFile;
948     return fcr;
949 }
950 
freeFileCompressionResources(fcResources * fcr)951 static int freeFileCompressionResources(fcResources* fcr)
952 {
953     int ret = 0;
954     waitUntilAllJobsCompleted(fcr->ctx);
955     ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
956     ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
957     if (fcr->otArg) {
958         ret |= (fcr->otArg->dstFile != stdout) ? fclose(fcr->otArg->dstFile) : 0;
959         free(fcr->otArg);
960         /* no need to freeCCtx() on otArg->ctx because it should be the same context */
961     }
962     return ret;
963 }
964 
compressFilename(const char * const srcFilename,const char * const dstFilenameOrNull)965 static int compressFilename(const char* const srcFilename, const char* const dstFilenameOrNull)
966 {
967     int ret = 0;
968     fcResources fcr = createFileCompressionResources(srcFilename, dstFilenameOrNull);
969     g_streamedSize = 0;
970     ret |= performCompression(fcr.ctx, fcr.srcFile, fcr.otArg);
971     ret |= freeFileCompressionResources(&fcr);
972     return ret;
973 }
974 
compressFilenames(const char ** filenameTable,unsigned numFiles,unsigned forceStdout)975 static int compressFilenames(const char** filenameTable, unsigned numFiles, unsigned forceStdout)
976 {
977     int ret = 0;
978     unsigned fileNum;
979     for (fileNum=0; fileNum<numFiles; fileNum++) {
980         const char* filename = filenameTable[fileNum];
981         if (!forceStdout) {
982             ret |= compressFilename(filename, NULL);
983         }
984         else {
985             ret |= compressFilename(filename, stdoutmark);
986         }
987 
988     }
989     return ret;
990 }
991 
992 /*! readU32FromChar() :
993     @return : unsigned integer value read from input in `char` format
994     allows and interprets K, KB, KiB, M, MB and MiB suffix.
995     Will also modify `*stringPtr`, advancing it to position where it stopped reading.
996     Note : function result can overflow if digit string > MAX_UINT */
readU32FromChar(const char ** stringPtr)997 static unsigned readU32FromChar(const char** stringPtr)
998 {
999     unsigned result = 0;
1000     while ((**stringPtr >='0') && (**stringPtr <='9'))
1001         result *= 10, result += **stringPtr - '0', (*stringPtr)++ ;
1002     if ((**stringPtr=='K') || (**stringPtr=='M')) {
1003         result <<= 10;
1004         if (**stringPtr=='M') result <<= 10;
1005         (*stringPtr)++ ;
1006         if (**stringPtr=='i') (*stringPtr)++;
1007         if (**stringPtr=='B') (*stringPtr)++;
1008     }
1009     return result;
1010 }
1011 
help(const char * progPath)1012 static void help(const char* progPath)
1013 {
1014     PRINT("Usage:\n");
1015     PRINT("  %s [options] [file(s)]\n", progPath);
1016     PRINT("\n");
1017     PRINT("Options:\n");
1018     PRINT("  -oFILE : specify the output file name\n");
1019     PRINT("  -i#    : provide initial compression level -- default %d, must be in the range [L, U] where L and U are bound values (see below for defaults)\n", DEFAULT_COMPRESSION_LEVEL);
1020     PRINT("  -h     : display help/information\n");
1021     PRINT("  -f     : force the compression level to stay constant\n");
1022     PRINT("  -c     : force write to stdout\n");
1023     PRINT("  -p     : hide progress bar\n");
1024     PRINT("  -q     : quiet mode -- do not show progress bar or other information\n");
1025     PRINT("  -l#    : provide lower bound for compression level -- default 1\n");
1026     PRINT("  -u#    : provide upper bound for compression level -- default %u\n", ZSTD_maxCLevel());
1027 }
1028 /* return 0 if successful, else return error */
main(int argCount,const char * argv[])1029 int main(int argCount, const char* argv[])
1030 {
1031     const char* outFilename = NULL;
1032     const char** filenameTable = (const char**)malloc(argCount*sizeof(const char*));
1033     unsigned filenameIdx = 0;
1034     unsigned forceStdout = 0;
1035     unsigned providedInitialCLevel = 0;
1036     int ret = 0;
1037     int argNum;
1038     filenameTable[0] = stdinmark;
1039     g_maxCLevel = ZSTD_maxCLevel();
1040 
1041     if (filenameTable == NULL) {
1042         DISPLAY("Error: could not allocate sapce for filename table.\n");
1043         return 1;
1044     }
1045 
1046     for (argNum=1; argNum<argCount; argNum++) {
1047         const char* argument = argv[argNum];
1048 
1049         /* output filename designated with "-o" */
1050         if (argument[0]=='-' && strlen(argument) > 1) {
1051             switch (argument[1]) {
1052                 case 'o':
1053                     argument += 2;
1054                     outFilename = argument;
1055                     break;
1056                 case 'i':
1057                     argument += 2;
1058                     g_compressionLevel = readU32FromChar(&argument);
1059                     providedInitialCLevel = 1;
1060                     break;
1061                 case 'h':
1062                     help(argv[0]);
1063                     goto _main_exit;
1064                 case 'p':
1065                     g_useProgressBar = 0;
1066                     break;
1067                 case 'c':
1068                     forceStdout = 1;
1069                     outFilename = stdoutmark;
1070                     break;
1071                 case 'f':
1072                     g_forceCompressionLevel = 1;
1073                     break;
1074                 case 'q':
1075                     g_useProgressBar = 0;
1076                     g_displayLevel = 0;
1077                     break;
1078                 case 'l':
1079                     argument += 2;
1080                     g_minCLevel = readU32FromChar(&argument);
1081                     break;
1082                 case 'u':
1083                     argument += 2;
1084                     g_maxCLevel = readU32FromChar(&argument);
1085                     break;
1086                 default:
1087                     DISPLAY("Error: invalid argument provided\n");
1088                     ret = 1;
1089                     goto _main_exit;
1090             }
1091             continue;
1092         }
1093 
1094         /* regular files to be compressed */
1095         filenameTable[filenameIdx++] = argument;
1096     }
1097 
1098     /* check initial, max, and min compression levels */
1099     {
1100         unsigned const minMaxInconsistent = g_minCLevel > g_maxCLevel;
1101         unsigned const initialNotInRange = g_minCLevel > g_compressionLevel || g_maxCLevel < g_compressionLevel;
1102         if (minMaxInconsistent || (initialNotInRange && providedInitialCLevel)) {
1103             DISPLAY("Error: provided compression level parameters are invalid\n");
1104             ret = 1;
1105             goto _main_exit;
1106         }
1107         else if (initialNotInRange) {
1108             g_compressionLevel = g_minCLevel;
1109         }
1110     }
1111 
1112     /* error checking with number of files */
1113     if (filenameIdx > 1 && (outFilename != NULL && strcmp(outFilename, stdoutmark))) {
1114         DISPLAY("Error: multiple input files provided, cannot use specified output file\n");
1115         ret = 1;
1116         goto _main_exit;
1117     }
1118 
1119     /* compress files */
1120     if (filenameIdx <= 1) {
1121         ret |= compressFilename(filenameTable[0], outFilename);
1122     }
1123     else {
1124         ret |= compressFilenames(filenameTable, filenameIdx, forceStdout);
1125     }
1126 _main_exit:
1127     free(filenameTable);
1128     return ret;
1129 }
1130