1 /***********************************************************************************************************************************
2 ZST Compress
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5 
6 #ifdef HAVE_LIBZST
7 
8 #include <zstd.h>
9 
10 #include "common/compress/zst/common.h"
11 #include "common/compress/zst/compress.h"
12 #include "common/debug.h"
13 #include "common/io/filter/filter.h"
14 #include "common/log.h"
15 #include "common/memContext.h"
16 #include "common/type/object.h"
17 
18 /***********************************************************************************************************************************
19 Filter type constant
20 ***********************************************************************************************************************************/
21 STRING_EXTERN(ZST_COMPRESS_FILTER_TYPE_STR,                         ZST_COMPRESS_FILTER_TYPE);
22 
23 /***********************************************************************************************************************************
24 Object type
25 ***********************************************************************************************************************************/
26 typedef struct ZstCompress
27 {
28     MemContext *memContext;                                         // Context to store data
29     ZSTD_CStream *context;                                          // Compression context
30     int level;                                                      // Compression level
31     IoFilter *filter;                                               // Filter interface
32 
33     bool inputSame;                                                 // Is the same input required on the next process call?
34     size_t inputOffset;                                             // Current offset in input buffer
35     bool flushing;                                                  // Is input complete and flushing in progress?
36 } ZstCompress;
37 
38 /***********************************************************************************************************************************
39 Render as string for logging
40 ***********************************************************************************************************************************/
41 static String *
zstCompressToLog(const ZstCompress * this)42 zstCompressToLog(const ZstCompress *this)
43 {
44     return strNewFmt(
45         "{level: %d, inputSame: %s, inputOffset: %zu, flushing: %s}", this->level, cvtBoolToConstZ(this->inputSame),
46         this->inputOffset, cvtBoolToConstZ(this->flushing));
47 }
48 
49 #define FUNCTION_LOG_ZST_COMPRESS_TYPE                                                                                             \
50     ZstCompress *
51 #define FUNCTION_LOG_ZST_COMPRESS_FORMAT(value, buffer, bufferSize)                                                                \
52     FUNCTION_LOG_STRING_OBJECT_FORMAT(value, zstCompressToLog, buffer, bufferSize)
53 
54 /***********************************************************************************************************************************
55 Free compression context
56 ***********************************************************************************************************************************/
57 static void
zstCompressFreeResource(THIS_VOID)58 zstCompressFreeResource(THIS_VOID)
59 {
60     THIS(ZstCompress);
61 
62     FUNCTION_LOG_BEGIN(logLevelTrace);
63         FUNCTION_LOG_PARAM(ZST_COMPRESS, this);
64     FUNCTION_LOG_END();
65 
66     ASSERT(this != NULL);
67 
68     ZSTD_freeCStream(this->context);
69 
70     FUNCTION_LOG_RETURN_VOID();
71 }
72 
73 /***********************************************************************************************************************************
74 Compress data
75 ***********************************************************************************************************************************/
76 static void
zstCompressProcess(THIS_VOID,const Buffer * uncompressed,Buffer * compressed)77 zstCompressProcess(THIS_VOID, const Buffer *uncompressed, Buffer *compressed)
78 {
79     THIS(ZstCompress);
80 
81     FUNCTION_LOG_BEGIN(logLevelTrace);
82         FUNCTION_LOG_PARAM(ZST_COMPRESS, this);
83         FUNCTION_LOG_PARAM(BUFFER, uncompressed);
84         FUNCTION_LOG_PARAM(BUFFER, compressed);
85     FUNCTION_LOG_END();
86 
87     ASSERT(this != NULL);
88     ASSERT(!(this->flushing && !this->inputSame));
89     ASSERT(this->context != NULL);
90     ASSERT(compressed != NULL);
91     ASSERT(!this->flushing || uncompressed == NULL);
92 
93     // Initialize output buffer
94     ZSTD_outBuffer out = {.dst = bufRemainsPtr(compressed), .size = bufRemains(compressed)};
95 
96     // If input is NULL then start flushing
97     if (uncompressed == NULL)
98     {
99         this->flushing = true;
100         this->inputSame = zstError(ZSTD_endStream(this->context, &out)) != 0;
101     }
102     // Else still have input data
103     else
104     {
105         // Initialize input buffer
106         ZSTD_inBuffer in =
107         {
108             .src = bufPtrConst(uncompressed) + this->inputOffset,
109             .size = bufUsed(uncompressed) - this->inputOffset,
110         };
111 
112         // Perform compression
113         zstError(ZSTD_compressStream(this->context, &out, &in));
114 
115         // If the input buffer was not entirely consumed then set inputSame and store the offset where processing will restart
116         if (in.pos < in.size)
117         {
118             // Output buffer should be completely full
119             ASSERT(out.pos == out.size);
120 
121             this->inputSame = true;
122             this->inputOffset += in.pos;
123         }
124         // Else ready for more input
125         else
126         {
127             this->inputSame = false;
128             this->inputOffset = 0;
129         }
130     }
131 
132     bufUsedInc(compressed, out.pos);
133 
134     FUNCTION_LOG_RETURN_VOID();
135 }
136 
137 /***********************************************************************************************************************************
138 Is compress done?
139 ***********************************************************************************************************************************/
140 static bool
zstCompressDone(const THIS_VOID)141 zstCompressDone(const THIS_VOID)
142 {
143     THIS(const ZstCompress);
144 
145     FUNCTION_TEST_BEGIN();
146         FUNCTION_TEST_PARAM(ZST_COMPRESS, this);
147     FUNCTION_TEST_END();
148 
149     ASSERT(this != NULL);
150 
151     FUNCTION_TEST_RETURN(this->flushing && !this->inputSame);
152 }
153 
154 /***********************************************************************************************************************************
155 Is the same input required on the next process call?
156 ***********************************************************************************************************************************/
157 static bool
zstCompressInputSame(const THIS_VOID)158 zstCompressInputSame(const THIS_VOID)
159 {
160     THIS(const ZstCompress);
161 
162     FUNCTION_TEST_BEGIN();
163         FUNCTION_TEST_PARAM(ZST_COMPRESS, this);
164     FUNCTION_TEST_END();
165 
166     ASSERT(this != NULL);
167 
168     FUNCTION_TEST_RETURN(this->inputSame);
169 }
170 
171 /**********************************************************************************************************************************/
172 IoFilter *
zstCompressNew(int level)173 zstCompressNew(int level)
174 {
175     FUNCTION_LOG_BEGIN(logLevelTrace);
176         FUNCTION_LOG_PARAM(INT, level);
177     FUNCTION_LOG_END();
178 
179     ASSERT(level >= 0);
180 
181     IoFilter *this = NULL;
182 
183     MEM_CONTEXT_NEW_BEGIN("ZstCompress")
184     {
185         ZstCompress *driver = memNew(sizeof(ZstCompress));
186 
187         *driver = (ZstCompress)
188         {
189             .memContext = MEM_CONTEXT_NEW(),
190             .context = ZSTD_createCStream(),
191             .level = level,
192         };
193 
194         // Set callback to ensure zst context is freed
195         memContextCallbackSet(driver->memContext, zstCompressFreeResource, driver);
196 
197         // Initialize context
198         zstError(ZSTD_initCStream(driver->context, driver->level));
199 
200         // Create param list
201         VariantList *paramList = varLstNew();
202         varLstAdd(paramList, varNewInt(level));
203 
204         // Create filter interface
205         this = ioFilterNewP(
206             ZST_COMPRESS_FILTER_TYPE_STR, driver, paramList, .done = zstCompressDone, .inOut = zstCompressProcess,
207             .inputSame = zstCompressInputSame);
208     }
209     MEM_CONTEXT_NEW_END();
210 
211     FUNCTION_LOG_RETURN(IO_FILTER, this);
212 }
213 
214 #endif // HAVE_LIBZST
215