1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <utility>
19 #include <vector>
20
21 #include <gtest/gtest.h>
22
23 #include "arrow/io/buffered.h"
24 #include "arrow/testing/gtest_util.h"
25 #include "arrow/util/bit_util.h"
26 #include "arrow/util/bitmap_builders.h"
27
28 #include "parquet/column_reader.h"
29 #include "parquet/column_writer.h"
30 #include "parquet/file_writer.h"
31 #include "parquet/metadata.h"
32 #include "parquet/platform.h"
33 #include "parquet/properties.h"
34 #include "parquet/statistics.h"
35 #include "parquet/test_util.h"
36 #include "parquet/thrift_internal.h"
37 #include "parquet/types.h"
38
39 namespace BitUtil = arrow::BitUtil;
40
41 namespace parquet {
42
43 using schema::GroupNode;
44 using schema::NodePtr;
45 using schema::PrimitiveNode;
46
47 namespace test {
48
49 // The default size used in most tests.
50 const int SMALL_SIZE = 100;
51 #ifdef PARQUET_VALGRIND
52 // Larger size to test some corner cases, only used in some specific cases.
53 const int LARGE_SIZE = 10000;
54 // Very large size to test dictionary fallback.
55 const int VERY_LARGE_SIZE = 40000;
56 // Reduced dictionary page size to use for testing dictionary fallback with valgrind
57 const int64_t DICTIONARY_PAGE_SIZE = 1024;
58 #else
59 // Larger size to test some corner cases, only used in some specific cases.
60 const int LARGE_SIZE = 100000;
61 // Very large size to test dictionary fallback.
62 const int VERY_LARGE_SIZE = 400000;
63 // Dictionary page size to use for testing dictionary fallback
64 const int64_t DICTIONARY_PAGE_SIZE = 1024 * 1024;
65 #endif
66
67 template <typename TestType>
68 class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
69 public:
SetUp()70 void SetUp() {
71 this->SetupValuesOut(SMALL_SIZE);
72 writer_properties_ = default_writer_properties();
73 definition_levels_out_.resize(SMALL_SIZE);
74 repetition_levels_out_.resize(SMALL_SIZE);
75
76 this->SetUpSchema(Repetition::REQUIRED);
77
78 descr_ = this->schema_.Column(0);
79 }
80
type_num()81 Type::type type_num() { return TestType::type_num; }
82
BuildReader(int64_t num_rows,Compression::type compression=Compression::UNCOMPRESSED)83 void BuildReader(int64_t num_rows,
84 Compression::type compression = Compression::UNCOMPRESSED) {
85 ASSERT_OK_AND_ASSIGN(auto buffer, sink_->Finish());
86 auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
87 std::unique_ptr<PageReader> page_reader =
88 PageReader::Open(std::move(source), num_rows, compression);
89 reader_ = std::static_pointer_cast<TypedColumnReader<TestType>>(
90 ColumnReader::Make(this->descr_, std::move(page_reader)));
91 }
92
BuildWriter(int64_t output_size=SMALL_SIZE,const ColumnProperties & column_properties=ColumnProperties (),const ParquetVersion::type version=ParquetVersion::PARQUET_1_0)93 std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
94 int64_t output_size = SMALL_SIZE,
95 const ColumnProperties& column_properties = ColumnProperties(),
96 const ParquetVersion::type version = ParquetVersion::PARQUET_1_0) {
97 sink_ = CreateOutputStream();
98 WriterProperties::Builder wp_builder;
99 wp_builder.version(version);
100 if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
101 column_properties.encoding() == Encoding::RLE_DICTIONARY) {
102 wp_builder.enable_dictionary();
103 wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
104 } else {
105 wp_builder.disable_dictionary();
106 wp_builder.encoding(column_properties.encoding());
107 }
108 wp_builder.max_statistics_size(column_properties.max_statistics_size());
109 writer_properties_ = wp_builder.build();
110
111 metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_);
112 std::unique_ptr<PageWriter> pager =
113 PageWriter::Open(sink_, column_properties.compression(),
114 Codec::UseDefaultCompressionLevel(), metadata_.get());
115 std::shared_ptr<ColumnWriter> writer =
116 ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get());
117 return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
118 }
119
ReadColumn(Compression::type compression=Compression::UNCOMPRESSED)120 void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) {
121 BuildReader(static_cast<int64_t>(this->values_out_.size()), compression);
122 reader_->ReadBatch(static_cast<int>(this->values_out_.size()),
123 definition_levels_out_.data(), repetition_levels_out_.data(),
124 this->values_out_ptr_, &values_read_);
125 this->SyncValuesOut();
126 }
127
128 void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED);
129
TestRequiredWithEncoding(Encoding::type encoding)130 void TestRequiredWithEncoding(Encoding::type encoding) {
131 return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false);
132 }
133
TestRequiredWithSettings(Encoding::type encoding,Compression::type compression,bool enable_dictionary,bool enable_statistics,int64_t num_rows=SMALL_SIZE,int compression_level=Codec::UseDefaultCompressionLevel ())134 void TestRequiredWithSettings(
135 Encoding::type encoding, Compression::type compression, bool enable_dictionary,
136 bool enable_statistics, int64_t num_rows = SMALL_SIZE,
137 int compression_level = Codec::UseDefaultCompressionLevel()) {
138 this->GenerateData(num_rows);
139
140 this->WriteRequiredWithSettings(encoding, compression, enable_dictionary,
141 enable_statistics, compression_level, num_rows);
142 ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows));
143
144 this->WriteRequiredWithSettingsSpaced(encoding, compression, enable_dictionary,
145 enable_statistics, num_rows, compression_level);
146 ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows));
147 }
148
TestDictionaryFallbackEncoding(ParquetVersion::type version)149 void TestDictionaryFallbackEncoding(ParquetVersion::type version) {
150 this->GenerateData(VERY_LARGE_SIZE);
151 ColumnProperties column_properties;
152 column_properties.set_dictionary_enabled(true);
153
154 if (version == ParquetVersion::PARQUET_1_0) {
155 column_properties.set_encoding(Encoding::PLAIN_DICTIONARY);
156 } else {
157 column_properties.set_encoding(Encoding::RLE_DICTIONARY);
158 }
159
160 auto writer = this->BuildWriter(VERY_LARGE_SIZE, column_properties, version);
161
162 writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
163 writer->Close();
164
165 // Read all rows so we are sure that also the non-dictionary pages are read correctly
166 this->SetupValuesOut(VERY_LARGE_SIZE);
167 this->ReadColumnFully();
168 ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_);
169 this->values_.resize(VERY_LARGE_SIZE);
170 ASSERT_EQ(this->values_, this->values_out_);
171 std::vector<Encoding::type> encodings = this->metadata_encodings();
172
173 if (this->type_num() == Type::BOOLEAN) {
174 // Dictionary encoding is not allowed for boolean type
175 // There are 2 encodings (PLAIN, RLE) in a non dictionary encoding case
176 std::vector<Encoding::type> expected({Encoding::PLAIN, Encoding::RLE});
177 ASSERT_EQ(encodings, expected);
178 } else if (version == ParquetVersion::PARQUET_1_0) {
179 // There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case
180 // for version 1.0
181 std::vector<Encoding::type> expected(
182 {Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN});
183 ASSERT_EQ(encodings, expected);
184 } else {
185 // There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case for
186 // version 2.0
187 std::vector<Encoding::type> expected(
188 {Encoding::RLE_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN});
189 ASSERT_EQ(encodings, expected);
190 }
191
192 std::vector<parquet::PageEncodingStats> encoding_stats =
193 this->metadata_encoding_stats();
194 if (this->type_num() == Type::BOOLEAN) {
195 ASSERT_EQ(encoding_stats[0].encoding, Encoding::PLAIN);
196 ASSERT_EQ(encoding_stats[0].page_type, PageType::DATA_PAGE);
197 } else if (version == ParquetVersion::PARQUET_1_0) {
198 std::vector<Encoding::type> expected(
199 {Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::PLAIN_DICTIONARY});
200 ASSERT_EQ(encoding_stats[0].encoding, expected[0]);
201 ASSERT_EQ(encoding_stats[0].page_type, PageType::DICTIONARY_PAGE);
202 for (size_t i = 1; i < encoding_stats.size(); i++) {
203 ASSERT_EQ(encoding_stats[i].encoding, expected[i]);
204 ASSERT_EQ(encoding_stats[i].page_type, PageType::DATA_PAGE);
205 }
206 } else {
207 std::vector<Encoding::type> expected(
208 {Encoding::PLAIN, Encoding::PLAIN, Encoding::RLE_DICTIONARY});
209 ASSERT_EQ(encoding_stats[0].encoding, expected[0]);
210 ASSERT_EQ(encoding_stats[0].page_type, PageType::DICTIONARY_PAGE);
211 for (size_t i = 1; i < encoding_stats.size(); i++) {
212 ASSERT_EQ(encoding_stats[i].encoding, expected[i]);
213 ASSERT_EQ(encoding_stats[i].page_type, PageType::DATA_PAGE);
214 }
215 }
216 }
217
WriteRequiredWithSettings(Encoding::type encoding,Compression::type compression,bool enable_dictionary,bool enable_statistics,int compression_level,int64_t num_rows)218 void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
219 bool enable_dictionary, bool enable_statistics,
220 int compression_level, int64_t num_rows) {
221 ColumnProperties column_properties(encoding, compression, enable_dictionary,
222 enable_statistics);
223 column_properties.set_compression_level(compression_level);
224 std::shared_ptr<TypedColumnWriter<TestType>> writer =
225 this->BuildWriter(num_rows, column_properties);
226 writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
227 // The behaviour should be independent from the number of Close() calls
228 writer->Close();
229 writer->Close();
230 }
231
WriteRequiredWithSettingsSpaced(Encoding::type encoding,Compression::type compression,bool enable_dictionary,bool enable_statistics,int64_t num_rows,int compression_level)232 void WriteRequiredWithSettingsSpaced(Encoding::type encoding,
233 Compression::type compression,
234 bool enable_dictionary, bool enable_statistics,
235 int64_t num_rows, int compression_level) {
236 std::vector<uint8_t> valid_bits(
237 BitUtil::BytesForBits(static_cast<uint32_t>(this->values_.size())) + 1, 255);
238 ColumnProperties column_properties(encoding, compression, enable_dictionary,
239 enable_statistics);
240 column_properties.set_compression_level(compression_level);
241 std::shared_ptr<TypedColumnWriter<TestType>> writer =
242 this->BuildWriter(num_rows, column_properties);
243 writer->WriteBatchSpaced(this->values_.size(), nullptr, nullptr, valid_bits.data(), 0,
244 this->values_ptr_);
245 // The behaviour should be independent from the number of Close() calls
246 writer->Close();
247 writer->Close();
248 }
249
ReadAndCompare(Compression::type compression,int64_t num_rows)250 void ReadAndCompare(Compression::type compression, int64_t num_rows) {
251 this->SetupValuesOut(num_rows);
252 this->ReadColumnFully(compression);
253 auto comparator = MakeComparator<TestType>(this->descr_);
254 for (size_t i = 0; i < this->values_.size(); i++) {
255 if (comparator->Compare(this->values_[i], this->values_out_[i]) ||
256 comparator->Compare(this->values_out_[i], this->values_[i])) {
257 std::cout << "Failed at " << i << std::endl;
258 }
259 ASSERT_FALSE(comparator->Compare(this->values_[i], this->values_out_[i]));
260 ASSERT_FALSE(comparator->Compare(this->values_out_[i], this->values_[i]));
261 }
262 ASSERT_EQ(this->values_, this->values_out_);
263 }
264
metadata_num_values()265 int64_t metadata_num_values() {
266 // Metadata accessor must be created lazily.
267 // This is because the ColumnChunkMetaData semantics dictate the metadata object is
268 // complete (no changes to the metadata buffer can be made after instantiation)
269 auto metadata_accessor =
270 ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
271 return metadata_accessor->num_values();
272 }
273
metadata_is_stats_set()274 bool metadata_is_stats_set() {
275 // Metadata accessor must be created lazily.
276 // This is because the ColumnChunkMetaData semantics dictate the metadata object is
277 // complete (no changes to the metadata buffer can be made after instantiation)
278 ApplicationVersion app_version(this->writer_properties_->created_by());
279 auto metadata_accessor =
280 ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
281 return metadata_accessor->is_stats_set();
282 }
283
metadata_stats_has_min_max()284 std::pair<bool, bool> metadata_stats_has_min_max() {
285 // Metadata accessor must be created lazily.
286 // This is because the ColumnChunkMetaData semantics dictate the metadata object is
287 // complete (no changes to the metadata buffer can be made after instantiation)
288 ApplicationVersion app_version(this->writer_properties_->created_by());
289 auto metadata_accessor =
290 ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
291 auto encoded_stats = metadata_accessor->statistics()->Encode();
292 return {encoded_stats.has_min, encoded_stats.has_max};
293 }
294
metadata_encodings()295 std::vector<Encoding::type> metadata_encodings() {
296 // Metadata accessor must be created lazily.
297 // This is because the ColumnChunkMetaData semantics dictate the metadata object is
298 // complete (no changes to the metadata buffer can be made after instantiation)
299 auto metadata_accessor =
300 ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
301 return metadata_accessor->encodings();
302 }
303
metadata_encoding_stats()304 std::vector<parquet::PageEncodingStats> metadata_encoding_stats() {
305 // Metadata accessor must be created lazily.
306 // This is because the ColumnChunkMetaData semantics dictate the metadata object is
307 // complete (no changes to the metadata buffer can be made after instantiation)
308 auto metadata_accessor =
309 ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
310 return metadata_accessor->encoding_stats();
311 }
312
313 protected:
314 int64_t values_read_;
315 // Keep the reader alive as for ByteArray the lifetime of the ByteArray
316 // content is bound to the reader.
317 std::shared_ptr<TypedColumnReader<TestType>> reader_;
318
319 std::vector<int16_t> definition_levels_out_;
320 std::vector<int16_t> repetition_levels_out_;
321
322 const ColumnDescriptor* descr_;
323
324 private:
325 std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
326 std::shared_ptr<::arrow::io::BufferOutputStream> sink_;
327 std::shared_ptr<WriterProperties> writer_properties_;
328 std::vector<std::vector<uint8_t>> data_buffer_;
329 };
330
331 template <typename TestType>
ReadColumnFully(Compression::type compression)332 void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression) {
333 int64_t total_values = static_cast<int64_t>(this->values_out_.size());
334 BuildReader(total_values, compression);
335 values_read_ = 0;
336 while (values_read_ < total_values) {
337 int64_t values_read_recently = 0;
338 reader_->ReadBatch(
339 static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
340 definition_levels_out_.data() + values_read_,
341 repetition_levels_out_.data() + values_read_,
342 this->values_out_ptr_ + values_read_, &values_read_recently);
343 values_read_ += values_read_recently;
344 }
345 this->SyncValuesOut();
346 }
347
348 template <>
ReadAndCompare(Compression::type compression,int64_t num_rows)349 void TestPrimitiveWriter<Int96Type>::ReadAndCompare(Compression::type compression,
350 int64_t num_rows) {
351 this->SetupValuesOut(num_rows);
352 this->ReadColumnFully(compression);
353
354 auto comparator = MakeComparator<Int96Type>(Type::INT96, SortOrder::SIGNED);
355 for (size_t i = 0; i < this->values_.size(); i++) {
356 if (comparator->Compare(this->values_[i], this->values_out_[i]) ||
357 comparator->Compare(this->values_out_[i], this->values_[i])) {
358 std::cout << "Failed at " << i << std::endl;
359 }
360 ASSERT_FALSE(comparator->Compare(this->values_[i], this->values_out_[i]));
361 ASSERT_FALSE(comparator->Compare(this->values_out_[i], this->values_[i]));
362 }
363 ASSERT_EQ(this->values_, this->values_out_);
364 }
365
366 template <>
ReadColumnFully(Compression::type compression)367 void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) {
368 int64_t total_values = static_cast<int64_t>(this->values_out_.size());
369 BuildReader(total_values, compression);
370 this->data_buffer_.clear();
371
372 values_read_ = 0;
373 while (values_read_ < total_values) {
374 int64_t values_read_recently = 0;
375 reader_->ReadBatch(
376 static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
377 definition_levels_out_.data() + values_read_,
378 repetition_levels_out_.data() + values_read_,
379 this->values_out_ptr_ + values_read_, &values_read_recently);
380
381 // Copy contents of the pointers
382 std::vector<uint8_t> data(values_read_recently * this->descr_->type_length());
383 uint8_t* data_ptr = data.data();
384 for (int64_t i = 0; i < values_read_recently; i++) {
385 memcpy(data_ptr + this->descr_->type_length() * i,
386 this->values_out_[i + values_read_].ptr, this->descr_->type_length());
387 this->values_out_[i + values_read_].ptr =
388 data_ptr + this->descr_->type_length() * i;
389 }
390 data_buffer_.emplace_back(std::move(data));
391
392 values_read_ += values_read_recently;
393 }
394 this->SyncValuesOut();
395 }
396
397 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
398 BooleanType, ByteArrayType, FLBAType>
399 TestTypes;
400
401 TYPED_TEST_SUITE(TestPrimitiveWriter, TestTypes);
402
403 using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>;
404
TYPED_TEST(TestPrimitiveWriter,RequiredPlain)405 TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
406 this->TestRequiredWithEncoding(Encoding::PLAIN);
407 }
408
TYPED_TEST(TestPrimitiveWriter,RequiredDictionary)409 TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) {
410 this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY);
411 }
412
413 /*
414 TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
415 this->TestRequiredWithEncoding(Encoding::RLE);
416 }
417
418 TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
419 this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
420 }
421
422 TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
423 this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
424 }
425
426 TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
427 this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
428 }
429
430 TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
431 this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
432 }
433
434 TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
435 this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
436 }
437 */
438
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStats)439 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
440 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true,
441 LARGE_SIZE);
442 }
443
444 #ifdef ARROW_WITH_SNAPPY
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithSnappyCompression)445 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) {
446 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false,
447 LARGE_SIZE);
448 }
449
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStatsAndSnappyCompression)450 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) {
451 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true,
452 LARGE_SIZE);
453 }
454 #endif
455
456 #ifdef ARROW_WITH_BROTLI
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithBrotliCompression)457 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) {
458 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false,
459 LARGE_SIZE);
460 }
461
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithBrotliCompressionAndLevel)462 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompressionAndLevel) {
463 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false,
464 LARGE_SIZE, 10);
465 }
466
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStatsAndBrotliCompression)467 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndBrotliCompression) {
468 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, true,
469 LARGE_SIZE);
470 }
471
472 #endif
473
474 #ifdef ARROW_WITH_GZIP
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithGzipCompression)475 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
476 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false,
477 LARGE_SIZE);
478 }
479
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithGzipCompressionAndLevel)480 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompressionAndLevel) {
481 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false,
482 LARGE_SIZE, 10);
483 }
484
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStatsAndGzipCompression)485 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
486 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true,
487 LARGE_SIZE);
488 }
489 #endif
490
491 #ifdef ARROW_WITH_LZ4
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithLz4Compression)492 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) {
493 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false,
494 LARGE_SIZE);
495 }
496
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStatsAndLz4Compression)497 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndLz4Compression) {
498 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, true,
499 LARGE_SIZE);
500 }
501 #endif
502
503 #ifdef ARROW_WITH_ZSTD
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithZstdCompression)504 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompression) {
505 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false,
506 LARGE_SIZE);
507 }
508
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithZstdCompressionAndLevel)509 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompressionAndLevel) {
510 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false,
511 LARGE_SIZE, 6);
512 }
513
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStatsAndZstdCompression)514 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndZstdCompression) {
515 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, true,
516 LARGE_SIZE);
517 }
518 #endif
519
TYPED_TEST(TestPrimitiveWriter,Optional)520 TYPED_TEST(TestPrimitiveWriter, Optional) {
521 // Optional and non-repeated, with definition levels
522 // but no repetition levels
523 this->SetUpSchema(Repetition::OPTIONAL);
524
525 this->GenerateData(SMALL_SIZE);
526 std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
527 definition_levels[1] = 0;
528
529 auto writer = this->BuildWriter();
530 writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr,
531 this->values_ptr_);
532 writer->Close();
533
534 // PARQUET-703
535 ASSERT_EQ(100, this->metadata_num_values());
536
537 this->ReadColumn();
538 ASSERT_EQ(99, this->values_read_);
539 this->values_out_.resize(99);
540 this->values_.resize(99);
541 ASSERT_EQ(this->values_, this->values_out_);
542 }
543
TYPED_TEST(TestPrimitiveWriter,OptionalSpaced)544 TYPED_TEST(TestPrimitiveWriter, OptionalSpaced) {
545 // Optional and non-repeated, with definition levels
546 // but no repetition levels
547 this->SetUpSchema(Repetition::OPTIONAL);
548
549 this->GenerateData(SMALL_SIZE);
550 std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
551 std::vector<uint8_t> valid_bits(::arrow::BitUtil::BytesForBits(SMALL_SIZE), 255);
552
553 definition_levels[SMALL_SIZE - 1] = 0;
554 ::arrow::BitUtil::ClearBit(valid_bits.data(), SMALL_SIZE - 1);
555 definition_levels[1] = 0;
556 ::arrow::BitUtil::ClearBit(valid_bits.data(), 1);
557
558 auto writer = this->BuildWriter();
559 writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr,
560 valid_bits.data(), 0, this->values_ptr_);
561 writer->Close();
562
563 // PARQUET-703
564 ASSERT_EQ(100, this->metadata_num_values());
565
566 this->ReadColumn();
567 ASSERT_EQ(98, this->values_read_);
568 this->values_out_.resize(98);
569 this->values_.resize(99);
570 this->values_.erase(this->values_.begin() + 1);
571 ASSERT_EQ(this->values_, this->values_out_);
572 }
573
TYPED_TEST(TestPrimitiveWriter,Repeated)574 TYPED_TEST(TestPrimitiveWriter, Repeated) {
575 // Optional and repeated, so definition and repetition levels
576 this->SetUpSchema(Repetition::REPEATED);
577
578 this->GenerateData(SMALL_SIZE);
579 std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
580 definition_levels[1] = 0;
581 std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
582
583 auto writer = this->BuildWriter();
584 writer->WriteBatch(this->values_.size(), definition_levels.data(),
585 repetition_levels.data(), this->values_ptr_);
586 writer->Close();
587
588 this->ReadColumn();
589 ASSERT_EQ(SMALL_SIZE - 1, this->values_read_);
590 this->values_out_.resize(SMALL_SIZE - 1);
591 this->values_.resize(SMALL_SIZE - 1);
592 ASSERT_EQ(this->values_, this->values_out_);
593 }
594
TYPED_TEST(TestPrimitiveWriter,RequiredLargeChunk)595 TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
596 this->GenerateData(LARGE_SIZE);
597
598 // Test case 1: required and non-repeated, so no definition or repetition levels
599 auto writer = this->BuildWriter(LARGE_SIZE);
600 writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
601 writer->Close();
602
603 // Just read the first SMALL_SIZE rows to ensure we could read it back in
604 this->ReadColumn();
605 ASSERT_EQ(SMALL_SIZE, this->values_read_);
606 this->values_.resize(SMALL_SIZE);
607 ASSERT_EQ(this->values_, this->values_out_);
608 }
609
610 // Test cases for dictionary fallback encoding
TYPED_TEST(TestPrimitiveWriter,DictionaryFallbackVersion1_0)611 TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion1_0) {
612 this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_1_0);
613 }
614
TYPED_TEST(TestPrimitiveWriter,DictionaryFallbackVersion2_0)615 TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion2_0) {
616 this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_0);
617 }
618
TEST(TestWriter,NullValuesBuffer)619 TEST(TestWriter, NullValuesBuffer) {
620 std::shared_ptr<::arrow::io::BufferOutputStream> sink = CreateOutputStream();
621
622 const auto item_node = schema::PrimitiveNode::Make(
623 "item", Repetition::REQUIRED, LogicalType::Int(32, true), Type::INT32);
624 const auto list_node =
625 schema::GroupNode::Make("list", Repetition::REPEATED, {item_node});
626 const auto column_node = schema::GroupNode::Make(
627 "array_of_ints_column", Repetition::OPTIONAL, {list_node}, LogicalType::List());
628 const auto schema_node =
629 schema::GroupNode::Make("schema", Repetition::REQUIRED, {column_node});
630
631 auto file_writer = ParquetFileWriter::Open(
632 sink, std::dynamic_pointer_cast<schema::GroupNode>(schema_node));
633 auto group_writer = file_writer->AppendRowGroup();
634 auto column_writer = group_writer->NextColumn();
635 auto typed_writer = dynamic_cast<Int32Writer*>(column_writer);
636
637 const int64_t num_values = 1;
638 const int16_t def_levels[] = {0};
639 const int16_t rep_levels[] = {0};
640 const uint8_t valid_bits[] = {0};
641 const int64_t valid_bits_offset = 0;
642 const int32_t* values = nullptr;
643
644 typed_writer->WriteBatchSpaced(num_values, def_levels, rep_levels, valid_bits,
645 valid_bits_offset, values);
646 }
647
648 // PARQUET-719
649 // Test case for NULL values
TEST_F(TestNullValuesWriter,OptionalNullValueChunk)650 TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
651 this->SetUpSchema(Repetition::OPTIONAL);
652
653 this->GenerateData(LARGE_SIZE);
654
655 std::vector<int16_t> definition_levels(LARGE_SIZE, 0);
656 std::vector<int16_t> repetition_levels(LARGE_SIZE, 0);
657
658 auto writer = this->BuildWriter(LARGE_SIZE);
659 // All values being written are NULL
660 writer->WriteBatch(this->values_.size(), definition_levels.data(),
661 repetition_levels.data(), nullptr);
662 writer->Close();
663
664 // Just read the first SMALL_SIZE rows to ensure we could read it back in
665 this->ReadColumn();
666 ASSERT_EQ(0, this->values_read_);
667 }
668
669 // PARQUET-764
670 // Correct bitpacking for boolean write at non-byte boundaries
671 using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>;
TEST_F(TestBooleanValuesWriter,AlternateBooleanValues)672 TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
673 this->SetUpSchema(Repetition::REQUIRED);
674 auto writer = this->BuildWriter();
675 for (int i = 0; i < SMALL_SIZE; i++) {
676 bool value = (i % 2 == 0) ? true : false;
677 writer->WriteBatch(1, nullptr, nullptr, &value);
678 }
679 writer->Close();
680 this->ReadColumn();
681 for (int i = 0; i < SMALL_SIZE; i++) {
682 ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i;
683 }
684 }
685
686 // PARQUET-979
687 // Prevent writing large MIN, MAX stats
688 using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>;
TEST_F(TestByteArrayValuesWriter,OmitStats)689 TEST_F(TestByteArrayValuesWriter, OmitStats) {
690 int min_len = 1024 * 4;
691 int max_len = 1024 * 8;
692 this->SetUpSchema(Repetition::REQUIRED);
693 auto writer = this->BuildWriter();
694
695 values_.resize(SMALL_SIZE);
696 InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
697 writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
698 writer->Close();
699
700 auto has_min_max = this->metadata_stats_has_min_max();
701 ASSERT_FALSE(has_min_max.first);
702 ASSERT_FALSE(has_min_max.second);
703 }
704
705 // PARQUET-1405
706 // Prevent writing large stats in the DataPageHeader
TEST_F(TestByteArrayValuesWriter,OmitDataPageStats)707 TEST_F(TestByteArrayValuesWriter, OmitDataPageStats) {
708 int min_len = static_cast<int>(std::pow(10, 7));
709 int max_len = static_cast<int>(std::pow(10, 7));
710 this->SetUpSchema(Repetition::REQUIRED);
711 ColumnProperties column_properties;
712 column_properties.set_statistics_enabled(false);
713 auto writer = this->BuildWriter(SMALL_SIZE, column_properties);
714
715 values_.resize(1);
716 InitWideByteArrayValues(1, this->values_, this->buffer_, min_len, max_len);
717 writer->WriteBatch(1, nullptr, nullptr, this->values_.data());
718 writer->Close();
719
720 ASSERT_NO_THROW(this->ReadColumn());
721 }
722
TEST_F(TestByteArrayValuesWriter,LimitStats)723 TEST_F(TestByteArrayValuesWriter, LimitStats) {
724 int min_len = 1024 * 4;
725 int max_len = 1024 * 8;
726 this->SetUpSchema(Repetition::REQUIRED);
727 ColumnProperties column_properties;
728 column_properties.set_max_statistics_size(static_cast<size_t>(max_len));
729 auto writer = this->BuildWriter(SMALL_SIZE, column_properties);
730
731 values_.resize(SMALL_SIZE);
732 InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
733 writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
734 writer->Close();
735
736 ASSERT_TRUE(this->metadata_is_stats_set());
737 }
738
TEST_F(TestByteArrayValuesWriter,CheckDefaultStats)739 TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) {
740 this->SetUpSchema(Repetition::REQUIRED);
741 auto writer = this->BuildWriter();
742 this->GenerateData(SMALL_SIZE);
743
744 writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_ptr_);
745 writer->Close();
746
747 ASSERT_TRUE(this->metadata_is_stats_set());
748 }
749
TEST(TestColumnWriter,RepeatedListsUpdateSpacedBug)750 TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) {
751 // In ARROW-3930 we discovered a bug when writing from Arrow when we had data
752 // that looks like this:
753 //
754 // [null, [0, 1, null, 2, 3, 4, null]]
755
756 // Create schema
757 NodePtr item = schema::Int32("item"); // optional item
758 NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST));
759 NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); // optional list
760 std::vector<NodePtr> fields = {bag};
761 NodePtr root = GroupNode::Make("schema", Repetition::REPEATED, fields);
762
763 SchemaDescriptor schema;
764 schema.Init(root);
765
766 auto sink = CreateOutputStream();
767 auto props = WriterProperties::Builder().build();
768
769 auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0));
770 std::unique_ptr<PageWriter> pager =
771 PageWriter::Open(sink, Compression::UNCOMPRESSED,
772 Codec::UseDefaultCompressionLevel(), metadata.get());
773 std::shared_ptr<ColumnWriter> writer =
774 ColumnWriter::Make(metadata.get(), std::move(pager), props.get());
775 auto typed_writer = std::static_pointer_cast<TypedColumnWriter<Int32Type>>(writer);
776
777 std::vector<int16_t> def_levels = {1, 3, 3, 2, 3, 3, 3, 2, 3, 3, 3, 2, 3, 3};
778 std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
779 std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
780
781 // Write the values into uninitialized memory
782 ASSERT_OK_AND_ASSIGN(auto values_buffer, ::arrow::AllocateBuffer(64));
783 memcpy(values_buffer->mutable_data(), values.data(), 13 * sizeof(int32_t));
784 auto values_data = reinterpret_cast<const int32_t*>(values_buffer->data());
785
786 std::shared_ptr<Buffer> valid_bits;
787 ASSERT_OK_AND_ASSIGN(valid_bits, ::arrow::internal::BytesToBits(
788 {1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1}));
789
790 // valgrind will warn about out of bounds access into def_levels_data
791 typed_writer->WriteBatchSpaced(14, def_levels.data(), rep_levels.data(),
792 valid_bits->data(), 0, values_data);
793 writer->Close();
794 }
795
GenerateLevels(int min_repeat_factor,int max_repeat_factor,int max_level,std::vector<int16_t> & input_levels)796 void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
797 std::vector<int16_t>& input_levels) {
798 // for each repetition count up to max_repeat_factor
799 for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
800 // repeat count increases by a factor of 2 for every iteration
801 int repeat_count = (1 << repeat);
802 // generate levels for repetition count up to the maximum level
803 int16_t value = 0;
804 int bwidth = 0;
805 while (value <= max_level) {
806 for (int i = 0; i < repeat_count; i++) {
807 input_levels.push_back(value);
808 }
809 value = static_cast<int16_t>((2 << bwidth) - 1);
810 bwidth++;
811 }
812 }
813 }
814
EncodeLevels(Encoding::type encoding,int16_t max_level,int num_levels,const int16_t * input_levels,std::vector<uint8_t> & bytes)815 void EncodeLevels(Encoding::type encoding, int16_t max_level, int num_levels,
816 const int16_t* input_levels, std::vector<uint8_t>& bytes) {
817 LevelEncoder encoder;
818 int levels_count = 0;
819 bytes.resize(2 * num_levels);
820 ASSERT_EQ(2 * num_levels, static_cast<int>(bytes.size()));
821 // encode levels
822 if (encoding == Encoding::RLE) {
823 // leave space to write the rle length value
824 encoder.Init(encoding, max_level, num_levels, bytes.data() + sizeof(int32_t),
825 static_cast<int>(bytes.size()));
826
827 levels_count = encoder.Encode(num_levels, input_levels);
828 (reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len();
829 } else {
830 encoder.Init(encoding, max_level, num_levels, bytes.data(),
831 static_cast<int>(bytes.size()));
832 levels_count = encoder.Encode(num_levels, input_levels);
833 }
834 ASSERT_EQ(num_levels, levels_count);
835 }
836
VerifyDecodingLevels(Encoding::type encoding,int16_t max_level,std::vector<int16_t> & input_levels,std::vector<uint8_t> & bytes)837 void VerifyDecodingLevels(Encoding::type encoding, int16_t max_level,
838 std::vector<int16_t>& input_levels,
839 std::vector<uint8_t>& bytes) {
840 LevelDecoder decoder;
841 int levels_count = 0;
842 std::vector<int16_t> output_levels;
843 int num_levels = static_cast<int>(input_levels.size());
844
845 output_levels.resize(num_levels);
846 ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
847
848 // Decode levels and test with multiple decode calls
849 decoder.SetData(encoding, max_level, num_levels, bytes.data(),
850 static_cast<int32_t>(bytes.size()));
851 int decode_count = 4;
852 int num_inner_levels = num_levels / decode_count;
853 // Try multiple decoding on a single SetData call
854 for (int ct = 0; ct < decode_count; ct++) {
855 int offset = ct * num_inner_levels;
856 levels_count = decoder.Decode(num_inner_levels, output_levels.data());
857 ASSERT_EQ(num_inner_levels, levels_count);
858 for (int i = 0; i < num_inner_levels; i++) {
859 EXPECT_EQ(input_levels[i + offset], output_levels[i]);
860 }
861 }
862 // check the remaining levels
863 int num_levels_completed = decode_count * (num_levels / decode_count);
864 int num_remaining_levels = num_levels - num_levels_completed;
865 if (num_remaining_levels > 0) {
866 levels_count = decoder.Decode(num_remaining_levels, output_levels.data());
867 ASSERT_EQ(num_remaining_levels, levels_count);
868 for (int i = 0; i < num_remaining_levels; i++) {
869 EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]);
870 }
871 }
872 // Test zero Decode values
873 ASSERT_EQ(0, decoder.Decode(1, output_levels.data()));
874 }
875
VerifyDecodingMultipleSetData(Encoding::type encoding,int16_t max_level,std::vector<int16_t> & input_levels,std::vector<std::vector<uint8_t>> & bytes)876 void VerifyDecodingMultipleSetData(Encoding::type encoding, int16_t max_level,
877 std::vector<int16_t>& input_levels,
878 std::vector<std::vector<uint8_t>>& bytes) {
879 LevelDecoder decoder;
880 int levels_count = 0;
881 std::vector<int16_t> output_levels;
882
883 // Decode levels and test with multiple SetData calls
884 int setdata_count = static_cast<int>(bytes.size());
885 int num_levels = static_cast<int>(input_levels.size()) / setdata_count;
886 output_levels.resize(num_levels);
887 // Try multiple SetData
888 for (int ct = 0; ct < setdata_count; ct++) {
889 int offset = ct * num_levels;
890 ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
891 decoder.SetData(encoding, max_level, num_levels, bytes[ct].data(),
892 static_cast<int32_t>(bytes[ct].size()));
893 levels_count = decoder.Decode(num_levels, output_levels.data());
894 ASSERT_EQ(num_levels, levels_count);
895 for (int i = 0; i < num_levels; i++) {
896 EXPECT_EQ(input_levels[i + offset], output_levels[i]);
897 }
898 }
899 }
900
901 // Test levels with maximum bit-width from 1 to 8
902 // increase the repetition count for each iteration by a factor of 2
TEST(TestLevels,TestLevelsDecodeMultipleBitWidth)903 TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) {
904 int min_repeat_factor = 0;
905 int max_repeat_factor = 7; // 128
906 int max_bit_width = 8;
907 std::vector<int16_t> input_levels;
908 std::vector<uint8_t> bytes;
909 Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
910
911 // for each encoding
912 for (int encode = 0; encode < 2; encode++) {
913 Encoding::type encoding = encodings[encode];
914 // BIT_PACKED requires a sequence of at least 8
915 if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3;
916 // for each maximum bit-width
917 for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
918 // find the maximum level for the current bit_width
919 int16_t max_level = static_cast<int16_t>((1 << bit_width) - 1);
920 // Generate levels
921 GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
922 ASSERT_NO_FATAL_FAILURE(EncodeLevels(encoding, max_level,
923 static_cast<int>(input_levels.size()),
924 input_levels.data(), bytes));
925 ASSERT_NO_FATAL_FAILURE(
926 VerifyDecodingLevels(encoding, max_level, input_levels, bytes));
927 input_levels.clear();
928 }
929 }
930 }
931
932 // Test multiple decoder SetData calls
TEST(TestLevels,TestLevelsDecodeMultipleSetData)933 TEST(TestLevels, TestLevelsDecodeMultipleSetData) {
934 int min_repeat_factor = 3;
935 int max_repeat_factor = 7; // 128
936 int bit_width = 8;
937 int16_t max_level = static_cast<int16_t>((1 << bit_width) - 1);
938 std::vector<int16_t> input_levels;
939 std::vector<std::vector<uint8_t>> bytes;
940 Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
941 GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
942 int num_levels = static_cast<int>(input_levels.size());
943 int setdata_factor = 8;
944 int split_level_size = num_levels / setdata_factor;
945 bytes.resize(setdata_factor);
946
947 // for each encoding
948 for (int encode = 0; encode < 2; encode++) {
949 Encoding::type encoding = encodings[encode];
950 for (int rf = 0; rf < setdata_factor; rf++) {
951 int offset = rf * split_level_size;
952 ASSERT_NO_FATAL_FAILURE(EncodeLevels(
953 encoding, max_level, split_level_size,
954 reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf]));
955 }
956 ASSERT_NO_FATAL_FAILURE(
957 VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes));
958 }
959 }
960
TEST(TestLevelEncoder,MinimumBufferSize)961 TEST(TestLevelEncoder, MinimumBufferSize) {
962 // PARQUET-676, PARQUET-698
963 const int kNumToEncode = 1024;
964
965 std::vector<int16_t> levels;
966 for (int i = 0; i < kNumToEncode; ++i) {
967 if (i % 9 == 0) {
968 levels.push_back(0);
969 } else {
970 levels.push_back(1);
971 }
972 }
973
974 std::vector<uint8_t> output(
975 LevelEncoder::MaxBufferSize(Encoding::RLE, 1, kNumToEncode));
976
977 LevelEncoder encoder;
978 encoder.Init(Encoding::RLE, 1, kNumToEncode, output.data(),
979 static_cast<int>(output.size()));
980 int encode_count = encoder.Encode(kNumToEncode, levels.data());
981
982 ASSERT_EQ(kNumToEncode, encode_count);
983 }
984
TEST(TestLevelEncoder,MinimumBufferSize2)985 TEST(TestLevelEncoder, MinimumBufferSize2) {
986 // PARQUET-708
987 // Test the worst case for bit_width=2 consisting of
988 // LiteralRun(size=8)
989 // RepeatedRun(size=8)
990 // LiteralRun(size=8)
991 // ...
992 const int kNumToEncode = 1024;
993
994 std::vector<int16_t> levels;
995 for (int i = 0; i < kNumToEncode; ++i) {
996 // This forces a literal run of 00000001
997 // followed by eight 1s
998 if ((i % 16) < 7) {
999 levels.push_back(0);
1000 } else {
1001 levels.push_back(1);
1002 }
1003 }
1004
1005 for (int16_t bit_width = 1; bit_width <= 8; bit_width++) {
1006 std::vector<uint8_t> output(
1007 LevelEncoder::MaxBufferSize(Encoding::RLE, bit_width, kNumToEncode));
1008
1009 LevelEncoder encoder;
1010 encoder.Init(Encoding::RLE, bit_width, kNumToEncode, output.data(),
1011 static_cast<int>(output.size()));
1012 int encode_count = encoder.Encode(kNumToEncode, levels.data());
1013
1014 ASSERT_EQ(kNumToEncode, encode_count);
1015 }
1016 }
1017
1018 } // namespace test
1019 } // namespace parquet
1020