1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "parquet/column_writer.h"
19
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <map>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28
29 #include "arrow/array.h"
30 #include "arrow/buffer_builder.h"
31 #include "arrow/compute/api.h"
32 #include "arrow/status.h"
33 #include "arrow/type.h"
34 #include "arrow/type_traits.h"
35 #include "arrow/util/bit_stream_utils.h"
36 #include "arrow/util/checked_cast.h"
37 #include "arrow/util/compression.h"
38 #include "arrow/util/logging.h"
39 #include "arrow/util/rle_encoding.h"
40 #include "parquet/column_page.h"
41 #include "parquet/encoding.h"
42 #include "parquet/encryption_internal.h"
43 #include "parquet/internal_file_encryptor.h"
44 #include "parquet/metadata.h"
45 #include "parquet/platform.h"
46 #include "parquet/properties.h"
47 #include "parquet/schema.h"
48 #include "parquet/statistics.h"
49 #include "parquet/thrift_internal.h"
50 #include "parquet/types.h"
51
52 using arrow::Datum;
53 using arrow::Status;
54 using arrow::BitUtil::BitWriter;
55 using arrow::internal::checked_cast;
56 using arrow::util::RleEncoder;
57
58 namespace parquet {
59
60 namespace {
61
AddIfNotNull(const int16_t * base,int64_t offset)62 inline const int16_t* AddIfNotNull(const int16_t* base, int64_t offset) {
63 if (base != nullptr) {
64 return base + offset;
65 }
66 return nullptr;
67 }
68
69 } // namespace
70
LevelEncoder()71 LevelEncoder::LevelEncoder() {}
~LevelEncoder()72 LevelEncoder::~LevelEncoder() {}
73
Init(Encoding::type encoding,int16_t max_level,int num_buffered_values,uint8_t * data,int data_size)74 void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
75 int num_buffered_values, uint8_t* data, int data_size) {
76 bit_width_ = BitUtil::Log2(max_level + 1);
77 encoding_ = encoding;
78 switch (encoding) {
79 case Encoding::RLE: {
80 rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
81 break;
82 }
83 case Encoding::BIT_PACKED: {
84 int num_bytes =
85 static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width_));
86 bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
87 break;
88 }
89 default:
90 throw ParquetException("Unknown encoding type for levels.");
91 }
92 }
93
MaxBufferSize(Encoding::type encoding,int16_t max_level,int num_buffered_values)94 int LevelEncoder::MaxBufferSize(Encoding::type encoding, int16_t max_level,
95 int num_buffered_values) {
96 int bit_width = BitUtil::Log2(max_level + 1);
97 int num_bytes = 0;
98 switch (encoding) {
99 case Encoding::RLE: {
100 // TODO: Due to the way we currently check if the buffer is full enough,
101 // we need to have MinBufferSize as head room.
102 num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) +
103 RleEncoder::MinBufferSize(bit_width);
104 break;
105 }
106 case Encoding::BIT_PACKED: {
107 num_bytes =
108 static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width));
109 break;
110 }
111 default:
112 throw ParquetException("Unknown encoding type for levels.");
113 }
114 return num_bytes;
115 }
116
Encode(int batch_size,const int16_t * levels)117 int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
118 int num_encoded = 0;
119 if (!rle_encoder_ && !bit_packed_encoder_) {
120 throw ParquetException("Level encoders are not initialized.");
121 }
122
123 if (encoding_ == Encoding::RLE) {
124 for (int i = 0; i < batch_size; ++i) {
125 if (!rle_encoder_->Put(*(levels + i))) {
126 break;
127 }
128 ++num_encoded;
129 }
130 rle_encoder_->Flush();
131 rle_length_ = rle_encoder_->len();
132 } else {
133 for (int i = 0; i < batch_size; ++i) {
134 if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) {
135 break;
136 }
137 ++num_encoded;
138 }
139 bit_packed_encoder_->Flush();
140 }
141 return num_encoded;
142 }
143
144 // ----------------------------------------------------------------------
145 // PageWriter implementation
146
147 // This subclass delimits pages appearing in a serialized stream, each preceded
148 // by a serialized Thrift format::PageHeader indicating the type of each page
149 // and the page metadata.
150 class SerializedPageWriter : public PageWriter {
151 public:
SerializedPageWriter(std::shared_ptr<ArrowOutputStream> sink,Compression::type codec,int compression_level,ColumnChunkMetaDataBuilder * metadata,int16_t row_group_ordinal,int16_t column_chunk_ordinal,MemoryPool * pool=::arrow::default_memory_pool (),std::shared_ptr<Encryptor> meta_encryptor=nullptr,std::shared_ptr<Encryptor> data_encryptor=nullptr)152 SerializedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
153 int compression_level, ColumnChunkMetaDataBuilder* metadata,
154 int16_t row_group_ordinal, int16_t column_chunk_ordinal,
155 MemoryPool* pool = ::arrow::default_memory_pool(),
156 std::shared_ptr<Encryptor> meta_encryptor = nullptr,
157 std::shared_ptr<Encryptor> data_encryptor = nullptr)
158 : sink_(std::move(sink)),
159 metadata_(metadata),
160 pool_(pool),
161 num_values_(0),
162 dictionary_page_offset_(0),
163 data_page_offset_(0),
164 total_uncompressed_size_(0),
165 total_compressed_size_(0),
166 page_ordinal_(0),
167 row_group_ordinal_(row_group_ordinal),
168 column_ordinal_(column_chunk_ordinal),
169 meta_encryptor_(std::move(meta_encryptor)),
170 data_encryptor_(std::move(data_encryptor)),
171 encryption_buffer_(AllocateBuffer(pool, 0)) {
172 if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) {
173 InitEncryption();
174 }
175 compressor_ = GetCodec(codec, compression_level);
176 thrift_serializer_.reset(new ThriftSerializer);
177 }
178
WriteDictionaryPage(const DictionaryPage & page)179 int64_t WriteDictionaryPage(const DictionaryPage& page) override {
180 int64_t uncompressed_size = page.size();
181 std::shared_ptr<Buffer> compressed_data;
182 if (has_compressor()) {
183 auto buffer = std::static_pointer_cast<ResizableBuffer>(
184 AllocateBuffer(pool_, uncompressed_size));
185 Compress(*(page.buffer().get()), buffer.get());
186 compressed_data = std::static_pointer_cast<Buffer>(buffer);
187 } else {
188 compressed_data = page.buffer();
189 }
190
191 format::DictionaryPageHeader dict_page_header;
192 dict_page_header.__set_num_values(page.num_values());
193 dict_page_header.__set_encoding(ToThrift(page.encoding()));
194 dict_page_header.__set_is_sorted(page.is_sorted());
195
196 const uint8_t* output_data_buffer = compressed_data->data();
197 int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
198
199 if (data_encryptor_.get()) {
200 UpdateEncryption(encryption::kDictionaryPage);
201 PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
202 data_encryptor_->CiphertextSizeDelta() + output_data_len, false));
203 output_data_len = data_encryptor_->Encrypt(compressed_data->data(), output_data_len,
204 encryption_buffer_->mutable_data());
205 output_data_buffer = encryption_buffer_->data();
206 }
207
208 format::PageHeader page_header;
209 page_header.__set_type(format::PageType::DICTIONARY_PAGE);
210 page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
211 page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
212 page_header.__set_dictionary_page_header(dict_page_header);
213 // TODO(PARQUET-594) crc checksum
214
215 PARQUET_ASSIGN_OR_THROW(int64_t start_pos, sink_->Tell());
216 if (dictionary_page_offset_ == 0) {
217 dictionary_page_offset_ = start_pos;
218 }
219
220 if (meta_encryptor_) {
221 UpdateEncryption(encryption::kDictionaryPageHeader);
222 }
223 int64_t header_size =
224 thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_);
225
226 PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
227
228 total_uncompressed_size_ += uncompressed_size + header_size;
229 total_compressed_size_ += output_data_len + header_size;
230 ++dict_encoding_stats_[page.encoding()];
231
232 PARQUET_ASSIGN_OR_THROW(int64_t final_pos, sink_->Tell());
233 return final_pos - start_pos;
234 }
235
Close(bool has_dictionary,bool fallback)236 void Close(bool has_dictionary, bool fallback) override {
237 if (meta_encryptor_ != nullptr) {
238 UpdateEncryption(encryption::kColumnMetaData);
239 }
240 // index_page_offset = -1 since they are not supported
241 metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_,
242 total_compressed_size_, total_uncompressed_size_, has_dictionary,
243 fallback, dict_encoding_stats_, data_encoding_stats_,
244 meta_encryptor_);
245 // Write metadata at end of column chunk
246 metadata_->WriteTo(sink_.get());
247 }
248
249 /**
250 * Compress a buffer.
251 */
Compress(const Buffer & src_buffer,ResizableBuffer * dest_buffer)252 void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
253 DCHECK(compressor_ != nullptr);
254
255 // Compress the data
256 int64_t max_compressed_size =
257 compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
258
259 // Use Arrow::Buffer::shrink_to_fit = false
260 // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
261 PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
262
263 PARQUET_ASSIGN_OR_THROW(
264 int64_t compressed_size,
265 compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size,
266 dest_buffer->mutable_data()));
267 PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
268 }
269
WriteDataPage(const DataPage & page)270 int64_t WriteDataPage(const DataPage& page) override {
271 int64_t uncompressed_size = page.uncompressed_size();
272 std::shared_ptr<Buffer> compressed_data = page.buffer();
273 const uint8_t* output_data_buffer = compressed_data->data();
274 int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
275
276 if (data_encryptor_.get()) {
277 PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
278 data_encryptor_->CiphertextSizeDelta() + output_data_len, false));
279 UpdateEncryption(encryption::kDataPage);
280 output_data_len = data_encryptor_->Encrypt(compressed_data->data(), output_data_len,
281 encryption_buffer_->mutable_data());
282 output_data_buffer = encryption_buffer_->data();
283 }
284
285 format::PageHeader page_header;
286 page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
287 page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
288 // TODO(PARQUET-594) crc checksum
289
290 if (page.type() == PageType::DATA_PAGE) {
291 const DataPageV1& v1_page = checked_cast<const DataPageV1&>(page);
292 SetDataPageHeader(page_header, v1_page);
293 } else if (page.type() == PageType::DATA_PAGE_V2) {
294 const DataPageV2& v2_page = checked_cast<const DataPageV2&>(page);
295 SetDataPageV2Header(page_header, v2_page);
296 } else {
297 throw ParquetException("Unexpected page type");
298 }
299
300 PARQUET_ASSIGN_OR_THROW(int64_t start_pos, sink_->Tell());
301 if (page_ordinal_ == 0) {
302 data_page_offset_ = start_pos;
303 }
304
305 if (meta_encryptor_) {
306 UpdateEncryption(encryption::kDataPageHeader);
307 }
308 int64_t header_size =
309 thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_);
310 PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
311
312 total_uncompressed_size_ += uncompressed_size + header_size;
313 total_compressed_size_ += output_data_len + header_size;
314 num_values_ += page.num_values();
315 ++data_encoding_stats_[page.encoding()];
316 ++page_ordinal_;
317 PARQUET_ASSIGN_OR_THROW(int64_t current_pos, sink_->Tell());
318 return current_pos - start_pos;
319 }
320
SetDataPageHeader(format::PageHeader & page_header,const DataPageV1 & page)321 void SetDataPageHeader(format::PageHeader& page_header, const DataPageV1& page) {
322 format::DataPageHeader data_page_header;
323 data_page_header.__set_num_values(page.num_values());
324 data_page_header.__set_encoding(ToThrift(page.encoding()));
325 data_page_header.__set_definition_level_encoding(
326 ToThrift(page.definition_level_encoding()));
327 data_page_header.__set_repetition_level_encoding(
328 ToThrift(page.repetition_level_encoding()));
329 data_page_header.__set_statistics(ToThrift(page.statistics()));
330
331 page_header.__set_type(format::PageType::DATA_PAGE);
332 page_header.__set_data_page_header(data_page_header);
333 }
334
SetDataPageV2Header(format::PageHeader & page_header,const DataPageV2 page)335 void SetDataPageV2Header(format::PageHeader& page_header, const DataPageV2 page) {
336 format::DataPageHeaderV2 data_page_header;
337 data_page_header.__set_num_values(page.num_values());
338 data_page_header.__set_num_nulls(page.num_nulls());
339 data_page_header.__set_num_rows(page.num_rows());
340 data_page_header.__set_encoding(ToThrift(page.encoding()));
341
342 data_page_header.__set_definition_levels_byte_length(
343 page.definition_levels_byte_length());
344 data_page_header.__set_repetition_levels_byte_length(
345 page.repetition_levels_byte_length());
346
347 data_page_header.__set_is_compressed(page.is_compressed());
348 data_page_header.__set_statistics(ToThrift(page.statistics()));
349
350 page_header.__set_type(format::PageType::DATA_PAGE_V2);
351 page_header.__set_data_page_header_v2(data_page_header);
352 }
353
has_compressor()354 bool has_compressor() override { return (compressor_ != nullptr); }
355
num_values()356 int64_t num_values() { return num_values_; }
357
dictionary_page_offset()358 int64_t dictionary_page_offset() { return dictionary_page_offset_; }
359
data_page_offset()360 int64_t data_page_offset() { return data_page_offset_; }
361
total_compressed_size()362 int64_t total_compressed_size() { return total_compressed_size_; }
363
total_uncompressed_size()364 int64_t total_uncompressed_size() { return total_uncompressed_size_; }
365
366 private:
367 // To allow UpdateEncryption on Close
368 friend class BufferedPageWriter;
369
InitEncryption()370 void InitEncryption() {
371 // Prepare the AAD for quick update later.
372 if (data_encryptor_ != nullptr) {
373 data_page_aad_ = encryption::CreateModuleAad(
374 data_encryptor_->file_aad(), encryption::kDataPage, row_group_ordinal_,
375 column_ordinal_, kNonPageOrdinal);
376 }
377 if (meta_encryptor_ != nullptr) {
378 data_page_header_aad_ = encryption::CreateModuleAad(
379 meta_encryptor_->file_aad(), encryption::kDataPageHeader, row_group_ordinal_,
380 column_ordinal_, kNonPageOrdinal);
381 }
382 }
383
UpdateEncryption(int8_t module_type)384 void UpdateEncryption(int8_t module_type) {
385 switch (module_type) {
386 case encryption::kColumnMetaData: {
387 meta_encryptor_->UpdateAad(encryption::CreateModuleAad(
388 meta_encryptor_->file_aad(), module_type, row_group_ordinal_, column_ordinal_,
389 kNonPageOrdinal));
390 break;
391 }
392 case encryption::kDataPage: {
393 encryption::QuickUpdatePageAad(data_page_aad_, page_ordinal_);
394 data_encryptor_->UpdateAad(data_page_aad_);
395 break;
396 }
397 case encryption::kDataPageHeader: {
398 encryption::QuickUpdatePageAad(data_page_header_aad_, page_ordinal_);
399 meta_encryptor_->UpdateAad(data_page_header_aad_);
400 break;
401 }
402 case encryption::kDictionaryPageHeader: {
403 meta_encryptor_->UpdateAad(encryption::CreateModuleAad(
404 meta_encryptor_->file_aad(), module_type, row_group_ordinal_, column_ordinal_,
405 kNonPageOrdinal));
406 break;
407 }
408 case encryption::kDictionaryPage: {
409 data_encryptor_->UpdateAad(encryption::CreateModuleAad(
410 data_encryptor_->file_aad(), module_type, row_group_ordinal_, column_ordinal_,
411 kNonPageOrdinal));
412 break;
413 }
414 default:
415 throw ParquetException("Unknown module type in UpdateEncryption");
416 }
417 }
418
419 std::shared_ptr<ArrowOutputStream> sink_;
420 ColumnChunkMetaDataBuilder* metadata_;
421 MemoryPool* pool_;
422 int64_t num_values_;
423 int64_t dictionary_page_offset_;
424 int64_t data_page_offset_;
425 int64_t total_uncompressed_size_;
426 int64_t total_compressed_size_;
427 int16_t page_ordinal_;
428 int16_t row_group_ordinal_;
429 int16_t column_ordinal_;
430
431 std::unique_ptr<ThriftSerializer> thrift_serializer_;
432
433 // Compression codec to use.
434 std::unique_ptr<::arrow::util::Codec> compressor_;
435
436 std::string data_page_aad_;
437 std::string data_page_header_aad_;
438
439 std::shared_ptr<Encryptor> meta_encryptor_;
440 std::shared_ptr<Encryptor> data_encryptor_;
441
442 std::shared_ptr<ResizableBuffer> encryption_buffer_;
443
444 std::map<Encoding::type, int32_t> dict_encoding_stats_;
445 std::map<Encoding::type, int32_t> data_encoding_stats_;
446 };
447
448 // This implementation of the PageWriter writes to the final sink on Close .
449 class BufferedPageWriter : public PageWriter {
450 public:
BufferedPageWriter(std::shared_ptr<ArrowOutputStream> sink,Compression::type codec,int compression_level,ColumnChunkMetaDataBuilder * metadata,int16_t row_group_ordinal,int16_t current_column_ordinal,MemoryPool * pool=::arrow::default_memory_pool (),std::shared_ptr<Encryptor> meta_encryptor=nullptr,std::shared_ptr<Encryptor> data_encryptor=nullptr)451 BufferedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
452 int compression_level, ColumnChunkMetaDataBuilder* metadata,
453 int16_t row_group_ordinal, int16_t current_column_ordinal,
454 MemoryPool* pool = ::arrow::default_memory_pool(),
455 std::shared_ptr<Encryptor> meta_encryptor = nullptr,
456 std::shared_ptr<Encryptor> data_encryptor = nullptr)
457 : final_sink_(std::move(sink)), metadata_(metadata), has_dictionary_pages_(false) {
458 in_memory_sink_ = CreateOutputStream(pool);
459 pager_ = std::unique_ptr<SerializedPageWriter>(
460 new SerializedPageWriter(in_memory_sink_, codec, compression_level, metadata,
461 row_group_ordinal, current_column_ordinal, pool,
462 std::move(meta_encryptor), std::move(data_encryptor)));
463 }
464
WriteDictionaryPage(const DictionaryPage & page)465 int64_t WriteDictionaryPage(const DictionaryPage& page) override {
466 has_dictionary_pages_ = true;
467 return pager_->WriteDictionaryPage(page);
468 }
469
Close(bool has_dictionary,bool fallback)470 void Close(bool has_dictionary, bool fallback) override {
471 if (pager_->meta_encryptor_ != nullptr) {
472 pager_->UpdateEncryption(encryption::kColumnMetaData);
473 }
474 // index_page_offset = -1 since they are not supported
475 PARQUET_ASSIGN_OR_THROW(int64_t final_position, final_sink_->Tell());
476 // dictionary page offset should be 0 iff there are no dictionary pages
477 auto dictionary_page_offset =
478 has_dictionary_pages_ ? pager_->dictionary_page_offset() + final_position : 0;
479 metadata_->Finish(pager_->num_values(), dictionary_page_offset, -1,
480 pager_->data_page_offset() + final_position,
481 pager_->total_compressed_size(), pager_->total_uncompressed_size(),
482 has_dictionary, fallback, pager_->dict_encoding_stats_,
483 pager_->data_encoding_stats_, pager_->meta_encryptor_);
484
485 // Write metadata at end of column chunk
486 metadata_->WriteTo(in_memory_sink_.get());
487
488 // flush everything to the serialized sink
489 PARQUET_ASSIGN_OR_THROW(auto buffer, in_memory_sink_->Finish());
490 PARQUET_THROW_NOT_OK(final_sink_->Write(buffer));
491 }
492
WriteDataPage(const DataPage & page)493 int64_t WriteDataPage(const DataPage& page) override {
494 return pager_->WriteDataPage(page);
495 }
496
Compress(const Buffer & src_buffer,ResizableBuffer * dest_buffer)497 void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
498 pager_->Compress(src_buffer, dest_buffer);
499 }
500
has_compressor()501 bool has_compressor() override { return pager_->has_compressor(); }
502
503 private:
504 std::shared_ptr<ArrowOutputStream> final_sink_;
505 ColumnChunkMetaDataBuilder* metadata_;
506 std::shared_ptr<::arrow::io::BufferOutputStream> in_memory_sink_;
507 std::unique_ptr<SerializedPageWriter> pager_;
508 bool has_dictionary_pages_;
509 };
510
Open(std::shared_ptr<ArrowOutputStream> sink,Compression::type codec,int compression_level,ColumnChunkMetaDataBuilder * metadata,int16_t row_group_ordinal,int16_t column_chunk_ordinal,MemoryPool * pool,bool buffered_row_group,std::shared_ptr<Encryptor> meta_encryptor,std::shared_ptr<Encryptor> data_encryptor)511 std::unique_ptr<PageWriter> PageWriter::Open(
512 std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
513 int compression_level, ColumnChunkMetaDataBuilder* metadata,
514 int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool,
515 bool buffered_row_group, std::shared_ptr<Encryptor> meta_encryptor,
516 std::shared_ptr<Encryptor> data_encryptor) {
517 if (buffered_row_group) {
518 return std::unique_ptr<PageWriter>(
519 new BufferedPageWriter(std::move(sink), codec, compression_level, metadata,
520 row_group_ordinal, column_chunk_ordinal, pool,
521 std::move(meta_encryptor), std::move(data_encryptor)));
522 } else {
523 return std::unique_ptr<PageWriter>(
524 new SerializedPageWriter(std::move(sink), codec, compression_level, metadata,
525 row_group_ordinal, column_chunk_ordinal, pool,
526 std::move(meta_encryptor), std::move(data_encryptor)));
527 }
528 }
529
530 // ----------------------------------------------------------------------
531 // ColumnWriter
532
default_writer_properties()533 const std::shared_ptr<WriterProperties>& default_writer_properties() {
534 static std::shared_ptr<WriterProperties> default_writer_properties =
535 WriterProperties::Builder().build();
536 return default_writer_properties;
537 }
538
539 class ColumnWriterImpl {
540 public:
ColumnWriterImpl(ColumnChunkMetaDataBuilder * metadata,std::unique_ptr<PageWriter> pager,const bool use_dictionary,Encoding::type encoding,const WriterProperties * properties)541 ColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
542 std::unique_ptr<PageWriter> pager, const bool use_dictionary,
543 Encoding::type encoding, const WriterProperties* properties)
544 : metadata_(metadata),
545 descr_(metadata->descr()),
546 pager_(std::move(pager)),
547 has_dictionary_(use_dictionary),
548 encoding_(encoding),
549 properties_(properties),
550 allocator_(properties->memory_pool()),
551 num_buffered_values_(0),
552 num_buffered_encoded_values_(0),
553 rows_written_(0),
554 total_bytes_written_(0),
555 total_compressed_bytes_(0),
556 closed_(false),
557 fallback_(false),
558 definition_levels_sink_(allocator_),
559 repetition_levels_sink_(allocator_) {
560 definition_levels_rle_ =
561 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
562 repetition_levels_rle_ =
563 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
564 uncompressed_data_ =
565 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
566 if (pager_->has_compressor()) {
567 compressor_temp_buffer_ =
568 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
569 }
570 }
571
572 virtual ~ColumnWriterImpl() = default;
573
574 int64_t Close();
575
576 protected:
577 virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
578
579 // Serializes Dictionary Page if enabled
580 virtual void WriteDictionaryPage() = 0;
581
582 // Plain-encoded statistics of the current page
583 virtual EncodedStatistics GetPageStatistics() = 0;
584
585 // Plain-encoded statistics of the whole chunk
586 virtual EncodedStatistics GetChunkStatistics() = 0;
587
588 // Merges page statistics into chunk statistics, then resets the values
589 virtual void ResetPageStatistics() = 0;
590
591 // Adds Data Pages to an in memory buffer in dictionary encoding mode
592 // Serializes the Data Pages in other encoding modes
593 void AddDataPage();
594
595 void BuildDataPageV1(int64_t definition_levels_rle_size,
596 int64_t repetition_levels_rle_size, int64_t uncompressed_size,
597 const std::shared_ptr<Buffer>& values);
598 void BuildDataPageV2(int64_t definition_levels_rle_size,
599 int64_t repetition_levels_rle_size, int64_t uncompressed_size,
600 const std::shared_ptr<Buffer>& values);
601
602 // Serializes Data Pages
WriteDataPage(const DataPage & page)603 void WriteDataPage(const DataPage& page) {
604 total_bytes_written_ += pager_->WriteDataPage(page);
605 }
606
607 // Write multiple definition levels
WriteDefinitionLevels(int64_t num_levels,const int16_t * levels)608 void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
609 DCHECK(!closed_);
610 PARQUET_THROW_NOT_OK(
611 definition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels));
612 }
613
614 // Write multiple repetition levels
WriteRepetitionLevels(int64_t num_levels,const int16_t * levels)615 void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
616 DCHECK(!closed_);
617 PARQUET_THROW_NOT_OK(
618 repetition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels));
619 }
620
621 // RLE encode the src_buffer into dest_buffer and return the encoded size
622 int64_t RleEncodeLevels(const void* src_buffer, ResizableBuffer* dest_buffer,
623 int16_t max_level, bool include_length_prefix = true);
624
625 // Serialize the buffered Data Pages
626 void FlushBufferedDataPages();
627
628 ColumnChunkMetaDataBuilder* metadata_;
629 const ColumnDescriptor* descr_;
630
631 std::unique_ptr<PageWriter> pager_;
632
633 bool has_dictionary_;
634 Encoding::type encoding_;
635 const WriterProperties* properties_;
636
637 LevelEncoder level_encoder_;
638
639 MemoryPool* allocator_;
640
641 // The total number of values stored in the data page. This is the maximum of
642 // the number of encoded definition levels or encoded values. For
643 // non-repeated, required columns, this is equal to the number of encoded
644 // values. For repeated or optional values, there may be fewer data values
645 // than levels, and this tells you how many encoded levels there are in that
646 // case.
647 int64_t num_buffered_values_;
648
649 // The total number of stored values. For repeated or optional values, this
650 // number may be lower than num_buffered_values_.
651 int64_t num_buffered_encoded_values_;
652
653 // Total number of rows written with this ColumnWriter
654 int rows_written_;
655
656 // Records the total number of bytes written by the serializer
657 int64_t total_bytes_written_;
658
659 // Records the current number of compressed bytes in a column
660 int64_t total_compressed_bytes_;
661
662 // Flag to check if the Writer has been closed
663 bool closed_;
664
665 // Flag to infer if dictionary encoding has fallen back to PLAIN
666 bool fallback_;
667
668 ::arrow::BufferBuilder definition_levels_sink_;
669 ::arrow::BufferBuilder repetition_levels_sink_;
670
671 std::shared_ptr<ResizableBuffer> definition_levels_rle_;
672 std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
673
674 std::shared_ptr<ResizableBuffer> uncompressed_data_;
675 std::shared_ptr<ResizableBuffer> compressor_temp_buffer_;
676
677 std::vector<std::unique_ptr<DataPage>> data_pages_;
678
679 private:
InitSinks()680 void InitSinks() {
681 definition_levels_sink_.Rewind(0);
682 repetition_levels_sink_.Rewind(0);
683 }
684
685 // Concatenate the encoded levels and values into one buffer
ConcatenateBuffers(int64_t definition_levels_rle_size,int64_t repetition_levels_rle_size,const std::shared_ptr<Buffer> & values,uint8_t * combined)686 void ConcatenateBuffers(int64_t definition_levels_rle_size,
687 int64_t repetition_levels_rle_size,
688 const std::shared_ptr<Buffer>& values, uint8_t* combined) {
689 memcpy(combined, repetition_levels_rle_->data(), repetition_levels_rle_size);
690 combined += repetition_levels_rle_size;
691 memcpy(combined, definition_levels_rle_->data(), definition_levels_rle_size);
692 combined += definition_levels_rle_size;
693 memcpy(combined, values->data(), values->size());
694 }
695 };
696
697 // return the size of the encoded buffer
RleEncodeLevels(const void * src_buffer,ResizableBuffer * dest_buffer,int16_t max_level,bool include_length_prefix)698 int64_t ColumnWriterImpl::RleEncodeLevels(const void* src_buffer,
699 ResizableBuffer* dest_buffer, int16_t max_level,
700 bool include_length_prefix) {
701 // V1 DataPage includes the length of the RLE level as a prefix.
702 int32_t prefix_size = include_length_prefix ? sizeof(int32_t) : 0;
703
704 // TODO: This only works with due to some RLE specifics
705 int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
706 static_cast<int>(num_buffered_values_)) +
707 prefix_size;
708
709 // Use Arrow::Buffer::shrink_to_fit = false
710 // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
711 PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
712
713 level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_),
714 dest_buffer->mutable_data() + prefix_size,
715 static_cast<int>(dest_buffer->size() - prefix_size));
716 int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_),
717 reinterpret_cast<const int16_t*>(src_buffer));
718 DCHECK_EQ(encoded, num_buffered_values_);
719
720 if (include_length_prefix) {
721 reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
722 }
723
724 return level_encoder_.len() + prefix_size;
725 }
726
AddDataPage()727 void ColumnWriterImpl::AddDataPage() {
728 int64_t definition_levels_rle_size = 0;
729 int64_t repetition_levels_rle_size = 0;
730
731 std::shared_ptr<Buffer> values = GetValuesBuffer();
732 bool is_v1_data_page = properties_->data_page_version() == ParquetDataPageVersion::V1;
733
734 if (descr_->max_definition_level() > 0) {
735 definition_levels_rle_size = RleEncodeLevels(
736 definition_levels_sink_.data(), definition_levels_rle_.get(),
737 descr_->max_definition_level(), /*include_length_prefix=*/is_v1_data_page);
738 }
739
740 if (descr_->max_repetition_level() > 0) {
741 repetition_levels_rle_size = RleEncodeLevels(
742 repetition_levels_sink_.data(), repetition_levels_rle_.get(),
743 descr_->max_repetition_level(), /*include_length_prefix=*/is_v1_data_page);
744 }
745
746 int64_t uncompressed_size =
747 definition_levels_rle_size + repetition_levels_rle_size + values->size();
748
749 if (is_v1_data_page) {
750 BuildDataPageV1(definition_levels_rle_size, repetition_levels_rle_size,
751 uncompressed_size, values);
752 } else {
753 BuildDataPageV2(definition_levels_rle_size, repetition_levels_rle_size,
754 uncompressed_size, values);
755 }
756
757 // Re-initialize the sinks for next Page.
758 InitSinks();
759 num_buffered_values_ = 0;
760 num_buffered_encoded_values_ = 0;
761 }
762
BuildDataPageV1(int64_t definition_levels_rle_size,int64_t repetition_levels_rle_size,int64_t uncompressed_size,const std::shared_ptr<Buffer> & values)763 void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
764 int64_t repetition_levels_rle_size,
765 int64_t uncompressed_size,
766 const std::shared_ptr<Buffer>& values) {
767 // Use Arrow::Buffer::shrink_to_fit = false
768 // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
769 PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
770 ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values,
771 uncompressed_data_->mutable_data());
772
773 EncodedStatistics page_stats = GetPageStatistics();
774 page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
775 page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
776 ResetPageStatistics();
777
778 std::shared_ptr<Buffer> compressed_data;
779 if (pager_->has_compressor()) {
780 pager_->Compress(*(uncompressed_data_.get()), compressor_temp_buffer_.get());
781 compressed_data = compressor_temp_buffer_;
782 } else {
783 compressed_data = uncompressed_data_;
784 }
785
786 // Write the page to OutputStream eagerly if there is no dictionary or
787 // if dictionary encoding has fallen back to PLAIN
788 if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
789 PARQUET_ASSIGN_OR_THROW(
790 auto compressed_data_copy,
791 compressed_data->CopySlice(0, compressed_data->size(), allocator_));
792 std::unique_ptr<DataPage> page_ptr(new DataPageV1(
793 compressed_data_copy, static_cast<int32_t>(num_buffered_values_), encoding_,
794 Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats));
795 total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
796
797 data_pages_.push_back(std::move(page_ptr));
798 } else { // Eagerly write pages
799 DataPageV1 page(compressed_data, static_cast<int32_t>(num_buffered_values_),
800 encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size,
801 page_stats);
802 WriteDataPage(page);
803 }
804 }
805
BuildDataPageV2(int64_t definition_levels_rle_size,int64_t repetition_levels_rle_size,int64_t uncompressed_size,const std::shared_ptr<Buffer> & values)806 void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
807 int64_t repetition_levels_rle_size,
808 int64_t uncompressed_size,
809 const std::shared_ptr<Buffer>& values) {
810 // Compress the values if needed. Repetition and definition levels are uncompressed in
811 // V2.
812 std::shared_ptr<Buffer> compressed_values;
813 if (pager_->has_compressor()) {
814 pager_->Compress(*values, compressor_temp_buffer_.get());
815 compressed_values = compressor_temp_buffer_;
816 } else {
817 compressed_values = values;
818 }
819
820 // Concatenate uncompressed levels and the possibly compressed values
821 int64_t combined_size =
822 definition_levels_rle_size + repetition_levels_rle_size + compressed_values->size();
823 std::shared_ptr<ResizableBuffer> combined = AllocateBuffer(allocator_, combined_size);
824
825 ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size,
826 compressed_values, combined->mutable_data());
827
828 EncodedStatistics page_stats = GetPageStatistics();
829 page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
830 page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
831 ResetPageStatistics();
832
833 int32_t num_values = static_cast<int32_t>(num_buffered_values_);
834 int32_t null_count = static_cast<int32_t>(page_stats.null_count);
835 int32_t def_levels_byte_length = static_cast<int32_t>(definition_levels_rle_size);
836 int32_t rep_levels_byte_length = static_cast<int32_t>(repetition_levels_rle_size);
837
838 // Write the page to OutputStream eagerly if there is no dictionary or
839 // if dictionary encoding has fallen back to PLAIN
840 if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
841 PARQUET_ASSIGN_OR_THROW(auto data_copy,
842 combined->CopySlice(0, combined->size(), allocator_));
843 std::unique_ptr<DataPage> page_ptr(new DataPageV2(
844 combined, num_values, null_count, num_values, encoding_, def_levels_byte_length,
845 rep_levels_byte_length, uncompressed_size, pager_->has_compressor()));
846 total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
847 data_pages_.push_back(std::move(page_ptr));
848 } else {
849 DataPageV2 page(combined, num_values, null_count, num_values, encoding_,
850 def_levels_byte_length, rep_levels_byte_length, uncompressed_size);
851 WriteDataPage(page);
852 }
853 }
854
Close()855 int64_t ColumnWriterImpl::Close() {
856 if (!closed_) {
857 closed_ = true;
858 if (has_dictionary_ && !fallback_) {
859 WriteDictionaryPage();
860 }
861
862 FlushBufferedDataPages();
863
864 EncodedStatistics chunk_statistics = GetChunkStatistics();
865 chunk_statistics.ApplyStatSizeLimits(
866 properties_->max_statistics_size(descr_->path()));
867 chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
868
869 // Write stats only if the column has at least one row written
870 if (rows_written_ > 0 && chunk_statistics.is_set()) {
871 metadata_->SetStatistics(chunk_statistics);
872 }
873 pager_->Close(has_dictionary_, fallback_);
874 }
875
876 return total_bytes_written_;
877 }
878
FlushBufferedDataPages()879 void ColumnWriterImpl::FlushBufferedDataPages() {
880 // Write all outstanding data to a new page
881 if (num_buffered_values_ > 0) {
882 AddDataPage();
883 }
884 for (const auto& page_ptr : data_pages_) {
885 WriteDataPage(*page_ptr);
886 }
887 data_pages_.clear();
888 total_compressed_bytes_ = 0;
889 }
890
891 // ----------------------------------------------------------------------
892 // TypedColumnWriter
893
894 template <typename Action>
DoInBatches(int64_t total,int64_t batch_size,Action && action)895 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
896 int64_t num_batches = static_cast<int>(total / batch_size);
897 for (int round = 0; round < num_batches; round++) {
898 action(round * batch_size, batch_size);
899 }
900 // Write the remaining values
901 if (total % batch_size > 0) {
902 action(num_batches * batch_size, total % batch_size);
903 }
904 }
905
DictionaryDirectWriteSupported(const::arrow::Array & array)906 bool DictionaryDirectWriteSupported(const ::arrow::Array& array) {
907 DCHECK_EQ(array.type_id(), ::arrow::Type::DICTIONARY);
908 const ::arrow::DictionaryType& dict_type =
909 static_cast<const ::arrow::DictionaryType&>(*array.type());
910 auto id = dict_type.value_type()->id();
911 return id == ::arrow::Type::BINARY || id == ::arrow::Type::STRING;
912 }
913
ConvertDictionaryToDense(const::arrow::Array & array,MemoryPool * pool,std::shared_ptr<::arrow::Array> * out)914 Status ConvertDictionaryToDense(const ::arrow::Array& array, MemoryPool* pool,
915 std::shared_ptr<::arrow::Array>* out) {
916 const ::arrow::DictionaryType& dict_type =
917 static_cast<const ::arrow::DictionaryType&>(*array.type());
918
919 // TODO(ARROW-1648): Remove this special handling once we require an Arrow
920 // version that has this fixed.
921 if (dict_type.value_type()->id() == ::arrow::Type::NA) {
922 *out = std::make_shared<::arrow::NullArray>(array.length());
923 return Status::OK();
924 }
925
926 ::arrow::compute::ExecContext ctx(pool);
927 ARROW_ASSIGN_OR_RAISE(Datum cast_output,
928 ::arrow::compute::Cast(array.data(), dict_type.value_type(),
929 ::arrow::compute::CastOptions(), &ctx));
930 *out = cast_output.make_array();
931 return Status::OK();
932 }
933
IsDictionaryEncoding(Encoding::type encoding)934 static inline bool IsDictionaryEncoding(Encoding::type encoding) {
935 return encoding == Encoding::PLAIN_DICTIONARY;
936 }
937
938 template <typename DType>
939 class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<DType> {
940 public:
941 using T = typename DType::c_type;
942
TypedColumnWriterImpl(ColumnChunkMetaDataBuilder * metadata,std::unique_ptr<PageWriter> pager,const bool use_dictionary,Encoding::type encoding,const WriterProperties * properties)943 TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
944 std::unique_ptr<PageWriter> pager, const bool use_dictionary,
945 Encoding::type encoding, const WriterProperties* properties)
946 : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding,
947 properties) {
948 current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_,
949 properties->memory_pool());
950
951 if (properties->statistics_enabled(descr_->path()) &&
952 (SortOrder::UNKNOWN != descr_->sort_order())) {
953 page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
954 chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
955 }
956 }
957
Close()958 int64_t Close() override { return ColumnWriterImpl::Close(); }
959
WriteBatch(int64_t num_values,const int16_t * def_levels,const int16_t * rep_levels,const T * values)960 void WriteBatch(int64_t num_values, const int16_t* def_levels,
961 const int16_t* rep_levels, const T* values) override {
962 // We check for DataPage limits only after we have inserted the values. If a user
963 // writes a large number of values, the DataPage size can be much above the limit.
964 // The purpose of this chunking is to bound this. Even if a user writes large number
965 // of values, the chunking will ensure the AddDataPage() is called at a reasonable
966 // pagesize limit
967 int64_t value_offset = 0;
968 auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
969 int64_t values_to_write = WriteLevels(batch_size, AddIfNotNull(def_levels, offset),
970 AddIfNotNull(rep_levels, offset));
971 // PARQUET-780
972 if (values_to_write > 0) {
973 DCHECK_NE(nullptr, values);
974 }
975 WriteValues(values + value_offset, values_to_write, batch_size - values_to_write);
976 CommitWriteAndCheckPageLimit(batch_size, values_to_write);
977 value_offset += values_to_write;
978
979 // Dictionary size checked separately from data page size since we
980 // circumvent this check when writing ::arrow::DictionaryArray directly
981 CheckDictionarySizeLimit();
982 };
983 DoInBatches(num_values, properties_->write_batch_size(), WriteChunk);
984 }
985
WriteBatchSpaced(int64_t num_values,const int16_t * def_levels,const int16_t * rep_levels,const uint8_t * valid_bits,int64_t valid_bits_offset,const T * values)986 void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
987 const int16_t* rep_levels, const uint8_t* valid_bits,
988 int64_t valid_bits_offset, const T* values) override {
989 // Like WriteBatch, but for spaced values
990 int64_t value_offset = 0;
991 auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
992 int64_t batch_num_values = 0;
993 int64_t batch_num_spaced_values = 0;
994 WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
995 AddIfNotNull(rep_levels, offset), &batch_num_values,
996 &batch_num_spaced_values);
997 WriteValuesSpaced(values + value_offset, batch_num_values, batch_num_spaced_values,
998 valid_bits, valid_bits_offset + value_offset);
999 CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values);
1000 value_offset += batch_num_spaced_values;
1001
1002 // Dictionary size checked separately from data page size since we
1003 // circumvent this check when writing ::arrow::DictionaryArray directly
1004 CheckDictionarySizeLimit();
1005 };
1006 DoInBatches(num_values, properties_->write_batch_size(), WriteChunk);
1007 }
1008
WriteArrow(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1009 Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
1010 int64_t num_levels, const ::arrow::Array& array,
1011 ArrowWriteContext* ctx) override {
1012 if (array.type()->id() == ::arrow::Type::DICTIONARY) {
1013 return WriteArrowDictionary(def_levels, rep_levels, num_levels, array, ctx);
1014 } else {
1015 return WriteArrowDense(def_levels, rep_levels, num_levels, array, ctx);
1016 }
1017 }
1018
EstimatedBufferedValueBytes() const1019 int64_t EstimatedBufferedValueBytes() const override {
1020 return current_encoder_->EstimatedDataEncodedSize();
1021 }
1022
1023 protected:
GetValuesBuffer()1024 std::shared_ptr<Buffer> GetValuesBuffer() override {
1025 return current_encoder_->FlushValues();
1026 }
1027
1028 // Internal function to handle direct writing of ::arrow::DictionaryArray,
1029 // since the standard logic concerning dictionary size limits and fallback to
1030 // plain encoding is circumvented
1031 Status WriteArrowDictionary(const int16_t* def_levels, const int16_t* rep_levels,
1032 int64_t num_levels, const ::arrow::Array& array,
1033 ArrowWriteContext* context);
1034
1035 Status WriteArrowDense(const int16_t* def_levels, const int16_t* rep_levels,
1036 int64_t num_levels, const ::arrow::Array& array,
1037 ArrowWriteContext* context);
1038
WriteDictionaryPage()1039 void WriteDictionaryPage() override {
1040 // We have to dynamic cast here because of TypedEncoder<Type> as
1041 // some compilers don't want to cast through virtual inheritance
1042 auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
1043 DCHECK(dict_encoder);
1044 std::shared_ptr<ResizableBuffer> buffer =
1045 AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
1046 dict_encoder->WriteDict(buffer->mutable_data());
1047
1048 DictionaryPage page(buffer, dict_encoder->num_entries(),
1049 properties_->dictionary_page_encoding());
1050 total_bytes_written_ += pager_->WriteDictionaryPage(page);
1051 }
1052
GetPageStatistics()1053 EncodedStatistics GetPageStatistics() override {
1054 EncodedStatistics result;
1055 if (page_statistics_) result = page_statistics_->Encode();
1056 return result;
1057 }
1058
GetChunkStatistics()1059 EncodedStatistics GetChunkStatistics() override {
1060 EncodedStatistics result;
1061 if (chunk_statistics_) result = chunk_statistics_->Encode();
1062 return result;
1063 }
1064
ResetPageStatistics()1065 void ResetPageStatistics() override {
1066 if (chunk_statistics_ != nullptr) {
1067 chunk_statistics_->Merge(*page_statistics_);
1068 page_statistics_->Reset();
1069 }
1070 }
1071
type() const1072 Type::type type() const override { return descr_->physical_type(); }
1073
descr() const1074 const ColumnDescriptor* descr() const override { return descr_; }
1075
rows_written() const1076 int64_t rows_written() const override { return rows_written_; }
1077
total_compressed_bytes() const1078 int64_t total_compressed_bytes() const override { return total_compressed_bytes_; }
1079
total_bytes_written() const1080 int64_t total_bytes_written() const override { return total_bytes_written_; }
1081
properties()1082 const WriterProperties* properties() override { return properties_; }
1083
1084 private:
1085 using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
1086 using TypedStats = TypedStatistics<DType>;
1087 std::unique_ptr<Encoder> current_encoder_;
1088 std::shared_ptr<TypedStats> page_statistics_;
1089 std::shared_ptr<TypedStats> chunk_statistics_;
1090
1091 // If writing a sequence of ::arrow::DictionaryArray to the writer, we keep the
1092 // dictionary passed to DictEncoder<T>::PutDictionary so we can check
1093 // subsequent array chunks to see either if materialization is required (in
1094 // which case we call back to the dense write path)
1095 std::shared_ptr<::arrow::Array> preserved_dictionary_;
1096
WriteLevels(int64_t num_values,const int16_t * def_levels,const int16_t * rep_levels)1097 int64_t WriteLevels(int64_t num_values, const int16_t* def_levels,
1098 const int16_t* rep_levels) {
1099 int64_t values_to_write = 0;
1100 // If the field is required and non-repeated, there are no definition levels
1101 if (descr_->max_definition_level() > 0) {
1102 for (int64_t i = 0; i < num_values; ++i) {
1103 if (def_levels[i] == descr_->max_definition_level()) {
1104 ++values_to_write;
1105 }
1106 }
1107
1108 WriteDefinitionLevels(num_values, def_levels);
1109 } else {
1110 // Required field, write all values
1111 values_to_write = num_values;
1112 }
1113
1114 // Not present for non-repeated fields
1115 if (descr_->max_repetition_level() > 0) {
1116 // A row could include more than one value
1117 // Count the occasions where we start a new row
1118 for (int64_t i = 0; i < num_values; ++i) {
1119 if (rep_levels[i] == 0) {
1120 rows_written_++;
1121 }
1122 }
1123
1124 WriteRepetitionLevels(num_values, rep_levels);
1125 } else {
1126 // Each value is exactly one row
1127 rows_written_ += static_cast<int>(num_values);
1128 }
1129 return values_to_write;
1130 }
1131
WriteLevelsSpaced(int64_t num_levels,const int16_t * def_levels,const int16_t * rep_levels,int64_t * out_values_to_write,int64_t * out_spaced_values_to_write)1132 void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels,
1133 const int16_t* rep_levels, int64_t* out_values_to_write,
1134 int64_t* out_spaced_values_to_write) {
1135 int64_t values_to_write = 0;
1136 int64_t spaced_values_to_write = 0;
1137 // If the field is required and non-repeated, there are no definition levels
1138 if (descr_->max_definition_level() > 0) {
1139 // Minimal definition level for which spaced values are written
1140 int16_t min_spaced_def_level = descr_->max_definition_level();
1141 if (descr_->schema_node()->is_optional()) {
1142 min_spaced_def_level--;
1143 }
1144 for (int64_t i = 0; i < num_levels; ++i) {
1145 if (def_levels[i] == descr_->max_definition_level()) {
1146 ++values_to_write;
1147 }
1148 if (def_levels[i] >= min_spaced_def_level) {
1149 ++spaced_values_to_write;
1150 }
1151 }
1152
1153 WriteDefinitionLevels(num_levels, def_levels);
1154 } else {
1155 // Required field, write all values
1156 values_to_write = num_levels;
1157 spaced_values_to_write = num_levels;
1158 }
1159
1160 // Not present for non-repeated fields
1161 if (descr_->max_repetition_level() > 0) {
1162 // A row could include more than one value
1163 // Count the occasions where we start a new row
1164 for (int64_t i = 0; i < num_levels; ++i) {
1165 if (rep_levels[i] == 0) {
1166 rows_written_++;
1167 }
1168 }
1169
1170 WriteRepetitionLevels(num_levels, rep_levels);
1171 } else {
1172 // Each value is exactly one row
1173 rows_written_ += static_cast<int>(num_levels);
1174 }
1175
1176 *out_values_to_write = values_to_write;
1177 *out_spaced_values_to_write = spaced_values_to_write;
1178 }
1179
CommitWriteAndCheckPageLimit(int64_t num_levels,int64_t num_values)1180 void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) {
1181 num_buffered_values_ += num_levels;
1182 num_buffered_encoded_values_ += num_values;
1183
1184 if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
1185 AddDataPage();
1186 }
1187 }
1188
FallbackToPlainEncoding()1189 void FallbackToPlainEncoding() {
1190 if (IsDictionaryEncoding(current_encoder_->encoding())) {
1191 WriteDictionaryPage();
1192 // Serialize the buffered Dictionary Indices
1193 FlushBufferedDataPages();
1194 fallback_ = true;
1195 // Only PLAIN encoding is supported for fallback in V1
1196 current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_,
1197 properties_->memory_pool());
1198 encoding_ = Encoding::PLAIN;
1199 }
1200 }
1201
1202 // Checks if the Dictionary Page size limit is reached
1203 // If the limit is reached, the Dictionary and Data Pages are serialized
1204 // The encoding is switched to PLAIN
1205 //
1206 // Only one Dictionary Page is written.
1207 // Fallback to PLAIN if dictionary page limit is reached.
CheckDictionarySizeLimit()1208 void CheckDictionarySizeLimit() {
1209 if (!has_dictionary_ || fallback_) {
1210 // Either not using dictionary encoding, or we have already fallen back
1211 // to PLAIN encoding because the size threshold was reached
1212 return;
1213 }
1214
1215 // We have to dynamic cast here because TypedEncoder<Type> as some compilers
1216 // don't want to cast through virtual inheritance
1217 auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
1218 if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
1219 FallbackToPlainEncoding();
1220 }
1221 }
1222
WriteValues(const T * values,int64_t num_values,int64_t num_nulls)1223 void WriteValues(const T* values, int64_t num_values, int64_t num_nulls) {
1224 dynamic_cast<ValueEncoderType*>(current_encoder_.get())
1225 ->Put(values, static_cast<int>(num_values));
1226 if (page_statistics_ != nullptr) {
1227 page_statistics_->Update(values, num_values, num_nulls);
1228 }
1229 }
1230
WriteValuesSpaced(const T * values,int64_t num_values,int64_t num_spaced_values,const uint8_t * valid_bits,int64_t valid_bits_offset)1231 void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values,
1232 const uint8_t* valid_bits, int64_t valid_bits_offset) {
1233 if (descr_->schema_node()->is_optional()) {
1234 dynamic_cast<ValueEncoderType*>(current_encoder_.get())
1235 ->PutSpaced(values, static_cast<int>(num_spaced_values), valid_bits,
1236 valid_bits_offset);
1237 } else {
1238 dynamic_cast<ValueEncoderType*>(current_encoder_.get())
1239 ->Put(values, static_cast<int>(num_values));
1240 }
1241 if (page_statistics_ != nullptr) {
1242 const int64_t num_nulls = num_spaced_values - num_values;
1243 page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_values,
1244 num_nulls);
1245 }
1246 }
1247 };
1248
1249 template <typename DType>
WriteArrowDictionary(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1250 Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(const int16_t* def_levels,
1251 const int16_t* rep_levels,
1252 int64_t num_levels,
1253 const ::arrow::Array& array,
1254 ArrowWriteContext* ctx) {
1255 // If this is the first time writing a DictionaryArray, then there's
1256 // a few possible paths to take:
1257 //
1258 // - If dictionary encoding is not enabled, convert to densely
1259 // encoded and call WriteArrow
1260 // - Dictionary encoding enabled
1261 // - If this is the first time this is called, then we call
1262 // PutDictionary into the encoder and then PutIndices on each
1263 // chunk. We store the dictionary that was written in
1264 // preserved_dictionary_ so that subsequent calls to this method
1265 // can make sure the dictionary has not changed
1266 // - On subsequent calls, we have to check whether the dictionary
1267 // has changed. If it has, then we trigger the varying
1268 // dictionary path and materialize each chunk and then call
1269 // WriteArrow with that
1270 auto WriteDense = [&] {
1271 std::shared_ptr<::arrow::Array> dense_array;
1272 RETURN_NOT_OK(
1273 ConvertDictionaryToDense(array, properties_->memory_pool(), &dense_array));
1274 return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx);
1275 };
1276
1277 if (!IsDictionaryEncoding(current_encoder_->encoding()) ||
1278 !DictionaryDirectWriteSupported(array)) {
1279 // No longer dictionary-encoding for whatever reason, maybe we never were
1280 // or we decided to stop. Note that WriteArrow can be invoked multiple
1281 // times with both dense and dictionary-encoded versions of the same data
1282 // without a problem. Any dense data will be hashed to indices until the
1283 // dictionary page limit is reached, at which everything (dictionary and
1284 // dense) will fall back to plain encoding
1285 return WriteDense();
1286 }
1287
1288 auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
1289 const auto& data = checked_cast<const ::arrow::DictionaryArray&>(array);
1290 std::shared_ptr<::arrow::Array> dictionary = data.dictionary();
1291 std::shared_ptr<::arrow::Array> indices = data.indices();
1292
1293 int64_t value_offset = 0;
1294 auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) {
1295 int64_t batch_num_values = 0;
1296 int64_t batch_num_spaced_values = 0;
1297 WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
1298 AddIfNotNull(rep_levels, offset), &batch_num_values,
1299 &batch_num_spaced_values);
1300 dict_encoder->PutIndices(*indices->Slice(value_offset, batch_num_spaced_values));
1301 CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
1302 value_offset += batch_num_spaced_values;
1303 };
1304
1305 // Handle seeing dictionary for the first time
1306 if (!preserved_dictionary_) {
1307 // It's a new dictionary. Call PutDictionary and keep track of it
1308 PARQUET_CATCH_NOT_OK(dict_encoder->PutDictionary(*dictionary));
1309
1310 // TODO(wesm): If some dictionary values are unobserved, then the
1311 // statistics will be inaccurate. Do we care enough to fix it?
1312 if (page_statistics_ != nullptr) {
1313 PARQUET_CATCH_NOT_OK(page_statistics_->Update(*dictionary));
1314 }
1315 preserved_dictionary_ = dictionary;
1316 } else if (!dictionary->Equals(*preserved_dictionary_)) {
1317 // Dictionary has changed
1318 PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding());
1319 return WriteDense();
1320 }
1321
1322 PARQUET_CATCH_NOT_OK(
1323 DoInBatches(num_levels, properties_->write_batch_size(), WriteIndicesChunk));
1324 return Status::OK();
1325 }
1326
1327 // ----------------------------------------------------------------------
1328 // Direct Arrow write path
1329
1330 template <typename ParquetType, typename ArrowType, typename Enable = void>
1331 struct SerializeFunctor {
1332 using ArrowCType = typename ArrowType::c_type;
1333 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
1334 using ParquetCType = typename ParquetType::c_type;
Serializeparquet::SerializeFunctor1335 Status Serialize(const ArrayType& array, ArrowWriteContext*, ParquetCType* out) {
1336 const ArrowCType* input = array.raw_values();
1337 if (array.null_count() > 0) {
1338 for (int i = 0; i < array.length(); i++) {
1339 out[i] = static_cast<ParquetCType>(input[i]);
1340 }
1341 } else {
1342 std::copy(input, input + array.length(), out);
1343 }
1344 return Status::OK();
1345 }
1346 };
1347
1348 template <typename ParquetType, typename ArrowType>
WriteArrowSerialize(const::arrow::Array & array,int64_t num_levels,const int16_t * def_levels,const int16_t * rep_levels,ArrowWriteContext * ctx,TypedColumnWriter<ParquetType> * writer)1349 Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels,
1350 const int16_t* def_levels, const int16_t* rep_levels,
1351 ArrowWriteContext* ctx,
1352 TypedColumnWriter<ParquetType>* writer) {
1353 using ParquetCType = typename ParquetType::c_type;
1354 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
1355
1356 ParquetCType* buffer = nullptr;
1357 PARQUET_THROW_NOT_OK(ctx->GetScratchData<ParquetCType>(array.length(), &buffer));
1358
1359 bool no_nulls =
1360 writer->descr()->schema_node()->is_required() || (array.null_count() == 0);
1361
1362 SerializeFunctor<ParquetType, ArrowType> functor;
1363 RETURN_NOT_OK(functor.Serialize(checked_cast<const ArrayType&>(array), ctx, buffer));
1364
1365 if (no_nulls) {
1366 PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer));
1367 } else {
1368 PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
1369 array.null_bitmap_data(),
1370 array.offset(), buffer));
1371 }
1372 return Status::OK();
1373 }
1374
1375 template <typename ParquetType>
WriteArrowZeroCopy(const::arrow::Array & array,int64_t num_levels,const int16_t * def_levels,const int16_t * rep_levels,ArrowWriteContext * ctx,TypedColumnWriter<ParquetType> * writer)1376 Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels,
1377 const int16_t* def_levels, const int16_t* rep_levels,
1378 ArrowWriteContext* ctx,
1379 TypedColumnWriter<ParquetType>* writer) {
1380 using T = typename ParquetType::c_type;
1381 const auto& data = static_cast<const ::arrow::PrimitiveArray&>(array);
1382 const T* values = nullptr;
1383 // The values buffer may be null if the array is empty (ARROW-2744)
1384 if (data.values() != nullptr) {
1385 values = reinterpret_cast<const T*>(data.values()->data()) + data.offset();
1386 } else {
1387 DCHECK_EQ(data.length(), 0);
1388 }
1389 if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
1390 PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values));
1391 } else {
1392 PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
1393 data.null_bitmap_data(), data.offset(),
1394 values));
1395 }
1396 return Status::OK();
1397 }
1398
1399 #define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \
1400 case ::arrow::Type::ArrowEnum: \
1401 return WriteArrowSerialize<ParquetType, ::arrow::ArrowType>( \
1402 array, num_levels, def_levels, rep_levels, ctx, this);
1403
1404 #define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \
1405 case ::arrow::Type::ArrowEnum: \
1406 return WriteArrowZeroCopy<ParquetType>(array, num_levels, def_levels, rep_levels, \
1407 ctx, this);
1408
1409 #define ARROW_UNSUPPORTED() \
1410 std::stringstream ss; \
1411 ss << "Arrow type " << array.type()->ToString() \
1412 << " cannot be written to Parquet type " << descr_->ToString(); \
1413 return Status::Invalid(ss.str());
1414
1415 // ----------------------------------------------------------------------
1416 // Write Arrow to BooleanType
1417
1418 template <>
1419 struct SerializeFunctor<BooleanType, ::arrow::BooleanType> {
Serializeparquet::SerializeFunctor1420 Status Serialize(const ::arrow::BooleanArray& data, ArrowWriteContext*, bool* out) {
1421 for (int i = 0; i < data.length(); i++) {
1422 *out++ = data.Value(i);
1423 }
1424 return Status::OK();
1425 }
1426 };
1427
1428 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1429 Status TypedColumnWriterImpl<BooleanType>::WriteArrowDense(const int16_t* def_levels,
1430 const int16_t* rep_levels,
1431 int64_t num_levels,
1432 const ::arrow::Array& array,
1433 ArrowWriteContext* ctx) {
1434 if (array.type_id() != ::arrow::Type::BOOL) {
1435 ARROW_UNSUPPORTED();
1436 }
1437 return WriteArrowSerialize<BooleanType, ::arrow::BooleanType>(
1438 array, num_levels, def_levels, rep_levels, ctx, this);
1439 }
1440
1441 // ----------------------------------------------------------------------
1442 // Write Arrow types to INT32
1443
1444 template <>
1445 struct SerializeFunctor<Int32Type, ::arrow::Date64Type> {
Serializeparquet::SerializeFunctor1446 Status Serialize(const ::arrow::Date64Array& array, ArrowWriteContext*, int32_t* out) {
1447 const int64_t* input = array.raw_values();
1448 for (int i = 0; i < array.length(); i++) {
1449 *out++ = static_cast<int32_t>(*input++ / 86400000);
1450 }
1451 return Status::OK();
1452 }
1453 };
1454
1455 template <>
1456 struct SerializeFunctor<Int32Type, ::arrow::Time32Type> {
Serializeparquet::SerializeFunctor1457 Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) {
1458 const int32_t* input = array.raw_values();
1459 const auto& type = static_cast<const ::arrow::Time32Type&>(*array.type());
1460 if (type.unit() == ::arrow::TimeUnit::SECOND) {
1461 for (int i = 0; i < array.length(); i++) {
1462 out[i] = input[i] * 1000;
1463 }
1464 } else {
1465 std::copy(input, input + array.length(), out);
1466 }
1467 return Status::OK();
1468 }
1469 };
1470
1471 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1472 Status TypedColumnWriterImpl<Int32Type>::WriteArrowDense(const int16_t* def_levels,
1473 const int16_t* rep_levels,
1474 int64_t num_levels,
1475 const ::arrow::Array& array,
1476 ArrowWriteContext* ctx) {
1477 switch (array.type()->id()) {
1478 case ::arrow::Type::NA: {
1479 PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr));
1480 } break;
1481 WRITE_SERIALIZE_CASE(INT8, Int8Type, Int32Type)
1482 WRITE_SERIALIZE_CASE(UINT8, UInt8Type, Int32Type)
1483 WRITE_SERIALIZE_CASE(INT16, Int16Type, Int32Type)
1484 WRITE_SERIALIZE_CASE(UINT16, UInt16Type, Int32Type)
1485 WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int32Type)
1486 WRITE_ZERO_COPY_CASE(INT32, Int32Type, Int32Type)
1487 WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type)
1488 WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type)
1489 WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type)
1490 default:
1491 ARROW_UNSUPPORTED()
1492 }
1493 return Status::OK();
1494 }
1495
1496 // ----------------------------------------------------------------------
1497 // Write Arrow to Int64 and Int96
1498
1499 #define INT96_CONVERT_LOOP(ConversionFunction) \
1500 for (int64_t i = 0; i < array.length(); i++) ConversionFunction(input[i], &out[i]);
1501
1502 template <>
1503 struct SerializeFunctor<Int96Type, ::arrow::TimestampType> {
Serializeparquet::SerializeFunctor1504 Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext*, Int96* out) {
1505 const int64_t* input = array.raw_values();
1506 const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
1507 switch (type.unit()) {
1508 case ::arrow::TimeUnit::NANO:
1509 INT96_CONVERT_LOOP(internal::NanosecondsToImpalaTimestamp);
1510 break;
1511 case ::arrow::TimeUnit::MICRO:
1512 INT96_CONVERT_LOOP(internal::MicrosecondsToImpalaTimestamp);
1513 break;
1514 case ::arrow::TimeUnit::MILLI:
1515 INT96_CONVERT_LOOP(internal::MillisecondsToImpalaTimestamp);
1516 break;
1517 case ::arrow::TimeUnit::SECOND:
1518 INT96_CONVERT_LOOP(internal::SecondsToImpalaTimestamp);
1519 break;
1520 }
1521 return Status::OK();
1522 }
1523 };
1524
1525 #define COERCE_DIVIDE -1
1526 #define COERCE_INVALID 0
1527 #define COERCE_MULTIPLY +1
1528
1529 static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {
1530 // from seconds ...
1531 {{COERCE_INVALID, 0}, // ... to seconds
1532 {COERCE_MULTIPLY, 1000}, // ... to millis
1533 {COERCE_MULTIPLY, 1000000}, // ... to micros
1534 {COERCE_MULTIPLY, INT64_C(1000000000)}}, // ... to nanos
1535 // from millis ...
1536 {{COERCE_INVALID, 0},
1537 {COERCE_MULTIPLY, 1},
1538 {COERCE_MULTIPLY, 1000},
1539 {COERCE_MULTIPLY, 1000000}},
1540 // from micros ...
1541 {{COERCE_INVALID, 0},
1542 {COERCE_DIVIDE, 1000},
1543 {COERCE_MULTIPLY, 1},
1544 {COERCE_MULTIPLY, 1000}},
1545 // from nanos ...
1546 {{COERCE_INVALID, 0},
1547 {COERCE_DIVIDE, 1000000},
1548 {COERCE_DIVIDE, 1000},
1549 {COERCE_MULTIPLY, 1}}};
1550
1551 template <>
1552 struct SerializeFunctor<Int64Type, ::arrow::TimestampType> {
Serializeparquet::SerializeFunctor1553 Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext* ctx,
1554 int64_t* out) {
1555 const auto& source_type = static_cast<const ::arrow::TimestampType&>(*array.type());
1556 auto source_unit = source_type.unit();
1557 const int64_t* values = array.raw_values();
1558
1559 ::arrow::TimeUnit::type target_unit = ctx->properties->coerce_timestamps_unit();
1560 auto target_type = ::arrow::timestamp(target_unit);
1561 bool truncation_allowed = ctx->properties->truncated_timestamps_allowed();
1562
1563 auto DivideBy = [&](const int64_t factor) {
1564 for (int64_t i = 0; i < array.length(); i++) {
1565 if (!truncation_allowed && array.IsValid(i) && (values[i] % factor != 0)) {
1566 return Status::Invalid("Casting from ", source_type.ToString(), " to ",
1567 target_type->ToString(),
1568 " would lose data: ", values[i]);
1569 }
1570 out[i] = values[i] / factor;
1571 }
1572 return Status::OK();
1573 };
1574
1575 auto MultiplyBy = [&](const int64_t factor) {
1576 for (int64_t i = 0; i < array.length(); i++) {
1577 out[i] = values[i] * factor;
1578 }
1579 return Status::OK();
1580 };
1581
1582 const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)]
1583 [static_cast<int>(target_unit)];
1584
1585 // .first -> coercion operation; .second -> scale factor
1586 DCHECK_NE(coercion.first, COERCE_INVALID);
1587 return coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second)
1588 : MultiplyBy(coercion.second);
1589 }
1590 };
1591
1592 #undef COERCE_DIVIDE
1593 #undef COERCE_INVALID
1594 #undef COERCE_MULTIPLY
1595
WriteTimestamps(const::arrow::Array & values,int64_t num_levels,const int16_t * def_levels,const int16_t * rep_levels,ArrowWriteContext * ctx,TypedColumnWriter<Int64Type> * writer)1596 Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels,
1597 const int16_t* def_levels, const int16_t* rep_levels,
1598 ArrowWriteContext* ctx, TypedColumnWriter<Int64Type>* writer) {
1599 const auto& source_type = static_cast<const ::arrow::TimestampType&>(*values.type());
1600
1601 auto WriteCoerce = [&](const ArrowWriterProperties* properties) {
1602 ArrowWriteContext temp_ctx = *ctx;
1603 temp_ctx.properties = properties;
1604 return WriteArrowSerialize<Int64Type, ::arrow::TimestampType>(
1605 values, num_levels, def_levels, rep_levels, &temp_ctx, writer);
1606 };
1607
1608 if (ctx->properties->coerce_timestamps_enabled()) {
1609 // User explicitly requested coercion to specific unit
1610 if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) {
1611 // No data conversion necessary
1612 return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels,
1613 ctx, writer);
1614 } else {
1615 return WriteCoerce(ctx->properties);
1616 }
1617 } else if (writer->properties()->version() == ParquetVersion::PARQUET_1_0 &&
1618 source_type.unit() == ::arrow::TimeUnit::NANO) {
1619 // Absent superseding user instructions, when writing Parquet version 1.0 files,
1620 // timestamps in nanoseconds are coerced to microseconds
1621 std::shared_ptr<ArrowWriterProperties> properties =
1622 (ArrowWriterProperties::Builder())
1623 .coerce_timestamps(::arrow::TimeUnit::MICRO)
1624 ->disallow_truncated_timestamps()
1625 ->build();
1626 return WriteCoerce(properties.get());
1627 } else if (source_type.unit() == ::arrow::TimeUnit::SECOND) {
1628 // Absent superseding user instructions, timestamps in seconds are coerced to
1629 // milliseconds
1630 std::shared_ptr<ArrowWriterProperties> properties =
1631 (ArrowWriterProperties::Builder())
1632 .coerce_timestamps(::arrow::TimeUnit::MILLI)
1633 ->build();
1634 return WriteCoerce(properties.get());
1635 } else {
1636 // No data conversion necessary
1637 return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels, ctx,
1638 writer);
1639 }
1640 }
1641
1642 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1643 Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(const int16_t* def_levels,
1644 const int16_t* rep_levels,
1645 int64_t num_levels,
1646 const ::arrow::Array& array,
1647 ArrowWriteContext* ctx) {
1648 switch (array.type()->id()) {
1649 case ::arrow::Type::TIMESTAMP:
1650 return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this);
1651 WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type)
1652 WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type)
1653 WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type)
1654 WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type)
1655 default:
1656 ARROW_UNSUPPORTED();
1657 }
1658 }
1659
1660 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1661 Status TypedColumnWriterImpl<Int96Type>::WriteArrowDense(const int16_t* def_levels,
1662 const int16_t* rep_levels,
1663 int64_t num_levels,
1664 const ::arrow::Array& array,
1665 ArrowWriteContext* ctx) {
1666 if (array.type_id() != ::arrow::Type::TIMESTAMP) {
1667 ARROW_UNSUPPORTED();
1668 }
1669 return WriteArrowSerialize<Int96Type, ::arrow::TimestampType>(
1670 array, num_levels, def_levels, rep_levels, ctx, this);
1671 }
1672
1673 // ----------------------------------------------------------------------
1674 // Floating point types
1675
1676 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1677 Status TypedColumnWriterImpl<FloatType>::WriteArrowDense(const int16_t* def_levels,
1678 const int16_t* rep_levels,
1679 int64_t num_levels,
1680 const ::arrow::Array& array,
1681 ArrowWriteContext* ctx) {
1682 if (array.type_id() != ::arrow::Type::FLOAT) {
1683 ARROW_UNSUPPORTED();
1684 }
1685 return WriteArrowZeroCopy<FloatType>(array, num_levels, def_levels, rep_levels, ctx,
1686 this);
1687 }
1688
1689 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1690 Status TypedColumnWriterImpl<DoubleType>::WriteArrowDense(const int16_t* def_levels,
1691 const int16_t* rep_levels,
1692 int64_t num_levels,
1693 const ::arrow::Array& array,
1694 ArrowWriteContext* ctx) {
1695 if (array.type_id() != ::arrow::Type::DOUBLE) {
1696 ARROW_UNSUPPORTED();
1697 }
1698 return WriteArrowZeroCopy<DoubleType>(array, num_levels, def_levels, rep_levels, ctx,
1699 this);
1700 }
1701
1702 // ----------------------------------------------------------------------
1703 // Write Arrow to BYTE_ARRAY
1704
1705 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1706 Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(const int16_t* def_levels,
1707 const int16_t* rep_levels,
1708 int64_t num_levels,
1709 const ::arrow::Array& array,
1710 ArrowWriteContext* ctx) {
1711 if (array.type()->id() != ::arrow::Type::BINARY &&
1712 array.type()->id() != ::arrow::Type::STRING) {
1713 ARROW_UNSUPPORTED();
1714 }
1715
1716 int64_t value_offset = 0;
1717 auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
1718 int64_t batch_num_values = 0;
1719 int64_t batch_num_spaced_values = 0;
1720 WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
1721 AddIfNotNull(rep_levels, offset), &batch_num_values,
1722 &batch_num_spaced_values);
1723 std::shared_ptr<::arrow::Array> data_slice =
1724 array.Slice(value_offset, batch_num_spaced_values);
1725 current_encoder_->Put(*data_slice);
1726 if (page_statistics_ != nullptr) {
1727 page_statistics_->Update(*data_slice);
1728 }
1729 CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
1730 CheckDictionarySizeLimit();
1731 value_offset += batch_num_spaced_values;
1732 };
1733
1734 PARQUET_CATCH_NOT_OK(
1735 DoInBatches(num_levels, properties_->write_batch_size(), WriteChunk));
1736 return Status::OK();
1737 }
1738
1739 // ----------------------------------------------------------------------
1740 // Write Arrow to FIXED_LEN_BYTE_ARRAY
1741
1742 template <typename ParquetType, typename ArrowType>
1743 struct SerializeFunctor<
1744 ParquetType, ArrowType,
1745 ::arrow::enable_if_t<::arrow::is_fixed_size_binary_type<ArrowType>::value &&
1746 !::arrow::is_decimal_type<ArrowType>::value>> {
Serializeparquet::SerializeFunctor1747 Status Serialize(const ::arrow::FixedSizeBinaryArray& array, ArrowWriteContext*,
1748 FLBA* out) {
1749 if (array.null_count() == 0) {
1750 // no nulls, just dump the data
1751 // todo(advancedxy): use a writeBatch to avoid this step
1752 for (int64_t i = 0; i < array.length(); i++) {
1753 out[i] = FixedLenByteArray(array.GetValue(i));
1754 }
1755 } else {
1756 for (int64_t i = 0; i < array.length(); i++) {
1757 if (array.IsValid(i)) {
1758 out[i] = FixedLenByteArray(array.GetValue(i));
1759 }
1760 }
1761 }
1762 return Status::OK();
1763 }
1764 };
1765
1766 // ----------------------------------------------------------------------
1767 // Write Arrow to Decimal128
1768
1769 using ::arrow::internal::checked_pointer_cast;
1770
1771 // Requires a custom serializer because decimal128 in parquet are in big-endian
1772 // format. Thus, a temporary local buffer is required.
1773 template <typename ParquetType, typename ArrowType>
1774 struct SerializeFunctor<ParquetType, ArrowType, ::arrow::enable_if_decimal<ArrowType>> {
Serializeparquet::SerializeFunctor1775 Status Serialize(const ::arrow::Decimal128Array& array, ArrowWriteContext* ctx,
1776 FLBA* out) {
1777 AllocateScratch(array, ctx);
1778 auto offset = Offset(array);
1779
1780 if (array.null_count() == 0) {
1781 for (int64_t i = 0; i < array.length(); i++) {
1782 out[i] = FixDecimalEndianess(array.GetValue(i), offset);
1783 }
1784 } else {
1785 for (int64_t i = 0; i < array.length(); i++) {
1786 out[i] = array.IsValid(i) ? FixDecimalEndianess(array.GetValue(i), offset)
1787 : FixedLenByteArray();
1788 }
1789 }
1790
1791 return Status::OK();
1792 }
1793
1794 // Parquet's Decimal are stored with FixedLength values where the length is
1795 // proportional to the precision. Arrow's Decimal are always stored with 16
1796 // bytes. Thus the internal FLBA pointer must be adjusted by the offset calculated
1797 // here.
Offsetparquet::SerializeFunctor1798 int32_t Offset(const ::arrow::Decimal128Array& array) {
1799 auto decimal_type = checked_pointer_cast<::arrow::Decimal128Type>(array.type());
1800 return decimal_type->byte_width() - internal::DecimalSize(decimal_type->precision());
1801 }
1802
AllocateScratchparquet::SerializeFunctor1803 void AllocateScratch(const ::arrow::Decimal128Array& array, ArrowWriteContext* ctx) {
1804 int64_t non_null_count = array.length() - array.null_count();
1805 int64_t size = non_null_count * 16;
1806 scratch_buffer = AllocateBuffer(ctx->memory_pool, size);
1807 scratch = reinterpret_cast<int64_t*>(scratch_buffer->mutable_data());
1808 }
1809
FixDecimalEndianessparquet::SerializeFunctor1810 FixedLenByteArray FixDecimalEndianess(const uint8_t* in, int64_t offset) {
1811 const auto* u64_in = reinterpret_cast<const int64_t*>(in);
1812 auto out = reinterpret_cast<const uint8_t*>(scratch) + offset;
1813 *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[1]);
1814 *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[0]);
1815 return FixedLenByteArray(out);
1816 }
1817
1818 std::shared_ptr<ResizableBuffer> scratch_buffer;
1819 int64_t* scratch;
1820 };
1821
1822 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1823 Status TypedColumnWriterImpl<FLBAType>::WriteArrowDense(const int16_t* def_levels,
1824 const int16_t* rep_levels,
1825 int64_t num_levels,
1826 const ::arrow::Array& array,
1827 ArrowWriteContext* ctx) {
1828 switch (array.type()->id()) {
1829 WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
1830 WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType)
1831 default:
1832 break;
1833 }
1834 return Status::OK();
1835 }
1836
1837 // ----------------------------------------------------------------------
1838 // Dynamic column writer constructor
1839
Make(ColumnChunkMetaDataBuilder * metadata,std::unique_ptr<PageWriter> pager,const WriterProperties * properties)1840 std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
1841 std::unique_ptr<PageWriter> pager,
1842 const WriterProperties* properties) {
1843 const ColumnDescriptor* descr = metadata->descr();
1844 const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
1845 descr->physical_type() != Type::BOOLEAN;
1846 Encoding::type encoding = properties->encoding(descr->path());
1847 if (use_dictionary) {
1848 encoding = properties->dictionary_index_encoding();
1849 }
1850 switch (descr->physical_type()) {
1851 case Type::BOOLEAN:
1852 return std::make_shared<TypedColumnWriterImpl<BooleanType>>(
1853 metadata, std::move(pager), use_dictionary, encoding, properties);
1854 case Type::INT32:
1855 return std::make_shared<TypedColumnWriterImpl<Int32Type>>(
1856 metadata, std::move(pager), use_dictionary, encoding, properties);
1857 case Type::INT64:
1858 return std::make_shared<TypedColumnWriterImpl<Int64Type>>(
1859 metadata, std::move(pager), use_dictionary, encoding, properties);
1860 case Type::INT96:
1861 return std::make_shared<TypedColumnWriterImpl<Int96Type>>(
1862 metadata, std::move(pager), use_dictionary, encoding, properties);
1863 case Type::FLOAT:
1864 return std::make_shared<TypedColumnWriterImpl<FloatType>>(
1865 metadata, std::move(pager), use_dictionary, encoding, properties);
1866 case Type::DOUBLE:
1867 return std::make_shared<TypedColumnWriterImpl<DoubleType>>(
1868 metadata, std::move(pager), use_dictionary, encoding, properties);
1869 case Type::BYTE_ARRAY:
1870 return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>(
1871 metadata, std::move(pager), use_dictionary, encoding, properties);
1872 case Type::FIXED_LEN_BYTE_ARRAY:
1873 return std::make_shared<TypedColumnWriterImpl<FLBAType>>(
1874 metadata, std::move(pager), use_dictionary, encoding, properties);
1875 default:
1876 ParquetException::NYI("type reader not implemented");
1877 }
1878 // Unreachable code, but suppress compiler warning
1879 return std::shared_ptr<ColumnWriter>(nullptr);
1880 }
1881
1882 } // namespace parquet
1883