1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "parquet/column_reader.h"
19
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <exception>
24 #include <iostream>
25 #include <memory>
26 #include <string>
27 #include <unordered_map>
28 #include <utility>
29 #include <vector>
30
31 #include "arrow/array.h"
32 #include "arrow/builder.h"
33 #include "arrow/table.h"
34 #include "arrow/type.h"
35 #include "arrow/util/bit_stream_utils.h"
36 #include "arrow/util/checked_cast.h"
37 #include "arrow/util/compression.h"
38 #include "arrow/util/int_util.h"
39 #include "arrow/util/logging.h"
40 #include "arrow/util/rle_encoding.h"
41 #include "parquet/column_page.h"
42 #include "parquet/encoding.h"
43 #include "parquet/encryption_internal.h"
44 #include "parquet/internal_file_decryptor.h"
45 #include "parquet/properties.h"
46 #include "parquet/statistics.h"
47 #include "parquet/thrift_internal.h" // IWYU pragma: keep
48
49 using arrow::MemoryPool;
50 using arrow::internal::checked_cast;
51
52 namespace parquet {
53
LevelDecoder()54 LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
55
~LevelDecoder()56 LevelDecoder::~LevelDecoder() {}
57
SetData(Encoding::type encoding,int16_t max_level,int num_buffered_values,const uint8_t * data,int32_t data_size)58 int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
59 int num_buffered_values, const uint8_t* data,
60 int32_t data_size) {
61 int32_t num_bytes = 0;
62 encoding_ = encoding;
63 num_values_remaining_ = num_buffered_values;
64 bit_width_ = BitUtil::Log2(max_level + 1);
65 switch (encoding) {
66 case Encoding::RLE: {
67 if (data_size < 4) {
68 throw ParquetException("Received invalid levels (corrupt data page?)");
69 }
70 num_bytes = ::arrow::util::SafeLoadAs<int32_t>(data);
71 if (num_bytes < 0 || num_bytes > data_size - 4) {
72 throw ParquetException("Received invalid number of bytes (corrupt data page?)");
73 }
74 const uint8_t* decoder_data = data + 4;
75 if (!rle_decoder_) {
76 rle_decoder_.reset(
77 new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_));
78 } else {
79 rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
80 }
81 return 4 + num_bytes;
82 }
83 case Encoding::BIT_PACKED: {
84 num_bytes =
85 static_cast<int32_t>(BitUtil::BytesForBits(num_buffered_values * bit_width_));
86 if (num_bytes < 0 || num_bytes > data_size - 4) {
87 throw ParquetException("Received invalid number of bytes (corrupt data page?)");
88 }
89 if (!bit_packed_decoder_) {
90 bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes));
91 } else {
92 bit_packed_decoder_->Reset(data, num_bytes);
93 }
94 return num_bytes;
95 }
96 default:
97 throw ParquetException("Unknown encoding type for levels.");
98 }
99 return -1;
100 }
101
SetDataV2(int32_t num_bytes,int16_t max_level,int num_buffered_values,const uint8_t * data)102 void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t max_level,
103 int num_buffered_values, const uint8_t* data) {
104 // Repetition and definition levels always uses RLE encoding
105 // in the DataPageV2 format.
106 if (num_bytes < 0) {
107 throw ParquetException("Invalid page header (corrupt data page?)");
108 }
109 encoding_ = Encoding::RLE;
110 num_values_remaining_ = num_buffered_values;
111 bit_width_ = BitUtil::Log2(max_level + 1);
112
113 if (!rle_decoder_) {
114 rle_decoder_.reset(new ::arrow::util::RleDecoder(data, num_bytes, bit_width_));
115 } else {
116 rle_decoder_->Reset(data, num_bytes, bit_width_);
117 }
118 }
119
Decode(int batch_size,int16_t * levels)120 int LevelDecoder::Decode(int batch_size, int16_t* levels) {
121 int num_decoded = 0;
122
123 int num_values = std::min(num_values_remaining_, batch_size);
124 if (encoding_ == Encoding::RLE) {
125 num_decoded = rle_decoder_->GetBatch(levels, num_values);
126 } else {
127 num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
128 }
129 num_values_remaining_ -= num_decoded;
130 return num_decoded;
131 }
132
default_reader_properties()133 ReaderProperties default_reader_properties() {
134 static ReaderProperties default_reader_properties;
135 return default_reader_properties;
136 }
137
138 // Extracts encoded statistics from V1 and V2 data page headers
139 template <typename H>
ExtractStatsFromHeader(const H & header)140 EncodedStatistics ExtractStatsFromHeader(const H& header) {
141 EncodedStatistics page_statistics;
142 if (!header.__isset.statistics) {
143 return page_statistics;
144 }
145 const format::Statistics& stats = header.statistics;
146 if (stats.__isset.max) {
147 page_statistics.set_max(stats.max);
148 }
149 if (stats.__isset.min) {
150 page_statistics.set_min(stats.min);
151 }
152 if (stats.__isset.null_count) {
153 page_statistics.set_null_count(stats.null_count);
154 }
155 if (stats.__isset.distinct_count) {
156 page_statistics.set_distinct_count(stats.distinct_count);
157 }
158 return page_statistics;
159 }
160
161 // ----------------------------------------------------------------------
162 // SerializedPageReader deserializes Thrift metadata and pages that have been
163 // assembled in a serialized stream for storing in a Parquet files
164
165 // This subclass delimits pages appearing in a serialized stream, each preceded
166 // by a serialized Thrift format::PageHeader indicating the type of each page
167 // and the page metadata.
168 class SerializedPageReader : public PageReader {
169 public:
SerializedPageReader(std::shared_ptr<ArrowInputStream> stream,int64_t total_num_rows,Compression::type codec,::arrow::MemoryPool * pool,const CryptoContext * crypto_ctx)170 SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows,
171 Compression::type codec, ::arrow::MemoryPool* pool,
172 const CryptoContext* crypto_ctx)
173 : stream_(std::move(stream)),
174 decompression_buffer_(AllocateBuffer(pool, 0)),
175 page_ordinal_(0),
176 seen_num_rows_(0),
177 total_num_rows_(total_num_rows),
178 decryption_buffer_(AllocateBuffer(pool, 0)) {
179 if (crypto_ctx != nullptr) {
180 crypto_ctx_ = *crypto_ctx;
181 InitDecryption();
182 }
183 max_page_header_size_ = kDefaultMaxPageHeaderSize;
184 decompressor_ = GetCodec(codec);
185 }
186
187 // Implement the PageReader interface
188 std::shared_ptr<Page> NextPage() override;
189
set_max_page_header_size(uint32_t size)190 void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
191
192 private:
193 void UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor, int8_t module_type,
194 const std::string& page_aad);
195
196 void InitDecryption();
197
198 std::shared_ptr<Buffer> DecompressPage(int compressed_len, int uncompressed_len,
199 const uint8_t* page_buffer);
200
201 std::shared_ptr<ArrowInputStream> stream_;
202
203 format::PageHeader current_page_header_;
204 std::shared_ptr<Page> current_page_;
205
206 // Compression codec to use.
207 std::unique_ptr<::arrow::util::Codec> decompressor_;
208 std::shared_ptr<ResizableBuffer> decompression_buffer_;
209
210 // The fields below are used for calculation of AAD (additional authenticated data)
211 // suffix which is part of the Parquet Modular Encryption.
212 // The AAD suffix for a parquet module is built internally by
213 // concatenating different parts some of which include
214 // the row group ordinal, column ordinal and page ordinal.
215 // Please refer to the encryption specification for more details:
216 // https://github.com/apache/parquet-format/blob/encryption/Encryption.md#44-additional-authenticated-data
217
218 // The ordinal fields in the context below are used for AAD suffix calculation.
219 CryptoContext crypto_ctx_;
220 int16_t page_ordinal_; // page ordinal does not count the dictionary page
221
222 // Maximum allowed page size
223 uint32_t max_page_header_size_;
224
225 // Number of rows read in data pages so far
226 int64_t seen_num_rows_;
227
228 // Number of rows in all the data pages
229 int64_t total_num_rows_;
230
231 // data_page_aad_ and data_page_header_aad_ contain the AAD for data page and data page
232 // header in a single column respectively.
233 // While calculating AAD for different pages in a single column the pages AAD is
234 // updated by only the page ordinal.
235 std::string data_page_aad_;
236 std::string data_page_header_aad_;
237 // Encryption
238 std::shared_ptr<ResizableBuffer> decryption_buffer_;
239 };
240
InitDecryption()241 void SerializedPageReader::InitDecryption() {
242 // Prepare the AAD for quick update later.
243 if (crypto_ctx_.data_decryptor != nullptr) {
244 DCHECK(!crypto_ctx_.data_decryptor->file_aad().empty());
245 data_page_aad_ = encryption::CreateModuleAad(
246 crypto_ctx_.data_decryptor->file_aad(), encryption::kDataPage,
247 crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal);
248 }
249 if (crypto_ctx_.meta_decryptor != nullptr) {
250 DCHECK(!crypto_ctx_.meta_decryptor->file_aad().empty());
251 data_page_header_aad_ = encryption::CreateModuleAad(
252 crypto_ctx_.meta_decryptor->file_aad(), encryption::kDataPageHeader,
253 crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal);
254 }
255 }
256
UpdateDecryption(const std::shared_ptr<Decryptor> & decryptor,int8_t module_type,const std::string & page_aad)257 void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor,
258 int8_t module_type,
259 const std::string& page_aad) {
260 DCHECK(decryptor != nullptr);
261 if (crypto_ctx_.start_decrypt_with_dictionary_page) {
262 std::string aad = encryption::CreateModuleAad(
263 decryptor->file_aad(), module_type, crypto_ctx_.row_group_ordinal,
264 crypto_ctx_.column_ordinal, kNonPageOrdinal);
265 decryptor->UpdateAad(aad);
266 } else {
267 encryption::QuickUpdatePageAad(page_aad, page_ordinal_);
268 decryptor->UpdateAad(page_aad);
269 }
270 }
271
NextPage()272 std::shared_ptr<Page> SerializedPageReader::NextPage() {
273 // Loop here because there may be unhandled page types that we skip until
274 // finding a page that we do know what to do with
275
276 while (seen_num_rows_ < total_num_rows_) {
277 uint32_t header_size = 0;
278 uint32_t allowed_page_size = kDefaultPageHeaderSize;
279
280 // Page headers can be very large because of page statistics
281 // We try to deserialize a larger buffer progressively
282 // until a maximum allowed header limit
283 while (true) {
284 PARQUET_ASSIGN_OR_THROW(auto view, stream_->Peek(allowed_page_size));
285 if (view.size() == 0) {
286 return std::shared_ptr<Page>(nullptr);
287 }
288
289 // This gets used, then set by DeserializeThriftMsg
290 header_size = static_cast<uint32_t>(view.size());
291 try {
292 if (crypto_ctx_.meta_decryptor != nullptr) {
293 UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader,
294 data_page_header_aad_);
295 }
296 DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(view.data()), &header_size,
297 ¤t_page_header_, crypto_ctx_.meta_decryptor);
298 break;
299 } catch (std::exception& e) {
300 // Failed to deserialize. Double the allowed page header size and try again
301 std::stringstream ss;
302 ss << e.what();
303 allowed_page_size *= 2;
304 if (allowed_page_size > max_page_header_size_) {
305 ss << "Deserializing page header failed.\n";
306 throw ParquetException(ss.str());
307 }
308 }
309 }
310 // Advance the stream offset
311 PARQUET_THROW_NOT_OK(stream_->Advance(header_size));
312
313 int compressed_len = current_page_header_.compressed_page_size;
314 int uncompressed_len = current_page_header_.uncompressed_page_size;
315 if (crypto_ctx_.data_decryptor != nullptr) {
316 UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
317 data_page_aad_);
318 }
319 // Read the compressed data page.
320 PARQUET_ASSIGN_OR_THROW(auto page_buffer, stream_->Read(compressed_len));
321 if (page_buffer->size() != compressed_len) {
322 std::stringstream ss;
323 ss << "Page was smaller (" << page_buffer->size() << ") than expected ("
324 << compressed_len << ")";
325 ParquetException::EofException(ss.str());
326 }
327
328 // Decrypt it if we need to
329 if (crypto_ctx_.data_decryptor != nullptr) {
330 PARQUET_THROW_NOT_OK(decryption_buffer_->Resize(
331 compressed_len - crypto_ctx_.data_decryptor->CiphertextSizeDelta(), false));
332 compressed_len = crypto_ctx_.data_decryptor->Decrypt(
333 page_buffer->data(), compressed_len, decryption_buffer_->mutable_data());
334
335 page_buffer = decryption_buffer_;
336 }
337 // Uncompress it if we need to
338 if (decompressor_ != nullptr) {
339 page_buffer = DecompressPage(compressed_len, uncompressed_len, page_buffer->data());
340 }
341
342 const PageType::type page_type = LoadEnumSafe(¤t_page_header_.type);
343
344 if (page_type == PageType::DICTIONARY_PAGE) {
345 crypto_ctx_.start_decrypt_with_dictionary_page = false;
346 const format::DictionaryPageHeader& dict_header =
347 current_page_header_.dictionary_page_header;
348
349 bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
350 if (dict_header.num_values < 0) {
351 throw ParquetException("Invalid page header (negative number of values)");
352 }
353
354 return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
355 LoadEnumSafe(&dict_header.encoding),
356 is_sorted);
357 } else if (page_type == PageType::DATA_PAGE) {
358 ++page_ordinal_;
359 const format::DataPageHeader& header = current_page_header_.data_page_header;
360
361 if (header.num_values < 0) {
362 throw ParquetException("Invalid page header (negative number of values)");
363 }
364 EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
365 seen_num_rows_ += header.num_values;
366
367 return std::make_shared<DataPageV1>(page_buffer, header.num_values,
368 LoadEnumSafe(&header.encoding),
369 LoadEnumSafe(&header.definition_level_encoding),
370 LoadEnumSafe(&header.repetition_level_encoding),
371 uncompressed_len, page_statistics);
372 } else if (page_type == PageType::DATA_PAGE_V2) {
373 ++page_ordinal_;
374 const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
375
376 if (header.num_values < 0) {
377 throw ParquetException("Invalid page header (negative number of values)");
378 }
379 if (header.definition_levels_byte_length < 0 ||
380 header.repetition_levels_byte_length < 0) {
381 throw ParquetException("Invalid page header (negative levels byte length)");
382 }
383 bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false;
384 EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
385 seen_num_rows_ += header.num_values;
386
387 return std::make_shared<DataPageV2>(
388 page_buffer, header.num_values, header.num_nulls, header.num_rows,
389 LoadEnumSafe(&header.encoding), header.definition_levels_byte_length,
390 header.repetition_levels_byte_length, uncompressed_len, is_compressed,
391 page_statistics);
392 } else {
393 // We don't know what this page type is. We're allowed to skip non-data
394 // pages.
395 continue;
396 }
397 }
398 return std::shared_ptr<Page>(nullptr);
399 }
400
DecompressPage(int compressed_len,int uncompressed_len,const uint8_t * page_buffer)401 std::shared_ptr<Buffer> SerializedPageReader::DecompressPage(int compressed_len,
402 int uncompressed_len,
403 const uint8_t* page_buffer) {
404 // Grow the uncompressed buffer if we need to.
405 if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
406 PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
407 }
408
409 if (current_page_header_.type != format::PageType::DATA_PAGE_V2) {
410 PARQUET_THROW_NOT_OK(
411 decompressor_->Decompress(compressed_len, page_buffer, uncompressed_len,
412 decompression_buffer_->mutable_data()));
413 } else {
414 // The levels are not compressed in V2 format
415 const auto& header = current_page_header_.data_page_header_v2;
416 int32_t levels_length =
417 header.repetition_levels_byte_length + header.definition_levels_byte_length;
418 uint8_t* decompressed = decompression_buffer_->mutable_data();
419 memcpy(decompressed, page_buffer, levels_length);
420 decompressed += levels_length;
421 const uint8_t* compressed_values = page_buffer + levels_length;
422
423 // Decompress the values
424 PARQUET_THROW_NOT_OK(
425 decompressor_->Decompress(compressed_len - levels_length, compressed_values,
426 uncompressed_len - levels_length, decompressed));
427 }
428
429 return decompression_buffer_;
430 }
431
Open(std::shared_ptr<ArrowInputStream> stream,int64_t total_num_rows,Compression::type codec,::arrow::MemoryPool * pool,const CryptoContext * ctx)432 std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream,
433 int64_t total_num_rows,
434 Compression::type codec,
435 ::arrow::MemoryPool* pool,
436 const CryptoContext* ctx) {
437 return std::unique_ptr<PageReader>(
438 new SerializedPageReader(std::move(stream), total_num_rows, codec, pool, ctx));
439 }
440
441 // ----------------------------------------------------------------------
442 // Impl base class for TypedColumnReader and RecordReader
443
444 // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
445 // encoding.
IsDictionaryIndexEncoding(const Encoding::type & e)446 static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
447 return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
448 }
449
450 template <typename DType>
451 class ColumnReaderImplBase {
452 public:
453 using T = typename DType::c_type;
454
ColumnReaderImplBase(const ColumnDescriptor * descr,::arrow::MemoryPool * pool)455 ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
456 : descr_(descr),
457 max_def_level_(descr->max_definition_level()),
458 max_rep_level_(descr->max_repetition_level()),
459 num_buffered_values_(0),
460 num_decoded_values_(0),
461 pool_(pool),
462 current_decoder_(nullptr),
463 current_encoding_(Encoding::UNKNOWN) {}
464
465 virtual ~ColumnReaderImplBase() = default;
466
467 protected:
468 // Read up to batch_size values from the current data page into the
469 // pre-allocated memory T*
470 //
471 // @returns: the number of values read into the out buffer
ReadValues(int64_t batch_size,T * out)472 int64_t ReadValues(int64_t batch_size, T* out) {
473 int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
474 return num_decoded;
475 }
476
477 // Read up to batch_size values from the current data page into the
478 // pre-allocated memory T*, leaving spaces for null entries according
479 // to the def_levels.
480 //
481 // @returns: the number of values read into the out buffer
ReadValuesSpaced(int64_t batch_size,T * out,int64_t null_count,uint8_t * valid_bits,int64_t valid_bits_offset)482 int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count,
483 uint8_t* valid_bits, int64_t valid_bits_offset) {
484 return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size),
485 static_cast<int>(null_count), valid_bits,
486 valid_bits_offset);
487 }
488
489 // Read multiple definition levels into preallocated memory
490 //
491 // Returns the number of decoded definition levels
ReadDefinitionLevels(int64_t batch_size,int16_t * levels)492 int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
493 if (max_def_level_ == 0) {
494 return 0;
495 }
496 return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
497 }
498
HasNextInternal()499 bool HasNextInternal() {
500 // Either there is no data page available yet, or the data page has been
501 // exhausted
502 if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
503 if (!ReadNewPage() || num_buffered_values_ == 0) {
504 return false;
505 }
506 }
507 return true;
508 }
509
510 // Read multiple repetition levels into preallocated memory
511 // Returns the number of decoded repetition levels
ReadRepetitionLevels(int64_t batch_size,int16_t * levels)512 int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
513 if (max_rep_level_ == 0) {
514 return 0;
515 }
516 return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
517 }
518
519 // Advance to the next data page
ReadNewPage()520 bool ReadNewPage() {
521 // Loop until we find the next data page.
522 while (true) {
523 current_page_ = pager_->NextPage();
524 if (!current_page_) {
525 // EOS
526 return false;
527 }
528
529 if (current_page_->type() == PageType::DICTIONARY_PAGE) {
530 ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
531 continue;
532 } else if (current_page_->type() == PageType::DATA_PAGE) {
533 const auto page = std::static_pointer_cast<DataPageV1>(current_page_);
534 const int64_t levels_byte_size = InitializeLevelDecoders(
535 *page, page->repetition_level_encoding(), page->definition_level_encoding());
536 InitializeDataDecoder(*page, levels_byte_size);
537 return true;
538 } else if (current_page_->type() == PageType::DATA_PAGE_V2) {
539 const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
540 int64_t levels_byte_size = InitializeLevelDecodersV2(*page);
541 InitializeDataDecoder(*page, levels_byte_size);
542 return true;
543 } else {
544 // We don't know what this page type is. We're allowed to skip non-data
545 // pages.
546 continue;
547 }
548 }
549 return true;
550 }
551
ConfigureDictionary(const DictionaryPage * page)552 void ConfigureDictionary(const DictionaryPage* page) {
553 int encoding = static_cast<int>(page->encoding());
554 if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
555 page->encoding() == Encoding::PLAIN) {
556 encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
557 }
558
559 auto it = decoders_.find(encoding);
560 if (it != decoders_.end()) {
561 throw ParquetException("Column cannot have more than one dictionary.");
562 }
563
564 if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
565 page->encoding() == Encoding::PLAIN) {
566 auto dictionary = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
567 dictionary->SetData(page->num_values(), page->data(), page->size());
568
569 // The dictionary is fully decoded during DictionaryDecoder::Init, so the
570 // DictionaryPage buffer is no longer required after this step
571 //
572 // TODO(wesm): investigate whether this all-or-nothing decoding of the
573 // dictionary makes sense and whether performance can be improved
574
575 std::unique_ptr<DictDecoder<DType>> decoder = MakeDictDecoder<DType>(descr_, pool_);
576 decoder->SetDict(dictionary.get());
577 decoders_[encoding] =
578 std::unique_ptr<DecoderType>(dynamic_cast<DecoderType*>(decoder.release()));
579 } else {
580 ParquetException::NYI("only plain dictionary encoding has been implemented");
581 }
582
583 new_dictionary_ = true;
584 current_decoder_ = decoders_[encoding].get();
585 DCHECK(current_decoder_);
586 }
587
588 // Initialize repetition and definition level decoders on the next data page.
589
590 // If the data page includes repetition and definition levels, we
591 // initialize the level decoders and return the number of encoded level bytes.
592 // The return value helps determine the number of bytes in the encoded data.
InitializeLevelDecoders(const DataPage & page,Encoding::type repetition_level_encoding,Encoding::type definition_level_encoding)593 int64_t InitializeLevelDecoders(const DataPage& page,
594 Encoding::type repetition_level_encoding,
595 Encoding::type definition_level_encoding) {
596 // Read a data page.
597 num_buffered_values_ = page.num_values();
598
599 // Have not decoded any values from the data page yet
600 num_decoded_values_ = 0;
601
602 const uint8_t* buffer = page.data();
603 int32_t levels_byte_size = 0;
604 int32_t max_size = page.size();
605
606 // Data page Layout: Repetition Levels - Definition Levels - encoded values.
607 // Levels are encoded as rle or bit-packed.
608 // Init repetition levels
609 if (max_rep_level_ > 0) {
610 int32_t rep_levels_bytes = repetition_level_decoder_.SetData(
611 repetition_level_encoding, max_rep_level_,
612 static_cast<int>(num_buffered_values_), buffer, max_size);
613 buffer += rep_levels_bytes;
614 levels_byte_size += rep_levels_bytes;
615 max_size -= rep_levels_bytes;
616 }
617 // TODO figure a way to set max_def_level_ to 0
618 // if the initial value is invalid
619
620 // Init definition levels
621 if (max_def_level_ > 0) {
622 int32_t def_levels_bytes = definition_level_decoder_.SetData(
623 definition_level_encoding, max_def_level_,
624 static_cast<int>(num_buffered_values_), buffer, max_size);
625 levels_byte_size += def_levels_bytes;
626 max_size -= def_levels_bytes;
627 }
628
629 return levels_byte_size;
630 }
631
InitializeLevelDecodersV2(const DataPageV2 & page)632 int64_t InitializeLevelDecodersV2(const DataPageV2& page) {
633 // Read a data page.
634 num_buffered_values_ = page.num_values();
635
636 // Have not decoded any values from the data page yet
637 num_decoded_values_ = 0;
638 const uint8_t* buffer = page.data();
639
640 const int64_t total_levels_length =
641 static_cast<int64_t>(page.repetition_levels_byte_length()) +
642 page.definition_levels_byte_length();
643
644 if (total_levels_length > page.size()) {
645 throw ParquetException("Data page too small for levels (corrupt header?)");
646 }
647
648 if (max_rep_level_ > 0) {
649 repetition_level_decoder_.SetDataV2(page.repetition_levels_byte_length(),
650 max_rep_level_,
651 static_cast<int>(num_buffered_values_), buffer);
652 buffer += page.repetition_levels_byte_length();
653 }
654
655 if (max_def_level_ > 0) {
656 definition_level_decoder_.SetDataV2(page.definition_levels_byte_length(),
657 max_def_level_,
658 static_cast<int>(num_buffered_values_), buffer);
659 }
660
661 return total_levels_length;
662 }
663
664 // Get a decoder object for this page or create a new decoder if this is the
665 // first page with this encoding.
InitializeDataDecoder(const DataPage & page,int64_t levels_byte_size)666 void InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) {
667 const uint8_t* buffer = page.data() + levels_byte_size;
668 const int64_t data_size = page.size() - levels_byte_size;
669
670 if (data_size < 0) {
671 throw ParquetException("Page smaller than size of encoded levels");
672 }
673
674 Encoding::type encoding = page.encoding();
675
676 if (IsDictionaryIndexEncoding(encoding)) {
677 encoding = Encoding::RLE_DICTIONARY;
678 }
679
680 auto it = decoders_.find(static_cast<int>(encoding));
681 if (it != decoders_.end()) {
682 DCHECK(it->second.get() != nullptr);
683 if (encoding == Encoding::RLE_DICTIONARY) {
684 DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
685 }
686 current_decoder_ = it->second.get();
687 } else {
688 switch (encoding) {
689 case Encoding::PLAIN: {
690 auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
691 current_decoder_ = decoder.get();
692 decoders_[static_cast<int>(encoding)] = std::move(decoder);
693 break;
694 }
695 case Encoding::BYTE_STREAM_SPLIT: {
696 auto decoder = MakeTypedDecoder<DType>(Encoding::BYTE_STREAM_SPLIT, descr_);
697 current_decoder_ = decoder.get();
698 decoders_[static_cast<int>(encoding)] = std::move(decoder);
699 break;
700 }
701 case Encoding::RLE_DICTIONARY:
702 throw ParquetException("Dictionary page must be before data page.");
703
704 case Encoding::DELTA_BINARY_PACKED:
705 case Encoding::DELTA_LENGTH_BYTE_ARRAY:
706 case Encoding::DELTA_BYTE_ARRAY:
707 ParquetException::NYI("Unsupported encoding");
708
709 default:
710 throw ParquetException("Unknown encoding type.");
711 }
712 }
713 current_encoding_ = encoding;
714 current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
715 static_cast<int>(data_size));
716 }
717
718 const ColumnDescriptor* descr_;
719 const int16_t max_def_level_;
720 const int16_t max_rep_level_;
721
722 std::unique_ptr<PageReader> pager_;
723 std::shared_ptr<Page> current_page_;
724
725 // Not set if full schema for this field has no optional or repeated elements
726 LevelDecoder definition_level_decoder_;
727
728 // Not set for flat schemas.
729 LevelDecoder repetition_level_decoder_;
730
731 // The total number of values stored in the data page. This is the maximum of
732 // the number of encoded definition levels or encoded values. For
733 // non-repeated, required columns, this is equal to the number of encoded
734 // values. For repeated or optional values, there may be fewer data values
735 // than levels, and this tells you how many encoded levels there are in that
736 // case.
737 int64_t num_buffered_values_;
738
739 // The number of values from the current data page that have been decoded
740 // into memory
741 int64_t num_decoded_values_;
742
743 ::arrow::MemoryPool* pool_;
744
745 using DecoderType = TypedDecoder<DType>;
746 DecoderType* current_decoder_;
747 Encoding::type current_encoding_;
748
749 /// Flag to signal when a new dictionary has been set, for the benefit of
750 /// DictionaryRecordReader
751 bool new_dictionary_;
752
753 // Map of encoding type to the respective decoder object. For example, a
754 // column chunk's data pages may include both dictionary-encoded and
755 // plain-encoded data.
756 std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_;
757
ConsumeBufferedValues(int64_t num_values)758 void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
759 };
760
761 // ----------------------------------------------------------------------
762 // TypedColumnReader implementations
763
764 template <typename DType>
765 class TypedColumnReaderImpl : public TypedColumnReader<DType>,
766 public ColumnReaderImplBase<DType> {
767 public:
768 using T = typename DType::c_type;
769
TypedColumnReaderImpl(const ColumnDescriptor * descr,std::unique_ptr<PageReader> pager,::arrow::MemoryPool * pool)770 TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager,
771 ::arrow::MemoryPool* pool)
772 : ColumnReaderImplBase<DType>(descr, pool) {
773 this->pager_ = std::move(pager);
774 }
775
HasNext()776 bool HasNext() override { return this->HasNextInternal(); }
777
778 int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
779 T* values, int64_t* values_read) override;
780
781 int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
782 T* values, uint8_t* valid_bits, int64_t valid_bits_offset,
783 int64_t* levels_read, int64_t* values_read,
784 int64_t* null_count) override;
785
786 int64_t Skip(int64_t num_rows_to_skip) override;
787
type() const788 Type::type type() const override { return this->descr_->physical_type(); }
789
descr() const790 const ColumnDescriptor* descr() const override { return this->descr_; }
791 };
792
793 template <typename DType>
ReadBatch(int64_t batch_size,int16_t * def_levels,int16_t * rep_levels,T * values,int64_t * values_read)794 int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels,
795 int16_t* rep_levels, T* values,
796 int64_t* values_read) {
797 // HasNext invokes ReadNewPage
798 if (!HasNext()) {
799 *values_read = 0;
800 return 0;
801 }
802
803 // TODO(wesm): keep reading data pages until batch_size is reached, or the
804 // row group is finished
805 batch_size =
806 std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
807
808 int64_t num_def_levels = 0;
809 int64_t num_rep_levels = 0;
810
811 int64_t values_to_read = 0;
812
813 // If the field is required and non-repeated, there are no definition levels
814 if (this->max_def_level_ > 0 && def_levels) {
815 num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
816 // TODO(wesm): this tallying of values-to-decode can be performed with better
817 // cache-efficiency if fused with the level decoding.
818 for (int64_t i = 0; i < num_def_levels; ++i) {
819 if (def_levels[i] == this->max_def_level_) {
820 ++values_to_read;
821 }
822 }
823 } else {
824 // Required field, read all values
825 values_to_read = batch_size;
826 }
827
828 // Not present for non-repeated fields
829 if (this->max_rep_level_ > 0 && rep_levels) {
830 num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
831 if (def_levels && num_def_levels != num_rep_levels) {
832 throw ParquetException("Number of decoded rep / def levels did not match");
833 }
834 }
835
836 *values_read = this->ReadValues(values_to_read, values);
837 int64_t total_values = std::max(num_def_levels, *values_read);
838 this->ConsumeBufferedValues(total_values);
839
840 return total_values;
841 }
842
843 template <typename DType>
ReadBatchSpaced(int64_t batch_size,int16_t * def_levels,int16_t * rep_levels,T * values,uint8_t * valid_bits,int64_t valid_bits_offset,int64_t * levels_read,int64_t * values_read,int64_t * null_count_out)844 int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
845 int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
846 uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
847 int64_t* values_read, int64_t* null_count_out) {
848 // HasNext invokes ReadNewPage
849 if (!HasNext()) {
850 *levels_read = 0;
851 *values_read = 0;
852 *null_count_out = 0;
853 return 0;
854 }
855
856 int64_t total_values;
857 // TODO(wesm): keep reading data pages until batch_size is reached, or the
858 // row group is finished
859 batch_size =
860 std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
861
862 // If the field is required and non-repeated, there are no definition levels
863 if (this->max_def_level_ > 0) {
864 int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
865
866 // Not present for non-repeated fields
867 if (this->max_rep_level_ > 0) {
868 int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
869 if (num_def_levels != num_rep_levels) {
870 throw ParquetException("Number of decoded rep / def levels did not match");
871 }
872 }
873
874 const bool has_spaced_values = internal::HasSpacedValues(this->descr_);
875
876 int64_t null_count = 0;
877 if (!has_spaced_values) {
878 int values_to_read = 0;
879 for (int64_t i = 0; i < num_def_levels; ++i) {
880 if (def_levels[i] == this->max_def_level_) {
881 ++values_to_read;
882 }
883 }
884 total_values = this->ReadValues(values_to_read, values);
885 for (int64_t i = 0; i < total_values; i++) {
886 ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
887 }
888 *values_read = total_values;
889 } else {
890 internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, this->max_def_level_,
891 this->max_rep_level_, values_read, &null_count,
892 valid_bits, valid_bits_offset);
893 total_values =
894 this->ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
895 valid_bits, valid_bits_offset);
896 }
897 *levels_read = num_def_levels;
898 *null_count_out = null_count;
899
900 } else {
901 // Required field, read all values
902 total_values = this->ReadValues(batch_size, values);
903 for (int64_t i = 0; i < total_values; i++) {
904 ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
905 }
906 *null_count_out = 0;
907 *levels_read = total_values;
908 }
909
910 this->ConsumeBufferedValues(*levels_read);
911 return total_values;
912 }
913
914 template <typename DType>
Skip(int64_t num_rows_to_skip)915 int64_t TypedColumnReaderImpl<DType>::Skip(int64_t num_rows_to_skip) {
916 int64_t rows_to_skip = num_rows_to_skip;
917 while (HasNext() && rows_to_skip > 0) {
918 // If the number of rows to skip is more than the number of undecoded values, skip the
919 // Page.
920 if (rows_to_skip > (this->num_buffered_values_ - this->num_decoded_values_)) {
921 rows_to_skip -= this->num_buffered_values_ - this->num_decoded_values_;
922 this->num_decoded_values_ = this->num_buffered_values_;
923 } else {
924 // We need to read this Page
925 // Jump to the right offset in the Page
926 int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint
927 int64_t values_read = 0;
928
929 // This will be enough scratch space to accommodate 16-bit levels or any
930 // value type
931 std::shared_ptr<ResizableBuffer> scratch = AllocateBuffer(
932 this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size);
933
934 do {
935 batch_size = std::min(batch_size, rows_to_skip);
936 values_read =
937 ReadBatch(static_cast<int>(batch_size),
938 reinterpret_cast<int16_t*>(scratch->mutable_data()),
939 reinterpret_cast<int16_t*>(scratch->mutable_data()),
940 reinterpret_cast<T*>(scratch->mutable_data()), &values_read);
941 rows_to_skip -= values_read;
942 } while (values_read > 0 && rows_to_skip > 0);
943 }
944 }
945 return num_rows_to_skip - rows_to_skip;
946 }
947
948 // ----------------------------------------------------------------------
949 // Dynamic column reader constructor
950
Make(const ColumnDescriptor * descr,std::unique_ptr<PageReader> pager,MemoryPool * pool)951 std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr,
952 std::unique_ptr<PageReader> pager,
953 MemoryPool* pool) {
954 switch (descr->physical_type()) {
955 case Type::BOOLEAN:
956 return std::make_shared<TypedColumnReaderImpl<BooleanType>>(descr, std::move(pager),
957 pool);
958 case Type::INT32:
959 return std::make_shared<TypedColumnReaderImpl<Int32Type>>(descr, std::move(pager),
960 pool);
961 case Type::INT64:
962 return std::make_shared<TypedColumnReaderImpl<Int64Type>>(descr, std::move(pager),
963 pool);
964 case Type::INT96:
965 return std::make_shared<TypedColumnReaderImpl<Int96Type>>(descr, std::move(pager),
966 pool);
967 case Type::FLOAT:
968 return std::make_shared<TypedColumnReaderImpl<FloatType>>(descr, std::move(pager),
969 pool);
970 case Type::DOUBLE:
971 return std::make_shared<TypedColumnReaderImpl<DoubleType>>(descr, std::move(pager),
972 pool);
973 case Type::BYTE_ARRAY:
974 return std::make_shared<TypedColumnReaderImpl<ByteArrayType>>(
975 descr, std::move(pager), pool);
976 case Type::FIXED_LEN_BYTE_ARRAY:
977 return std::make_shared<TypedColumnReaderImpl<FLBAType>>(descr, std::move(pager),
978 pool);
979 default:
980 ParquetException::NYI("type reader not implemented");
981 }
982 // Unreachable code, but suppress compiler warning
983 return std::shared_ptr<ColumnReader>(nullptr);
984 }
985
986 // ----------------------------------------------------------------------
987 // RecordReader
988
989 namespace internal {
990
991 // The minimum number of repetition/definition levels to decode at a time, for
992 // better vectorized performance when doing many smaller record reads
993 constexpr int64_t kMinLevelBatchSize = 1024;
994
995 template <typename DType>
996 class TypedRecordReader : public ColumnReaderImplBase<DType>,
997 virtual public RecordReader {
998 public:
999 using T = typename DType::c_type;
1000 using BASE = ColumnReaderImplBase<DType>;
TypedRecordReader(const ColumnDescriptor * descr,MemoryPool * pool)1001 TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) : BASE(descr, pool) {
1002 nullable_values_ = internal::HasSpacedValues(descr);
1003 at_record_start_ = true;
1004 records_read_ = 0;
1005 values_written_ = 0;
1006 values_capacity_ = 0;
1007 null_count_ = 0;
1008 levels_written_ = 0;
1009 levels_position_ = 0;
1010 levels_capacity_ = 0;
1011 uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY);
1012
1013 if (uses_values_) {
1014 values_ = AllocateBuffer(pool);
1015 }
1016 valid_bits_ = AllocateBuffer(pool);
1017 def_levels_ = AllocateBuffer(pool);
1018 rep_levels_ = AllocateBuffer(pool);
1019 Reset();
1020 }
1021
available_values_current_page() const1022 int64_t available_values_current_page() const {
1023 return this->num_buffered_values_ - this->num_decoded_values_;
1024 }
1025
1026 // Compute the values capacity in bytes for the given number of elements
bytes_for_values(int64_t nitems) const1027 int64_t bytes_for_values(int64_t nitems) const {
1028 int64_t type_size = GetTypeByteSize(this->descr_->physical_type());
1029 if (::arrow::internal::HasMultiplyOverflow(nitems, type_size)) {
1030 throw ParquetException("Total size of items too large");
1031 }
1032 return nitems * type_size;
1033 }
1034
ReadRecords(int64_t num_records)1035 int64_t ReadRecords(int64_t num_records) override {
1036 // Delimit records, then read values at the end
1037 int64_t records_read = 0;
1038
1039 if (levels_position_ < levels_written_) {
1040 records_read += ReadRecordData(num_records);
1041 }
1042
1043 int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
1044
1045 // If we are in the middle of a record, we continue until reaching the
1046 // desired number of records or the end of the current record if we've found
1047 // enough records
1048 while (!at_record_start_ || records_read < num_records) {
1049 // Is there more data to read in this row group?
1050 if (!this->HasNextInternal()) {
1051 if (!at_record_start_) {
1052 // We ended the row group while inside a record that we haven't seen
1053 // the end of yet. So increment the record count for the last record in
1054 // the row group
1055 ++records_read;
1056 at_record_start_ = true;
1057 }
1058 break;
1059 }
1060
1061 /// We perform multiple batch reads until we either exhaust the row group
1062 /// or observe the desired number of records
1063 int64_t batch_size = std::min(level_batch_size, available_values_current_page());
1064
1065 // No more data in column
1066 if (batch_size == 0) {
1067 break;
1068 }
1069
1070 if (this->max_def_level_ > 0) {
1071 ReserveLevels(batch_size);
1072
1073 int16_t* def_levels = this->def_levels() + levels_written_;
1074 int16_t* rep_levels = this->rep_levels() + levels_written_;
1075
1076 // Not present for non-repeated fields
1077 int64_t levels_read = 0;
1078 if (this->max_rep_level_ > 0) {
1079 levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
1080 if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
1081 throw ParquetException("Number of decoded rep / def levels did not match");
1082 }
1083 } else if (this->max_def_level_ > 0) {
1084 levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
1085 }
1086
1087 // Exhausted column chunk
1088 if (levels_read == 0) {
1089 break;
1090 }
1091
1092 levels_written_ += levels_read;
1093 records_read += ReadRecordData(num_records - records_read);
1094 } else {
1095 // No repetition or definition levels
1096 batch_size = std::min(num_records - records_read, batch_size);
1097 records_read += ReadRecordData(batch_size);
1098 }
1099 }
1100
1101 return records_read;
1102 }
1103
1104 // We may outwardly have the appearance of having exhausted a column chunk
1105 // when in fact we are in the middle of processing the last batch
has_values_to_process() const1106 bool has_values_to_process() const { return levels_position_ < levels_written_; }
1107
ReleaseValues()1108 std::shared_ptr<ResizableBuffer> ReleaseValues() override {
1109 if (uses_values_) {
1110 auto result = values_;
1111 PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true));
1112 values_ = AllocateBuffer(this->pool_);
1113 return result;
1114 } else {
1115 return nullptr;
1116 }
1117 }
1118
ReleaseIsValid()1119 std::shared_ptr<ResizableBuffer> ReleaseIsValid() override {
1120 if (nullable_values_) {
1121 auto result = valid_bits_;
1122 PARQUET_THROW_NOT_OK(result->Resize(BitUtil::BytesForBits(values_written_), true));
1123 valid_bits_ = AllocateBuffer(this->pool_);
1124 return result;
1125 } else {
1126 return nullptr;
1127 }
1128 }
1129
1130 // Process written repetition/definition levels to reach the end of
1131 // records. Process no more levels than necessary to delimit the indicated
1132 // number of logical records. Updates internal state of RecordReader
1133 //
1134 // \return Number of records delimited
DelimitRecords(int64_t num_records,int64_t * values_seen)1135 int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
1136 int64_t values_to_read = 0;
1137 int64_t records_read = 0;
1138
1139 const int16_t* def_levels = this->def_levels() + levels_position_;
1140 const int16_t* rep_levels = this->rep_levels() + levels_position_;
1141
1142 DCHECK_GT(this->max_rep_level_, 0);
1143
1144 // Count logical records and number of values to read
1145 while (levels_position_ < levels_written_) {
1146 if (*rep_levels++ == 0) {
1147 // If at_record_start_ is true, we are seeing the start of a record
1148 // for the second time, such as after repeated calls to
1149 // DelimitRecords. In this case we must continue until we find
1150 // another record start or exhausting the ColumnChunk
1151 if (!at_record_start_) {
1152 // We've reached the end of a record; increment the record count.
1153 ++records_read;
1154 if (records_read == num_records) {
1155 // We've found the number of records we were looking for. Set
1156 // at_record_start_ to true and break
1157 at_record_start_ = true;
1158 break;
1159 }
1160 }
1161 }
1162
1163 // We have decided to consume the level at this position; therefore we
1164 // must advance until we find another record boundary
1165 at_record_start_ = false;
1166
1167 if (*def_levels++ == this->max_def_level_) {
1168 ++values_to_read;
1169 }
1170 ++levels_position_;
1171 }
1172 *values_seen = values_to_read;
1173 return records_read;
1174 }
1175
Reserve(int64_t capacity)1176 void Reserve(int64_t capacity) override {
1177 ReserveLevels(capacity);
1178 ReserveValues(capacity);
1179 }
1180
UpdateCapacity(int64_t capacity,int64_t size,int64_t extra_size)1181 int64_t UpdateCapacity(int64_t capacity, int64_t size, int64_t extra_size) {
1182 if (extra_size < 0) {
1183 throw ParquetException("Negative size (corrupt file?)");
1184 }
1185 if (::arrow::internal::HasAdditionOverflow(size, extra_size)) {
1186 throw ParquetException("Allocation size too large (corrupt file?)");
1187 }
1188 const int64_t target_size = size + extra_size;
1189 if (target_size >= (1LL << 62)) {
1190 throw ParquetException("Allocation size too large (corrupt file?)");
1191 }
1192 if (capacity >= target_size) {
1193 return capacity;
1194 }
1195 return BitUtil::NextPower2(target_size);
1196 }
1197
ReserveLevels(int64_t extra_levels)1198 void ReserveLevels(int64_t extra_levels) {
1199 if (this->max_def_level_ > 0) {
1200 const int64_t new_levels_capacity =
1201 UpdateCapacity(levels_capacity_, levels_written_, extra_levels);
1202 if (new_levels_capacity > levels_capacity_) {
1203 constexpr auto kItemSize = static_cast<int64_t>(sizeof(int16_t));
1204 if (::arrow::internal::HasMultiplyOverflow(new_levels_capacity, kItemSize)) {
1205 throw ParquetException("Allocation size too large (corrupt file?)");
1206 }
1207 PARQUET_THROW_NOT_OK(def_levels_->Resize(new_levels_capacity * kItemSize, false));
1208 if (this->max_rep_level_ > 0) {
1209 PARQUET_THROW_NOT_OK(
1210 rep_levels_->Resize(new_levels_capacity * kItemSize, false));
1211 }
1212 levels_capacity_ = new_levels_capacity;
1213 }
1214 }
1215 }
1216
ReserveValues(int64_t extra_values)1217 void ReserveValues(int64_t extra_values) {
1218 const int64_t new_values_capacity =
1219 UpdateCapacity(values_capacity_, values_written_, extra_values);
1220 if (new_values_capacity > values_capacity_) {
1221 // XXX(wesm): A hack to avoid memory allocation when reading directly
1222 // into builder classes
1223 if (uses_values_) {
1224 PARQUET_THROW_NOT_OK(
1225 values_->Resize(bytes_for_values(new_values_capacity), false));
1226 }
1227 values_capacity_ = new_values_capacity;
1228 }
1229 if (nullable_values_) {
1230 int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_);
1231 if (valid_bits_->size() < valid_bytes_new) {
1232 int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_);
1233 PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
1234
1235 // Avoid valgrind warnings
1236 memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
1237 valid_bytes_new - valid_bytes_old);
1238 }
1239 }
1240 }
1241
Reset()1242 void Reset() override {
1243 ResetValues();
1244
1245 if (levels_written_ > 0) {
1246 const int64_t levels_remaining = levels_written_ - levels_position_;
1247 // Shift remaining levels to beginning of buffer and trim to only the number
1248 // of decoded levels remaining
1249 int16_t* def_data = def_levels();
1250 int16_t* rep_data = rep_levels();
1251
1252 std::copy(def_data + levels_position_, def_data + levels_written_, def_data);
1253 PARQUET_THROW_NOT_OK(
1254 def_levels_->Resize(levels_remaining * sizeof(int16_t), false));
1255
1256 if (this->max_rep_level_ > 0) {
1257 std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data);
1258 PARQUET_THROW_NOT_OK(
1259 rep_levels_->Resize(levels_remaining * sizeof(int16_t), false));
1260 }
1261
1262 levels_written_ -= levels_position_;
1263 levels_position_ = 0;
1264 levels_capacity_ = levels_remaining;
1265 }
1266
1267 records_read_ = 0;
1268
1269 // Call Finish on the binary builders to reset them
1270 }
1271
SetPageReader(std::unique_ptr<PageReader> reader)1272 void SetPageReader(std::unique_ptr<PageReader> reader) override {
1273 at_record_start_ = true;
1274 this->pager_ = std::move(reader);
1275 ResetDecoders();
1276 }
1277
HasMoreData() const1278 bool HasMoreData() const override { return this->pager_ != nullptr; }
1279
1280 // Dictionary decoders must be reset when advancing row groups
ResetDecoders()1281 void ResetDecoders() { this->decoders_.clear(); }
1282
ReadValuesSpaced(int64_t values_with_nulls,int64_t null_count)1283 virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) {
1284 uint8_t* valid_bits = valid_bits_->mutable_data();
1285 const int64_t valid_bits_offset = values_written_;
1286
1287 int64_t num_decoded = this->current_decoder_->DecodeSpaced(
1288 ValuesHead<T>(), static_cast<int>(values_with_nulls),
1289 static_cast<int>(null_count), valid_bits, valid_bits_offset);
1290 DCHECK_EQ(num_decoded, values_with_nulls);
1291 }
1292
ReadValuesDense(int64_t values_to_read)1293 virtual void ReadValuesDense(int64_t values_to_read) {
1294 int64_t num_decoded =
1295 this->current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read));
1296 DCHECK_EQ(num_decoded, values_to_read);
1297 }
1298
1299 // Return number of logical records read
ReadRecordData(int64_t num_records)1300 int64_t ReadRecordData(int64_t num_records) {
1301 // Conservative upper bound
1302 const int64_t possible_num_values =
1303 std::max(num_records, levels_written_ - levels_position_);
1304 ReserveValues(possible_num_values);
1305
1306 const int64_t start_levels_position = levels_position_;
1307
1308 int64_t values_to_read = 0;
1309 int64_t records_read = 0;
1310 if (this->max_rep_level_ > 0) {
1311 records_read = DelimitRecords(num_records, &values_to_read);
1312 } else if (this->max_def_level_ > 0) {
1313 // No repetition levels, skip delimiting logic. Each level represents a
1314 // null or not null entry
1315 records_read = std::min(levels_written_ - levels_position_, num_records);
1316
1317 // This is advanced by DelimitRecords, which we skipped
1318 levels_position_ += records_read;
1319 } else {
1320 records_read = values_to_read = num_records;
1321 }
1322
1323 int64_t null_count = 0;
1324 if (nullable_values_) {
1325 int64_t values_with_nulls = 0;
1326 internal::DefinitionLevelsToBitmap(
1327 def_levels() + start_levels_position, levels_position_ - start_levels_position,
1328 this->max_def_level_, this->max_rep_level_, &values_with_nulls, &null_count,
1329 valid_bits_->mutable_data(), values_written_);
1330 values_to_read = values_with_nulls - null_count;
1331 DCHECK_GE(values_to_read, 0);
1332 ReadValuesSpaced(values_with_nulls, null_count);
1333 } else {
1334 DCHECK_GE(values_to_read, 0);
1335 ReadValuesDense(values_to_read);
1336 }
1337 if (this->max_def_level_ > 0) {
1338 // Optional, repeated, or some mix thereof
1339 this->ConsumeBufferedValues(levels_position_ - start_levels_position);
1340 } else {
1341 // Flat, non-repeated
1342 this->ConsumeBufferedValues(values_to_read);
1343 }
1344 // Total values, including null spaces, if any
1345 values_written_ += values_to_read + null_count;
1346 null_count_ += null_count;
1347
1348 return records_read;
1349 }
1350
DebugPrintState()1351 void DebugPrintState() override {
1352 const int16_t* def_levels = this->def_levels();
1353 const int16_t* rep_levels = this->rep_levels();
1354 const int64_t total_levels_read = levels_position_;
1355
1356 const T* vals = reinterpret_cast<const T*>(this->values());
1357
1358 std::cout << "def levels: ";
1359 for (int64_t i = 0; i < total_levels_read; ++i) {
1360 std::cout << def_levels[i] << " ";
1361 }
1362 std::cout << std::endl;
1363
1364 std::cout << "rep levels: ";
1365 for (int64_t i = 0; i < total_levels_read; ++i) {
1366 std::cout << rep_levels[i] << " ";
1367 }
1368 std::cout << std::endl;
1369
1370 std::cout << "values: ";
1371 for (int64_t i = 0; i < this->values_written(); ++i) {
1372 std::cout << vals[i] << " ";
1373 }
1374 std::cout << std::endl;
1375 }
1376
ResetValues()1377 void ResetValues() {
1378 if (values_written_ > 0) {
1379 // Resize to 0, but do not shrink to fit
1380 if (uses_values_) {
1381 PARQUET_THROW_NOT_OK(values_->Resize(0, false));
1382 }
1383 PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
1384 values_written_ = 0;
1385 values_capacity_ = 0;
1386 null_count_ = 0;
1387 }
1388 }
1389
1390 protected:
1391 template <typename T>
ValuesHead()1392 T* ValuesHead() {
1393 return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
1394 }
1395 };
1396
1397 class FLBARecordReader : public TypedRecordReader<FLBAType>,
1398 virtual public BinaryRecordReader {
1399 public:
FLBARecordReader(const ColumnDescriptor * descr,::arrow::MemoryPool * pool)1400 FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
1401 : TypedRecordReader<FLBAType>(descr, pool), builder_(nullptr) {
1402 DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
1403 int byte_width = descr_->type_length();
1404 std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
1405 builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, this->pool_));
1406 }
1407
GetBuilderChunks()1408 ::arrow::ArrayVector GetBuilderChunks() override {
1409 std::shared_ptr<::arrow::Array> chunk;
1410 PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
1411 return ::arrow::ArrayVector({chunk});
1412 }
1413
ReadValuesDense(int64_t values_to_read)1414 void ReadValuesDense(int64_t values_to_read) override {
1415 auto values = ValuesHead<FLBA>();
1416 int64_t num_decoded =
1417 this->current_decoder_->Decode(values, static_cast<int>(values_to_read));
1418 DCHECK_EQ(num_decoded, values_to_read);
1419
1420 for (int64_t i = 0; i < num_decoded; i++) {
1421 PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
1422 }
1423 ResetValues();
1424 }
1425
ReadValuesSpaced(int64_t values_to_read,int64_t null_count)1426 void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
1427 uint8_t* valid_bits = valid_bits_->mutable_data();
1428 const int64_t valid_bits_offset = values_written_;
1429 auto values = ValuesHead<FLBA>();
1430
1431 int64_t num_decoded = this->current_decoder_->DecodeSpaced(
1432 values, static_cast<int>(values_to_read), static_cast<int>(null_count),
1433 valid_bits, valid_bits_offset);
1434 DCHECK_EQ(num_decoded, values_to_read);
1435
1436 for (int64_t i = 0; i < num_decoded; i++) {
1437 if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
1438 PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
1439 } else {
1440 PARQUET_THROW_NOT_OK(builder_->AppendNull());
1441 }
1442 }
1443 ResetValues();
1444 }
1445
1446 private:
1447 std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_;
1448 };
1449
1450 class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
1451 virtual public BinaryRecordReader {
1452 public:
ByteArrayChunkedRecordReader(const ColumnDescriptor * descr,::arrow::MemoryPool * pool)1453 ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
1454 : TypedRecordReader<ByteArrayType>(descr, pool) {
1455 DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
1456 accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
1457 }
1458
GetBuilderChunks()1459 ::arrow::ArrayVector GetBuilderChunks() override {
1460 ::arrow::ArrayVector result = accumulator_.chunks;
1461 if (result.size() == 0 || accumulator_.builder->length() > 0) {
1462 std::shared_ptr<::arrow::Array> last_chunk;
1463 PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
1464 result.push_back(std::move(last_chunk));
1465 }
1466 accumulator_.chunks = {};
1467 return result;
1468 }
1469
ReadValuesDense(int64_t values_to_read)1470 void ReadValuesDense(int64_t values_to_read) override {
1471 int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
1472 static_cast<int>(values_to_read), &accumulator_);
1473 DCHECK_EQ(num_decoded, values_to_read);
1474 ResetValues();
1475 }
1476
ReadValuesSpaced(int64_t values_to_read,int64_t null_count)1477 void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
1478 int64_t num_decoded = this->current_decoder_->DecodeArrow(
1479 static_cast<int>(values_to_read), static_cast<int>(null_count),
1480 valid_bits_->mutable_data(), values_written_, &accumulator_);
1481 DCHECK_EQ(num_decoded, values_to_read - null_count);
1482 ResetValues();
1483 }
1484
1485 private:
1486 // Helper data structure for accumulating builder chunks
1487 typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
1488 };
1489
1490 class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
1491 virtual public DictionaryRecordReader {
1492 public:
ByteArrayDictionaryRecordReader(const ColumnDescriptor * descr,::arrow::MemoryPool * pool)1493 ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr,
1494 ::arrow::MemoryPool* pool)
1495 : TypedRecordReader<ByteArrayType>(descr, pool), builder_(pool) {
1496 this->read_dictionary_ = true;
1497 }
1498
GetResult()1499 std::shared_ptr<::arrow::ChunkedArray> GetResult() override {
1500 FlushBuilder();
1501 std::vector<std::shared_ptr<::arrow::Array>> result;
1502 std::swap(result, result_chunks_);
1503 return std::make_shared<::arrow::ChunkedArray>(std::move(result), builder_.type());
1504 }
1505
FlushBuilder()1506 void FlushBuilder() {
1507 if (builder_.length() > 0) {
1508 std::shared_ptr<::arrow::Array> chunk;
1509 PARQUET_THROW_NOT_OK(builder_.Finish(&chunk));
1510 result_chunks_.emplace_back(std::move(chunk));
1511
1512 // Also clears the dictionary memo table
1513 builder_.Reset();
1514 }
1515 }
1516
MaybeWriteNewDictionary()1517 void MaybeWriteNewDictionary() {
1518 if (this->new_dictionary_) {
1519 /// If there is a new dictionary, we may need to flush the builder, then
1520 /// insert the new dictionary values
1521 FlushBuilder();
1522 builder_.ResetFull();
1523 auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
1524 decoder->InsertDictionary(&builder_);
1525 this->new_dictionary_ = false;
1526 }
1527 }
1528
ReadValuesDense(int64_t values_to_read)1529 void ReadValuesDense(int64_t values_to_read) override {
1530 int64_t num_decoded = 0;
1531 if (current_encoding_ == Encoding::RLE_DICTIONARY) {
1532 MaybeWriteNewDictionary();
1533 auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
1534 num_decoded = decoder->DecodeIndices(static_cast<int>(values_to_read), &builder_);
1535 } else {
1536 num_decoded = this->current_decoder_->DecodeArrowNonNull(
1537 static_cast<int>(values_to_read), &builder_);
1538
1539 /// Flush values since they have been copied into the builder
1540 ResetValues();
1541 }
1542 DCHECK_EQ(num_decoded, values_to_read);
1543 }
1544
ReadValuesSpaced(int64_t values_to_read,int64_t null_count)1545 void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
1546 int64_t num_decoded = 0;
1547 if (current_encoding_ == Encoding::RLE_DICTIONARY) {
1548 MaybeWriteNewDictionary();
1549 auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
1550 num_decoded = decoder->DecodeIndicesSpaced(
1551 static_cast<int>(values_to_read), static_cast<int>(null_count),
1552 valid_bits_->mutable_data(), values_written_, &builder_);
1553 } else {
1554 num_decoded = this->current_decoder_->DecodeArrow(
1555 static_cast<int>(values_to_read), static_cast<int>(null_count),
1556 valid_bits_->mutable_data(), values_written_, &builder_);
1557
1558 /// Flush values since they have been copied into the builder
1559 ResetValues();
1560 }
1561 DCHECK_EQ(num_decoded, values_to_read - null_count);
1562 }
1563
1564 private:
1565 using BinaryDictDecoder = DictDecoder<ByteArrayType>;
1566
1567 ::arrow::BinaryDictionary32Builder builder_;
1568 std::vector<std::shared_ptr<::arrow::Array>> result_chunks_;
1569 };
1570
1571 // TODO(wesm): Implement these to some satisfaction
1572 template <>
DebugPrintState()1573 void TypedRecordReader<Int96Type>::DebugPrintState() {}
1574
1575 template <>
DebugPrintState()1576 void TypedRecordReader<ByteArrayType>::DebugPrintState() {}
1577
1578 template <>
DebugPrintState()1579 void TypedRecordReader<FLBAType>::DebugPrintState() {}
1580
MakeByteArrayRecordReader(const ColumnDescriptor * descr,::arrow::MemoryPool * pool,bool read_dictionary)1581 std::shared_ptr<RecordReader> MakeByteArrayRecordReader(const ColumnDescriptor* descr,
1582 ::arrow::MemoryPool* pool,
1583 bool read_dictionary) {
1584 if (read_dictionary) {
1585 return std::make_shared<ByteArrayDictionaryRecordReader>(descr, pool);
1586 } else {
1587 return std::make_shared<ByteArrayChunkedRecordReader>(descr, pool);
1588 }
1589 }
1590
Make(const ColumnDescriptor * descr,MemoryPool * pool,const bool read_dictionary)1591 std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
1592 MemoryPool* pool,
1593 const bool read_dictionary) {
1594 switch (descr->physical_type()) {
1595 case Type::BOOLEAN:
1596 return std::make_shared<TypedRecordReader<BooleanType>>(descr, pool);
1597 case Type::INT32:
1598 return std::make_shared<TypedRecordReader<Int32Type>>(descr, pool);
1599 case Type::INT64:
1600 return std::make_shared<TypedRecordReader<Int64Type>>(descr, pool);
1601 case Type::INT96:
1602 return std::make_shared<TypedRecordReader<Int96Type>>(descr, pool);
1603 case Type::FLOAT:
1604 return std::make_shared<TypedRecordReader<FloatType>>(descr, pool);
1605 case Type::DOUBLE:
1606 return std::make_shared<TypedRecordReader<DoubleType>>(descr, pool);
1607 case Type::BYTE_ARRAY:
1608 return MakeByteArrayRecordReader(descr, pool, read_dictionary);
1609 case Type::FIXED_LEN_BYTE_ARRAY:
1610 return std::make_shared<FLBARecordReader>(descr, pool);
1611 default: {
1612 // PARQUET-1481: This can occur if the file is corrupt
1613 std::stringstream ss;
1614 ss << "Invalid physical column type: " << static_cast<int>(descr->physical_type());
1615 throw ParquetException(ss.str());
1616 }
1617 }
1618 // Unreachable code, but suppress compiler warning
1619 return nullptr;
1620 }
1621
1622 } // namespace internal
1623 } // namespace parquet
1624