1 /***********************************************************************************************************************************
2 LZ4 Compress
3 
4 Developed against version r131 using the documentation in https://github.com/lz4/lz4/blob/r131/lib/lz4frame.h.
5 ***********************************************************************************************************************************/
6 #include "build.auto.h"
7 
8 #ifdef HAVE_LIBLZ4
9 
10 #include <stdio.h>
11 #include <lz4frame.h>
12 #include <string.h>
13 
14 #include "common/compress/lz4/common.h"
15 #include "common/compress/lz4/compress.h"
16 #include "common/debug.h"
17 #include "common/io/filter/filter.h"
18 #include "common/log.h"
19 #include "common/memContext.h"
20 #include "common/type/object.h"
21 
22 /***********************************************************************************************************************************
23 Older versions of lz4 do not define the max header size.  This seems to be the max for any version.
24 ***********************************************************************************************************************************/
25 #ifndef LZ4F_HEADER_SIZE_MAX
26     #define LZ4F_HEADER_SIZE_MAX                                    19
27 #endif
28 
29 /***********************************************************************************************************************************
30 Filter type constant
31 ***********************************************************************************************************************************/
32 STRING_EXTERN(LZ4_COMPRESS_FILTER_TYPE_STR,                         LZ4_COMPRESS_FILTER_TYPE);
33 
34 /***********************************************************************************************************************************
35 Object type
36 ***********************************************************************************************************************************/
37 typedef struct Lz4Compress
38 {
39     MemContext *memContext;                                         // Context to store data
40     LZ4F_compressionContext_t context;                              // LZ4 compression context
41     LZ4F_preferences_t prefs;                                       // Preferences -- just compress level set
42     IoFilter *filter;                                               // Filter interface
43 
44     Buffer *buffer;                                                 // For when the output buffer can't accept all compressed data
45     bool first;                                                     // Is this the first call to process?
46     bool inputSame;                                                 // Is the same input required on the next process call?
47     bool flushing;                                                  // Is input complete and flushing in progress?
48 } Lz4Compress;
49 
50 /***********************************************************************************************************************************
51 Render as string for logging
52 ***********************************************************************************************************************************/
53 static String *
lz4CompressToLog(const Lz4Compress * this)54 lz4CompressToLog(const Lz4Compress *this)
55 {
56     return strNewFmt(
57         "{level: %d, first: %s, inputSame: %s, flushing: %s}", this->prefs.compressionLevel,
58         cvtBoolToConstZ(this->first), cvtBoolToConstZ(this->inputSame), cvtBoolToConstZ(this->flushing));
59 }
60 
61 #define FUNCTION_LOG_LZ4_COMPRESS_TYPE                                                                                             \
62     Lz4Compress *
63 #define FUNCTION_LOG_LZ4_COMPRESS_FORMAT(value, buffer, bufferSize)                                                                \
64     FUNCTION_LOG_STRING_OBJECT_FORMAT(value, lz4CompressToLog, buffer, bufferSize)
65 
66 /***********************************************************************************************************************************
67 Free compression context
68 ***********************************************************************************************************************************/
69 static void
lz4CompressFreeResource(THIS_VOID)70 lz4CompressFreeResource(THIS_VOID)
71 {
72     THIS(Lz4Compress);
73 
74     FUNCTION_LOG_BEGIN(logLevelTrace);
75         FUNCTION_LOG_PARAM(LZ4_COMPRESS, this);
76     FUNCTION_LOG_END();
77 
78     ASSERT(this != NULL);
79 
80     LZ4F_freeCompressionContext(this->context);
81 
82     FUNCTION_LOG_RETURN_VOID();
83 }
84 
85 /***********************************************************************************************************************************
86 Compress data
87 ***********************************************************************************************************************************/
88 // Helper to return a buffer where output will be written.  If there is enough space in the provided output buffer then use it,
89 // otherwise allocate an internal buffer to hold the compressed data.  Once we start using the internal buffer we'll need to
90 // continue using it until it is completely flushed.
91 static Buffer *
lz4CompressBuffer(Lz4Compress * this,size_t required,Buffer * output)92 lz4CompressBuffer(Lz4Compress *this, size_t required, Buffer *output)
93 {
94     FUNCTION_TEST_BEGIN();
95         FUNCTION_TEST_PARAM(LZ4_COMPRESS, this);
96         FUNCTION_TEST_PARAM(SIZE, required);
97         FUNCTION_TEST_PARAM(BUFFER, output);
98     FUNCTION_TEST_END();
99 
100     Buffer *result = output;
101 
102     // Is an internal buffer required to hold the compressed data?
103     if (!bufEmpty(this->buffer) || required >= bufRemains(output))
104     {
105         // Reallocate buffer if it is not large enough
106         if (required >= bufRemains(this->buffer))
107             bufResize(this->buffer, bufUsed(this->buffer) + required);
108 
109         result = this->buffer;
110     }
111 
112     FUNCTION_TEST_RETURN(result);
113 }
114 
115 // Helper to flush output data to compressed buffer
116 static void
lz4CompressFlush(Lz4Compress * this,Buffer * output,Buffer * compressed)117 lz4CompressFlush(Lz4Compress *this, Buffer *output, Buffer *compressed)
118 {
119     FUNCTION_TEST_BEGIN();
120         FUNCTION_TEST_PARAM(LZ4_COMPRESS, this);
121         FUNCTION_TEST_PARAM(BUFFER, output);
122         FUNCTION_TEST_PARAM(BUFFER, compressed);
123     FUNCTION_TEST_END();
124 
125     // If the compressed buffer can hold all the output
126     if (bufRemains(compressed) >= bufUsed(output))
127     {
128         bufCat(compressed, output);
129         bufUsedZero(output);
130 
131         this->inputSame = false;
132     }
133     // Else flush as much as possible and set inputSame to flush more once the compressed buffer has been emptied
134     else
135     {
136         size_t catSize = bufRemains(compressed);
137         bufCatSub(compressed, output, 0, catSize);
138 
139         memmove(bufPtr(output), bufPtr(output) + catSize, bufUsed(output) - catSize);
140         bufUsedSet(output, bufUsed(output) - catSize);
141 
142         this->inputSame = true;
143     }
144 
145     FUNCTION_TEST_RETURN_VOID();
146 }
147 
148 static void
lz4CompressProcess(THIS_VOID,const Buffer * uncompressed,Buffer * compressed)149 lz4CompressProcess(THIS_VOID, const Buffer *uncompressed, Buffer *compressed)
150 {
151     THIS(Lz4Compress);
152 
153     FUNCTION_LOG_BEGIN(logLevelTrace);
154         FUNCTION_LOG_PARAM(LZ4_COMPRESS, this);
155         FUNCTION_LOG_PARAM(BUFFER, uncompressed);
156         FUNCTION_LOG_PARAM(BUFFER, compressed);
157     FUNCTION_LOG_END();
158 
159     ASSERT(this != NULL);
160     ASSERT(!(this->flushing && !this->inputSame));
161     ASSERT(this->context != NULL);
162     ASSERT(compressed != NULL);
163     ASSERT(!this->flushing || uncompressed == NULL);
164 
165     // Flush overflow output to the compressed buffer
166     if (this->inputSame)
167     {
168         lz4CompressFlush(this, this->buffer, compressed);
169     }
170     else
171     {
172         Buffer *output = NULL;
173 
174         // If first call to process then begin compression
175         if (this->first)
176         {
177             output = lz4CompressBuffer(this, LZ4F_HEADER_SIZE_MAX, compressed);
178             bufUsedInc(
179                 output, lz4Error(LZ4F_compressBegin(this->context, bufRemainsPtr(output), bufRemains(output), &this->prefs)));
180 
181             this->first = false;
182         }
183 
184         // Normal processing call
185         if (uncompressed != NULL)
186         {
187             output = lz4CompressBuffer(this, lz4Error(LZ4F_compressBound(bufUsed(uncompressed), &this->prefs)), compressed);
188 
189             bufUsedInc(
190                 output,
191                 lz4Error(
192                     LZ4F_compressUpdate(
193                         this->context, bufRemainsPtr(output), bufRemains(output), bufPtrConst(uncompressed), bufUsed(uncompressed),
194                         NULL)));
195         }
196         // Else flush remaining output
197         else
198         {
199             // Pass 1 as src size to help allocate enough space for the final flush. This is required for some versions that don't
200             // allocate enough memory unless autoFlush is enabled. Other versions fail if autoFlush is only enabled before the final
201             // flush. This will hopefully work across all versions even if it does allocate a larger buffer than needed.
202             output = lz4CompressBuffer(this, lz4Error(LZ4F_compressBound(1, &this->prefs)), compressed);
203             bufUsedInc(output, lz4Error(LZ4F_compressEnd(this->context, bufRemainsPtr(output), bufRemains(output), NULL)));
204 
205             this->flushing = true;
206         }
207 
208         // If the output buffer was allocated locally it will need to be flushed to the compressed buffer
209         if (output != compressed)
210             lz4CompressFlush(this, output, compressed);
211     }
212 
213     FUNCTION_LOG_RETURN_VOID();
214 }
215 
216 /***********************************************************************************************************************************
217 Is compress done?
218 ***********************************************************************************************************************************/
219 static bool
lz4CompressDone(const THIS_VOID)220 lz4CompressDone(const THIS_VOID)
221 {
222     THIS(const Lz4Compress);
223 
224     FUNCTION_TEST_BEGIN();
225         FUNCTION_TEST_PARAM(LZ4_COMPRESS, this);
226     FUNCTION_TEST_END();
227 
228     ASSERT(this != NULL);
229 
230     FUNCTION_TEST_RETURN(this->flushing && !this->inputSame);
231 }
232 
233 /***********************************************************************************************************************************
234 Is the same input required on the next process call?
235 ***********************************************************************************************************************************/
236 static bool
lz4CompressInputSame(const THIS_VOID)237 lz4CompressInputSame(const THIS_VOID)
238 {
239     THIS(const Lz4Compress);
240 
241     FUNCTION_TEST_BEGIN();
242         FUNCTION_TEST_PARAM(LZ4_COMPRESS, this);
243     FUNCTION_TEST_END();
244 
245     ASSERT(this != NULL);
246 
247     FUNCTION_TEST_RETURN(this->inputSame);
248 }
249 
250 /**********************************************************************************************************************************/
251 IoFilter *
lz4CompressNew(int level)252 lz4CompressNew(int level)
253 {
254     FUNCTION_LOG_BEGIN(logLevelTrace);
255         FUNCTION_LOG_PARAM(INT, level);
256     FUNCTION_LOG_END();
257 
258     ASSERT(level >= 0);
259 
260     IoFilter *this = NULL;
261 
262     MEM_CONTEXT_NEW_BEGIN("Lz4Compress")
263     {
264         Lz4Compress *driver = memNew(sizeof(Lz4Compress));
265 
266         *driver = (Lz4Compress)
267         {
268             .memContext = MEM_CONTEXT_NEW(),
269             .prefs = {.compressionLevel = level, .frameInfo = {.contentChecksumFlag = LZ4F_contentChecksumEnabled}},
270             .first = true,
271             .buffer = bufNew(0),
272         };
273 
274         // Create lz4 context
275         lz4Error(LZ4F_createCompressionContext(&driver->context, LZ4F_VERSION));
276 
277         // Set callback to ensure lz4 context is freed
278         memContextCallbackSet(driver->memContext, lz4CompressFreeResource, driver);
279 
280         // Create param list
281         VariantList *paramList = varLstNew();
282         varLstAdd(paramList, varNewInt(level));
283 
284         // Create filter interface
285         this = ioFilterNewP(
286             LZ4_COMPRESS_FILTER_TYPE_STR, driver, paramList, .done = lz4CompressDone, .inOut = lz4CompressProcess,
287             .inputSame = lz4CompressInputSame);
288     }
289     MEM_CONTEXT_NEW_END();
290 
291     FUNCTION_LOG_RETURN(IO_FILTER, this);
292 }
293 
294 #endif // HAVE_LIBLZ4
295