1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/util/compression.h"
19 
20 #include <cstdint>
21 #include <cstring>
22 #include <memory>
23 
24 #include <lz4.h>
25 #include <lz4frame.h>
26 
27 #include "arrow/result.h"
28 #include "arrow/status.h"
29 #include "arrow/util/bit_util.h"
30 #include "arrow/util/endian.h"
31 #include "arrow/util/logging.h"
32 #include "arrow/util/macros.h"
33 #include "arrow/util/ubsan.h"
34 
35 #ifndef LZ4F_HEADER_SIZE_MAX
36 #define LZ4F_HEADER_SIZE_MAX 19
37 #endif
38 
39 namespace arrow {
40 namespace util {
41 
42 namespace {
43 
LZ4Error(LZ4F_errorCode_t ret,const char * prefix_msg)44 static Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) {
45   return Status::IOError(prefix_msg, LZ4F_getErrorName(ret));
46 }
47 
DefaultPreferences()48 static LZ4F_preferences_t DefaultPreferences() {
49   LZ4F_preferences_t prefs;
50   memset(&prefs, 0, sizeof(prefs));
51   return prefs;
52 }
53 
54 // ----------------------------------------------------------------------
55 // Lz4 frame decompressor implementation
56 
57 class LZ4Decompressor : public Decompressor {
58  public:
LZ4Decompressor()59   LZ4Decompressor() {}
60 
~LZ4Decompressor()61   ~LZ4Decompressor() override {
62     if (ctx_ != nullptr) {
63       ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_));
64     }
65   }
66 
Init()67   Status Init() {
68     LZ4F_errorCode_t ret;
69     finished_ = false;
70 
71     ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION);
72     if (LZ4F_isError(ret)) {
73       return LZ4Error(ret, "LZ4 init failed: ");
74     } else {
75       return Status::OK();
76     }
77   }
78 
Reset()79   Status Reset() override {
80 #if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800
81     // LZ4F_resetDecompressionContext appeared in 1.8.0
82     DCHECK_NE(ctx_, nullptr);
83     LZ4F_resetDecompressionContext(ctx_);
84     finished_ = false;
85     return Status::OK();
86 #else
87     if (ctx_ != nullptr) {
88       ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_));
89     }
90     return Init();
91 #endif
92   }
93 
Decompress(int64_t input_len,const uint8_t * input,int64_t output_len,uint8_t * output)94   Result<DecompressResult> Decompress(int64_t input_len, const uint8_t* input,
95                                       int64_t output_len, uint8_t* output) override {
96     auto src = input;
97     auto dst = output;
98     auto src_size = static_cast<size_t>(input_len);
99     auto dst_capacity = static_cast<size_t>(output_len);
100     size_t ret;
101 
102     ret =
103         LZ4F_decompress(ctx_, dst, &dst_capacity, src, &src_size, nullptr /* options */);
104     if (LZ4F_isError(ret)) {
105       return LZ4Error(ret, "LZ4 decompress failed: ");
106     }
107     finished_ = (ret == 0);
108     return DecompressResult{static_cast<int64_t>(src_size),
109                             static_cast<int64_t>(dst_capacity),
110                             (src_size == 0 && dst_capacity == 0)};
111   }
112 
IsFinished()113   bool IsFinished() override { return finished_; }
114 
115  protected:
116   LZ4F_decompressionContext_t ctx_ = nullptr;
117   bool finished_;
118 };
119 
120 // ----------------------------------------------------------------------
121 // Lz4 frame compressor implementation
122 
123 class LZ4Compressor : public Compressor {
124  public:
LZ4Compressor()125   LZ4Compressor() {}
126 
~LZ4Compressor()127   ~LZ4Compressor() override {
128     if (ctx_ != nullptr) {
129       ARROW_UNUSED(LZ4F_freeCompressionContext(ctx_));
130     }
131   }
132 
Init()133   Status Init() {
134     LZ4F_errorCode_t ret;
135     prefs_ = DefaultPreferences();
136     first_time_ = true;
137 
138     ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION);
139     if (LZ4F_isError(ret)) {
140       return LZ4Error(ret, "LZ4 init failed: ");
141     } else {
142       return Status::OK();
143     }
144   }
145 
146 #define BEGIN_COMPRESS(dst, dst_capacity, output_too_small)     \
147   if (first_time_) {                                            \
148     if (dst_capacity < LZ4F_HEADER_SIZE_MAX) {                  \
149       /* Output too small to write LZ4F header */               \
150       return (output_too_small);                                \
151     }                                                           \
152     ret = LZ4F_compressBegin(ctx_, dst, dst_capacity, &prefs_); \
153     if (LZ4F_isError(ret)) {                                    \
154       return LZ4Error(ret, "LZ4 compress begin failed: ");      \
155     }                                                           \
156     first_time_ = false;                                        \
157     dst += ret;                                                 \
158     dst_capacity -= ret;                                        \
159     bytes_written += static_cast<int64_t>(ret);                 \
160   }
161 
Compress(int64_t input_len,const uint8_t * input,int64_t output_len,uint8_t * output)162   Result<CompressResult> Compress(int64_t input_len, const uint8_t* input,
163                                   int64_t output_len, uint8_t* output) override {
164     auto src = input;
165     auto dst = output;
166     auto src_size = static_cast<size_t>(input_len);
167     auto dst_capacity = static_cast<size_t>(output_len);
168     size_t ret;
169     int64_t bytes_written = 0;
170 
171     BEGIN_COMPRESS(dst, dst_capacity, (CompressResult{0, 0}));
172 
173     if (dst_capacity < LZ4F_compressBound(src_size, &prefs_)) {
174       // Output too small to compress into
175       return CompressResult{0, bytes_written};
176     }
177     ret = LZ4F_compressUpdate(ctx_, dst, dst_capacity, src, src_size,
178                               nullptr /* options */);
179     if (LZ4F_isError(ret)) {
180       return LZ4Error(ret, "LZ4 compress update failed: ");
181     }
182     bytes_written += static_cast<int64_t>(ret);
183     DCHECK_LE(bytes_written, output_len);
184     return CompressResult{input_len, bytes_written};
185   }
186 
Flush(int64_t output_len,uint8_t * output)187   Result<FlushResult> Flush(int64_t output_len, uint8_t* output) override {
188     auto dst = output;
189     auto dst_capacity = static_cast<size_t>(output_len);
190     size_t ret;
191     int64_t bytes_written = 0;
192 
193     BEGIN_COMPRESS(dst, dst_capacity, (FlushResult{0, true}));
194 
195     if (dst_capacity < LZ4F_compressBound(0, &prefs_)) {
196       // Output too small to flush into
197       return FlushResult{bytes_written, true};
198     }
199 
200     ret = LZ4F_flush(ctx_, dst, dst_capacity, nullptr /* options */);
201     if (LZ4F_isError(ret)) {
202       return LZ4Error(ret, "LZ4 flush failed: ");
203     }
204     bytes_written += static_cast<int64_t>(ret);
205     DCHECK_LE(bytes_written, output_len);
206     return FlushResult{bytes_written, false};
207   }
208 
End(int64_t output_len,uint8_t * output)209   Result<EndResult> End(int64_t output_len, uint8_t* output) override {
210     auto dst = output;
211     auto dst_capacity = static_cast<size_t>(output_len);
212     size_t ret;
213     int64_t bytes_written = 0;
214 
215     BEGIN_COMPRESS(dst, dst_capacity, (EndResult{0, true}));
216 
217     if (dst_capacity < LZ4F_compressBound(0, &prefs_)) {
218       // Output too small to end frame into
219       return EndResult{bytes_written, true};
220     }
221 
222     ret = LZ4F_compressEnd(ctx_, dst, dst_capacity, nullptr /* options */);
223     if (LZ4F_isError(ret)) {
224       return LZ4Error(ret, "LZ4 end failed: ");
225     }
226     bytes_written += static_cast<int64_t>(ret);
227     DCHECK_LE(bytes_written, output_len);
228     return EndResult{bytes_written, false};
229   }
230 
231 #undef BEGIN_COMPRESS
232 
233  protected:
234   LZ4F_compressionContext_t ctx_ = nullptr;
235   LZ4F_preferences_t prefs_;
236   bool first_time_;
237 };
238 
239 // ----------------------------------------------------------------------
240 // Lz4 frame codec implementation
241 
242 class Lz4FrameCodec : public Codec {
243  public:
Lz4FrameCodec()244   Lz4FrameCodec() : prefs_(DefaultPreferences()) {}
245 
MaxCompressedLen(int64_t input_len,const uint8_t * ARROW_ARG_UNUSED (input))246   int64_t MaxCompressedLen(int64_t input_len,
247                            const uint8_t* ARROW_ARG_UNUSED(input)) override {
248     return static_cast<int64_t>(
249         LZ4F_compressFrameBound(static_cast<size_t>(input_len), &prefs_));
250   }
251 
Compress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)252   Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
253                            int64_t output_buffer_len, uint8_t* output_buffer) override {
254     auto output_len =
255         LZ4F_compressFrame(output_buffer, static_cast<size_t>(output_buffer_len), input,
256                            static_cast<size_t>(input_len), &prefs_);
257     if (LZ4F_isError(output_len)) {
258       return LZ4Error(output_len, "Lz4 compression failure: ");
259     }
260     return static_cast<int64_t>(output_len);
261   }
262 
Decompress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)263   Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
264                              int64_t output_buffer_len, uint8_t* output_buffer) override {
265     ARROW_ASSIGN_OR_RAISE(auto decomp, MakeDecompressor());
266 
267     int64_t total_bytes_written = 0;
268     while (!decomp->IsFinished() && input_len != 0) {
269       ARROW_ASSIGN_OR_RAISE(
270           auto res,
271           decomp->Decompress(input_len, input, output_buffer_len, output_buffer));
272       input += res.bytes_read;
273       input_len -= res.bytes_read;
274       output_buffer += res.bytes_written;
275       output_buffer_len -= res.bytes_written;
276       total_bytes_written += res.bytes_written;
277       if (res.need_more_output) {
278         return Status::IOError("Lz4 decompression buffer too small");
279       }
280     }
281     if (!decomp->IsFinished()) {
282       return Status::IOError("Lz4 compressed input contains less than one frame");
283     }
284     if (input_len != 0) {
285       return Status::IOError("Lz4 compressed input contains more than one frame");
286     }
287     return total_bytes_written;
288   }
289 
MakeCompressor()290   Result<std::shared_ptr<Compressor>> MakeCompressor() override {
291     auto ptr = std::make_shared<LZ4Compressor>();
292     RETURN_NOT_OK(ptr->Init());
293     return ptr;
294   }
295 
MakeDecompressor()296   Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
297     auto ptr = std::make_shared<LZ4Decompressor>();
298     RETURN_NOT_OK(ptr->Init());
299     return ptr;
300   }
301 
compression_type() const302   Compression::type compression_type() const override { return Compression::LZ4_FRAME; }
303 
304  protected:
305   const LZ4F_preferences_t prefs_;
306 };
307 
308 // ----------------------------------------------------------------------
309 // Lz4 "raw" codec implementation
310 
311 class Lz4Codec : public Codec {
312  public:
Decompress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)313   Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
314                              int64_t output_buffer_len, uint8_t* output_buffer) override {
315     int64_t decompressed_size = LZ4_decompress_safe(
316         reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer),
317         static_cast<int>(input_len), static_cast<int>(output_buffer_len));
318     if (decompressed_size < 0) {
319       return Status::IOError("Corrupt Lz4 compressed data.");
320     }
321     return decompressed_size;
322   }
323 
MaxCompressedLen(int64_t input_len,const uint8_t * ARROW_ARG_UNUSED (input))324   int64_t MaxCompressedLen(int64_t input_len,
325                            const uint8_t* ARROW_ARG_UNUSED(input)) override {
326     return LZ4_compressBound(static_cast<int>(input_len));
327   }
328 
Compress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)329   Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
330                            int64_t output_buffer_len, uint8_t* output_buffer) override {
331     int64_t output_len = LZ4_compress_default(
332         reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer),
333         static_cast<int>(input_len), static_cast<int>(output_buffer_len));
334     if (output_len == 0) {
335       return Status::IOError("Lz4 compression failure.");
336     }
337     return output_len;
338   }
339 
MakeCompressor()340   Result<std::shared_ptr<Compressor>> MakeCompressor() override {
341     return Status::NotImplemented(
342         "Streaming compression unsupported with LZ4 raw format. "
343         "Try using LZ4 frame format instead.");
344   }
345 
MakeDecompressor()346   Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
347     return Status::NotImplemented(
348         "Streaming decompression unsupported with LZ4 raw format. "
349         "Try using LZ4 frame format instead.");
350   }
351 
compression_type() const352   Compression::type compression_type() const override { return Compression::LZ4; }
353 };
354 
355 // ----------------------------------------------------------------------
356 // Lz4 Hadoop "raw" codec implementation
357 
358 class Lz4HadoopCodec : public Lz4Codec {
359  public:
Decompress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)360   Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
361                              int64_t output_buffer_len, uint8_t* output_buffer) override {
362     const int64_t decompressed_size =
363         TryDecompressHadoop(input_len, input, output_buffer_len, output_buffer);
364     if (decompressed_size != kNotHadoop) {
365       return decompressed_size;
366     }
367     // Fall back on raw LZ4 codec (for files produces by earlier versions of Parquet C++)
368     return Lz4Codec::Decompress(input_len, input, output_buffer_len, output_buffer);
369   }
370 
MaxCompressedLen(int64_t input_len,const uint8_t * ARROW_ARG_UNUSED (input))371   int64_t MaxCompressedLen(int64_t input_len,
372                            const uint8_t* ARROW_ARG_UNUSED(input)) override {
373     return kPrefixLength + Lz4Codec::MaxCompressedLen(input_len, nullptr);
374   }
375 
Compress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)376   Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
377                            int64_t output_buffer_len, uint8_t* output_buffer) override {
378     if (output_buffer_len < kPrefixLength) {
379       return Status::Invalid("Output buffer too small for Lz4HadoopCodec compression");
380     }
381 
382     ARROW_ASSIGN_OR_RAISE(
383         int64_t output_len,
384         Lz4Codec::Compress(input_len, input, output_buffer_len - kPrefixLength,
385                            output_buffer + kPrefixLength));
386 
387     // Prepend decompressed size in bytes and compressed size in bytes
388     // to be compatible with Hadoop Lz4Codec
389     const uint32_t decompressed_size =
390         BitUtil::ToBigEndian(static_cast<uint32_t>(input_len));
391     const uint32_t compressed_size =
392         BitUtil::ToBigEndian(static_cast<uint32_t>(output_len));
393     SafeStore(output_buffer, decompressed_size);
394     SafeStore(output_buffer + sizeof(uint32_t), compressed_size);
395 
396     return kPrefixLength + output_len;
397   }
398 
MakeCompressor()399   Result<std::shared_ptr<Compressor>> MakeCompressor() override {
400     return Status::NotImplemented(
401         "Streaming compression unsupported with LZ4 Hadoop raw format. "
402         "Try using LZ4 frame format instead.");
403   }
404 
MakeDecompressor()405   Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
406     return Status::NotImplemented(
407         "Streaming decompression unsupported with LZ4 Hadoop raw format. "
408         "Try using LZ4 frame format instead.");
409   }
410 
compression_type() const411   Compression::type compression_type() const override { return Compression::LZ4_HADOOP; }
412 
413  protected:
414   // Offset starting at which page data can be read/written
415   static const int64_t kPrefixLength = sizeof(uint32_t) * 2;
416 
417   static const int64_t kNotHadoop = -1;
418 
TryDecompressHadoop(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)419   int64_t TryDecompressHadoop(int64_t input_len, const uint8_t* input,
420                               int64_t output_buffer_len, uint8_t* output_buffer) {
421     // Parquet files written with the Hadoop Lz4Codec use their own framing.
422     // The input buffer can contain an arbitrary number of "frames", each
423     // with the following structure:
424     // - bytes 0..3: big-endian uint32_t representing the frame decompressed size
425     // - bytes 4..7: big-endian uint32_t representing the frame compressed size
426     // - bytes 8...: frame compressed data
427     //
428     // The Hadoop Lz4Codec source code can be found here:
429     // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
430     int64_t total_decompressed_size = 0;
431 
432     while (input_len >= kPrefixLength) {
433       const uint32_t expected_decompressed_size =
434           BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
435       const uint32_t expected_compressed_size =
436           BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));
437       input += kPrefixLength;
438       input_len -= kPrefixLength;
439 
440       if (input_len < expected_compressed_size) {
441         // Not enough bytes for Hadoop "frame"
442         return kNotHadoop;
443       }
444       if (output_buffer_len < expected_decompressed_size) {
445         // Not enough bytes to hold advertised output => probably not Hadoop
446         return kNotHadoop;
447       }
448       // Try decompressing and compare with expected decompressed length
449       auto maybe_decompressed_size = Lz4Codec::Decompress(
450           expected_compressed_size, input, output_buffer_len, output_buffer);
451       if (!maybe_decompressed_size.ok() ||
452           *maybe_decompressed_size != expected_decompressed_size) {
453         return kNotHadoop;
454       }
455       input += expected_compressed_size;
456       input_len -= expected_compressed_size;
457       output_buffer += expected_decompressed_size;
458       output_buffer_len -= expected_decompressed_size;
459       total_decompressed_size += expected_decompressed_size;
460     }
461 
462     if (input_len == 0) {
463       return total_decompressed_size;
464     } else {
465       return kNotHadoop;
466     }
467   }
468 };
469 
470 }  // namespace
471 
472 namespace internal {
473 
MakeLz4FrameCodec()474 std::unique_ptr<Codec> MakeLz4FrameCodec() {
475   return std::unique_ptr<Codec>(new Lz4FrameCodec());
476 }
477 
MakeLz4HadoopRawCodec()478 std::unique_ptr<Codec> MakeLz4HadoopRawCodec() {
479   return std::unique_ptr<Codec>(new Lz4HadoopCodec());
480 }
481 
MakeLz4RawCodec()482 std::unique_ptr<Codec> MakeLz4RawCodec() {
483   return std::unique_ptr<Codec>(new Lz4Codec());
484 }
485 
486 }  // namespace internal
487 
488 }  // namespace util
489 }  // namespace arrow
490