1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <utility>
19 #include <vector>
20 
21 #include <gtest/gtest.h>
22 
23 #include "arrow/io/buffered.h"
24 #include "arrow/testing/gtest_util.h"
25 #include "arrow/util/bit_util.h"
26 #include "arrow/util/bitmap_builders.h"
27 
28 #include "parquet/column_reader.h"
29 #include "parquet/column_writer.h"
30 #include "parquet/file_writer.h"
31 #include "parquet/metadata.h"
32 #include "parquet/platform.h"
33 #include "parquet/properties.h"
34 #include "parquet/statistics.h"
35 #include "parquet/test_util.h"
36 #include "parquet/thrift_internal.h"
37 #include "parquet/types.h"
38 
39 namespace BitUtil = arrow::BitUtil;
40 
41 namespace parquet {
42 
43 using schema::GroupNode;
44 using schema::NodePtr;
45 using schema::PrimitiveNode;
46 
47 namespace test {
48 
49 // The default size used in most tests.
50 const int SMALL_SIZE = 100;
51 #ifdef PARQUET_VALGRIND
52 // Larger size to test some corner cases, only used in some specific cases.
53 const int LARGE_SIZE = 10000;
54 // Very large size to test dictionary fallback.
55 const int VERY_LARGE_SIZE = 40000;
56 // Reduced dictionary page size to use for testing dictionary fallback with valgrind
57 const int64_t DICTIONARY_PAGE_SIZE = 1024;
58 #else
59 // Larger size to test some corner cases, only used in some specific cases.
60 const int LARGE_SIZE = 100000;
61 // Very large size to test dictionary fallback.
62 const int VERY_LARGE_SIZE = 400000;
63 // Dictionary page size to use for testing dictionary fallback
64 const int64_t DICTIONARY_PAGE_SIZE = 1024 * 1024;
65 #endif
66 
67 template <typename TestType>
68 class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
69  public:
SetUp()70   void SetUp() {
71     this->SetupValuesOut(SMALL_SIZE);
72     writer_properties_ = default_writer_properties();
73     definition_levels_out_.resize(SMALL_SIZE);
74     repetition_levels_out_.resize(SMALL_SIZE);
75 
76     this->SetUpSchema(Repetition::REQUIRED);
77 
78     descr_ = this->schema_.Column(0);
79   }
80 
type_num()81   Type::type type_num() { return TestType::type_num; }
82 
BuildReader(int64_t num_rows,Compression::type compression=Compression::UNCOMPRESSED)83   void BuildReader(int64_t num_rows,
84                    Compression::type compression = Compression::UNCOMPRESSED) {
85     ASSERT_OK_AND_ASSIGN(auto buffer, sink_->Finish());
86     auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
87     std::unique_ptr<PageReader> page_reader =
88         PageReader::Open(std::move(source), num_rows, compression);
89     reader_ = std::static_pointer_cast<TypedColumnReader<TestType>>(
90         ColumnReader::Make(this->descr_, std::move(page_reader)));
91   }
92 
BuildWriter(int64_t output_size=SMALL_SIZE,const ColumnProperties & column_properties=ColumnProperties (),const ParquetVersion::type version=ParquetVersion::PARQUET_1_0)93   std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
94       int64_t output_size = SMALL_SIZE,
95       const ColumnProperties& column_properties = ColumnProperties(),
96       const ParquetVersion::type version = ParquetVersion::PARQUET_1_0) {
97     sink_ = CreateOutputStream();
98     WriterProperties::Builder wp_builder;
99     wp_builder.version(version);
100     if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
101         column_properties.encoding() == Encoding::RLE_DICTIONARY) {
102       wp_builder.enable_dictionary();
103       wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
104     } else {
105       wp_builder.disable_dictionary();
106       wp_builder.encoding(column_properties.encoding());
107     }
108     wp_builder.max_statistics_size(column_properties.max_statistics_size());
109     writer_properties_ = wp_builder.build();
110 
111     metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_);
112     std::unique_ptr<PageWriter> pager =
113         PageWriter::Open(sink_, column_properties.compression(),
114                          Codec::UseDefaultCompressionLevel(), metadata_.get());
115     std::shared_ptr<ColumnWriter> writer =
116         ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get());
117     return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
118   }
119 
ReadColumn(Compression::type compression=Compression::UNCOMPRESSED)120   void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) {
121     BuildReader(static_cast<int64_t>(this->values_out_.size()), compression);
122     reader_->ReadBatch(static_cast<int>(this->values_out_.size()),
123                        definition_levels_out_.data(), repetition_levels_out_.data(),
124                        this->values_out_ptr_, &values_read_);
125     this->SyncValuesOut();
126   }
127 
128   void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED);
129 
TestRequiredWithEncoding(Encoding::type encoding)130   void TestRequiredWithEncoding(Encoding::type encoding) {
131     return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false);
132   }
133 
TestRequiredWithSettings(Encoding::type encoding,Compression::type compression,bool enable_dictionary,bool enable_statistics,int64_t num_rows=SMALL_SIZE,int compression_level=Codec::UseDefaultCompressionLevel ())134   void TestRequiredWithSettings(
135       Encoding::type encoding, Compression::type compression, bool enable_dictionary,
136       bool enable_statistics, int64_t num_rows = SMALL_SIZE,
137       int compression_level = Codec::UseDefaultCompressionLevel()) {
138     this->GenerateData(num_rows);
139 
140     this->WriteRequiredWithSettings(encoding, compression, enable_dictionary,
141                                     enable_statistics, compression_level, num_rows);
142     ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows));
143 
144     this->WriteRequiredWithSettingsSpaced(encoding, compression, enable_dictionary,
145                                           enable_statistics, num_rows, compression_level);
146     ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows));
147   }
148 
TestDictionaryFallbackEncoding(ParquetVersion::type version)149   void TestDictionaryFallbackEncoding(ParquetVersion::type version) {
150     this->GenerateData(VERY_LARGE_SIZE);
151     ColumnProperties column_properties;
152     column_properties.set_dictionary_enabled(true);
153 
154     if (version == ParquetVersion::PARQUET_1_0) {
155       column_properties.set_encoding(Encoding::PLAIN_DICTIONARY);
156     } else {
157       column_properties.set_encoding(Encoding::RLE_DICTIONARY);
158     }
159 
160     auto writer = this->BuildWriter(VERY_LARGE_SIZE, column_properties, version);
161 
162     writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
163     writer->Close();
164 
165     // Read all rows so we are sure that also the non-dictionary pages are read correctly
166     this->SetupValuesOut(VERY_LARGE_SIZE);
167     this->ReadColumnFully();
168     ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_);
169     this->values_.resize(VERY_LARGE_SIZE);
170     ASSERT_EQ(this->values_, this->values_out_);
171     std::vector<Encoding::type> encodings = this->metadata_encodings();
172 
173     if (this->type_num() == Type::BOOLEAN) {
174       // Dictionary encoding is not allowed for boolean type
175       // There are 2 encodings (PLAIN, RLE) in a non dictionary encoding case
176       std::vector<Encoding::type> expected({Encoding::PLAIN, Encoding::RLE});
177       ASSERT_EQ(encodings, expected);
178     } else if (version == ParquetVersion::PARQUET_1_0) {
179       // There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case
180       // for version 1.0
181       std::vector<Encoding::type> expected(
182           {Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN});
183       ASSERT_EQ(encodings, expected);
184     } else {
185       // There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case for
186       // version 2.0
187       std::vector<Encoding::type> expected(
188           {Encoding::RLE_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN});
189       ASSERT_EQ(encodings, expected);
190     }
191 
192     std::vector<parquet::PageEncodingStats> encoding_stats =
193         this->metadata_encoding_stats();
194     if (this->type_num() == Type::BOOLEAN) {
195       ASSERT_EQ(encoding_stats[0].encoding, Encoding::PLAIN);
196       ASSERT_EQ(encoding_stats[0].page_type, PageType::DATA_PAGE);
197     } else if (version == ParquetVersion::PARQUET_1_0) {
198       std::vector<Encoding::type> expected(
199           {Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::PLAIN_DICTIONARY});
200       ASSERT_EQ(encoding_stats[0].encoding, expected[0]);
201       ASSERT_EQ(encoding_stats[0].page_type, PageType::DICTIONARY_PAGE);
202       for (size_t i = 1; i < encoding_stats.size(); i++) {
203         ASSERT_EQ(encoding_stats[i].encoding, expected[i]);
204         ASSERT_EQ(encoding_stats[i].page_type, PageType::DATA_PAGE);
205       }
206     } else {
207       std::vector<Encoding::type> expected(
208           {Encoding::PLAIN, Encoding::PLAIN, Encoding::RLE_DICTIONARY});
209       ASSERT_EQ(encoding_stats[0].encoding, expected[0]);
210       ASSERT_EQ(encoding_stats[0].page_type, PageType::DICTIONARY_PAGE);
211       for (size_t i = 1; i < encoding_stats.size(); i++) {
212         ASSERT_EQ(encoding_stats[i].encoding, expected[i]);
213         ASSERT_EQ(encoding_stats[i].page_type, PageType::DATA_PAGE);
214       }
215     }
216   }
217 
WriteRequiredWithSettings(Encoding::type encoding,Compression::type compression,bool enable_dictionary,bool enable_statistics,int compression_level,int64_t num_rows)218   void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
219                                  bool enable_dictionary, bool enable_statistics,
220                                  int compression_level, int64_t num_rows) {
221     ColumnProperties column_properties(encoding, compression, enable_dictionary,
222                                        enable_statistics);
223     column_properties.set_compression_level(compression_level);
224     std::shared_ptr<TypedColumnWriter<TestType>> writer =
225         this->BuildWriter(num_rows, column_properties);
226     writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
227     // The behaviour should be independent from the number of Close() calls
228     writer->Close();
229     writer->Close();
230   }
231 
WriteRequiredWithSettingsSpaced(Encoding::type encoding,Compression::type compression,bool enable_dictionary,bool enable_statistics,int64_t num_rows,int compression_level)232   void WriteRequiredWithSettingsSpaced(Encoding::type encoding,
233                                        Compression::type compression,
234                                        bool enable_dictionary, bool enable_statistics,
235                                        int64_t num_rows, int compression_level) {
236     std::vector<uint8_t> valid_bits(
237         BitUtil::BytesForBits(static_cast<uint32_t>(this->values_.size())) + 1, 255);
238     ColumnProperties column_properties(encoding, compression, enable_dictionary,
239                                        enable_statistics);
240     column_properties.set_compression_level(compression_level);
241     std::shared_ptr<TypedColumnWriter<TestType>> writer =
242         this->BuildWriter(num_rows, column_properties);
243     writer->WriteBatchSpaced(this->values_.size(), nullptr, nullptr, valid_bits.data(), 0,
244                              this->values_ptr_);
245     // The behaviour should be independent from the number of Close() calls
246     writer->Close();
247     writer->Close();
248   }
249 
ReadAndCompare(Compression::type compression,int64_t num_rows)250   void ReadAndCompare(Compression::type compression, int64_t num_rows) {
251     this->SetupValuesOut(num_rows);
252     this->ReadColumnFully(compression);
253     auto comparator = MakeComparator<TestType>(this->descr_);
254     for (size_t i = 0; i < this->values_.size(); i++) {
255       if (comparator->Compare(this->values_[i], this->values_out_[i]) ||
256           comparator->Compare(this->values_out_[i], this->values_[i])) {
257         std::cout << "Failed at " << i << std::endl;
258       }
259       ASSERT_FALSE(comparator->Compare(this->values_[i], this->values_out_[i]));
260       ASSERT_FALSE(comparator->Compare(this->values_out_[i], this->values_[i]));
261     }
262     ASSERT_EQ(this->values_, this->values_out_);
263   }
264 
metadata_num_values()265   int64_t metadata_num_values() {
266     // Metadata accessor must be created lazily.
267     // This is because the ColumnChunkMetaData semantics dictate the metadata object is
268     // complete (no changes to the metadata buffer can be made after instantiation)
269     auto metadata_accessor =
270         ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
271     return metadata_accessor->num_values();
272   }
273 
metadata_is_stats_set()274   bool metadata_is_stats_set() {
275     // Metadata accessor must be created lazily.
276     // This is because the ColumnChunkMetaData semantics dictate the metadata object is
277     // complete (no changes to the metadata buffer can be made after instantiation)
278     ApplicationVersion app_version(this->writer_properties_->created_by());
279     auto metadata_accessor =
280         ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
281     return metadata_accessor->is_stats_set();
282   }
283 
metadata_stats_has_min_max()284   std::pair<bool, bool> metadata_stats_has_min_max() {
285     // Metadata accessor must be created lazily.
286     // This is because the ColumnChunkMetaData semantics dictate the metadata object is
287     // complete (no changes to the metadata buffer can be made after instantiation)
288     ApplicationVersion app_version(this->writer_properties_->created_by());
289     auto metadata_accessor =
290         ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
291     auto encoded_stats = metadata_accessor->statistics()->Encode();
292     return {encoded_stats.has_min, encoded_stats.has_max};
293   }
294 
metadata_encodings()295   std::vector<Encoding::type> metadata_encodings() {
296     // Metadata accessor must be created lazily.
297     // This is because the ColumnChunkMetaData semantics dictate the metadata object is
298     // complete (no changes to the metadata buffer can be made after instantiation)
299     auto metadata_accessor =
300         ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
301     return metadata_accessor->encodings();
302   }
303 
metadata_encoding_stats()304   std::vector<parquet::PageEncodingStats> metadata_encoding_stats() {
305     // Metadata accessor must be created lazily.
306     // This is because the ColumnChunkMetaData semantics dictate the metadata object is
307     // complete (no changes to the metadata buffer can be made after instantiation)
308     auto metadata_accessor =
309         ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
310     return metadata_accessor->encoding_stats();
311   }
312 
313  protected:
314   int64_t values_read_;
315   // Keep the reader alive as for ByteArray the lifetime of the ByteArray
316   // content is bound to the reader.
317   std::shared_ptr<TypedColumnReader<TestType>> reader_;
318 
319   std::vector<int16_t> definition_levels_out_;
320   std::vector<int16_t> repetition_levels_out_;
321 
322   const ColumnDescriptor* descr_;
323 
324  private:
325   std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
326   std::shared_ptr<::arrow::io::BufferOutputStream> sink_;
327   std::shared_ptr<WriterProperties> writer_properties_;
328   std::vector<std::vector<uint8_t>> data_buffer_;
329 };
330 
331 template <typename TestType>
ReadColumnFully(Compression::type compression)332 void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression) {
333   int64_t total_values = static_cast<int64_t>(this->values_out_.size());
334   BuildReader(total_values, compression);
335   values_read_ = 0;
336   while (values_read_ < total_values) {
337     int64_t values_read_recently = 0;
338     reader_->ReadBatch(
339         static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
340         definition_levels_out_.data() + values_read_,
341         repetition_levels_out_.data() + values_read_,
342         this->values_out_ptr_ + values_read_, &values_read_recently);
343     values_read_ += values_read_recently;
344   }
345   this->SyncValuesOut();
346 }
347 
348 template <>
ReadAndCompare(Compression::type compression,int64_t num_rows)349 void TestPrimitiveWriter<Int96Type>::ReadAndCompare(Compression::type compression,
350                                                     int64_t num_rows) {
351   this->SetupValuesOut(num_rows);
352   this->ReadColumnFully(compression);
353 
354   auto comparator = MakeComparator<Int96Type>(Type::INT96, SortOrder::SIGNED);
355   for (size_t i = 0; i < this->values_.size(); i++) {
356     if (comparator->Compare(this->values_[i], this->values_out_[i]) ||
357         comparator->Compare(this->values_out_[i], this->values_[i])) {
358       std::cout << "Failed at " << i << std::endl;
359     }
360     ASSERT_FALSE(comparator->Compare(this->values_[i], this->values_out_[i]));
361     ASSERT_FALSE(comparator->Compare(this->values_out_[i], this->values_[i]));
362   }
363   ASSERT_EQ(this->values_, this->values_out_);
364 }
365 
366 template <>
ReadColumnFully(Compression::type compression)367 void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) {
368   int64_t total_values = static_cast<int64_t>(this->values_out_.size());
369   BuildReader(total_values, compression);
370   this->data_buffer_.clear();
371 
372   values_read_ = 0;
373   while (values_read_ < total_values) {
374     int64_t values_read_recently = 0;
375     reader_->ReadBatch(
376         static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
377         definition_levels_out_.data() + values_read_,
378         repetition_levels_out_.data() + values_read_,
379         this->values_out_ptr_ + values_read_, &values_read_recently);
380 
381     // Copy contents of the pointers
382     std::vector<uint8_t> data(values_read_recently * this->descr_->type_length());
383     uint8_t* data_ptr = data.data();
384     for (int64_t i = 0; i < values_read_recently; i++) {
385       memcpy(data_ptr + this->descr_->type_length() * i,
386              this->values_out_[i + values_read_].ptr, this->descr_->type_length());
387       this->values_out_[i + values_read_].ptr =
388           data_ptr + this->descr_->type_length() * i;
389     }
390     data_buffer_.emplace_back(std::move(data));
391 
392     values_read_ += values_read_recently;
393   }
394   this->SyncValuesOut();
395 }
396 
397 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
398                          BooleanType, ByteArrayType, FLBAType>
399     TestTypes;
400 
401 TYPED_TEST_SUITE(TestPrimitiveWriter, TestTypes);
402 
403 using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>;
404 
TYPED_TEST(TestPrimitiveWriter,RequiredPlain)405 TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
406   this->TestRequiredWithEncoding(Encoding::PLAIN);
407 }
408 
TYPED_TEST(TestPrimitiveWriter,RequiredDictionary)409 TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) {
410   this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY);
411 }
412 
413 /*
414 TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
415   this->TestRequiredWithEncoding(Encoding::RLE);
416 }
417 
418 TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
419   this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
420 }
421 
422 TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
423   this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
424 }
425 
426 TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
427   this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
428 }
429 
430 TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
431   this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
432 }
433 
434 TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
435   this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
436 }
437 */
438 
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStats)439 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
440   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true,
441                                  LARGE_SIZE);
442 }
443 
444 #ifdef ARROW_WITH_SNAPPY
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithSnappyCompression)445 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) {
446   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false,
447                                  LARGE_SIZE);
448 }
449 
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStatsAndSnappyCompression)450 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) {
451   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true,
452                                  LARGE_SIZE);
453 }
454 #endif
455 
456 #ifdef ARROW_WITH_BROTLI
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithBrotliCompression)457 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) {
458   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false,
459                                  LARGE_SIZE);
460 }
461 
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithBrotliCompressionAndLevel)462 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompressionAndLevel) {
463   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false,
464                                  LARGE_SIZE, 10);
465 }
466 
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStatsAndBrotliCompression)467 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndBrotliCompression) {
468   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, true,
469                                  LARGE_SIZE);
470 }
471 
472 #endif
473 
474 #ifdef ARROW_WITH_GZIP
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithGzipCompression)475 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
476   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false,
477                                  LARGE_SIZE);
478 }
479 
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithGzipCompressionAndLevel)480 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompressionAndLevel) {
481   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false,
482                                  LARGE_SIZE, 10);
483 }
484 
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStatsAndGzipCompression)485 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
486   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true,
487                                  LARGE_SIZE);
488 }
489 #endif
490 
491 #ifdef ARROW_WITH_LZ4
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithLz4Compression)492 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) {
493   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false,
494                                  LARGE_SIZE);
495 }
496 
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStatsAndLz4Compression)497 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndLz4Compression) {
498   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, true,
499                                  LARGE_SIZE);
500 }
501 #endif
502 
503 #ifdef ARROW_WITH_ZSTD
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithZstdCompression)504 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompression) {
505   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false,
506                                  LARGE_SIZE);
507 }
508 
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithZstdCompressionAndLevel)509 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompressionAndLevel) {
510   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false,
511                                  LARGE_SIZE, 6);
512 }
513 
TYPED_TEST(TestPrimitiveWriter,RequiredPlainWithStatsAndZstdCompression)514 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndZstdCompression) {
515   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, true,
516                                  LARGE_SIZE);
517 }
518 #endif
519 
TYPED_TEST(TestPrimitiveWriter,Optional)520 TYPED_TEST(TestPrimitiveWriter, Optional) {
521   // Optional and non-repeated, with definition levels
522   // but no repetition levels
523   this->SetUpSchema(Repetition::OPTIONAL);
524 
525   this->GenerateData(SMALL_SIZE);
526   std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
527   definition_levels[1] = 0;
528 
529   auto writer = this->BuildWriter();
530   writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr,
531                      this->values_ptr_);
532   writer->Close();
533 
534   // PARQUET-703
535   ASSERT_EQ(100, this->metadata_num_values());
536 
537   this->ReadColumn();
538   ASSERT_EQ(99, this->values_read_);
539   this->values_out_.resize(99);
540   this->values_.resize(99);
541   ASSERT_EQ(this->values_, this->values_out_);
542 }
543 
TYPED_TEST(TestPrimitiveWriter,OptionalSpaced)544 TYPED_TEST(TestPrimitiveWriter, OptionalSpaced) {
545   // Optional and non-repeated, with definition levels
546   // but no repetition levels
547   this->SetUpSchema(Repetition::OPTIONAL);
548 
549   this->GenerateData(SMALL_SIZE);
550   std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
551   std::vector<uint8_t> valid_bits(::arrow::BitUtil::BytesForBits(SMALL_SIZE), 255);
552 
553   definition_levels[SMALL_SIZE - 1] = 0;
554   ::arrow::BitUtil::ClearBit(valid_bits.data(), SMALL_SIZE - 1);
555   definition_levels[1] = 0;
556   ::arrow::BitUtil::ClearBit(valid_bits.data(), 1);
557 
558   auto writer = this->BuildWriter();
559   writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr,
560                            valid_bits.data(), 0, this->values_ptr_);
561   writer->Close();
562 
563   // PARQUET-703
564   ASSERT_EQ(100, this->metadata_num_values());
565 
566   this->ReadColumn();
567   ASSERT_EQ(98, this->values_read_);
568   this->values_out_.resize(98);
569   this->values_.resize(99);
570   this->values_.erase(this->values_.begin() + 1);
571   ASSERT_EQ(this->values_, this->values_out_);
572 }
573 
TYPED_TEST(TestPrimitiveWriter,Repeated)574 TYPED_TEST(TestPrimitiveWriter, Repeated) {
575   // Optional and repeated, so definition and repetition levels
576   this->SetUpSchema(Repetition::REPEATED);
577 
578   this->GenerateData(SMALL_SIZE);
579   std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
580   definition_levels[1] = 0;
581   std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
582 
583   auto writer = this->BuildWriter();
584   writer->WriteBatch(this->values_.size(), definition_levels.data(),
585                      repetition_levels.data(), this->values_ptr_);
586   writer->Close();
587 
588   this->ReadColumn();
589   ASSERT_EQ(SMALL_SIZE - 1, this->values_read_);
590   this->values_out_.resize(SMALL_SIZE - 1);
591   this->values_.resize(SMALL_SIZE - 1);
592   ASSERT_EQ(this->values_, this->values_out_);
593 }
594 
TYPED_TEST(TestPrimitiveWriter,RequiredLargeChunk)595 TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
596   this->GenerateData(LARGE_SIZE);
597 
598   // Test case 1: required and non-repeated, so no definition or repetition levels
599   auto writer = this->BuildWriter(LARGE_SIZE);
600   writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
601   writer->Close();
602 
603   // Just read the first SMALL_SIZE rows to ensure we could read it back in
604   this->ReadColumn();
605   ASSERT_EQ(SMALL_SIZE, this->values_read_);
606   this->values_.resize(SMALL_SIZE);
607   ASSERT_EQ(this->values_, this->values_out_);
608 }
609 
610 // Test cases for dictionary fallback encoding
TYPED_TEST(TestPrimitiveWriter,DictionaryFallbackVersion1_0)611 TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion1_0) {
612   this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_1_0);
613 }
614 
TYPED_TEST(TestPrimitiveWriter,DictionaryFallbackVersion2_0)615 TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion2_0) {
616   this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_0);
617 }
618 
TEST(TestWriter,NullValuesBuffer)619 TEST(TestWriter, NullValuesBuffer) {
620   std::shared_ptr<::arrow::io::BufferOutputStream> sink = CreateOutputStream();
621 
622   const auto item_node = schema::PrimitiveNode::Make(
623       "item", Repetition::REQUIRED, LogicalType::Int(32, true), Type::INT32);
624   const auto list_node =
625       schema::GroupNode::Make("list", Repetition::REPEATED, {item_node});
626   const auto column_node = schema::GroupNode::Make(
627       "array_of_ints_column", Repetition::OPTIONAL, {list_node}, LogicalType::List());
628   const auto schema_node =
629       schema::GroupNode::Make("schema", Repetition::REQUIRED, {column_node});
630 
631   auto file_writer = ParquetFileWriter::Open(
632       sink, std::dynamic_pointer_cast<schema::GroupNode>(schema_node));
633   auto group_writer = file_writer->AppendRowGroup();
634   auto column_writer = group_writer->NextColumn();
635   auto typed_writer = dynamic_cast<Int32Writer*>(column_writer);
636 
637   const int64_t num_values = 1;
638   const int16_t def_levels[] = {0};
639   const int16_t rep_levels[] = {0};
640   const uint8_t valid_bits[] = {0};
641   const int64_t valid_bits_offset = 0;
642   const int32_t* values = nullptr;
643 
644   typed_writer->WriteBatchSpaced(num_values, def_levels, rep_levels, valid_bits,
645                                  valid_bits_offset, values);
646 }
647 
648 // PARQUET-719
649 // Test case for NULL values
TEST_F(TestNullValuesWriter,OptionalNullValueChunk)650 TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
651   this->SetUpSchema(Repetition::OPTIONAL);
652 
653   this->GenerateData(LARGE_SIZE);
654 
655   std::vector<int16_t> definition_levels(LARGE_SIZE, 0);
656   std::vector<int16_t> repetition_levels(LARGE_SIZE, 0);
657 
658   auto writer = this->BuildWriter(LARGE_SIZE);
659   // All values being written are NULL
660   writer->WriteBatch(this->values_.size(), definition_levels.data(),
661                      repetition_levels.data(), nullptr);
662   writer->Close();
663 
664   // Just read the first SMALL_SIZE rows to ensure we could read it back in
665   this->ReadColumn();
666   ASSERT_EQ(0, this->values_read_);
667 }
668 
669 // PARQUET-764
670 // Correct bitpacking for boolean write at non-byte boundaries
671 using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>;
TEST_F(TestBooleanValuesWriter,AlternateBooleanValues)672 TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
673   this->SetUpSchema(Repetition::REQUIRED);
674   auto writer = this->BuildWriter();
675   for (int i = 0; i < SMALL_SIZE; i++) {
676     bool value = (i % 2 == 0) ? true : false;
677     writer->WriteBatch(1, nullptr, nullptr, &value);
678   }
679   writer->Close();
680   this->ReadColumn();
681   for (int i = 0; i < SMALL_SIZE; i++) {
682     ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i;
683   }
684 }
685 
686 // PARQUET-979
687 // Prevent writing large MIN, MAX stats
688 using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>;
TEST_F(TestByteArrayValuesWriter,OmitStats)689 TEST_F(TestByteArrayValuesWriter, OmitStats) {
690   int min_len = 1024 * 4;
691   int max_len = 1024 * 8;
692   this->SetUpSchema(Repetition::REQUIRED);
693   auto writer = this->BuildWriter();
694 
695   values_.resize(SMALL_SIZE);
696   InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
697   writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
698   writer->Close();
699 
700   auto has_min_max = this->metadata_stats_has_min_max();
701   ASSERT_FALSE(has_min_max.first);
702   ASSERT_FALSE(has_min_max.second);
703 }
704 
705 // PARQUET-1405
706 // Prevent writing large stats in the DataPageHeader
TEST_F(TestByteArrayValuesWriter,OmitDataPageStats)707 TEST_F(TestByteArrayValuesWriter, OmitDataPageStats) {
708   int min_len = static_cast<int>(std::pow(10, 7));
709   int max_len = static_cast<int>(std::pow(10, 7));
710   this->SetUpSchema(Repetition::REQUIRED);
711   ColumnProperties column_properties;
712   column_properties.set_statistics_enabled(false);
713   auto writer = this->BuildWriter(SMALL_SIZE, column_properties);
714 
715   values_.resize(1);
716   InitWideByteArrayValues(1, this->values_, this->buffer_, min_len, max_len);
717   writer->WriteBatch(1, nullptr, nullptr, this->values_.data());
718   writer->Close();
719 
720   ASSERT_NO_THROW(this->ReadColumn());
721 }
722 
TEST_F(TestByteArrayValuesWriter,LimitStats)723 TEST_F(TestByteArrayValuesWriter, LimitStats) {
724   int min_len = 1024 * 4;
725   int max_len = 1024 * 8;
726   this->SetUpSchema(Repetition::REQUIRED);
727   ColumnProperties column_properties;
728   column_properties.set_max_statistics_size(static_cast<size_t>(max_len));
729   auto writer = this->BuildWriter(SMALL_SIZE, column_properties);
730 
731   values_.resize(SMALL_SIZE);
732   InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
733   writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
734   writer->Close();
735 
736   ASSERT_TRUE(this->metadata_is_stats_set());
737 }
738 
TEST_F(TestByteArrayValuesWriter,CheckDefaultStats)739 TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) {
740   this->SetUpSchema(Repetition::REQUIRED);
741   auto writer = this->BuildWriter();
742   this->GenerateData(SMALL_SIZE);
743 
744   writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_ptr_);
745   writer->Close();
746 
747   ASSERT_TRUE(this->metadata_is_stats_set());
748 }
749 
TEST(TestColumnWriter,RepeatedListsUpdateSpacedBug)750 TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) {
751   // In ARROW-3930 we discovered a bug when writing from Arrow when we had data
752   // that looks like this:
753   //
754   // [null, [0, 1, null, 2, 3, 4, null]]
755 
756   // Create schema
757   NodePtr item = schema::Int32("item");  // optional item
758   NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, ConvertedType::LIST));
759   NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list}));  // optional list
760   std::vector<NodePtr> fields = {bag};
761   NodePtr root = GroupNode::Make("schema", Repetition::REPEATED, fields);
762 
763   SchemaDescriptor schema;
764   schema.Init(root);
765 
766   auto sink = CreateOutputStream();
767   auto props = WriterProperties::Builder().build();
768 
769   auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0));
770   std::unique_ptr<PageWriter> pager =
771       PageWriter::Open(sink, Compression::UNCOMPRESSED,
772                        Codec::UseDefaultCompressionLevel(), metadata.get());
773   std::shared_ptr<ColumnWriter> writer =
774       ColumnWriter::Make(metadata.get(), std::move(pager), props.get());
775   auto typed_writer = std::static_pointer_cast<TypedColumnWriter<Int32Type>>(writer);
776 
777   std::vector<int16_t> def_levels = {1, 3, 3, 2, 3, 3, 3, 2, 3, 3, 3, 2, 3, 3};
778   std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
779   std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
780 
781   // Write the values into uninitialized memory
782   ASSERT_OK_AND_ASSIGN(auto values_buffer, ::arrow::AllocateBuffer(64));
783   memcpy(values_buffer->mutable_data(), values.data(), 13 * sizeof(int32_t));
784   auto values_data = reinterpret_cast<const int32_t*>(values_buffer->data());
785 
786   std::shared_ptr<Buffer> valid_bits;
787   ASSERT_OK_AND_ASSIGN(valid_bits, ::arrow::internal::BytesToBits(
788                                        {1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1}));
789 
790   // valgrind will warn about out of bounds access into def_levels_data
791   typed_writer->WriteBatchSpaced(14, def_levels.data(), rep_levels.data(),
792                                  valid_bits->data(), 0, values_data);
793   writer->Close();
794 }
795 
GenerateLevels(int min_repeat_factor,int max_repeat_factor,int max_level,std::vector<int16_t> & input_levels)796 void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
797                     std::vector<int16_t>& input_levels) {
798   // for each repetition count up to max_repeat_factor
799   for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
800     // repeat count increases by a factor of 2 for every iteration
801     int repeat_count = (1 << repeat);
802     // generate levels for repetition count up to the maximum level
803     int16_t value = 0;
804     int bwidth = 0;
805     while (value <= max_level) {
806       for (int i = 0; i < repeat_count; i++) {
807         input_levels.push_back(value);
808       }
809       value = static_cast<int16_t>((2 << bwidth) - 1);
810       bwidth++;
811     }
812   }
813 }
814 
EncodeLevels(Encoding::type encoding,int16_t max_level,int num_levels,const int16_t * input_levels,std::vector<uint8_t> & bytes)815 void EncodeLevels(Encoding::type encoding, int16_t max_level, int num_levels,
816                   const int16_t* input_levels, std::vector<uint8_t>& bytes) {
817   LevelEncoder encoder;
818   int levels_count = 0;
819   bytes.resize(2 * num_levels);
820   ASSERT_EQ(2 * num_levels, static_cast<int>(bytes.size()));
821   // encode levels
822   if (encoding == Encoding::RLE) {
823     // leave space to write the rle length value
824     encoder.Init(encoding, max_level, num_levels, bytes.data() + sizeof(int32_t),
825                  static_cast<int>(bytes.size()));
826 
827     levels_count = encoder.Encode(num_levels, input_levels);
828     (reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len();
829   } else {
830     encoder.Init(encoding, max_level, num_levels, bytes.data(),
831                  static_cast<int>(bytes.size()));
832     levels_count = encoder.Encode(num_levels, input_levels);
833   }
834   ASSERT_EQ(num_levels, levels_count);
835 }
836 
VerifyDecodingLevels(Encoding::type encoding,int16_t max_level,std::vector<int16_t> & input_levels,std::vector<uint8_t> & bytes)837 void VerifyDecodingLevels(Encoding::type encoding, int16_t max_level,
838                           std::vector<int16_t>& input_levels,
839                           std::vector<uint8_t>& bytes) {
840   LevelDecoder decoder;
841   int levels_count = 0;
842   std::vector<int16_t> output_levels;
843   int num_levels = static_cast<int>(input_levels.size());
844 
845   output_levels.resize(num_levels);
846   ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
847 
848   // Decode levels and test with multiple decode calls
849   decoder.SetData(encoding, max_level, num_levels, bytes.data(),
850                   static_cast<int32_t>(bytes.size()));
851   int decode_count = 4;
852   int num_inner_levels = num_levels / decode_count;
853   // Try multiple decoding on a single SetData call
854   for (int ct = 0; ct < decode_count; ct++) {
855     int offset = ct * num_inner_levels;
856     levels_count = decoder.Decode(num_inner_levels, output_levels.data());
857     ASSERT_EQ(num_inner_levels, levels_count);
858     for (int i = 0; i < num_inner_levels; i++) {
859       EXPECT_EQ(input_levels[i + offset], output_levels[i]);
860     }
861   }
862   // check the remaining levels
863   int num_levels_completed = decode_count * (num_levels / decode_count);
864   int num_remaining_levels = num_levels - num_levels_completed;
865   if (num_remaining_levels > 0) {
866     levels_count = decoder.Decode(num_remaining_levels, output_levels.data());
867     ASSERT_EQ(num_remaining_levels, levels_count);
868     for (int i = 0; i < num_remaining_levels; i++) {
869       EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]);
870     }
871   }
872   // Test zero Decode values
873   ASSERT_EQ(0, decoder.Decode(1, output_levels.data()));
874 }
875 
VerifyDecodingMultipleSetData(Encoding::type encoding,int16_t max_level,std::vector<int16_t> & input_levels,std::vector<std::vector<uint8_t>> & bytes)876 void VerifyDecodingMultipleSetData(Encoding::type encoding, int16_t max_level,
877                                    std::vector<int16_t>& input_levels,
878                                    std::vector<std::vector<uint8_t>>& bytes) {
879   LevelDecoder decoder;
880   int levels_count = 0;
881   std::vector<int16_t> output_levels;
882 
883   // Decode levels and test with multiple SetData calls
884   int setdata_count = static_cast<int>(bytes.size());
885   int num_levels = static_cast<int>(input_levels.size()) / setdata_count;
886   output_levels.resize(num_levels);
887   // Try multiple SetData
888   for (int ct = 0; ct < setdata_count; ct++) {
889     int offset = ct * num_levels;
890     ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
891     decoder.SetData(encoding, max_level, num_levels, bytes[ct].data(),
892                     static_cast<int32_t>(bytes[ct].size()));
893     levels_count = decoder.Decode(num_levels, output_levels.data());
894     ASSERT_EQ(num_levels, levels_count);
895     for (int i = 0; i < num_levels; i++) {
896       EXPECT_EQ(input_levels[i + offset], output_levels[i]);
897     }
898   }
899 }
900 
901 // Test levels with maximum bit-width from 1 to 8
902 // increase the repetition count for each iteration by a factor of 2
TEST(TestLevels,TestLevelsDecodeMultipleBitWidth)903 TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) {
904   int min_repeat_factor = 0;
905   int max_repeat_factor = 7;  // 128
906   int max_bit_width = 8;
907   std::vector<int16_t> input_levels;
908   std::vector<uint8_t> bytes;
909   Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
910 
911   // for each encoding
912   for (int encode = 0; encode < 2; encode++) {
913     Encoding::type encoding = encodings[encode];
914     // BIT_PACKED requires a sequence of at least 8
915     if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3;
916     // for each maximum bit-width
917     for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
918       // find the maximum level for the current bit_width
919       int16_t max_level = static_cast<int16_t>((1 << bit_width) - 1);
920       // Generate levels
921       GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
922       ASSERT_NO_FATAL_FAILURE(EncodeLevels(encoding, max_level,
923                                            static_cast<int>(input_levels.size()),
924                                            input_levels.data(), bytes));
925       ASSERT_NO_FATAL_FAILURE(
926           VerifyDecodingLevels(encoding, max_level, input_levels, bytes));
927       input_levels.clear();
928     }
929   }
930 }
931 
932 // Test multiple decoder SetData calls
TEST(TestLevels,TestLevelsDecodeMultipleSetData)933 TEST(TestLevels, TestLevelsDecodeMultipleSetData) {
934   int min_repeat_factor = 3;
935   int max_repeat_factor = 7;  // 128
936   int bit_width = 8;
937   int16_t max_level = static_cast<int16_t>((1 << bit_width) - 1);
938   std::vector<int16_t> input_levels;
939   std::vector<std::vector<uint8_t>> bytes;
940   Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
941   GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
942   int num_levels = static_cast<int>(input_levels.size());
943   int setdata_factor = 8;
944   int split_level_size = num_levels / setdata_factor;
945   bytes.resize(setdata_factor);
946 
947   // for each encoding
948   for (int encode = 0; encode < 2; encode++) {
949     Encoding::type encoding = encodings[encode];
950     for (int rf = 0; rf < setdata_factor; rf++) {
951       int offset = rf * split_level_size;
952       ASSERT_NO_FATAL_FAILURE(EncodeLevels(
953           encoding, max_level, split_level_size,
954           reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf]));
955     }
956     ASSERT_NO_FATAL_FAILURE(
957         VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes));
958   }
959 }
960 
TEST(TestLevelEncoder,MinimumBufferSize)961 TEST(TestLevelEncoder, MinimumBufferSize) {
962   // PARQUET-676, PARQUET-698
963   const int kNumToEncode = 1024;
964 
965   std::vector<int16_t> levels;
966   for (int i = 0; i < kNumToEncode; ++i) {
967     if (i % 9 == 0) {
968       levels.push_back(0);
969     } else {
970       levels.push_back(1);
971     }
972   }
973 
974   std::vector<uint8_t> output(
975       LevelEncoder::MaxBufferSize(Encoding::RLE, 1, kNumToEncode));
976 
977   LevelEncoder encoder;
978   encoder.Init(Encoding::RLE, 1, kNumToEncode, output.data(),
979                static_cast<int>(output.size()));
980   int encode_count = encoder.Encode(kNumToEncode, levels.data());
981 
982   ASSERT_EQ(kNumToEncode, encode_count);
983 }
984 
TEST(TestLevelEncoder,MinimumBufferSize2)985 TEST(TestLevelEncoder, MinimumBufferSize2) {
986   // PARQUET-708
987   // Test the worst case for bit_width=2 consisting of
988   // LiteralRun(size=8)
989   // RepeatedRun(size=8)
990   // LiteralRun(size=8)
991   // ...
992   const int kNumToEncode = 1024;
993 
994   std::vector<int16_t> levels;
995   for (int i = 0; i < kNumToEncode; ++i) {
996     // This forces a literal run of 00000001
997     // followed by eight 1s
998     if ((i % 16) < 7) {
999       levels.push_back(0);
1000     } else {
1001       levels.push_back(1);
1002     }
1003   }
1004 
1005   for (int16_t bit_width = 1; bit_width <= 8; bit_width++) {
1006     std::vector<uint8_t> output(
1007         LevelEncoder::MaxBufferSize(Encoding::RLE, bit_width, kNumToEncode));
1008 
1009     LevelEncoder encoder;
1010     encoder.Init(Encoding::RLE, bit_width, kNumToEncode, output.data(),
1011                  static_cast<int>(output.size()));
1012     int encode_count = encoder.Encode(kNumToEncode, levels.data());
1013 
1014     ASSERT_EQ(kNumToEncode, encode_count);
1015   }
1016 }
1017 
1018 }  // namespace test
1019 }  // namespace parquet
1020