1 #include "compression.hh"
2 #include "util.hh"
3 #include "finally.hh"
4 #include "logging.hh"
5 
6 #include <lzma.h>
7 #include <bzlib.h>
8 #include <cstdio>
9 #include <cstring>
10 
11 #include <brotli/decode.h>
12 #include <brotli/encode.h>
13 
14 #include <iostream>
15 
16 namespace nix {
17 
18 // Don't feed brotli too much at once.
19 struct ChunkedCompressionSink : CompressionSink
20 {
21     uint8_t outbuf[32 * 1024];
22 
writenix::ChunkedCompressionSink23     void write(const unsigned char * data, size_t len) override
24     {
25         const size_t CHUNK_SIZE = sizeof(outbuf) << 2;
26         while (len) {
27             size_t n = std::min(CHUNK_SIZE, len);
28             writeInternal(data, n);
29             data += n;
30             len -= n;
31         }
32     }
33 
34     virtual void writeInternal(const unsigned char * data, size_t len) = 0;
35 };
36 
37 struct NoneSink : CompressionSink
38 {
39     Sink & nextSink;
NoneSinknix::NoneSink40     NoneSink(Sink & nextSink) : nextSink(nextSink) { }
finishnix::NoneSink41     void finish() override { flush(); }
writenix::NoneSink42     void write(const unsigned char * data, size_t len) override { nextSink(data, len); }
43 };
44 
45 struct XzDecompressionSink : CompressionSink
46 {
47     Sink & nextSink;
48     uint8_t outbuf[BUFSIZ];
49     lzma_stream strm = LZMA_STREAM_INIT;
50     bool finished = false;
51 
XzDecompressionSinknix::XzDecompressionSink52     XzDecompressionSink(Sink & nextSink) : nextSink(nextSink)
53     {
54         lzma_ret ret = lzma_stream_decoder(
55             &strm, UINT64_MAX, LZMA_CONCATENATED);
56         if (ret != LZMA_OK)
57             throw CompressionError("unable to initialise lzma decoder");
58 
59         strm.next_out = outbuf;
60         strm.avail_out = sizeof(outbuf);
61     }
62 
~XzDecompressionSinknix::XzDecompressionSink63     ~XzDecompressionSink()
64     {
65         lzma_end(&strm);
66     }
67 
finishnix::XzDecompressionSink68     void finish() override
69     {
70         CompressionSink::flush();
71         write(nullptr, 0);
72     }
73 
writenix::XzDecompressionSink74     void write(const unsigned char * data, size_t len) override
75     {
76         strm.next_in = data;
77         strm.avail_in = len;
78 
79         while (!finished && (!data || strm.avail_in)) {
80             checkInterrupt();
81 
82             lzma_ret ret = lzma_code(&strm, data ? LZMA_RUN : LZMA_FINISH);
83             if (ret != LZMA_OK && ret != LZMA_STREAM_END)
84                 throw CompressionError("error %d while decompressing xz file", ret);
85 
86             finished = ret == LZMA_STREAM_END;
87 
88             if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
89                 nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
90                 strm.next_out = outbuf;
91                 strm.avail_out = sizeof(outbuf);
92             }
93         }
94     }
95 };
96 
97 struct BzipDecompressionSink : ChunkedCompressionSink
98 {
99     Sink & nextSink;
100     bz_stream strm;
101     bool finished = false;
102 
BzipDecompressionSinknix::BzipDecompressionSink103     BzipDecompressionSink(Sink & nextSink) : nextSink(nextSink)
104     {
105         memset(&strm, 0, sizeof(strm));
106         int ret = BZ2_bzDecompressInit(&strm, 0, 0);
107         if (ret != BZ_OK)
108             throw CompressionError("unable to initialise bzip2 decoder");
109 
110         strm.next_out = (char *) outbuf;
111         strm.avail_out = sizeof(outbuf);
112     }
113 
~BzipDecompressionSinknix::BzipDecompressionSink114     ~BzipDecompressionSink()
115     {
116         BZ2_bzDecompressEnd(&strm);
117     }
118 
finishnix::BzipDecompressionSink119     void finish() override
120     {
121         flush();
122         write(nullptr, 0);
123     }
124 
writeInternalnix::BzipDecompressionSink125     void writeInternal(const unsigned char * data, size_t len) override
126     {
127         assert(len <= std::numeric_limits<decltype(strm.avail_in)>::max());
128 
129         strm.next_in = (char *) data;
130         strm.avail_in = len;
131 
132         while (strm.avail_in) {
133             checkInterrupt();
134 
135             int ret = BZ2_bzDecompress(&strm);
136             if (ret != BZ_OK && ret != BZ_STREAM_END)
137                 throw CompressionError("error while decompressing bzip2 file");
138 
139             finished = ret == BZ_STREAM_END;
140 
141             if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
142                 nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
143                 strm.next_out = (char *) outbuf;
144                 strm.avail_out = sizeof(outbuf);
145             }
146         }
147     }
148 };
149 
150 struct BrotliDecompressionSink : ChunkedCompressionSink
151 {
152     Sink & nextSink;
153     BrotliDecoderState * state;
154     bool finished = false;
155 
BrotliDecompressionSinknix::BrotliDecompressionSink156     BrotliDecompressionSink(Sink & nextSink) : nextSink(nextSink)
157     {
158         state = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
159         if (!state)
160             throw CompressionError("unable to initialize brotli decoder");
161     }
162 
~BrotliDecompressionSinknix::BrotliDecompressionSink163     ~BrotliDecompressionSink()
164     {
165         BrotliDecoderDestroyInstance(state);
166     }
167 
finishnix::BrotliDecompressionSink168     void finish() override
169     {
170         flush();
171         writeInternal(nullptr, 0);
172     }
173 
writeInternalnix::BrotliDecompressionSink174     void writeInternal(const unsigned char * data, size_t len) override
175     {
176         const uint8_t * next_in = data;
177         size_t avail_in = len;
178         uint8_t * next_out = outbuf;
179         size_t avail_out = sizeof(outbuf);
180 
181         while (!finished && (!data || avail_in)) {
182             checkInterrupt();
183 
184             if (!BrotliDecoderDecompressStream(state,
185                     &avail_in, &next_in,
186                     &avail_out, &next_out,
187                     nullptr))
188                 throw CompressionError("error while decompressing brotli file");
189 
190             if (avail_out < sizeof(outbuf) || avail_in == 0) {
191                 nextSink(outbuf, sizeof(outbuf) - avail_out);
192                 next_out = outbuf;
193                 avail_out = sizeof(outbuf);
194             }
195 
196             finished = BrotliDecoderIsFinished(state);
197         }
198     }
199 };
200 
decompress(const std::string & method,const std::string & in)201 ref<std::string> decompress(const std::string & method, const std::string & in)
202 {
203     StringSink ssink;
204     auto sink = makeDecompressionSink(method, ssink);
205     (*sink)(in);
206     sink->finish();
207     return ssink.s;
208 }
209 
makeDecompressionSink(const std::string & method,Sink & nextSink)210 ref<CompressionSink> makeDecompressionSink(const std::string & method, Sink & nextSink)
211 {
212     if (method == "none" || method == "")
213         return make_ref<NoneSink>(nextSink);
214     else if (method == "xz")
215         return make_ref<XzDecompressionSink>(nextSink);
216     else if (method == "bzip2")
217         return make_ref<BzipDecompressionSink>(nextSink);
218     else if (method == "br")
219         return make_ref<BrotliDecompressionSink>(nextSink);
220     else
221         throw UnknownCompressionMethod("unknown compression method '%s'", method);
222 }
223 
224 struct XzCompressionSink : CompressionSink
225 {
226     Sink & nextSink;
227     uint8_t outbuf[BUFSIZ];
228     lzma_stream strm = LZMA_STREAM_INIT;
229     bool finished = false;
230 
XzCompressionSinknix::XzCompressionSink231     XzCompressionSink(Sink & nextSink, bool parallel) : nextSink(nextSink)
232     {
233         lzma_ret ret;
234         bool done = false;
235 
236         if (parallel) {
237 #ifdef HAVE_LZMA_MT
238             lzma_mt mt_options = {};
239             mt_options.flags = 0;
240             mt_options.timeout = 300; // Using the same setting as the xz cmd line
241             mt_options.preset = LZMA_PRESET_DEFAULT;
242             mt_options.filters = NULL;
243             mt_options.check = LZMA_CHECK_CRC64;
244             mt_options.threads = lzma_cputhreads();
245             mt_options.block_size = 0;
246             if (mt_options.threads == 0)
247                 mt_options.threads = 1;
248             // FIXME: maybe use lzma_stream_encoder_mt_memusage() to control the
249             // number of threads.
250             ret = lzma_stream_encoder_mt(&strm, &mt_options);
251             done = true;
252 #else
253             printMsg(lvlError, "warning: parallel XZ compression requested but not supported, falling back to single-threaded compression");
254 #endif
255         }
256 
257         if (!done)
258             ret = lzma_easy_encoder(&strm, 6, LZMA_CHECK_CRC64);
259 
260         if (ret != LZMA_OK)
261             throw CompressionError("unable to initialise lzma encoder");
262 
263         // FIXME: apply the x86 BCJ filter?
264 
265         strm.next_out = outbuf;
266         strm.avail_out = sizeof(outbuf);
267     }
268 
~XzCompressionSinknix::XzCompressionSink269     ~XzCompressionSink()
270     {
271         lzma_end(&strm);
272     }
273 
finishnix::XzCompressionSink274     void finish() override
275     {
276         CompressionSink::flush();
277         write(nullptr, 0);
278     }
279 
writenix::XzCompressionSink280     void write(const unsigned char * data, size_t len) override
281     {
282         strm.next_in = data;
283         strm.avail_in = len;
284 
285         while (!finished && (!data || strm.avail_in)) {
286             checkInterrupt();
287 
288             lzma_ret ret = lzma_code(&strm, data ? LZMA_RUN : LZMA_FINISH);
289             if (ret != LZMA_OK && ret != LZMA_STREAM_END)
290                 throw CompressionError("error %d while compressing xz file", ret);
291 
292             finished = ret == LZMA_STREAM_END;
293 
294             if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
295                 nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
296                 strm.next_out = outbuf;
297                 strm.avail_out = sizeof(outbuf);
298             }
299         }
300     }
301 };
302 
303 struct BzipCompressionSink : ChunkedCompressionSink
304 {
305     Sink & nextSink;
306     bz_stream strm;
307     bool finished = false;
308 
BzipCompressionSinknix::BzipCompressionSink309     BzipCompressionSink(Sink & nextSink) : nextSink(nextSink)
310     {
311         memset(&strm, 0, sizeof(strm));
312         int ret = BZ2_bzCompressInit(&strm, 9, 0, 30);
313         if (ret != BZ_OK)
314             throw CompressionError("unable to initialise bzip2 encoder");
315 
316         strm.next_out = (char *) outbuf;
317         strm.avail_out = sizeof(outbuf);
318     }
319 
~BzipCompressionSinknix::BzipCompressionSink320     ~BzipCompressionSink()
321     {
322         BZ2_bzCompressEnd(&strm);
323     }
324 
finishnix::BzipCompressionSink325     void finish() override
326     {
327         flush();
328         writeInternal(nullptr, 0);
329     }
330 
writeInternalnix::BzipCompressionSink331     void writeInternal(const unsigned char * data, size_t len) override
332     {
333         assert(len <= std::numeric_limits<decltype(strm.avail_in)>::max());
334 
335         strm.next_in = (char *) data;
336         strm.avail_in = len;
337 
338         while (!finished && (!data || strm.avail_in)) {
339             checkInterrupt();
340 
341             int ret = BZ2_bzCompress(&strm, data ? BZ_RUN : BZ_FINISH);
342             if (ret != BZ_RUN_OK && ret != BZ_FINISH_OK && ret != BZ_STREAM_END)
343                 throw CompressionError("error %d while compressing bzip2 file", ret);
344 
345             finished = ret == BZ_STREAM_END;
346 
347             if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
348                 nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
349                 strm.next_out = (char *) outbuf;
350                 strm.avail_out = sizeof(outbuf);
351             }
352         }
353     }
354 };
355 
356 struct BrotliCompressionSink : ChunkedCompressionSink
357 {
358     Sink & nextSink;
359     uint8_t outbuf[BUFSIZ];
360     BrotliEncoderState *state;
361     bool finished = false;
362 
BrotliCompressionSinknix::BrotliCompressionSink363     BrotliCompressionSink(Sink & nextSink) : nextSink(nextSink)
364     {
365         state = BrotliEncoderCreateInstance(nullptr, nullptr, nullptr);
366         if (!state)
367             throw CompressionError("unable to initialise brotli encoder");
368     }
369 
~BrotliCompressionSinknix::BrotliCompressionSink370     ~BrotliCompressionSink()
371     {
372         BrotliEncoderDestroyInstance(state);
373     }
374 
finishnix::BrotliCompressionSink375     void finish() override
376     {
377         flush();
378         writeInternal(nullptr, 0);
379     }
380 
writeInternalnix::BrotliCompressionSink381     void writeInternal(const unsigned char * data, size_t len) override
382     {
383         const uint8_t * next_in = data;
384         size_t avail_in = len;
385         uint8_t * next_out = outbuf;
386         size_t avail_out = sizeof(outbuf);
387 
388         while (!finished && (!data || avail_in)) {
389             checkInterrupt();
390 
391             if (!BrotliEncoderCompressStream(state,
392                     data ? BROTLI_OPERATION_PROCESS : BROTLI_OPERATION_FINISH,
393                     &avail_in, &next_in,
394                     &avail_out, &next_out,
395                     nullptr))
396                 throw CompressionError("error while compressing brotli compression");
397 
398             if (avail_out < sizeof(outbuf) || avail_in == 0) {
399                 nextSink(outbuf, sizeof(outbuf) - avail_out);
400                 next_out = outbuf;
401                 avail_out = sizeof(outbuf);
402             }
403 
404             finished = BrotliEncoderIsFinished(state);
405         }
406     }
407 };
408 
makeCompressionSink(const std::string & method,Sink & nextSink,const bool parallel)409 ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel)
410 {
411     if (method == "none")
412         return make_ref<NoneSink>(nextSink);
413     else if (method == "xz")
414         return make_ref<XzCompressionSink>(nextSink, parallel);
415     else if (method == "bzip2")
416         return make_ref<BzipCompressionSink>(nextSink);
417     else if (method == "br")
418         return make_ref<BrotliCompressionSink>(nextSink);
419     else
420         throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
421 }
422 
compress(const std::string & method,const std::string & in,const bool parallel)423 ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
424 {
425     StringSink ssink;
426     auto sink = makeCompressionSink(method, ssink, parallel);
427     (*sink)(in);
428     sink->finish();
429     return ssink.s;
430 }
431 
432 }
433