1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/util/compression.h"
19
20 #include <cstdint>
21 #include <cstring>
22 #include <memory>
23
24 #include <lz4.h>
25 #include <lz4frame.h>
26
27 #include "arrow/result.h"
28 #include "arrow/status.h"
29 #include "arrow/util/bit_util.h"
30 #include "arrow/util/endian.h"
31 #include "arrow/util/logging.h"
32 #include "arrow/util/macros.h"
33 #include "arrow/util/ubsan.h"
34
35 #ifndef LZ4F_HEADER_SIZE_MAX
36 #define LZ4F_HEADER_SIZE_MAX 19
37 #endif
38
39 namespace arrow {
40 namespace util {
41
42 namespace {
43
LZ4Error(LZ4F_errorCode_t ret,const char * prefix_msg)44 static Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) {
45 return Status::IOError(prefix_msg, LZ4F_getErrorName(ret));
46 }
47
DefaultPreferences()48 static LZ4F_preferences_t DefaultPreferences() {
49 LZ4F_preferences_t prefs;
50 memset(&prefs, 0, sizeof(prefs));
51 return prefs;
52 }
53
54 // ----------------------------------------------------------------------
55 // Lz4 frame decompressor implementation
56
57 class LZ4Decompressor : public Decompressor {
58 public:
LZ4Decompressor()59 LZ4Decompressor() {}
60
~LZ4Decompressor()61 ~LZ4Decompressor() override {
62 if (ctx_ != nullptr) {
63 ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_));
64 }
65 }
66
Init()67 Status Init() {
68 LZ4F_errorCode_t ret;
69 finished_ = false;
70
71 ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION);
72 if (LZ4F_isError(ret)) {
73 return LZ4Error(ret, "LZ4 init failed: ");
74 } else {
75 return Status::OK();
76 }
77 }
78
Reset()79 Status Reset() override {
80 #if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800
81 // LZ4F_resetDecompressionContext appeared in 1.8.0
82 DCHECK_NE(ctx_, nullptr);
83 LZ4F_resetDecompressionContext(ctx_);
84 finished_ = false;
85 return Status::OK();
86 #else
87 if (ctx_ != nullptr) {
88 ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_));
89 }
90 return Init();
91 #endif
92 }
93
Decompress(int64_t input_len,const uint8_t * input,int64_t output_len,uint8_t * output)94 Result<DecompressResult> Decompress(int64_t input_len, const uint8_t* input,
95 int64_t output_len, uint8_t* output) override {
96 auto src = input;
97 auto dst = output;
98 auto src_size = static_cast<size_t>(input_len);
99 auto dst_capacity = static_cast<size_t>(output_len);
100 size_t ret;
101
102 ret =
103 LZ4F_decompress(ctx_, dst, &dst_capacity, src, &src_size, nullptr /* options */);
104 if (LZ4F_isError(ret)) {
105 return LZ4Error(ret, "LZ4 decompress failed: ");
106 }
107 finished_ = (ret == 0);
108 return DecompressResult{static_cast<int64_t>(src_size),
109 static_cast<int64_t>(dst_capacity),
110 (src_size == 0 && dst_capacity == 0)};
111 }
112
IsFinished()113 bool IsFinished() override { return finished_; }
114
115 protected:
116 LZ4F_decompressionContext_t ctx_ = nullptr;
117 bool finished_;
118 };
119
120 // ----------------------------------------------------------------------
121 // Lz4 frame compressor implementation
122
123 class LZ4Compressor : public Compressor {
124 public:
LZ4Compressor()125 LZ4Compressor() {}
126
~LZ4Compressor()127 ~LZ4Compressor() override {
128 if (ctx_ != nullptr) {
129 ARROW_UNUSED(LZ4F_freeCompressionContext(ctx_));
130 }
131 }
132
Init()133 Status Init() {
134 LZ4F_errorCode_t ret;
135 prefs_ = DefaultPreferences();
136 first_time_ = true;
137
138 ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION);
139 if (LZ4F_isError(ret)) {
140 return LZ4Error(ret, "LZ4 init failed: ");
141 } else {
142 return Status::OK();
143 }
144 }
145
146 #define BEGIN_COMPRESS(dst, dst_capacity, output_too_small) \
147 if (first_time_) { \
148 if (dst_capacity < LZ4F_HEADER_SIZE_MAX) { \
149 /* Output too small to write LZ4F header */ \
150 return (output_too_small); \
151 } \
152 ret = LZ4F_compressBegin(ctx_, dst, dst_capacity, &prefs_); \
153 if (LZ4F_isError(ret)) { \
154 return LZ4Error(ret, "LZ4 compress begin failed: "); \
155 } \
156 first_time_ = false; \
157 dst += ret; \
158 dst_capacity -= ret; \
159 bytes_written += static_cast<int64_t>(ret); \
160 }
161
Compress(int64_t input_len,const uint8_t * input,int64_t output_len,uint8_t * output)162 Result<CompressResult> Compress(int64_t input_len, const uint8_t* input,
163 int64_t output_len, uint8_t* output) override {
164 auto src = input;
165 auto dst = output;
166 auto src_size = static_cast<size_t>(input_len);
167 auto dst_capacity = static_cast<size_t>(output_len);
168 size_t ret;
169 int64_t bytes_written = 0;
170
171 BEGIN_COMPRESS(dst, dst_capacity, (CompressResult{0, 0}));
172
173 if (dst_capacity < LZ4F_compressBound(src_size, &prefs_)) {
174 // Output too small to compress into
175 return CompressResult{0, bytes_written};
176 }
177 ret = LZ4F_compressUpdate(ctx_, dst, dst_capacity, src, src_size,
178 nullptr /* options */);
179 if (LZ4F_isError(ret)) {
180 return LZ4Error(ret, "LZ4 compress update failed: ");
181 }
182 bytes_written += static_cast<int64_t>(ret);
183 DCHECK_LE(bytes_written, output_len);
184 return CompressResult{input_len, bytes_written};
185 }
186
Flush(int64_t output_len,uint8_t * output)187 Result<FlushResult> Flush(int64_t output_len, uint8_t* output) override {
188 auto dst = output;
189 auto dst_capacity = static_cast<size_t>(output_len);
190 size_t ret;
191 int64_t bytes_written = 0;
192
193 BEGIN_COMPRESS(dst, dst_capacity, (FlushResult{0, true}));
194
195 if (dst_capacity < LZ4F_compressBound(0, &prefs_)) {
196 // Output too small to flush into
197 return FlushResult{bytes_written, true};
198 }
199
200 ret = LZ4F_flush(ctx_, dst, dst_capacity, nullptr /* options */);
201 if (LZ4F_isError(ret)) {
202 return LZ4Error(ret, "LZ4 flush failed: ");
203 }
204 bytes_written += static_cast<int64_t>(ret);
205 DCHECK_LE(bytes_written, output_len);
206 return FlushResult{bytes_written, false};
207 }
208
End(int64_t output_len,uint8_t * output)209 Result<EndResult> End(int64_t output_len, uint8_t* output) override {
210 auto dst = output;
211 auto dst_capacity = static_cast<size_t>(output_len);
212 size_t ret;
213 int64_t bytes_written = 0;
214
215 BEGIN_COMPRESS(dst, dst_capacity, (EndResult{0, true}));
216
217 if (dst_capacity < LZ4F_compressBound(0, &prefs_)) {
218 // Output too small to end frame into
219 return EndResult{bytes_written, true};
220 }
221
222 ret = LZ4F_compressEnd(ctx_, dst, dst_capacity, nullptr /* options */);
223 if (LZ4F_isError(ret)) {
224 return LZ4Error(ret, "LZ4 end failed: ");
225 }
226 bytes_written += static_cast<int64_t>(ret);
227 DCHECK_LE(bytes_written, output_len);
228 return EndResult{bytes_written, false};
229 }
230
231 #undef BEGIN_COMPRESS
232
233 protected:
234 LZ4F_compressionContext_t ctx_ = nullptr;
235 LZ4F_preferences_t prefs_;
236 bool first_time_;
237 };
238
239 // ----------------------------------------------------------------------
240 // Lz4 frame codec implementation
241
242 class Lz4FrameCodec : public Codec {
243 public:
Lz4FrameCodec()244 Lz4FrameCodec() : prefs_(DefaultPreferences()) {}
245
MaxCompressedLen(int64_t input_len,const uint8_t * ARROW_ARG_UNUSED (input))246 int64_t MaxCompressedLen(int64_t input_len,
247 const uint8_t* ARROW_ARG_UNUSED(input)) override {
248 return static_cast<int64_t>(
249 LZ4F_compressFrameBound(static_cast<size_t>(input_len), &prefs_));
250 }
251
Compress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)252 Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
253 int64_t output_buffer_len, uint8_t* output_buffer) override {
254 auto output_len =
255 LZ4F_compressFrame(output_buffer, static_cast<size_t>(output_buffer_len), input,
256 static_cast<size_t>(input_len), &prefs_);
257 if (LZ4F_isError(output_len)) {
258 return LZ4Error(output_len, "Lz4 compression failure: ");
259 }
260 return static_cast<int64_t>(output_len);
261 }
262
Decompress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)263 Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
264 int64_t output_buffer_len, uint8_t* output_buffer) override {
265 ARROW_ASSIGN_OR_RAISE(auto decomp, MakeDecompressor());
266
267 int64_t total_bytes_written = 0;
268 while (!decomp->IsFinished() && input_len != 0) {
269 ARROW_ASSIGN_OR_RAISE(
270 auto res,
271 decomp->Decompress(input_len, input, output_buffer_len, output_buffer));
272 input += res.bytes_read;
273 input_len -= res.bytes_read;
274 output_buffer += res.bytes_written;
275 output_buffer_len -= res.bytes_written;
276 total_bytes_written += res.bytes_written;
277 if (res.need_more_output) {
278 return Status::IOError("Lz4 decompression buffer too small");
279 }
280 }
281 if (!decomp->IsFinished()) {
282 return Status::IOError("Lz4 compressed input contains less than one frame");
283 }
284 if (input_len != 0) {
285 return Status::IOError("Lz4 compressed input contains more than one frame");
286 }
287 return total_bytes_written;
288 }
289
MakeCompressor()290 Result<std::shared_ptr<Compressor>> MakeCompressor() override {
291 auto ptr = std::make_shared<LZ4Compressor>();
292 RETURN_NOT_OK(ptr->Init());
293 return ptr;
294 }
295
MakeDecompressor()296 Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
297 auto ptr = std::make_shared<LZ4Decompressor>();
298 RETURN_NOT_OK(ptr->Init());
299 return ptr;
300 }
301
compression_type() const302 Compression::type compression_type() const override { return Compression::LZ4_FRAME; }
303
304 protected:
305 const LZ4F_preferences_t prefs_;
306 };
307
308 // ----------------------------------------------------------------------
309 // Lz4 "raw" codec implementation
310
311 class Lz4Codec : public Codec {
312 public:
Decompress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)313 Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
314 int64_t output_buffer_len, uint8_t* output_buffer) override {
315 int64_t decompressed_size = LZ4_decompress_safe(
316 reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer),
317 static_cast<int>(input_len), static_cast<int>(output_buffer_len));
318 if (decompressed_size < 0) {
319 return Status::IOError("Corrupt Lz4 compressed data.");
320 }
321 return decompressed_size;
322 }
323
MaxCompressedLen(int64_t input_len,const uint8_t * ARROW_ARG_UNUSED (input))324 int64_t MaxCompressedLen(int64_t input_len,
325 const uint8_t* ARROW_ARG_UNUSED(input)) override {
326 return LZ4_compressBound(static_cast<int>(input_len));
327 }
328
Compress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)329 Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
330 int64_t output_buffer_len, uint8_t* output_buffer) override {
331 int64_t output_len = LZ4_compress_default(
332 reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer),
333 static_cast<int>(input_len), static_cast<int>(output_buffer_len));
334 if (output_len == 0) {
335 return Status::IOError("Lz4 compression failure.");
336 }
337 return output_len;
338 }
339
MakeCompressor()340 Result<std::shared_ptr<Compressor>> MakeCompressor() override {
341 return Status::NotImplemented(
342 "Streaming compression unsupported with LZ4 raw format. "
343 "Try using LZ4 frame format instead.");
344 }
345
MakeDecompressor()346 Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
347 return Status::NotImplemented(
348 "Streaming decompression unsupported with LZ4 raw format. "
349 "Try using LZ4 frame format instead.");
350 }
351
compression_type() const352 Compression::type compression_type() const override { return Compression::LZ4; }
353 };
354
355 // ----------------------------------------------------------------------
356 // Lz4 Hadoop "raw" codec implementation
357
358 class Lz4HadoopCodec : public Lz4Codec {
359 public:
Decompress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)360 Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
361 int64_t output_buffer_len, uint8_t* output_buffer) override {
362 const int64_t decompressed_size =
363 TryDecompressHadoop(input_len, input, output_buffer_len, output_buffer);
364 if (decompressed_size != kNotHadoop) {
365 return decompressed_size;
366 }
367 // Fall back on raw LZ4 codec (for files produces by earlier versions of Parquet C++)
368 return Lz4Codec::Decompress(input_len, input, output_buffer_len, output_buffer);
369 }
370
MaxCompressedLen(int64_t input_len,const uint8_t * ARROW_ARG_UNUSED (input))371 int64_t MaxCompressedLen(int64_t input_len,
372 const uint8_t* ARROW_ARG_UNUSED(input)) override {
373 return kPrefixLength + Lz4Codec::MaxCompressedLen(input_len, nullptr);
374 }
375
Compress(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)376 Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
377 int64_t output_buffer_len, uint8_t* output_buffer) override {
378 if (output_buffer_len < kPrefixLength) {
379 return Status::Invalid("Output buffer too small for Lz4HadoopCodec compression");
380 }
381
382 ARROW_ASSIGN_OR_RAISE(
383 int64_t output_len,
384 Lz4Codec::Compress(input_len, input, output_buffer_len - kPrefixLength,
385 output_buffer + kPrefixLength));
386
387 // Prepend decompressed size in bytes and compressed size in bytes
388 // to be compatible with Hadoop Lz4Codec
389 const uint32_t decompressed_size =
390 BitUtil::ToBigEndian(static_cast<uint32_t>(input_len));
391 const uint32_t compressed_size =
392 BitUtil::ToBigEndian(static_cast<uint32_t>(output_len));
393 SafeStore(output_buffer, decompressed_size);
394 SafeStore(output_buffer + sizeof(uint32_t), compressed_size);
395
396 return kPrefixLength + output_len;
397 }
398
MakeCompressor()399 Result<std::shared_ptr<Compressor>> MakeCompressor() override {
400 return Status::NotImplemented(
401 "Streaming compression unsupported with LZ4 Hadoop raw format. "
402 "Try using LZ4 frame format instead.");
403 }
404
MakeDecompressor()405 Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
406 return Status::NotImplemented(
407 "Streaming decompression unsupported with LZ4 Hadoop raw format. "
408 "Try using LZ4 frame format instead.");
409 }
410
compression_type() const411 Compression::type compression_type() const override { return Compression::LZ4_HADOOP; }
412
413 protected:
414 // Offset starting at which page data can be read/written
415 static const int64_t kPrefixLength = sizeof(uint32_t) * 2;
416
417 static const int64_t kNotHadoop = -1;
418
TryDecompressHadoop(int64_t input_len,const uint8_t * input,int64_t output_buffer_len,uint8_t * output_buffer)419 int64_t TryDecompressHadoop(int64_t input_len, const uint8_t* input,
420 int64_t output_buffer_len, uint8_t* output_buffer) {
421 // Parquet files written with the Hadoop Lz4Codec use their own framing.
422 // The input buffer can contain an arbitrary number of "frames", each
423 // with the following structure:
424 // - bytes 0..3: big-endian uint32_t representing the frame decompressed size
425 // - bytes 4..7: big-endian uint32_t representing the frame compressed size
426 // - bytes 8...: frame compressed data
427 //
428 // The Hadoop Lz4Codec source code can be found here:
429 // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
430 int64_t total_decompressed_size = 0;
431
432 while (input_len >= kPrefixLength) {
433 const uint32_t expected_decompressed_size =
434 BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
435 const uint32_t expected_compressed_size =
436 BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));
437 input += kPrefixLength;
438 input_len -= kPrefixLength;
439
440 if (input_len < expected_compressed_size) {
441 // Not enough bytes for Hadoop "frame"
442 return kNotHadoop;
443 }
444 if (output_buffer_len < expected_decompressed_size) {
445 // Not enough bytes to hold advertised output => probably not Hadoop
446 return kNotHadoop;
447 }
448 // Try decompressing and compare with expected decompressed length
449 auto maybe_decompressed_size = Lz4Codec::Decompress(
450 expected_compressed_size, input, output_buffer_len, output_buffer);
451 if (!maybe_decompressed_size.ok() ||
452 *maybe_decompressed_size != expected_decompressed_size) {
453 return kNotHadoop;
454 }
455 input += expected_compressed_size;
456 input_len -= expected_compressed_size;
457 output_buffer += expected_decompressed_size;
458 output_buffer_len -= expected_decompressed_size;
459 total_decompressed_size += expected_decompressed_size;
460 }
461
462 if (input_len == 0) {
463 return total_decompressed_size;
464 } else {
465 return kNotHadoop;
466 }
467 }
468 };
469
470 } // namespace
471
472 namespace internal {
473
MakeLz4FrameCodec()474 std::unique_ptr<Codec> MakeLz4FrameCodec() {
475 return std::unique_ptr<Codec>(new Lz4FrameCodec());
476 }
477
MakeLz4HadoopRawCodec()478 std::unique_ptr<Codec> MakeLz4HadoopRawCodec() {
479 return std::unique_ptr<Codec>(new Lz4HadoopCodec());
480 }
481
MakeLz4RawCodec()482 std::unique_ptr<Codec> MakeLz4RawCodec() {
483 return std::unique_ptr<Codec>(new Lz4Codec());
484 }
485
486 } // namespace internal
487
488 } // namespace util
489 } // namespace arrow
490