1 /***********************************************************************************************************************************
2 ZST Compress
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5
6 #ifdef HAVE_LIBZST
7
8 #include <zstd.h>
9
10 #include "common/compress/zst/common.h"
11 #include "common/compress/zst/compress.h"
12 #include "common/debug.h"
13 #include "common/io/filter/filter.h"
14 #include "common/log.h"
15 #include "common/memContext.h"
16 #include "common/type/object.h"
17
18 /***********************************************************************************************************************************
19 Filter type constant
20 ***********************************************************************************************************************************/
21 STRING_EXTERN(ZST_COMPRESS_FILTER_TYPE_STR, ZST_COMPRESS_FILTER_TYPE);
22
23 /***********************************************************************************************************************************
24 Object type
25 ***********************************************************************************************************************************/
26 typedef struct ZstCompress
27 {
28 MemContext *memContext; // Context to store data
29 ZSTD_CStream *context; // Compression context
30 int level; // Compression level
31 IoFilter *filter; // Filter interface
32
33 bool inputSame; // Is the same input required on the next process call?
34 size_t inputOffset; // Current offset in input buffer
35 bool flushing; // Is input complete and flushing in progress?
36 } ZstCompress;
37
38 /***********************************************************************************************************************************
39 Render as string for logging
40 ***********************************************************************************************************************************/
41 static String *
zstCompressToLog(const ZstCompress * this)42 zstCompressToLog(const ZstCompress *this)
43 {
44 return strNewFmt(
45 "{level: %d, inputSame: %s, inputOffset: %zu, flushing: %s}", this->level, cvtBoolToConstZ(this->inputSame),
46 this->inputOffset, cvtBoolToConstZ(this->flushing));
47 }
48
49 #define FUNCTION_LOG_ZST_COMPRESS_TYPE \
50 ZstCompress *
51 #define FUNCTION_LOG_ZST_COMPRESS_FORMAT(value, buffer, bufferSize) \
52 FUNCTION_LOG_STRING_OBJECT_FORMAT(value, zstCompressToLog, buffer, bufferSize)
53
54 /***********************************************************************************************************************************
55 Free compression context
56 ***********************************************************************************************************************************/
57 static void
zstCompressFreeResource(THIS_VOID)58 zstCompressFreeResource(THIS_VOID)
59 {
60 THIS(ZstCompress);
61
62 FUNCTION_LOG_BEGIN(logLevelTrace);
63 FUNCTION_LOG_PARAM(ZST_COMPRESS, this);
64 FUNCTION_LOG_END();
65
66 ASSERT(this != NULL);
67
68 ZSTD_freeCStream(this->context);
69
70 FUNCTION_LOG_RETURN_VOID();
71 }
72
73 /***********************************************************************************************************************************
74 Compress data
75 ***********************************************************************************************************************************/
76 static void
zstCompressProcess(THIS_VOID,const Buffer * uncompressed,Buffer * compressed)77 zstCompressProcess(THIS_VOID, const Buffer *uncompressed, Buffer *compressed)
78 {
79 THIS(ZstCompress);
80
81 FUNCTION_LOG_BEGIN(logLevelTrace);
82 FUNCTION_LOG_PARAM(ZST_COMPRESS, this);
83 FUNCTION_LOG_PARAM(BUFFER, uncompressed);
84 FUNCTION_LOG_PARAM(BUFFER, compressed);
85 FUNCTION_LOG_END();
86
87 ASSERT(this != NULL);
88 ASSERT(!(this->flushing && !this->inputSame));
89 ASSERT(this->context != NULL);
90 ASSERT(compressed != NULL);
91 ASSERT(!this->flushing || uncompressed == NULL);
92
93 // Initialize output buffer
94 ZSTD_outBuffer out = {.dst = bufRemainsPtr(compressed), .size = bufRemains(compressed)};
95
96 // If input is NULL then start flushing
97 if (uncompressed == NULL)
98 {
99 this->flushing = true;
100 this->inputSame = zstError(ZSTD_endStream(this->context, &out)) != 0;
101 }
102 // Else still have input data
103 else
104 {
105 // Initialize input buffer
106 ZSTD_inBuffer in =
107 {
108 .src = bufPtrConst(uncompressed) + this->inputOffset,
109 .size = bufUsed(uncompressed) - this->inputOffset,
110 };
111
112 // Perform compression
113 zstError(ZSTD_compressStream(this->context, &out, &in));
114
115 // If the input buffer was not entirely consumed then set inputSame and store the offset where processing will restart
116 if (in.pos < in.size)
117 {
118 // Output buffer should be completely full
119 ASSERT(out.pos == out.size);
120
121 this->inputSame = true;
122 this->inputOffset += in.pos;
123 }
124 // Else ready for more input
125 else
126 {
127 this->inputSame = false;
128 this->inputOffset = 0;
129 }
130 }
131
132 bufUsedInc(compressed, out.pos);
133
134 FUNCTION_LOG_RETURN_VOID();
135 }
136
137 /***********************************************************************************************************************************
138 Is compress done?
139 ***********************************************************************************************************************************/
140 static bool
zstCompressDone(const THIS_VOID)141 zstCompressDone(const THIS_VOID)
142 {
143 THIS(const ZstCompress);
144
145 FUNCTION_TEST_BEGIN();
146 FUNCTION_TEST_PARAM(ZST_COMPRESS, this);
147 FUNCTION_TEST_END();
148
149 ASSERT(this != NULL);
150
151 FUNCTION_TEST_RETURN(this->flushing && !this->inputSame);
152 }
153
154 /***********************************************************************************************************************************
155 Is the same input required on the next process call?
156 ***********************************************************************************************************************************/
157 static bool
zstCompressInputSame(const THIS_VOID)158 zstCompressInputSame(const THIS_VOID)
159 {
160 THIS(const ZstCompress);
161
162 FUNCTION_TEST_BEGIN();
163 FUNCTION_TEST_PARAM(ZST_COMPRESS, this);
164 FUNCTION_TEST_END();
165
166 ASSERT(this != NULL);
167
168 FUNCTION_TEST_RETURN(this->inputSame);
169 }
170
171 /**********************************************************************************************************************************/
172 IoFilter *
zstCompressNew(int level)173 zstCompressNew(int level)
174 {
175 FUNCTION_LOG_BEGIN(logLevelTrace);
176 FUNCTION_LOG_PARAM(INT, level);
177 FUNCTION_LOG_END();
178
179 ASSERT(level >= 0);
180
181 IoFilter *this = NULL;
182
183 MEM_CONTEXT_NEW_BEGIN("ZstCompress")
184 {
185 ZstCompress *driver = memNew(sizeof(ZstCompress));
186
187 *driver = (ZstCompress)
188 {
189 .memContext = MEM_CONTEXT_NEW(),
190 .context = ZSTD_createCStream(),
191 .level = level,
192 };
193
194 // Set callback to ensure zst context is freed
195 memContextCallbackSet(driver->memContext, zstCompressFreeResource, driver);
196
197 // Initialize context
198 zstError(ZSTD_initCStream(driver->context, driver->level));
199
200 // Create param list
201 VariantList *paramList = varLstNew();
202 varLstAdd(paramList, varNewInt(level));
203
204 // Create filter interface
205 this = ioFilterNewP(
206 ZST_COMPRESS_FILTER_TYPE_STR, driver, paramList, .done = zstCompressDone, .inOut = zstCompressProcess,
207 .inputSame = zstCompressInputSame);
208 }
209 MEM_CONTEXT_NEW_END();
210
211 FUNCTION_LOG_RETURN(IO_FILTER, this);
212 }
213
214 #endif // HAVE_LIBZST
215