1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "parquet/column_reader.h"
19 
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <exception>
24 #include <iostream>
25 #include <memory>
26 #include <string>
27 #include <unordered_map>
28 #include <utility>
29 #include <vector>
30 
31 #include "arrow/array.h"
32 #include "arrow/builder.h"
33 #include "arrow/table.h"
34 #include "arrow/type.h"
35 #include "arrow/util/bit_stream_utils.h"
36 #include "arrow/util/checked_cast.h"
37 #include "arrow/util/compression.h"
38 #include "arrow/util/int_util.h"
39 #include "arrow/util/logging.h"
40 #include "arrow/util/rle_encoding.h"
41 #include "parquet/column_page.h"
42 #include "parquet/encoding.h"
43 #include "parquet/encryption_internal.h"
44 #include "parquet/internal_file_decryptor.h"
45 #include "parquet/properties.h"
46 #include "parquet/statistics.h"
47 #include "parquet/thrift_internal.h"  // IWYU pragma: keep
48 
49 using arrow::MemoryPool;
50 using arrow::internal::checked_cast;
51 
52 namespace parquet {
53 
LevelDecoder()54 LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
55 
~LevelDecoder()56 LevelDecoder::~LevelDecoder() {}
57 
SetData(Encoding::type encoding,int16_t max_level,int num_buffered_values,const uint8_t * data,int32_t data_size)58 int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
59                           int num_buffered_values, const uint8_t* data,
60                           int32_t data_size) {
61   int32_t num_bytes = 0;
62   encoding_ = encoding;
63   num_values_remaining_ = num_buffered_values;
64   bit_width_ = BitUtil::Log2(max_level + 1);
65   switch (encoding) {
66     case Encoding::RLE: {
67       if (data_size < 4) {
68         throw ParquetException("Received invalid levels (corrupt data page?)");
69       }
70       num_bytes = ::arrow::util::SafeLoadAs<int32_t>(data);
71       if (num_bytes < 0 || num_bytes > data_size - 4) {
72         throw ParquetException("Received invalid number of bytes (corrupt data page?)");
73       }
74       const uint8_t* decoder_data = data + 4;
75       if (!rle_decoder_) {
76         rle_decoder_.reset(
77             new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_));
78       } else {
79         rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
80       }
81       return 4 + num_bytes;
82     }
83     case Encoding::BIT_PACKED: {
84       num_bytes =
85           static_cast<int32_t>(BitUtil::BytesForBits(num_buffered_values * bit_width_));
86       if (num_bytes < 0 || num_bytes > data_size - 4) {
87         throw ParquetException("Received invalid number of bytes (corrupt data page?)");
88       }
89       if (!bit_packed_decoder_) {
90         bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes));
91       } else {
92         bit_packed_decoder_->Reset(data, num_bytes);
93       }
94       return num_bytes;
95     }
96     default:
97       throw ParquetException("Unknown encoding type for levels.");
98   }
99   return -1;
100 }
101 
SetDataV2(int32_t num_bytes,int16_t max_level,int num_buffered_values,const uint8_t * data)102 void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t max_level,
103                              int num_buffered_values, const uint8_t* data) {
104   // Repetition and definition levels always uses RLE encoding
105   // in the DataPageV2 format.
106   if (num_bytes < 0) {
107     throw ParquetException("Invalid page header (corrupt data page?)");
108   }
109   encoding_ = Encoding::RLE;
110   num_values_remaining_ = num_buffered_values;
111   bit_width_ = BitUtil::Log2(max_level + 1);
112 
113   if (!rle_decoder_) {
114     rle_decoder_.reset(new ::arrow::util::RleDecoder(data, num_bytes, bit_width_));
115   } else {
116     rle_decoder_->Reset(data, num_bytes, bit_width_);
117   }
118 }
119 
Decode(int batch_size,int16_t * levels)120 int LevelDecoder::Decode(int batch_size, int16_t* levels) {
121   int num_decoded = 0;
122 
123   int num_values = std::min(num_values_remaining_, batch_size);
124   if (encoding_ == Encoding::RLE) {
125     num_decoded = rle_decoder_->GetBatch(levels, num_values);
126   } else {
127     num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
128   }
129   num_values_remaining_ -= num_decoded;
130   return num_decoded;
131 }
132 
default_reader_properties()133 ReaderProperties default_reader_properties() {
134   static ReaderProperties default_reader_properties;
135   return default_reader_properties;
136 }
137 
138 // Extracts encoded statistics from V1 and V2 data page headers
139 template <typename H>
ExtractStatsFromHeader(const H & header)140 EncodedStatistics ExtractStatsFromHeader(const H& header) {
141   EncodedStatistics page_statistics;
142   if (!header.__isset.statistics) {
143     return page_statistics;
144   }
145   const format::Statistics& stats = header.statistics;
146   if (stats.__isset.max) {
147     page_statistics.set_max(stats.max);
148   }
149   if (stats.__isset.min) {
150     page_statistics.set_min(stats.min);
151   }
152   if (stats.__isset.null_count) {
153     page_statistics.set_null_count(stats.null_count);
154   }
155   if (stats.__isset.distinct_count) {
156     page_statistics.set_distinct_count(stats.distinct_count);
157   }
158   return page_statistics;
159 }
160 
161 // ----------------------------------------------------------------------
162 // SerializedPageReader deserializes Thrift metadata and pages that have been
163 // assembled in a serialized stream for storing in a Parquet files
164 
165 // This subclass delimits pages appearing in a serialized stream, each preceded
166 // by a serialized Thrift format::PageHeader indicating the type of each page
167 // and the page metadata.
168 class SerializedPageReader : public PageReader {
169  public:
SerializedPageReader(std::shared_ptr<ArrowInputStream> stream,int64_t total_num_rows,Compression::type codec,::arrow::MemoryPool * pool,const CryptoContext * crypto_ctx)170   SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows,
171                        Compression::type codec, ::arrow::MemoryPool* pool,
172                        const CryptoContext* crypto_ctx)
173       : stream_(std::move(stream)),
174         decompression_buffer_(AllocateBuffer(pool, 0)),
175         page_ordinal_(0),
176         seen_num_rows_(0),
177         total_num_rows_(total_num_rows),
178         decryption_buffer_(AllocateBuffer(pool, 0)) {
179     if (crypto_ctx != nullptr) {
180       crypto_ctx_ = *crypto_ctx;
181       InitDecryption();
182     }
183     max_page_header_size_ = kDefaultMaxPageHeaderSize;
184     decompressor_ = GetCodec(codec);
185   }
186 
187   // Implement the PageReader interface
188   std::shared_ptr<Page> NextPage() override;
189 
set_max_page_header_size(uint32_t size)190   void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
191 
192  private:
193   void UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor, int8_t module_type,
194                         const std::string& page_aad);
195 
196   void InitDecryption();
197 
198   std::shared_ptr<Buffer> DecompressPage(int compressed_len, int uncompressed_len,
199                                          const uint8_t* page_buffer);
200 
201   std::shared_ptr<ArrowInputStream> stream_;
202 
203   format::PageHeader current_page_header_;
204   std::shared_ptr<Page> current_page_;
205 
206   // Compression codec to use.
207   std::unique_ptr<::arrow::util::Codec> decompressor_;
208   std::shared_ptr<ResizableBuffer> decompression_buffer_;
209 
210   // The fields below are used for calculation of AAD (additional authenticated data)
211   // suffix which is part of the Parquet Modular Encryption.
212   // The AAD suffix for a parquet module is built internally by
213   // concatenating different parts some of which include
214   // the row group ordinal, column ordinal and page ordinal.
215   // Please refer to the encryption specification for more details:
216   // https://github.com/apache/parquet-format/blob/encryption/Encryption.md#44-additional-authenticated-data
217 
218   // The ordinal fields in the context below are used for AAD suffix calculation.
219   CryptoContext crypto_ctx_;
220   int16_t page_ordinal_;  // page ordinal does not count the dictionary page
221 
222   // Maximum allowed page size
223   uint32_t max_page_header_size_;
224 
225   // Number of rows read in data pages so far
226   int64_t seen_num_rows_;
227 
228   // Number of rows in all the data pages
229   int64_t total_num_rows_;
230 
231   // data_page_aad_ and data_page_header_aad_ contain the AAD for data page and data page
232   // header in a single column respectively.
233   // While calculating AAD for different pages in a single column the pages AAD is
234   // updated by only the page ordinal.
235   std::string data_page_aad_;
236   std::string data_page_header_aad_;
237   // Encryption
238   std::shared_ptr<ResizableBuffer> decryption_buffer_;
239 };
240 
InitDecryption()241 void SerializedPageReader::InitDecryption() {
242   // Prepare the AAD for quick update later.
243   if (crypto_ctx_.data_decryptor != nullptr) {
244     DCHECK(!crypto_ctx_.data_decryptor->file_aad().empty());
245     data_page_aad_ = encryption::CreateModuleAad(
246         crypto_ctx_.data_decryptor->file_aad(), encryption::kDataPage,
247         crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal);
248   }
249   if (crypto_ctx_.meta_decryptor != nullptr) {
250     DCHECK(!crypto_ctx_.meta_decryptor->file_aad().empty());
251     data_page_header_aad_ = encryption::CreateModuleAad(
252         crypto_ctx_.meta_decryptor->file_aad(), encryption::kDataPageHeader,
253         crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal);
254   }
255 }
256 
UpdateDecryption(const std::shared_ptr<Decryptor> & decryptor,int8_t module_type,const std::string & page_aad)257 void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor,
258                                             int8_t module_type,
259                                             const std::string& page_aad) {
260   DCHECK(decryptor != nullptr);
261   if (crypto_ctx_.start_decrypt_with_dictionary_page) {
262     std::string aad = encryption::CreateModuleAad(
263         decryptor->file_aad(), module_type, crypto_ctx_.row_group_ordinal,
264         crypto_ctx_.column_ordinal, kNonPageOrdinal);
265     decryptor->UpdateAad(aad);
266   } else {
267     encryption::QuickUpdatePageAad(page_aad, page_ordinal_);
268     decryptor->UpdateAad(page_aad);
269   }
270 }
271 
NextPage()272 std::shared_ptr<Page> SerializedPageReader::NextPage() {
273   // Loop here because there may be unhandled page types that we skip until
274   // finding a page that we do know what to do with
275 
276   while (seen_num_rows_ < total_num_rows_) {
277     uint32_t header_size = 0;
278     uint32_t allowed_page_size = kDefaultPageHeaderSize;
279 
280     // Page headers can be very large because of page statistics
281     // We try to deserialize a larger buffer progressively
282     // until a maximum allowed header limit
283     while (true) {
284       PARQUET_ASSIGN_OR_THROW(auto view, stream_->Peek(allowed_page_size));
285       if (view.size() == 0) {
286         return std::shared_ptr<Page>(nullptr);
287       }
288 
289       // This gets used, then set by DeserializeThriftMsg
290       header_size = static_cast<uint32_t>(view.size());
291       try {
292         if (crypto_ctx_.meta_decryptor != nullptr) {
293           UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader,
294                            data_page_header_aad_);
295         }
296         DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(view.data()), &header_size,
297                              &current_page_header_, crypto_ctx_.meta_decryptor);
298         break;
299       } catch (std::exception& e) {
300         // Failed to deserialize. Double the allowed page header size and try again
301         std::stringstream ss;
302         ss << e.what();
303         allowed_page_size *= 2;
304         if (allowed_page_size > max_page_header_size_) {
305           ss << "Deserializing page header failed.\n";
306           throw ParquetException(ss.str());
307         }
308       }
309     }
310     // Advance the stream offset
311     PARQUET_THROW_NOT_OK(stream_->Advance(header_size));
312 
313     int compressed_len = current_page_header_.compressed_page_size;
314     int uncompressed_len = current_page_header_.uncompressed_page_size;
315     if (crypto_ctx_.data_decryptor != nullptr) {
316       UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
317                        data_page_aad_);
318     }
319     // Read the compressed data page.
320     PARQUET_ASSIGN_OR_THROW(auto page_buffer, stream_->Read(compressed_len));
321     if (page_buffer->size() != compressed_len) {
322       std::stringstream ss;
323       ss << "Page was smaller (" << page_buffer->size() << ") than expected ("
324          << compressed_len << ")";
325       ParquetException::EofException(ss.str());
326     }
327 
328     // Decrypt it if we need to
329     if (crypto_ctx_.data_decryptor != nullptr) {
330       PARQUET_THROW_NOT_OK(decryption_buffer_->Resize(
331           compressed_len - crypto_ctx_.data_decryptor->CiphertextSizeDelta(), false));
332       compressed_len = crypto_ctx_.data_decryptor->Decrypt(
333           page_buffer->data(), compressed_len, decryption_buffer_->mutable_data());
334 
335       page_buffer = decryption_buffer_;
336     }
337     // Uncompress it if we need to
338     if (decompressor_ != nullptr) {
339       page_buffer = DecompressPage(compressed_len, uncompressed_len, page_buffer->data());
340     }
341 
342     const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
343 
344     if (page_type == PageType::DICTIONARY_PAGE) {
345       crypto_ctx_.start_decrypt_with_dictionary_page = false;
346       const format::DictionaryPageHeader& dict_header =
347           current_page_header_.dictionary_page_header;
348 
349       bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
350       if (dict_header.num_values < 0) {
351         throw ParquetException("Invalid page header (negative number of values)");
352       }
353 
354       return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
355                                               LoadEnumSafe(&dict_header.encoding),
356                                               is_sorted);
357     } else if (page_type == PageType::DATA_PAGE) {
358       ++page_ordinal_;
359       const format::DataPageHeader& header = current_page_header_.data_page_header;
360 
361       if (header.num_values < 0) {
362         throw ParquetException("Invalid page header (negative number of values)");
363       }
364       EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
365       seen_num_rows_ += header.num_values;
366 
367       return std::make_shared<DataPageV1>(page_buffer, header.num_values,
368                                           LoadEnumSafe(&header.encoding),
369                                           LoadEnumSafe(&header.definition_level_encoding),
370                                           LoadEnumSafe(&header.repetition_level_encoding),
371                                           uncompressed_len, page_statistics);
372     } else if (page_type == PageType::DATA_PAGE_V2) {
373       ++page_ordinal_;
374       const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
375 
376       if (header.num_values < 0) {
377         throw ParquetException("Invalid page header (negative number of values)");
378       }
379       if (header.definition_levels_byte_length < 0 ||
380           header.repetition_levels_byte_length < 0) {
381         throw ParquetException("Invalid page header (negative levels byte length)");
382       }
383       bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false;
384       EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
385       seen_num_rows_ += header.num_values;
386 
387       return std::make_shared<DataPageV2>(
388           page_buffer, header.num_values, header.num_nulls, header.num_rows,
389           LoadEnumSafe(&header.encoding), header.definition_levels_byte_length,
390           header.repetition_levels_byte_length, uncompressed_len, is_compressed,
391           page_statistics);
392     } else {
393       // We don't know what this page type is. We're allowed to skip non-data
394       // pages.
395       continue;
396     }
397   }
398   return std::shared_ptr<Page>(nullptr);
399 }
400 
DecompressPage(int compressed_len,int uncompressed_len,const uint8_t * page_buffer)401 std::shared_ptr<Buffer> SerializedPageReader::DecompressPage(int compressed_len,
402                                                              int uncompressed_len,
403                                                              const uint8_t* page_buffer) {
404   // Grow the uncompressed buffer if we need to.
405   if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
406     PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
407   }
408 
409   if (current_page_header_.type != format::PageType::DATA_PAGE_V2) {
410     PARQUET_THROW_NOT_OK(
411         decompressor_->Decompress(compressed_len, page_buffer, uncompressed_len,
412                                   decompression_buffer_->mutable_data()));
413   } else {
414     // The levels are not compressed in V2 format
415     const auto& header = current_page_header_.data_page_header_v2;
416     int32_t levels_length =
417         header.repetition_levels_byte_length + header.definition_levels_byte_length;
418     uint8_t* decompressed = decompression_buffer_->mutable_data();
419     memcpy(decompressed, page_buffer, levels_length);
420     decompressed += levels_length;
421     const uint8_t* compressed_values = page_buffer + levels_length;
422 
423     // Decompress the values
424     PARQUET_THROW_NOT_OK(
425         decompressor_->Decompress(compressed_len - levels_length, compressed_values,
426                                   uncompressed_len - levels_length, decompressed));
427   }
428 
429   return decompression_buffer_;
430 }
431 
Open(std::shared_ptr<ArrowInputStream> stream,int64_t total_num_rows,Compression::type codec,::arrow::MemoryPool * pool,const CryptoContext * ctx)432 std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream,
433                                              int64_t total_num_rows,
434                                              Compression::type codec,
435                                              ::arrow::MemoryPool* pool,
436                                              const CryptoContext* ctx) {
437   return std::unique_ptr<PageReader>(
438       new SerializedPageReader(std::move(stream), total_num_rows, codec, pool, ctx));
439 }
440 
441 // ----------------------------------------------------------------------
442 // Impl base class for TypedColumnReader and RecordReader
443 
444 // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
445 // encoding.
IsDictionaryIndexEncoding(const Encoding::type & e)446 static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
447   return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
448 }
449 
450 template <typename DType>
451 class ColumnReaderImplBase {
452  public:
453   using T = typename DType::c_type;
454 
ColumnReaderImplBase(const ColumnDescriptor * descr,::arrow::MemoryPool * pool)455   ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
456       : descr_(descr),
457         max_def_level_(descr->max_definition_level()),
458         max_rep_level_(descr->max_repetition_level()),
459         num_buffered_values_(0),
460         num_decoded_values_(0),
461         pool_(pool),
462         current_decoder_(nullptr),
463         current_encoding_(Encoding::UNKNOWN) {}
464 
465   virtual ~ColumnReaderImplBase() = default;
466 
467  protected:
468   // Read up to batch_size values from the current data page into the
469   // pre-allocated memory T*
470   //
471   // @returns: the number of values read into the out buffer
ReadValues(int64_t batch_size,T * out)472   int64_t ReadValues(int64_t batch_size, T* out) {
473     int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
474     return num_decoded;
475   }
476 
477   // Read up to batch_size values from the current data page into the
478   // pre-allocated memory T*, leaving spaces for null entries according
479   // to the def_levels.
480   //
481   // @returns: the number of values read into the out buffer
ReadValuesSpaced(int64_t batch_size,T * out,int64_t null_count,uint8_t * valid_bits,int64_t valid_bits_offset)482   int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count,
483                            uint8_t* valid_bits, int64_t valid_bits_offset) {
484     return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size),
485                                           static_cast<int>(null_count), valid_bits,
486                                           valid_bits_offset);
487   }
488 
489   // Read multiple definition levels into preallocated memory
490   //
491   // Returns the number of decoded definition levels
ReadDefinitionLevels(int64_t batch_size,int16_t * levels)492   int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
493     if (max_def_level_ == 0) {
494       return 0;
495     }
496     return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
497   }
498 
HasNextInternal()499   bool HasNextInternal() {
500     // Either there is no data page available yet, or the data page has been
501     // exhausted
502     if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
503       if (!ReadNewPage() || num_buffered_values_ == 0) {
504         return false;
505       }
506     }
507     return true;
508   }
509 
510   // Read multiple repetition levels into preallocated memory
511   // Returns the number of decoded repetition levels
ReadRepetitionLevels(int64_t batch_size,int16_t * levels)512   int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
513     if (max_rep_level_ == 0) {
514       return 0;
515     }
516     return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
517   }
518 
519   // Advance to the next data page
ReadNewPage()520   bool ReadNewPage() {
521     // Loop until we find the next data page.
522     while (true) {
523       current_page_ = pager_->NextPage();
524       if (!current_page_) {
525         // EOS
526         return false;
527       }
528 
529       if (current_page_->type() == PageType::DICTIONARY_PAGE) {
530         ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
531         continue;
532       } else if (current_page_->type() == PageType::DATA_PAGE) {
533         const auto page = std::static_pointer_cast<DataPageV1>(current_page_);
534         const int64_t levels_byte_size = InitializeLevelDecoders(
535             *page, page->repetition_level_encoding(), page->definition_level_encoding());
536         InitializeDataDecoder(*page, levels_byte_size);
537         return true;
538       } else if (current_page_->type() == PageType::DATA_PAGE_V2) {
539         const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
540         int64_t levels_byte_size = InitializeLevelDecodersV2(*page);
541         InitializeDataDecoder(*page, levels_byte_size);
542         return true;
543       } else {
544         // We don't know what this page type is. We're allowed to skip non-data
545         // pages.
546         continue;
547       }
548     }
549     return true;
550   }
551 
ConfigureDictionary(const DictionaryPage * page)552   void ConfigureDictionary(const DictionaryPage* page) {
553     int encoding = static_cast<int>(page->encoding());
554     if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
555         page->encoding() == Encoding::PLAIN) {
556       encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
557     }
558 
559     auto it = decoders_.find(encoding);
560     if (it != decoders_.end()) {
561       throw ParquetException("Column cannot have more than one dictionary.");
562     }
563 
564     if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
565         page->encoding() == Encoding::PLAIN) {
566       auto dictionary = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
567       dictionary->SetData(page->num_values(), page->data(), page->size());
568 
569       // The dictionary is fully decoded during DictionaryDecoder::Init, so the
570       // DictionaryPage buffer is no longer required after this step
571       //
572       // TODO(wesm): investigate whether this all-or-nothing decoding of the
573       // dictionary makes sense and whether performance can be improved
574 
575       std::unique_ptr<DictDecoder<DType>> decoder = MakeDictDecoder<DType>(descr_, pool_);
576       decoder->SetDict(dictionary.get());
577       decoders_[encoding] =
578           std::unique_ptr<DecoderType>(dynamic_cast<DecoderType*>(decoder.release()));
579     } else {
580       ParquetException::NYI("only plain dictionary encoding has been implemented");
581     }
582 
583     new_dictionary_ = true;
584     current_decoder_ = decoders_[encoding].get();
585     DCHECK(current_decoder_);
586   }
587 
588   // Initialize repetition and definition level decoders on the next data page.
589 
590   // If the data page includes repetition and definition levels, we
591   // initialize the level decoders and return the number of encoded level bytes.
592   // The return value helps determine the number of bytes in the encoded data.
InitializeLevelDecoders(const DataPage & page,Encoding::type repetition_level_encoding,Encoding::type definition_level_encoding)593   int64_t InitializeLevelDecoders(const DataPage& page,
594                                   Encoding::type repetition_level_encoding,
595                                   Encoding::type definition_level_encoding) {
596     // Read a data page.
597     num_buffered_values_ = page.num_values();
598 
599     // Have not decoded any values from the data page yet
600     num_decoded_values_ = 0;
601 
602     const uint8_t* buffer = page.data();
603     int32_t levels_byte_size = 0;
604     int32_t max_size = page.size();
605 
606     // Data page Layout: Repetition Levels - Definition Levels - encoded values.
607     // Levels are encoded as rle or bit-packed.
608     // Init repetition levels
609     if (max_rep_level_ > 0) {
610       int32_t rep_levels_bytes = repetition_level_decoder_.SetData(
611           repetition_level_encoding, max_rep_level_,
612           static_cast<int>(num_buffered_values_), buffer, max_size);
613       buffer += rep_levels_bytes;
614       levels_byte_size += rep_levels_bytes;
615       max_size -= rep_levels_bytes;
616     }
617     // TODO figure a way to set max_def_level_ to 0
618     // if the initial value is invalid
619 
620     // Init definition levels
621     if (max_def_level_ > 0) {
622       int32_t def_levels_bytes = definition_level_decoder_.SetData(
623           definition_level_encoding, max_def_level_,
624           static_cast<int>(num_buffered_values_), buffer, max_size);
625       levels_byte_size += def_levels_bytes;
626       max_size -= def_levels_bytes;
627     }
628 
629     return levels_byte_size;
630   }
631 
InitializeLevelDecodersV2(const DataPageV2 & page)632   int64_t InitializeLevelDecodersV2(const DataPageV2& page) {
633     // Read a data page.
634     num_buffered_values_ = page.num_values();
635 
636     // Have not decoded any values from the data page yet
637     num_decoded_values_ = 0;
638     const uint8_t* buffer = page.data();
639 
640     const int64_t total_levels_length =
641         static_cast<int64_t>(page.repetition_levels_byte_length()) +
642         page.definition_levels_byte_length();
643 
644     if (total_levels_length > page.size()) {
645       throw ParquetException("Data page too small for levels (corrupt header?)");
646     }
647 
648     if (max_rep_level_ > 0) {
649       repetition_level_decoder_.SetDataV2(page.repetition_levels_byte_length(),
650                                           max_rep_level_,
651                                           static_cast<int>(num_buffered_values_), buffer);
652       buffer += page.repetition_levels_byte_length();
653     }
654 
655     if (max_def_level_ > 0) {
656       definition_level_decoder_.SetDataV2(page.definition_levels_byte_length(),
657                                           max_def_level_,
658                                           static_cast<int>(num_buffered_values_), buffer);
659     }
660 
661     return total_levels_length;
662   }
663 
664   // Get a decoder object for this page or create a new decoder if this is the
665   // first page with this encoding.
InitializeDataDecoder(const DataPage & page,int64_t levels_byte_size)666   void InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) {
667     const uint8_t* buffer = page.data() + levels_byte_size;
668     const int64_t data_size = page.size() - levels_byte_size;
669 
670     if (data_size < 0) {
671       throw ParquetException("Page smaller than size of encoded levels");
672     }
673 
674     Encoding::type encoding = page.encoding();
675 
676     if (IsDictionaryIndexEncoding(encoding)) {
677       encoding = Encoding::RLE_DICTIONARY;
678     }
679 
680     auto it = decoders_.find(static_cast<int>(encoding));
681     if (it != decoders_.end()) {
682       DCHECK(it->second.get() != nullptr);
683       if (encoding == Encoding::RLE_DICTIONARY) {
684         DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
685       }
686       current_decoder_ = it->second.get();
687     } else {
688       switch (encoding) {
689         case Encoding::PLAIN: {
690           auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
691           current_decoder_ = decoder.get();
692           decoders_[static_cast<int>(encoding)] = std::move(decoder);
693           break;
694         }
695         case Encoding::BYTE_STREAM_SPLIT: {
696           auto decoder = MakeTypedDecoder<DType>(Encoding::BYTE_STREAM_SPLIT, descr_);
697           current_decoder_ = decoder.get();
698           decoders_[static_cast<int>(encoding)] = std::move(decoder);
699           break;
700         }
701         case Encoding::RLE_DICTIONARY:
702           throw ParquetException("Dictionary page must be before data page.");
703 
704         case Encoding::DELTA_BINARY_PACKED:
705         case Encoding::DELTA_LENGTH_BYTE_ARRAY:
706         case Encoding::DELTA_BYTE_ARRAY:
707           ParquetException::NYI("Unsupported encoding");
708 
709         default:
710           throw ParquetException("Unknown encoding type.");
711       }
712     }
713     current_encoding_ = encoding;
714     current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
715                               static_cast<int>(data_size));
716   }
717 
718   const ColumnDescriptor* descr_;
719   const int16_t max_def_level_;
720   const int16_t max_rep_level_;
721 
722   std::unique_ptr<PageReader> pager_;
723   std::shared_ptr<Page> current_page_;
724 
725   // Not set if full schema for this field has no optional or repeated elements
726   LevelDecoder definition_level_decoder_;
727 
728   // Not set for flat schemas.
729   LevelDecoder repetition_level_decoder_;
730 
731   // The total number of values stored in the data page. This is the maximum of
732   // the number of encoded definition levels or encoded values. For
733   // non-repeated, required columns, this is equal to the number of encoded
734   // values. For repeated or optional values, there may be fewer data values
735   // than levels, and this tells you how many encoded levels there are in that
736   // case.
737   int64_t num_buffered_values_;
738 
739   // The number of values from the current data page that have been decoded
740   // into memory
741   int64_t num_decoded_values_;
742 
743   ::arrow::MemoryPool* pool_;
744 
745   using DecoderType = TypedDecoder<DType>;
746   DecoderType* current_decoder_;
747   Encoding::type current_encoding_;
748 
749   /// Flag to signal when a new dictionary has been set, for the benefit of
750   /// DictionaryRecordReader
751   bool new_dictionary_;
752 
753   // Map of encoding type to the respective decoder object. For example, a
754   // column chunk's data pages may include both dictionary-encoded and
755   // plain-encoded data.
756   std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_;
757 
ConsumeBufferedValues(int64_t num_values)758   void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
759 };
760 
761 // ----------------------------------------------------------------------
762 // TypedColumnReader implementations
763 
764 template <typename DType>
765 class TypedColumnReaderImpl : public TypedColumnReader<DType>,
766                               public ColumnReaderImplBase<DType> {
767  public:
768   using T = typename DType::c_type;
769 
TypedColumnReaderImpl(const ColumnDescriptor * descr,std::unique_ptr<PageReader> pager,::arrow::MemoryPool * pool)770   TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager,
771                         ::arrow::MemoryPool* pool)
772       : ColumnReaderImplBase<DType>(descr, pool) {
773     this->pager_ = std::move(pager);
774   }
775 
HasNext()776   bool HasNext() override { return this->HasNextInternal(); }
777 
778   int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
779                     T* values, int64_t* values_read) override;
780 
781   int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
782                           T* values, uint8_t* valid_bits, int64_t valid_bits_offset,
783                           int64_t* levels_read, int64_t* values_read,
784                           int64_t* null_count) override;
785 
786   int64_t Skip(int64_t num_rows_to_skip) override;
787 
type() const788   Type::type type() const override { return this->descr_->physical_type(); }
789 
descr() const790   const ColumnDescriptor* descr() const override { return this->descr_; }
791 };
792 
793 template <typename DType>
ReadBatch(int64_t batch_size,int16_t * def_levels,int16_t * rep_levels,T * values,int64_t * values_read)794 int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels,
795                                                 int16_t* rep_levels, T* values,
796                                                 int64_t* values_read) {
797   // HasNext invokes ReadNewPage
798   if (!HasNext()) {
799     *values_read = 0;
800     return 0;
801   }
802 
803   // TODO(wesm): keep reading data pages until batch_size is reached, or the
804   // row group is finished
805   batch_size =
806       std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
807 
808   int64_t num_def_levels = 0;
809   int64_t num_rep_levels = 0;
810 
811   int64_t values_to_read = 0;
812 
813   // If the field is required and non-repeated, there are no definition levels
814   if (this->max_def_level_ > 0 && def_levels) {
815     num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
816     // TODO(wesm): this tallying of values-to-decode can be performed with better
817     // cache-efficiency if fused with the level decoding.
818     for (int64_t i = 0; i < num_def_levels; ++i) {
819       if (def_levels[i] == this->max_def_level_) {
820         ++values_to_read;
821       }
822     }
823   } else {
824     // Required field, read all values
825     values_to_read = batch_size;
826   }
827 
828   // Not present for non-repeated fields
829   if (this->max_rep_level_ > 0 && rep_levels) {
830     num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
831     if (def_levels && num_def_levels != num_rep_levels) {
832       throw ParquetException("Number of decoded rep / def levels did not match");
833     }
834   }
835 
836   *values_read = this->ReadValues(values_to_read, values);
837   int64_t total_values = std::max(num_def_levels, *values_read);
838   this->ConsumeBufferedValues(total_values);
839 
840   return total_values;
841 }
842 
843 template <typename DType>
ReadBatchSpaced(int64_t batch_size,int16_t * def_levels,int16_t * rep_levels,T * values,uint8_t * valid_bits,int64_t valid_bits_offset,int64_t * levels_read,int64_t * values_read,int64_t * null_count_out)844 int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
845     int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
846     uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
847     int64_t* values_read, int64_t* null_count_out) {
848   // HasNext invokes ReadNewPage
849   if (!HasNext()) {
850     *levels_read = 0;
851     *values_read = 0;
852     *null_count_out = 0;
853     return 0;
854   }
855 
856   int64_t total_values;
857   // TODO(wesm): keep reading data pages until batch_size is reached, or the
858   // row group is finished
859   batch_size =
860       std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
861 
862   // If the field is required and non-repeated, there are no definition levels
863   if (this->max_def_level_ > 0) {
864     int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
865 
866     // Not present for non-repeated fields
867     if (this->max_rep_level_ > 0) {
868       int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
869       if (num_def_levels != num_rep_levels) {
870         throw ParquetException("Number of decoded rep / def levels did not match");
871       }
872     }
873 
874     const bool has_spaced_values = internal::HasSpacedValues(this->descr_);
875 
876     int64_t null_count = 0;
877     if (!has_spaced_values) {
878       int values_to_read = 0;
879       for (int64_t i = 0; i < num_def_levels; ++i) {
880         if (def_levels[i] == this->max_def_level_) {
881           ++values_to_read;
882         }
883       }
884       total_values = this->ReadValues(values_to_read, values);
885       for (int64_t i = 0; i < total_values; i++) {
886         ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
887       }
888       *values_read = total_values;
889     } else {
890       internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, this->max_def_level_,
891                                          this->max_rep_level_, values_read, &null_count,
892                                          valid_bits, valid_bits_offset);
893       total_values =
894           this->ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
895                                  valid_bits, valid_bits_offset);
896     }
897     *levels_read = num_def_levels;
898     *null_count_out = null_count;
899 
900   } else {
901     // Required field, read all values
902     total_values = this->ReadValues(batch_size, values);
903     for (int64_t i = 0; i < total_values; i++) {
904       ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
905     }
906     *null_count_out = 0;
907     *levels_read = total_values;
908   }
909 
910   this->ConsumeBufferedValues(*levels_read);
911   return total_values;
912 }
913 
914 template <typename DType>
Skip(int64_t num_rows_to_skip)915 int64_t TypedColumnReaderImpl<DType>::Skip(int64_t num_rows_to_skip) {
916   int64_t rows_to_skip = num_rows_to_skip;
917   while (HasNext() && rows_to_skip > 0) {
918     // If the number of rows to skip is more than the number of undecoded values, skip the
919     // Page.
920     if (rows_to_skip > (this->num_buffered_values_ - this->num_decoded_values_)) {
921       rows_to_skip -= this->num_buffered_values_ - this->num_decoded_values_;
922       this->num_decoded_values_ = this->num_buffered_values_;
923     } else {
924       // We need to read this Page
925       // Jump to the right offset in the Page
926       int64_t batch_size = 1024;  // ReadBatch with a smaller memory footprint
927       int64_t values_read = 0;
928 
929       // This will be enough scratch space to accommodate 16-bit levels or any
930       // value type
931       std::shared_ptr<ResizableBuffer> scratch = AllocateBuffer(
932           this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size);
933 
934       do {
935         batch_size = std::min(batch_size, rows_to_skip);
936         values_read =
937             ReadBatch(static_cast<int>(batch_size),
938                       reinterpret_cast<int16_t*>(scratch->mutable_data()),
939                       reinterpret_cast<int16_t*>(scratch->mutable_data()),
940                       reinterpret_cast<T*>(scratch->mutable_data()), &values_read);
941         rows_to_skip -= values_read;
942       } while (values_read > 0 && rows_to_skip > 0);
943     }
944   }
945   return num_rows_to_skip - rows_to_skip;
946 }
947 
948 // ----------------------------------------------------------------------
949 // Dynamic column reader constructor
950 
Make(const ColumnDescriptor * descr,std::unique_ptr<PageReader> pager,MemoryPool * pool)951 std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr,
952                                                  std::unique_ptr<PageReader> pager,
953                                                  MemoryPool* pool) {
954   switch (descr->physical_type()) {
955     case Type::BOOLEAN:
956       return std::make_shared<TypedColumnReaderImpl<BooleanType>>(descr, std::move(pager),
957                                                                   pool);
958     case Type::INT32:
959       return std::make_shared<TypedColumnReaderImpl<Int32Type>>(descr, std::move(pager),
960                                                                 pool);
961     case Type::INT64:
962       return std::make_shared<TypedColumnReaderImpl<Int64Type>>(descr, std::move(pager),
963                                                                 pool);
964     case Type::INT96:
965       return std::make_shared<TypedColumnReaderImpl<Int96Type>>(descr, std::move(pager),
966                                                                 pool);
967     case Type::FLOAT:
968       return std::make_shared<TypedColumnReaderImpl<FloatType>>(descr, std::move(pager),
969                                                                 pool);
970     case Type::DOUBLE:
971       return std::make_shared<TypedColumnReaderImpl<DoubleType>>(descr, std::move(pager),
972                                                                  pool);
973     case Type::BYTE_ARRAY:
974       return std::make_shared<TypedColumnReaderImpl<ByteArrayType>>(
975           descr, std::move(pager), pool);
976     case Type::FIXED_LEN_BYTE_ARRAY:
977       return std::make_shared<TypedColumnReaderImpl<FLBAType>>(descr, std::move(pager),
978                                                                pool);
979     default:
980       ParquetException::NYI("type reader not implemented");
981   }
982   // Unreachable code, but suppress compiler warning
983   return std::shared_ptr<ColumnReader>(nullptr);
984 }
985 
986 // ----------------------------------------------------------------------
987 // RecordReader
988 
989 namespace internal {
990 
991 // The minimum number of repetition/definition levels to decode at a time, for
992 // better vectorized performance when doing many smaller record reads
993 constexpr int64_t kMinLevelBatchSize = 1024;
994 
995 template <typename DType>
996 class TypedRecordReader : public ColumnReaderImplBase<DType>,
997                           virtual public RecordReader {
998  public:
999   using T = typename DType::c_type;
1000   using BASE = ColumnReaderImplBase<DType>;
TypedRecordReader(const ColumnDescriptor * descr,MemoryPool * pool)1001   TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) : BASE(descr, pool) {
1002     nullable_values_ = internal::HasSpacedValues(descr);
1003     at_record_start_ = true;
1004     records_read_ = 0;
1005     values_written_ = 0;
1006     values_capacity_ = 0;
1007     null_count_ = 0;
1008     levels_written_ = 0;
1009     levels_position_ = 0;
1010     levels_capacity_ = 0;
1011     uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY);
1012 
1013     if (uses_values_) {
1014       values_ = AllocateBuffer(pool);
1015     }
1016     valid_bits_ = AllocateBuffer(pool);
1017     def_levels_ = AllocateBuffer(pool);
1018     rep_levels_ = AllocateBuffer(pool);
1019     Reset();
1020   }
1021 
available_values_current_page() const1022   int64_t available_values_current_page() const {
1023     return this->num_buffered_values_ - this->num_decoded_values_;
1024   }
1025 
1026   // Compute the values capacity in bytes for the given number of elements
bytes_for_values(int64_t nitems) const1027   int64_t bytes_for_values(int64_t nitems) const {
1028     int64_t type_size = GetTypeByteSize(this->descr_->physical_type());
1029     if (::arrow::internal::HasMultiplyOverflow(nitems, type_size)) {
1030       throw ParquetException("Total size of items too large");
1031     }
1032     return nitems * type_size;
1033   }
1034 
ReadRecords(int64_t num_records)1035   int64_t ReadRecords(int64_t num_records) override {
1036     // Delimit records, then read values at the end
1037     int64_t records_read = 0;
1038 
1039     if (levels_position_ < levels_written_) {
1040       records_read += ReadRecordData(num_records);
1041     }
1042 
1043     int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
1044 
1045     // If we are in the middle of a record, we continue until reaching the
1046     // desired number of records or the end of the current record if we've found
1047     // enough records
1048     while (!at_record_start_ || records_read < num_records) {
1049       // Is there more data to read in this row group?
1050       if (!this->HasNextInternal()) {
1051         if (!at_record_start_) {
1052           // We ended the row group while inside a record that we haven't seen
1053           // the end of yet. So increment the record count for the last record in
1054           // the row group
1055           ++records_read;
1056           at_record_start_ = true;
1057         }
1058         break;
1059       }
1060 
1061       /// We perform multiple batch reads until we either exhaust the row group
1062       /// or observe the desired number of records
1063       int64_t batch_size = std::min(level_batch_size, available_values_current_page());
1064 
1065       // No more data in column
1066       if (batch_size == 0) {
1067         break;
1068       }
1069 
1070       if (this->max_def_level_ > 0) {
1071         ReserveLevels(batch_size);
1072 
1073         int16_t* def_levels = this->def_levels() + levels_written_;
1074         int16_t* rep_levels = this->rep_levels() + levels_written_;
1075 
1076         // Not present for non-repeated fields
1077         int64_t levels_read = 0;
1078         if (this->max_rep_level_ > 0) {
1079           levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
1080           if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
1081             throw ParquetException("Number of decoded rep / def levels did not match");
1082           }
1083         } else if (this->max_def_level_ > 0) {
1084           levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
1085         }
1086 
1087         // Exhausted column chunk
1088         if (levels_read == 0) {
1089           break;
1090         }
1091 
1092         levels_written_ += levels_read;
1093         records_read += ReadRecordData(num_records - records_read);
1094       } else {
1095         // No repetition or definition levels
1096         batch_size = std::min(num_records - records_read, batch_size);
1097         records_read += ReadRecordData(batch_size);
1098       }
1099     }
1100 
1101     return records_read;
1102   }
1103 
1104   // We may outwardly have the appearance of having exhausted a column chunk
1105   // when in fact we are in the middle of processing the last batch
has_values_to_process() const1106   bool has_values_to_process() const { return levels_position_ < levels_written_; }
1107 
ReleaseValues()1108   std::shared_ptr<ResizableBuffer> ReleaseValues() override {
1109     if (uses_values_) {
1110       auto result = values_;
1111       PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true));
1112       values_ = AllocateBuffer(this->pool_);
1113       return result;
1114     } else {
1115       return nullptr;
1116     }
1117   }
1118 
ReleaseIsValid()1119   std::shared_ptr<ResizableBuffer> ReleaseIsValid() override {
1120     if (nullable_values_) {
1121       auto result = valid_bits_;
1122       PARQUET_THROW_NOT_OK(result->Resize(BitUtil::BytesForBits(values_written_), true));
1123       valid_bits_ = AllocateBuffer(this->pool_);
1124       return result;
1125     } else {
1126       return nullptr;
1127     }
1128   }
1129 
1130   // Process written repetition/definition levels to reach the end of
1131   // records. Process no more levels than necessary to delimit the indicated
1132   // number of logical records. Updates internal state of RecordReader
1133   //
1134   // \return Number of records delimited
DelimitRecords(int64_t num_records,int64_t * values_seen)1135   int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
1136     int64_t values_to_read = 0;
1137     int64_t records_read = 0;
1138 
1139     const int16_t* def_levels = this->def_levels() + levels_position_;
1140     const int16_t* rep_levels = this->rep_levels() + levels_position_;
1141 
1142     DCHECK_GT(this->max_rep_level_, 0);
1143 
1144     // Count logical records and number of values to read
1145     while (levels_position_ < levels_written_) {
1146       if (*rep_levels++ == 0) {
1147         // If at_record_start_ is true, we are seeing the start of a record
1148         // for the second time, such as after repeated calls to
1149         // DelimitRecords. In this case we must continue until we find
1150         // another record start or exhausting the ColumnChunk
1151         if (!at_record_start_) {
1152           // We've reached the end of a record; increment the record count.
1153           ++records_read;
1154           if (records_read == num_records) {
1155             // We've found the number of records we were looking for. Set
1156             // at_record_start_ to true and break
1157             at_record_start_ = true;
1158             break;
1159           }
1160         }
1161       }
1162 
1163       // We have decided to consume the level at this position; therefore we
1164       // must advance until we find another record boundary
1165       at_record_start_ = false;
1166 
1167       if (*def_levels++ == this->max_def_level_) {
1168         ++values_to_read;
1169       }
1170       ++levels_position_;
1171     }
1172     *values_seen = values_to_read;
1173     return records_read;
1174   }
1175 
Reserve(int64_t capacity)1176   void Reserve(int64_t capacity) override {
1177     ReserveLevels(capacity);
1178     ReserveValues(capacity);
1179   }
1180 
UpdateCapacity(int64_t capacity,int64_t size,int64_t extra_size)1181   int64_t UpdateCapacity(int64_t capacity, int64_t size, int64_t extra_size) {
1182     if (extra_size < 0) {
1183       throw ParquetException("Negative size (corrupt file?)");
1184     }
1185     if (::arrow::internal::HasAdditionOverflow(size, extra_size)) {
1186       throw ParquetException("Allocation size too large (corrupt file?)");
1187     }
1188     const int64_t target_size = size + extra_size;
1189     if (target_size >= (1LL << 62)) {
1190       throw ParquetException("Allocation size too large (corrupt file?)");
1191     }
1192     if (capacity >= target_size) {
1193       return capacity;
1194     }
1195     return BitUtil::NextPower2(target_size);
1196   }
1197 
ReserveLevels(int64_t extra_levels)1198   void ReserveLevels(int64_t extra_levels) {
1199     if (this->max_def_level_ > 0) {
1200       const int64_t new_levels_capacity =
1201           UpdateCapacity(levels_capacity_, levels_written_, extra_levels);
1202       if (new_levels_capacity > levels_capacity_) {
1203         constexpr auto kItemSize = static_cast<int64_t>(sizeof(int16_t));
1204         if (::arrow::internal::HasMultiplyOverflow(new_levels_capacity, kItemSize)) {
1205           throw ParquetException("Allocation size too large (corrupt file?)");
1206         }
1207         PARQUET_THROW_NOT_OK(def_levels_->Resize(new_levels_capacity * kItemSize, false));
1208         if (this->max_rep_level_ > 0) {
1209           PARQUET_THROW_NOT_OK(
1210               rep_levels_->Resize(new_levels_capacity * kItemSize, false));
1211         }
1212         levels_capacity_ = new_levels_capacity;
1213       }
1214     }
1215   }
1216 
ReserveValues(int64_t extra_values)1217   void ReserveValues(int64_t extra_values) {
1218     const int64_t new_values_capacity =
1219         UpdateCapacity(values_capacity_, values_written_, extra_values);
1220     if (new_values_capacity > values_capacity_) {
1221       // XXX(wesm): A hack to avoid memory allocation when reading directly
1222       // into builder classes
1223       if (uses_values_) {
1224         PARQUET_THROW_NOT_OK(
1225             values_->Resize(bytes_for_values(new_values_capacity), false));
1226       }
1227       values_capacity_ = new_values_capacity;
1228     }
1229     if (nullable_values_) {
1230       int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_);
1231       if (valid_bits_->size() < valid_bytes_new) {
1232         int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_);
1233         PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
1234 
1235         // Avoid valgrind warnings
1236         memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
1237                valid_bytes_new - valid_bytes_old);
1238       }
1239     }
1240   }
1241 
Reset()1242   void Reset() override {
1243     ResetValues();
1244 
1245     if (levels_written_ > 0) {
1246       const int64_t levels_remaining = levels_written_ - levels_position_;
1247       // Shift remaining levels to beginning of buffer and trim to only the number
1248       // of decoded levels remaining
1249       int16_t* def_data = def_levels();
1250       int16_t* rep_data = rep_levels();
1251 
1252       std::copy(def_data + levels_position_, def_data + levels_written_, def_data);
1253       PARQUET_THROW_NOT_OK(
1254           def_levels_->Resize(levels_remaining * sizeof(int16_t), false));
1255 
1256       if (this->max_rep_level_ > 0) {
1257         std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data);
1258         PARQUET_THROW_NOT_OK(
1259             rep_levels_->Resize(levels_remaining * sizeof(int16_t), false));
1260       }
1261 
1262       levels_written_ -= levels_position_;
1263       levels_position_ = 0;
1264       levels_capacity_ = levels_remaining;
1265     }
1266 
1267     records_read_ = 0;
1268 
1269     // Call Finish on the binary builders to reset them
1270   }
1271 
SetPageReader(std::unique_ptr<PageReader> reader)1272   void SetPageReader(std::unique_ptr<PageReader> reader) override {
1273     at_record_start_ = true;
1274     this->pager_ = std::move(reader);
1275     ResetDecoders();
1276   }
1277 
HasMoreData() const1278   bool HasMoreData() const override { return this->pager_ != nullptr; }
1279 
1280   // Dictionary decoders must be reset when advancing row groups
ResetDecoders()1281   void ResetDecoders() { this->decoders_.clear(); }
1282 
ReadValuesSpaced(int64_t values_with_nulls,int64_t null_count)1283   virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) {
1284     uint8_t* valid_bits = valid_bits_->mutable_data();
1285     const int64_t valid_bits_offset = values_written_;
1286 
1287     int64_t num_decoded = this->current_decoder_->DecodeSpaced(
1288         ValuesHead<T>(), static_cast<int>(values_with_nulls),
1289         static_cast<int>(null_count), valid_bits, valid_bits_offset);
1290     DCHECK_EQ(num_decoded, values_with_nulls);
1291   }
1292 
ReadValuesDense(int64_t values_to_read)1293   virtual void ReadValuesDense(int64_t values_to_read) {
1294     int64_t num_decoded =
1295         this->current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read));
1296     DCHECK_EQ(num_decoded, values_to_read);
1297   }
1298 
1299   // Return number of logical records read
ReadRecordData(int64_t num_records)1300   int64_t ReadRecordData(int64_t num_records) {
1301     // Conservative upper bound
1302     const int64_t possible_num_values =
1303         std::max(num_records, levels_written_ - levels_position_);
1304     ReserveValues(possible_num_values);
1305 
1306     const int64_t start_levels_position = levels_position_;
1307 
1308     int64_t values_to_read = 0;
1309     int64_t records_read = 0;
1310     if (this->max_rep_level_ > 0) {
1311       records_read = DelimitRecords(num_records, &values_to_read);
1312     } else if (this->max_def_level_ > 0) {
1313       // No repetition levels, skip delimiting logic. Each level represents a
1314       // null or not null entry
1315       records_read = std::min(levels_written_ - levels_position_, num_records);
1316 
1317       // This is advanced by DelimitRecords, which we skipped
1318       levels_position_ += records_read;
1319     } else {
1320       records_read = values_to_read = num_records;
1321     }
1322 
1323     int64_t null_count = 0;
1324     if (nullable_values_) {
1325       int64_t values_with_nulls = 0;
1326       internal::DefinitionLevelsToBitmap(
1327           def_levels() + start_levels_position, levels_position_ - start_levels_position,
1328           this->max_def_level_, this->max_rep_level_, &values_with_nulls, &null_count,
1329           valid_bits_->mutable_data(), values_written_);
1330       values_to_read = values_with_nulls - null_count;
1331       DCHECK_GE(values_to_read, 0);
1332       ReadValuesSpaced(values_with_nulls, null_count);
1333     } else {
1334       DCHECK_GE(values_to_read, 0);
1335       ReadValuesDense(values_to_read);
1336     }
1337     if (this->max_def_level_ > 0) {
1338       // Optional, repeated, or some mix thereof
1339       this->ConsumeBufferedValues(levels_position_ - start_levels_position);
1340     } else {
1341       // Flat, non-repeated
1342       this->ConsumeBufferedValues(values_to_read);
1343     }
1344     // Total values, including null spaces, if any
1345     values_written_ += values_to_read + null_count;
1346     null_count_ += null_count;
1347 
1348     return records_read;
1349   }
1350 
DebugPrintState()1351   void DebugPrintState() override {
1352     const int16_t* def_levels = this->def_levels();
1353     const int16_t* rep_levels = this->rep_levels();
1354     const int64_t total_levels_read = levels_position_;
1355 
1356     const T* vals = reinterpret_cast<const T*>(this->values());
1357 
1358     std::cout << "def levels: ";
1359     for (int64_t i = 0; i < total_levels_read; ++i) {
1360       std::cout << def_levels[i] << " ";
1361     }
1362     std::cout << std::endl;
1363 
1364     std::cout << "rep levels: ";
1365     for (int64_t i = 0; i < total_levels_read; ++i) {
1366       std::cout << rep_levels[i] << " ";
1367     }
1368     std::cout << std::endl;
1369 
1370     std::cout << "values: ";
1371     for (int64_t i = 0; i < this->values_written(); ++i) {
1372       std::cout << vals[i] << " ";
1373     }
1374     std::cout << std::endl;
1375   }
1376 
ResetValues()1377   void ResetValues() {
1378     if (values_written_ > 0) {
1379       // Resize to 0, but do not shrink to fit
1380       if (uses_values_) {
1381         PARQUET_THROW_NOT_OK(values_->Resize(0, false));
1382       }
1383       PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
1384       values_written_ = 0;
1385       values_capacity_ = 0;
1386       null_count_ = 0;
1387     }
1388   }
1389 
1390  protected:
1391   template <typename T>
ValuesHead()1392   T* ValuesHead() {
1393     return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
1394   }
1395 };
1396 
1397 class FLBARecordReader : public TypedRecordReader<FLBAType>,
1398                          virtual public BinaryRecordReader {
1399  public:
FLBARecordReader(const ColumnDescriptor * descr,::arrow::MemoryPool * pool)1400   FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
1401       : TypedRecordReader<FLBAType>(descr, pool), builder_(nullptr) {
1402     DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
1403     int byte_width = descr_->type_length();
1404     std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
1405     builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, this->pool_));
1406   }
1407 
GetBuilderChunks()1408   ::arrow::ArrayVector GetBuilderChunks() override {
1409     std::shared_ptr<::arrow::Array> chunk;
1410     PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
1411     return ::arrow::ArrayVector({chunk});
1412   }
1413 
ReadValuesDense(int64_t values_to_read)1414   void ReadValuesDense(int64_t values_to_read) override {
1415     auto values = ValuesHead<FLBA>();
1416     int64_t num_decoded =
1417         this->current_decoder_->Decode(values, static_cast<int>(values_to_read));
1418     DCHECK_EQ(num_decoded, values_to_read);
1419 
1420     for (int64_t i = 0; i < num_decoded; i++) {
1421       PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
1422     }
1423     ResetValues();
1424   }
1425 
ReadValuesSpaced(int64_t values_to_read,int64_t null_count)1426   void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
1427     uint8_t* valid_bits = valid_bits_->mutable_data();
1428     const int64_t valid_bits_offset = values_written_;
1429     auto values = ValuesHead<FLBA>();
1430 
1431     int64_t num_decoded = this->current_decoder_->DecodeSpaced(
1432         values, static_cast<int>(values_to_read), static_cast<int>(null_count),
1433         valid_bits, valid_bits_offset);
1434     DCHECK_EQ(num_decoded, values_to_read);
1435 
1436     for (int64_t i = 0; i < num_decoded; i++) {
1437       if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
1438         PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
1439       } else {
1440         PARQUET_THROW_NOT_OK(builder_->AppendNull());
1441       }
1442     }
1443     ResetValues();
1444   }
1445 
1446  private:
1447   std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_;
1448 };
1449 
1450 class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
1451                                      virtual public BinaryRecordReader {
1452  public:
ByteArrayChunkedRecordReader(const ColumnDescriptor * descr,::arrow::MemoryPool * pool)1453   ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
1454       : TypedRecordReader<ByteArrayType>(descr, pool) {
1455     DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
1456     accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
1457   }
1458 
GetBuilderChunks()1459   ::arrow::ArrayVector GetBuilderChunks() override {
1460     ::arrow::ArrayVector result = accumulator_.chunks;
1461     if (result.size() == 0 || accumulator_.builder->length() > 0) {
1462       std::shared_ptr<::arrow::Array> last_chunk;
1463       PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
1464       result.push_back(std::move(last_chunk));
1465     }
1466     accumulator_.chunks = {};
1467     return result;
1468   }
1469 
ReadValuesDense(int64_t values_to_read)1470   void ReadValuesDense(int64_t values_to_read) override {
1471     int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
1472         static_cast<int>(values_to_read), &accumulator_);
1473     DCHECK_EQ(num_decoded, values_to_read);
1474     ResetValues();
1475   }
1476 
ReadValuesSpaced(int64_t values_to_read,int64_t null_count)1477   void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
1478     int64_t num_decoded = this->current_decoder_->DecodeArrow(
1479         static_cast<int>(values_to_read), static_cast<int>(null_count),
1480         valid_bits_->mutable_data(), values_written_, &accumulator_);
1481     DCHECK_EQ(num_decoded, values_to_read - null_count);
1482     ResetValues();
1483   }
1484 
1485  private:
1486   // Helper data structure for accumulating builder chunks
1487   typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
1488 };
1489 
1490 class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
1491                                         virtual public DictionaryRecordReader {
1492  public:
ByteArrayDictionaryRecordReader(const ColumnDescriptor * descr,::arrow::MemoryPool * pool)1493   ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr,
1494                                   ::arrow::MemoryPool* pool)
1495       : TypedRecordReader<ByteArrayType>(descr, pool), builder_(pool) {
1496     this->read_dictionary_ = true;
1497   }
1498 
GetResult()1499   std::shared_ptr<::arrow::ChunkedArray> GetResult() override {
1500     FlushBuilder();
1501     std::vector<std::shared_ptr<::arrow::Array>> result;
1502     std::swap(result, result_chunks_);
1503     return std::make_shared<::arrow::ChunkedArray>(std::move(result), builder_.type());
1504   }
1505 
FlushBuilder()1506   void FlushBuilder() {
1507     if (builder_.length() > 0) {
1508       std::shared_ptr<::arrow::Array> chunk;
1509       PARQUET_THROW_NOT_OK(builder_.Finish(&chunk));
1510       result_chunks_.emplace_back(std::move(chunk));
1511 
1512       // Also clears the dictionary memo table
1513       builder_.Reset();
1514     }
1515   }
1516 
MaybeWriteNewDictionary()1517   void MaybeWriteNewDictionary() {
1518     if (this->new_dictionary_) {
1519       /// If there is a new dictionary, we may need to flush the builder, then
1520       /// insert the new dictionary values
1521       FlushBuilder();
1522       builder_.ResetFull();
1523       auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
1524       decoder->InsertDictionary(&builder_);
1525       this->new_dictionary_ = false;
1526     }
1527   }
1528 
ReadValuesDense(int64_t values_to_read)1529   void ReadValuesDense(int64_t values_to_read) override {
1530     int64_t num_decoded = 0;
1531     if (current_encoding_ == Encoding::RLE_DICTIONARY) {
1532       MaybeWriteNewDictionary();
1533       auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
1534       num_decoded = decoder->DecodeIndices(static_cast<int>(values_to_read), &builder_);
1535     } else {
1536       num_decoded = this->current_decoder_->DecodeArrowNonNull(
1537           static_cast<int>(values_to_read), &builder_);
1538 
1539       /// Flush values since they have been copied into the builder
1540       ResetValues();
1541     }
1542     DCHECK_EQ(num_decoded, values_to_read);
1543   }
1544 
ReadValuesSpaced(int64_t values_to_read,int64_t null_count)1545   void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
1546     int64_t num_decoded = 0;
1547     if (current_encoding_ == Encoding::RLE_DICTIONARY) {
1548       MaybeWriteNewDictionary();
1549       auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
1550       num_decoded = decoder->DecodeIndicesSpaced(
1551           static_cast<int>(values_to_read), static_cast<int>(null_count),
1552           valid_bits_->mutable_data(), values_written_, &builder_);
1553     } else {
1554       num_decoded = this->current_decoder_->DecodeArrow(
1555           static_cast<int>(values_to_read), static_cast<int>(null_count),
1556           valid_bits_->mutable_data(), values_written_, &builder_);
1557 
1558       /// Flush values since they have been copied into the builder
1559       ResetValues();
1560     }
1561     DCHECK_EQ(num_decoded, values_to_read - null_count);
1562   }
1563 
1564  private:
1565   using BinaryDictDecoder = DictDecoder<ByteArrayType>;
1566 
1567   ::arrow::BinaryDictionary32Builder builder_;
1568   std::vector<std::shared_ptr<::arrow::Array>> result_chunks_;
1569 };
1570 
1571 // TODO(wesm): Implement these to some satisfaction
1572 template <>
DebugPrintState()1573 void TypedRecordReader<Int96Type>::DebugPrintState() {}
1574 
1575 template <>
DebugPrintState()1576 void TypedRecordReader<ByteArrayType>::DebugPrintState() {}
1577 
1578 template <>
DebugPrintState()1579 void TypedRecordReader<FLBAType>::DebugPrintState() {}
1580 
MakeByteArrayRecordReader(const ColumnDescriptor * descr,::arrow::MemoryPool * pool,bool read_dictionary)1581 std::shared_ptr<RecordReader> MakeByteArrayRecordReader(const ColumnDescriptor* descr,
1582                                                         ::arrow::MemoryPool* pool,
1583                                                         bool read_dictionary) {
1584   if (read_dictionary) {
1585     return std::make_shared<ByteArrayDictionaryRecordReader>(descr, pool);
1586   } else {
1587     return std::make_shared<ByteArrayChunkedRecordReader>(descr, pool);
1588   }
1589 }
1590 
Make(const ColumnDescriptor * descr,MemoryPool * pool,const bool read_dictionary)1591 std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
1592                                                  MemoryPool* pool,
1593                                                  const bool read_dictionary) {
1594   switch (descr->physical_type()) {
1595     case Type::BOOLEAN:
1596       return std::make_shared<TypedRecordReader<BooleanType>>(descr, pool);
1597     case Type::INT32:
1598       return std::make_shared<TypedRecordReader<Int32Type>>(descr, pool);
1599     case Type::INT64:
1600       return std::make_shared<TypedRecordReader<Int64Type>>(descr, pool);
1601     case Type::INT96:
1602       return std::make_shared<TypedRecordReader<Int96Type>>(descr, pool);
1603     case Type::FLOAT:
1604       return std::make_shared<TypedRecordReader<FloatType>>(descr, pool);
1605     case Type::DOUBLE:
1606       return std::make_shared<TypedRecordReader<DoubleType>>(descr, pool);
1607     case Type::BYTE_ARRAY:
1608       return MakeByteArrayRecordReader(descr, pool, read_dictionary);
1609     case Type::FIXED_LEN_BYTE_ARRAY:
1610       return std::make_shared<FLBARecordReader>(descr, pool);
1611     default: {
1612       // PARQUET-1481: This can occur if the file is corrupt
1613       std::stringstream ss;
1614       ss << "Invalid physical column type: " << static_cast<int>(descr->physical_type());
1615       throw ParquetException(ss.str());
1616     }
1617   }
1618   // Unreachable code, but suppress compiler warning
1619   return nullptr;
1620 }
1621 
1622 }  // namespace internal
1623 }  // namespace parquet
1624