1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "parquet/column_writer.h"
19 
20 #include <algorithm>
21 #include <cstdint>
22 #include <cstring>
23 #include <map>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "arrow/array.h"
30 #include "arrow/buffer_builder.h"
31 #include "arrow/compute/api.h"
32 #include "arrow/status.h"
33 #include "arrow/type.h"
34 #include "arrow/type_traits.h"
35 #include "arrow/util/bit_stream_utils.h"
36 #include "arrow/util/checked_cast.h"
37 #include "arrow/util/compression.h"
38 #include "arrow/util/logging.h"
39 #include "arrow/util/rle_encoding.h"
40 #include "parquet/column_page.h"
41 #include "parquet/encoding.h"
42 #include "parquet/encryption_internal.h"
43 #include "parquet/internal_file_encryptor.h"
44 #include "parquet/metadata.h"
45 #include "parquet/platform.h"
46 #include "parquet/properties.h"
47 #include "parquet/schema.h"
48 #include "parquet/statistics.h"
49 #include "parquet/thrift_internal.h"
50 #include "parquet/types.h"
51 
52 using arrow::Datum;
53 using arrow::Status;
54 using arrow::BitUtil::BitWriter;
55 using arrow::internal::checked_cast;
56 using arrow::util::RleEncoder;
57 
58 namespace parquet {
59 
60 namespace {
61 
AddIfNotNull(const int16_t * base,int64_t offset)62 inline const int16_t* AddIfNotNull(const int16_t* base, int64_t offset) {
63   if (base != nullptr) {
64     return base + offset;
65   }
66   return nullptr;
67 }
68 
69 }  // namespace
70 
LevelEncoder()71 LevelEncoder::LevelEncoder() {}
~LevelEncoder()72 LevelEncoder::~LevelEncoder() {}
73 
Init(Encoding::type encoding,int16_t max_level,int num_buffered_values,uint8_t * data,int data_size)74 void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
75                         int num_buffered_values, uint8_t* data, int data_size) {
76   bit_width_ = BitUtil::Log2(max_level + 1);
77   encoding_ = encoding;
78   switch (encoding) {
79     case Encoding::RLE: {
80       rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
81       break;
82     }
83     case Encoding::BIT_PACKED: {
84       int num_bytes =
85           static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width_));
86       bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
87       break;
88     }
89     default:
90       throw ParquetException("Unknown encoding type for levels.");
91   }
92 }
93 
MaxBufferSize(Encoding::type encoding,int16_t max_level,int num_buffered_values)94 int LevelEncoder::MaxBufferSize(Encoding::type encoding, int16_t max_level,
95                                 int num_buffered_values) {
96   int bit_width = BitUtil::Log2(max_level + 1);
97   int num_bytes = 0;
98   switch (encoding) {
99     case Encoding::RLE: {
100       // TODO: Due to the way we currently check if the buffer is full enough,
101       // we need to have MinBufferSize as head room.
102       num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) +
103                   RleEncoder::MinBufferSize(bit_width);
104       break;
105     }
106     case Encoding::BIT_PACKED: {
107       num_bytes =
108           static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width));
109       break;
110     }
111     default:
112       throw ParquetException("Unknown encoding type for levels.");
113   }
114   return num_bytes;
115 }
116 
Encode(int batch_size,const int16_t * levels)117 int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
118   int num_encoded = 0;
119   if (!rle_encoder_ && !bit_packed_encoder_) {
120     throw ParquetException("Level encoders are not initialized.");
121   }
122 
123   if (encoding_ == Encoding::RLE) {
124     for (int i = 0; i < batch_size; ++i) {
125       if (!rle_encoder_->Put(*(levels + i))) {
126         break;
127       }
128       ++num_encoded;
129     }
130     rle_encoder_->Flush();
131     rle_length_ = rle_encoder_->len();
132   } else {
133     for (int i = 0; i < batch_size; ++i) {
134       if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) {
135         break;
136       }
137       ++num_encoded;
138     }
139     bit_packed_encoder_->Flush();
140   }
141   return num_encoded;
142 }
143 
144 // ----------------------------------------------------------------------
145 // PageWriter implementation
146 
147 // This subclass delimits pages appearing in a serialized stream, each preceded
148 // by a serialized Thrift format::PageHeader indicating the type of each page
149 // and the page metadata.
150 class SerializedPageWriter : public PageWriter {
151  public:
SerializedPageWriter(std::shared_ptr<ArrowOutputStream> sink,Compression::type codec,int compression_level,ColumnChunkMetaDataBuilder * metadata,int16_t row_group_ordinal,int16_t column_chunk_ordinal,MemoryPool * pool=::arrow::default_memory_pool (),std::shared_ptr<Encryptor> meta_encryptor=nullptr,std::shared_ptr<Encryptor> data_encryptor=nullptr)152   SerializedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
153                        int compression_level, ColumnChunkMetaDataBuilder* metadata,
154                        int16_t row_group_ordinal, int16_t column_chunk_ordinal,
155                        MemoryPool* pool = ::arrow::default_memory_pool(),
156                        std::shared_ptr<Encryptor> meta_encryptor = nullptr,
157                        std::shared_ptr<Encryptor> data_encryptor = nullptr)
158       : sink_(std::move(sink)),
159         metadata_(metadata),
160         pool_(pool),
161         num_values_(0),
162         dictionary_page_offset_(0),
163         data_page_offset_(0),
164         total_uncompressed_size_(0),
165         total_compressed_size_(0),
166         page_ordinal_(0),
167         row_group_ordinal_(row_group_ordinal),
168         column_ordinal_(column_chunk_ordinal),
169         meta_encryptor_(std::move(meta_encryptor)),
170         data_encryptor_(std::move(data_encryptor)),
171         encryption_buffer_(AllocateBuffer(pool, 0)) {
172     if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) {
173       InitEncryption();
174     }
175     compressor_ = GetCodec(codec, compression_level);
176     thrift_serializer_.reset(new ThriftSerializer);
177   }
178 
WriteDictionaryPage(const DictionaryPage & page)179   int64_t WriteDictionaryPage(const DictionaryPage& page) override {
180     int64_t uncompressed_size = page.size();
181     std::shared_ptr<Buffer> compressed_data;
182     if (has_compressor()) {
183       auto buffer = std::static_pointer_cast<ResizableBuffer>(
184           AllocateBuffer(pool_, uncompressed_size));
185       Compress(*(page.buffer().get()), buffer.get());
186       compressed_data = std::static_pointer_cast<Buffer>(buffer);
187     } else {
188       compressed_data = page.buffer();
189     }
190 
191     format::DictionaryPageHeader dict_page_header;
192     dict_page_header.__set_num_values(page.num_values());
193     dict_page_header.__set_encoding(ToThrift(page.encoding()));
194     dict_page_header.__set_is_sorted(page.is_sorted());
195 
196     const uint8_t* output_data_buffer = compressed_data->data();
197     int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
198 
199     if (data_encryptor_.get()) {
200       UpdateEncryption(encryption::kDictionaryPage);
201       PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
202           data_encryptor_->CiphertextSizeDelta() + output_data_len, false));
203       output_data_len = data_encryptor_->Encrypt(compressed_data->data(), output_data_len,
204                                                  encryption_buffer_->mutable_data());
205       output_data_buffer = encryption_buffer_->data();
206     }
207 
208     format::PageHeader page_header;
209     page_header.__set_type(format::PageType::DICTIONARY_PAGE);
210     page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
211     page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
212     page_header.__set_dictionary_page_header(dict_page_header);
213     // TODO(PARQUET-594) crc checksum
214 
215     PARQUET_ASSIGN_OR_THROW(int64_t start_pos, sink_->Tell());
216     if (dictionary_page_offset_ == 0) {
217       dictionary_page_offset_ = start_pos;
218     }
219 
220     if (meta_encryptor_) {
221       UpdateEncryption(encryption::kDictionaryPageHeader);
222     }
223     int64_t header_size =
224         thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_);
225 
226     PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
227 
228     total_uncompressed_size_ += uncompressed_size + header_size;
229     total_compressed_size_ += output_data_len + header_size;
230     ++dict_encoding_stats_[page.encoding()];
231 
232     PARQUET_ASSIGN_OR_THROW(int64_t final_pos, sink_->Tell());
233     return final_pos - start_pos;
234   }
235 
Close(bool has_dictionary,bool fallback)236   void Close(bool has_dictionary, bool fallback) override {
237     if (meta_encryptor_ != nullptr) {
238       UpdateEncryption(encryption::kColumnMetaData);
239     }
240     // index_page_offset = -1 since they are not supported
241     metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_,
242                       total_compressed_size_, total_uncompressed_size_, has_dictionary,
243                       fallback, dict_encoding_stats_, data_encoding_stats_,
244                       meta_encryptor_);
245     // Write metadata at end of column chunk
246     metadata_->WriteTo(sink_.get());
247   }
248 
249   /**
250    * Compress a buffer.
251    */
Compress(const Buffer & src_buffer,ResizableBuffer * dest_buffer)252   void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
253     DCHECK(compressor_ != nullptr);
254 
255     // Compress the data
256     int64_t max_compressed_size =
257         compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
258 
259     // Use Arrow::Buffer::shrink_to_fit = false
260     // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
261     PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
262 
263     PARQUET_ASSIGN_OR_THROW(
264         int64_t compressed_size,
265         compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size,
266                               dest_buffer->mutable_data()));
267     PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
268   }
269 
WriteDataPage(const DataPage & page)270   int64_t WriteDataPage(const DataPage& page) override {
271     int64_t uncompressed_size = page.uncompressed_size();
272     std::shared_ptr<Buffer> compressed_data = page.buffer();
273     const uint8_t* output_data_buffer = compressed_data->data();
274     int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
275 
276     if (data_encryptor_.get()) {
277       PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
278           data_encryptor_->CiphertextSizeDelta() + output_data_len, false));
279       UpdateEncryption(encryption::kDataPage);
280       output_data_len = data_encryptor_->Encrypt(compressed_data->data(), output_data_len,
281                                                  encryption_buffer_->mutable_data());
282       output_data_buffer = encryption_buffer_->data();
283     }
284 
285     format::PageHeader page_header;
286     page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
287     page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
288     // TODO(PARQUET-594) crc checksum
289 
290     if (page.type() == PageType::DATA_PAGE) {
291       const DataPageV1& v1_page = checked_cast<const DataPageV1&>(page);
292       SetDataPageHeader(page_header, v1_page);
293     } else if (page.type() == PageType::DATA_PAGE_V2) {
294       const DataPageV2& v2_page = checked_cast<const DataPageV2&>(page);
295       SetDataPageV2Header(page_header, v2_page);
296     } else {
297       throw ParquetException("Unexpected page type");
298     }
299 
300     PARQUET_ASSIGN_OR_THROW(int64_t start_pos, sink_->Tell());
301     if (page_ordinal_ == 0) {
302       data_page_offset_ = start_pos;
303     }
304 
305     if (meta_encryptor_) {
306       UpdateEncryption(encryption::kDataPageHeader);
307     }
308     int64_t header_size =
309         thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_);
310     PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
311 
312     total_uncompressed_size_ += uncompressed_size + header_size;
313     total_compressed_size_ += output_data_len + header_size;
314     num_values_ += page.num_values();
315     ++data_encoding_stats_[page.encoding()];
316     ++page_ordinal_;
317     PARQUET_ASSIGN_OR_THROW(int64_t current_pos, sink_->Tell());
318     return current_pos - start_pos;
319   }
320 
SetDataPageHeader(format::PageHeader & page_header,const DataPageV1 & page)321   void SetDataPageHeader(format::PageHeader& page_header, const DataPageV1& page) {
322     format::DataPageHeader data_page_header;
323     data_page_header.__set_num_values(page.num_values());
324     data_page_header.__set_encoding(ToThrift(page.encoding()));
325     data_page_header.__set_definition_level_encoding(
326         ToThrift(page.definition_level_encoding()));
327     data_page_header.__set_repetition_level_encoding(
328         ToThrift(page.repetition_level_encoding()));
329     data_page_header.__set_statistics(ToThrift(page.statistics()));
330 
331     page_header.__set_type(format::PageType::DATA_PAGE);
332     page_header.__set_data_page_header(data_page_header);
333   }
334 
SetDataPageV2Header(format::PageHeader & page_header,const DataPageV2 page)335   void SetDataPageV2Header(format::PageHeader& page_header, const DataPageV2 page) {
336     format::DataPageHeaderV2 data_page_header;
337     data_page_header.__set_num_values(page.num_values());
338     data_page_header.__set_num_nulls(page.num_nulls());
339     data_page_header.__set_num_rows(page.num_rows());
340     data_page_header.__set_encoding(ToThrift(page.encoding()));
341 
342     data_page_header.__set_definition_levels_byte_length(
343         page.definition_levels_byte_length());
344     data_page_header.__set_repetition_levels_byte_length(
345         page.repetition_levels_byte_length());
346 
347     data_page_header.__set_is_compressed(page.is_compressed());
348     data_page_header.__set_statistics(ToThrift(page.statistics()));
349 
350     page_header.__set_type(format::PageType::DATA_PAGE_V2);
351     page_header.__set_data_page_header_v2(data_page_header);
352   }
353 
has_compressor()354   bool has_compressor() override { return (compressor_ != nullptr); }
355 
num_values()356   int64_t num_values() { return num_values_; }
357 
dictionary_page_offset()358   int64_t dictionary_page_offset() { return dictionary_page_offset_; }
359 
data_page_offset()360   int64_t data_page_offset() { return data_page_offset_; }
361 
total_compressed_size()362   int64_t total_compressed_size() { return total_compressed_size_; }
363 
total_uncompressed_size()364   int64_t total_uncompressed_size() { return total_uncompressed_size_; }
365 
366  private:
367   // To allow UpdateEncryption on Close
368   friend class BufferedPageWriter;
369 
InitEncryption()370   void InitEncryption() {
371     // Prepare the AAD for quick update later.
372     if (data_encryptor_ != nullptr) {
373       data_page_aad_ = encryption::CreateModuleAad(
374           data_encryptor_->file_aad(), encryption::kDataPage, row_group_ordinal_,
375           column_ordinal_, kNonPageOrdinal);
376     }
377     if (meta_encryptor_ != nullptr) {
378       data_page_header_aad_ = encryption::CreateModuleAad(
379           meta_encryptor_->file_aad(), encryption::kDataPageHeader, row_group_ordinal_,
380           column_ordinal_, kNonPageOrdinal);
381     }
382   }
383 
UpdateEncryption(int8_t module_type)384   void UpdateEncryption(int8_t module_type) {
385     switch (module_type) {
386       case encryption::kColumnMetaData: {
387         meta_encryptor_->UpdateAad(encryption::CreateModuleAad(
388             meta_encryptor_->file_aad(), module_type, row_group_ordinal_, column_ordinal_,
389             kNonPageOrdinal));
390         break;
391       }
392       case encryption::kDataPage: {
393         encryption::QuickUpdatePageAad(data_page_aad_, page_ordinal_);
394         data_encryptor_->UpdateAad(data_page_aad_);
395         break;
396       }
397       case encryption::kDataPageHeader: {
398         encryption::QuickUpdatePageAad(data_page_header_aad_, page_ordinal_);
399         meta_encryptor_->UpdateAad(data_page_header_aad_);
400         break;
401       }
402       case encryption::kDictionaryPageHeader: {
403         meta_encryptor_->UpdateAad(encryption::CreateModuleAad(
404             meta_encryptor_->file_aad(), module_type, row_group_ordinal_, column_ordinal_,
405             kNonPageOrdinal));
406         break;
407       }
408       case encryption::kDictionaryPage: {
409         data_encryptor_->UpdateAad(encryption::CreateModuleAad(
410             data_encryptor_->file_aad(), module_type, row_group_ordinal_, column_ordinal_,
411             kNonPageOrdinal));
412         break;
413       }
414       default:
415         throw ParquetException("Unknown module type in UpdateEncryption");
416     }
417   }
418 
419   std::shared_ptr<ArrowOutputStream> sink_;
420   ColumnChunkMetaDataBuilder* metadata_;
421   MemoryPool* pool_;
422   int64_t num_values_;
423   int64_t dictionary_page_offset_;
424   int64_t data_page_offset_;
425   int64_t total_uncompressed_size_;
426   int64_t total_compressed_size_;
427   int16_t page_ordinal_;
428   int16_t row_group_ordinal_;
429   int16_t column_ordinal_;
430 
431   std::unique_ptr<ThriftSerializer> thrift_serializer_;
432 
433   // Compression codec to use.
434   std::unique_ptr<::arrow::util::Codec> compressor_;
435 
436   std::string data_page_aad_;
437   std::string data_page_header_aad_;
438 
439   std::shared_ptr<Encryptor> meta_encryptor_;
440   std::shared_ptr<Encryptor> data_encryptor_;
441 
442   std::shared_ptr<ResizableBuffer> encryption_buffer_;
443 
444   std::map<Encoding::type, int32_t> dict_encoding_stats_;
445   std::map<Encoding::type, int32_t> data_encoding_stats_;
446 };
447 
448 // This implementation of the PageWriter writes to the final sink on Close .
449 class BufferedPageWriter : public PageWriter {
450  public:
BufferedPageWriter(std::shared_ptr<ArrowOutputStream> sink,Compression::type codec,int compression_level,ColumnChunkMetaDataBuilder * metadata,int16_t row_group_ordinal,int16_t current_column_ordinal,MemoryPool * pool=::arrow::default_memory_pool (),std::shared_ptr<Encryptor> meta_encryptor=nullptr,std::shared_ptr<Encryptor> data_encryptor=nullptr)451   BufferedPageWriter(std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
452                      int compression_level, ColumnChunkMetaDataBuilder* metadata,
453                      int16_t row_group_ordinal, int16_t current_column_ordinal,
454                      MemoryPool* pool = ::arrow::default_memory_pool(),
455                      std::shared_ptr<Encryptor> meta_encryptor = nullptr,
456                      std::shared_ptr<Encryptor> data_encryptor = nullptr)
457       : final_sink_(std::move(sink)), metadata_(metadata), has_dictionary_pages_(false) {
458     in_memory_sink_ = CreateOutputStream(pool);
459     pager_ = std::unique_ptr<SerializedPageWriter>(
460         new SerializedPageWriter(in_memory_sink_, codec, compression_level, metadata,
461                                  row_group_ordinal, current_column_ordinal, pool,
462                                  std::move(meta_encryptor), std::move(data_encryptor)));
463   }
464 
WriteDictionaryPage(const DictionaryPage & page)465   int64_t WriteDictionaryPage(const DictionaryPage& page) override {
466     has_dictionary_pages_ = true;
467     return pager_->WriteDictionaryPage(page);
468   }
469 
Close(bool has_dictionary,bool fallback)470   void Close(bool has_dictionary, bool fallback) override {
471     if (pager_->meta_encryptor_ != nullptr) {
472       pager_->UpdateEncryption(encryption::kColumnMetaData);
473     }
474     // index_page_offset = -1 since they are not supported
475     PARQUET_ASSIGN_OR_THROW(int64_t final_position, final_sink_->Tell());
476     // dictionary page offset should be 0 iff there are no dictionary pages
477     auto dictionary_page_offset =
478         has_dictionary_pages_ ? pager_->dictionary_page_offset() + final_position : 0;
479     metadata_->Finish(pager_->num_values(), dictionary_page_offset, -1,
480                       pager_->data_page_offset() + final_position,
481                       pager_->total_compressed_size(), pager_->total_uncompressed_size(),
482                       has_dictionary, fallback, pager_->dict_encoding_stats_,
483                       pager_->data_encoding_stats_, pager_->meta_encryptor_);
484 
485     // Write metadata at end of column chunk
486     metadata_->WriteTo(in_memory_sink_.get());
487 
488     // flush everything to the serialized sink
489     PARQUET_ASSIGN_OR_THROW(auto buffer, in_memory_sink_->Finish());
490     PARQUET_THROW_NOT_OK(final_sink_->Write(buffer));
491   }
492 
WriteDataPage(const DataPage & page)493   int64_t WriteDataPage(const DataPage& page) override {
494     return pager_->WriteDataPage(page);
495   }
496 
Compress(const Buffer & src_buffer,ResizableBuffer * dest_buffer)497   void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
498     pager_->Compress(src_buffer, dest_buffer);
499   }
500 
has_compressor()501   bool has_compressor() override { return pager_->has_compressor(); }
502 
503  private:
504   std::shared_ptr<ArrowOutputStream> final_sink_;
505   ColumnChunkMetaDataBuilder* metadata_;
506   std::shared_ptr<::arrow::io::BufferOutputStream> in_memory_sink_;
507   std::unique_ptr<SerializedPageWriter> pager_;
508   bool has_dictionary_pages_;
509 };
510 
Open(std::shared_ptr<ArrowOutputStream> sink,Compression::type codec,int compression_level,ColumnChunkMetaDataBuilder * metadata,int16_t row_group_ordinal,int16_t column_chunk_ordinal,MemoryPool * pool,bool buffered_row_group,std::shared_ptr<Encryptor> meta_encryptor,std::shared_ptr<Encryptor> data_encryptor)511 std::unique_ptr<PageWriter> PageWriter::Open(
512     std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
513     int compression_level, ColumnChunkMetaDataBuilder* metadata,
514     int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool,
515     bool buffered_row_group, std::shared_ptr<Encryptor> meta_encryptor,
516     std::shared_ptr<Encryptor> data_encryptor) {
517   if (buffered_row_group) {
518     return std::unique_ptr<PageWriter>(
519         new BufferedPageWriter(std::move(sink), codec, compression_level, metadata,
520                                row_group_ordinal, column_chunk_ordinal, pool,
521                                std::move(meta_encryptor), std::move(data_encryptor)));
522   } else {
523     return std::unique_ptr<PageWriter>(
524         new SerializedPageWriter(std::move(sink), codec, compression_level, metadata,
525                                  row_group_ordinal, column_chunk_ordinal, pool,
526                                  std::move(meta_encryptor), std::move(data_encryptor)));
527   }
528 }
529 
530 // ----------------------------------------------------------------------
531 // ColumnWriter
532 
default_writer_properties()533 const std::shared_ptr<WriterProperties>& default_writer_properties() {
534   static std::shared_ptr<WriterProperties> default_writer_properties =
535       WriterProperties::Builder().build();
536   return default_writer_properties;
537 }
538 
539 class ColumnWriterImpl {
540  public:
ColumnWriterImpl(ColumnChunkMetaDataBuilder * metadata,std::unique_ptr<PageWriter> pager,const bool use_dictionary,Encoding::type encoding,const WriterProperties * properties)541   ColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
542                    std::unique_ptr<PageWriter> pager, const bool use_dictionary,
543                    Encoding::type encoding, const WriterProperties* properties)
544       : metadata_(metadata),
545         descr_(metadata->descr()),
546         pager_(std::move(pager)),
547         has_dictionary_(use_dictionary),
548         encoding_(encoding),
549         properties_(properties),
550         allocator_(properties->memory_pool()),
551         num_buffered_values_(0),
552         num_buffered_encoded_values_(0),
553         rows_written_(0),
554         total_bytes_written_(0),
555         total_compressed_bytes_(0),
556         closed_(false),
557         fallback_(false),
558         definition_levels_sink_(allocator_),
559         repetition_levels_sink_(allocator_) {
560     definition_levels_rle_ =
561         std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
562     repetition_levels_rle_ =
563         std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
564     uncompressed_data_ =
565         std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
566     if (pager_->has_compressor()) {
567       compressor_temp_buffer_ =
568           std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
569     }
570   }
571 
572   virtual ~ColumnWriterImpl() = default;
573 
574   int64_t Close();
575 
576  protected:
577   virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
578 
579   // Serializes Dictionary Page if enabled
580   virtual void WriteDictionaryPage() = 0;
581 
582   // Plain-encoded statistics of the current page
583   virtual EncodedStatistics GetPageStatistics() = 0;
584 
585   // Plain-encoded statistics of the whole chunk
586   virtual EncodedStatistics GetChunkStatistics() = 0;
587 
588   // Merges page statistics into chunk statistics, then resets the values
589   virtual void ResetPageStatistics() = 0;
590 
591   // Adds Data Pages to an in memory buffer in dictionary encoding mode
592   // Serializes the Data Pages in other encoding modes
593   void AddDataPage();
594 
595   void BuildDataPageV1(int64_t definition_levels_rle_size,
596                        int64_t repetition_levels_rle_size, int64_t uncompressed_size,
597                        const std::shared_ptr<Buffer>& values);
598   void BuildDataPageV2(int64_t definition_levels_rle_size,
599                        int64_t repetition_levels_rle_size, int64_t uncompressed_size,
600                        const std::shared_ptr<Buffer>& values);
601 
602   // Serializes Data Pages
WriteDataPage(const DataPage & page)603   void WriteDataPage(const DataPage& page) {
604     total_bytes_written_ += pager_->WriteDataPage(page);
605   }
606 
607   // Write multiple definition levels
WriteDefinitionLevels(int64_t num_levels,const int16_t * levels)608   void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
609     DCHECK(!closed_);
610     PARQUET_THROW_NOT_OK(
611         definition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels));
612   }
613 
614   // Write multiple repetition levels
WriteRepetitionLevels(int64_t num_levels,const int16_t * levels)615   void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
616     DCHECK(!closed_);
617     PARQUET_THROW_NOT_OK(
618         repetition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels));
619   }
620 
621   // RLE encode the src_buffer into dest_buffer and return the encoded size
622   int64_t RleEncodeLevels(const void* src_buffer, ResizableBuffer* dest_buffer,
623                           int16_t max_level, bool include_length_prefix = true);
624 
625   // Serialize the buffered Data Pages
626   void FlushBufferedDataPages();
627 
628   ColumnChunkMetaDataBuilder* metadata_;
629   const ColumnDescriptor* descr_;
630 
631   std::unique_ptr<PageWriter> pager_;
632 
633   bool has_dictionary_;
634   Encoding::type encoding_;
635   const WriterProperties* properties_;
636 
637   LevelEncoder level_encoder_;
638 
639   MemoryPool* allocator_;
640 
641   // The total number of values stored in the data page. This is the maximum of
642   // the number of encoded definition levels or encoded values. For
643   // non-repeated, required columns, this is equal to the number of encoded
644   // values. For repeated or optional values, there may be fewer data values
645   // than levels, and this tells you how many encoded levels there are in that
646   // case.
647   int64_t num_buffered_values_;
648 
649   // The total number of stored values. For repeated or optional values, this
650   // number may be lower than num_buffered_values_.
651   int64_t num_buffered_encoded_values_;
652 
653   // Total number of rows written with this ColumnWriter
654   int rows_written_;
655 
656   // Records the total number of bytes written by the serializer
657   int64_t total_bytes_written_;
658 
659   // Records the current number of compressed bytes in a column
660   int64_t total_compressed_bytes_;
661 
662   // Flag to check if the Writer has been closed
663   bool closed_;
664 
665   // Flag to infer if dictionary encoding has fallen back to PLAIN
666   bool fallback_;
667 
668   ::arrow::BufferBuilder definition_levels_sink_;
669   ::arrow::BufferBuilder repetition_levels_sink_;
670 
671   std::shared_ptr<ResizableBuffer> definition_levels_rle_;
672   std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
673 
674   std::shared_ptr<ResizableBuffer> uncompressed_data_;
675   std::shared_ptr<ResizableBuffer> compressor_temp_buffer_;
676 
677   std::vector<std::unique_ptr<DataPage>> data_pages_;
678 
679  private:
InitSinks()680   void InitSinks() {
681     definition_levels_sink_.Rewind(0);
682     repetition_levels_sink_.Rewind(0);
683   }
684 
685   // Concatenate the encoded levels and values into one buffer
ConcatenateBuffers(int64_t definition_levels_rle_size,int64_t repetition_levels_rle_size,const std::shared_ptr<Buffer> & values,uint8_t * combined)686   void ConcatenateBuffers(int64_t definition_levels_rle_size,
687                           int64_t repetition_levels_rle_size,
688                           const std::shared_ptr<Buffer>& values, uint8_t* combined) {
689     memcpy(combined, repetition_levels_rle_->data(), repetition_levels_rle_size);
690     combined += repetition_levels_rle_size;
691     memcpy(combined, definition_levels_rle_->data(), definition_levels_rle_size);
692     combined += definition_levels_rle_size;
693     memcpy(combined, values->data(), values->size());
694   }
695 };
696 
697 // return the size of the encoded buffer
RleEncodeLevels(const void * src_buffer,ResizableBuffer * dest_buffer,int16_t max_level,bool include_length_prefix)698 int64_t ColumnWriterImpl::RleEncodeLevels(const void* src_buffer,
699                                           ResizableBuffer* dest_buffer, int16_t max_level,
700                                           bool include_length_prefix) {
701   // V1 DataPage includes the length of the RLE level as a prefix.
702   int32_t prefix_size = include_length_prefix ? sizeof(int32_t) : 0;
703 
704   // TODO: This only works with due to some RLE specifics
705   int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
706                                                  static_cast<int>(num_buffered_values_)) +
707                      prefix_size;
708 
709   // Use Arrow::Buffer::shrink_to_fit = false
710   // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
711   PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
712 
713   level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_),
714                       dest_buffer->mutable_data() + prefix_size,
715                       static_cast<int>(dest_buffer->size() - prefix_size));
716   int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_),
717                                       reinterpret_cast<const int16_t*>(src_buffer));
718   DCHECK_EQ(encoded, num_buffered_values_);
719 
720   if (include_length_prefix) {
721     reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
722   }
723 
724   return level_encoder_.len() + prefix_size;
725 }
726 
AddDataPage()727 void ColumnWriterImpl::AddDataPage() {
728   int64_t definition_levels_rle_size = 0;
729   int64_t repetition_levels_rle_size = 0;
730 
731   std::shared_ptr<Buffer> values = GetValuesBuffer();
732   bool is_v1_data_page = properties_->data_page_version() == ParquetDataPageVersion::V1;
733 
734   if (descr_->max_definition_level() > 0) {
735     definition_levels_rle_size = RleEncodeLevels(
736         definition_levels_sink_.data(), definition_levels_rle_.get(),
737         descr_->max_definition_level(), /*include_length_prefix=*/is_v1_data_page);
738   }
739 
740   if (descr_->max_repetition_level() > 0) {
741     repetition_levels_rle_size = RleEncodeLevels(
742         repetition_levels_sink_.data(), repetition_levels_rle_.get(),
743         descr_->max_repetition_level(), /*include_length_prefix=*/is_v1_data_page);
744   }
745 
746   int64_t uncompressed_size =
747       definition_levels_rle_size + repetition_levels_rle_size + values->size();
748 
749   if (is_v1_data_page) {
750     BuildDataPageV1(definition_levels_rle_size, repetition_levels_rle_size,
751                     uncompressed_size, values);
752   } else {
753     BuildDataPageV2(definition_levels_rle_size, repetition_levels_rle_size,
754                     uncompressed_size, values);
755   }
756 
757   // Re-initialize the sinks for next Page.
758   InitSinks();
759   num_buffered_values_ = 0;
760   num_buffered_encoded_values_ = 0;
761 }
762 
BuildDataPageV1(int64_t definition_levels_rle_size,int64_t repetition_levels_rle_size,int64_t uncompressed_size,const std::shared_ptr<Buffer> & values)763 void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
764                                        int64_t repetition_levels_rle_size,
765                                        int64_t uncompressed_size,
766                                        const std::shared_ptr<Buffer>& values) {
767   // Use Arrow::Buffer::shrink_to_fit = false
768   // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
769   PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
770   ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values,
771                      uncompressed_data_->mutable_data());
772 
773   EncodedStatistics page_stats = GetPageStatistics();
774   page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
775   page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
776   ResetPageStatistics();
777 
778   std::shared_ptr<Buffer> compressed_data;
779   if (pager_->has_compressor()) {
780     pager_->Compress(*(uncompressed_data_.get()), compressor_temp_buffer_.get());
781     compressed_data = compressor_temp_buffer_;
782   } else {
783     compressed_data = uncompressed_data_;
784   }
785 
786   // Write the page to OutputStream eagerly if there is no dictionary or
787   // if dictionary encoding has fallen back to PLAIN
788   if (has_dictionary_ && !fallback_) {  // Save pages until end of dictionary encoding
789     PARQUET_ASSIGN_OR_THROW(
790         auto compressed_data_copy,
791         compressed_data->CopySlice(0, compressed_data->size(), allocator_));
792     std::unique_ptr<DataPage> page_ptr(new DataPageV1(
793         compressed_data_copy, static_cast<int32_t>(num_buffered_values_), encoding_,
794         Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats));
795     total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
796 
797     data_pages_.push_back(std::move(page_ptr));
798   } else {  // Eagerly write pages
799     DataPageV1 page(compressed_data, static_cast<int32_t>(num_buffered_values_),
800                     encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size,
801                     page_stats);
802     WriteDataPage(page);
803   }
804 }
805 
BuildDataPageV2(int64_t definition_levels_rle_size,int64_t repetition_levels_rle_size,int64_t uncompressed_size,const std::shared_ptr<Buffer> & values)806 void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
807                                        int64_t repetition_levels_rle_size,
808                                        int64_t uncompressed_size,
809                                        const std::shared_ptr<Buffer>& values) {
810   // Compress the values if needed. Repetition and definition levels are uncompressed in
811   // V2.
812   std::shared_ptr<Buffer> compressed_values;
813   if (pager_->has_compressor()) {
814     pager_->Compress(*values, compressor_temp_buffer_.get());
815     compressed_values = compressor_temp_buffer_;
816   } else {
817     compressed_values = values;
818   }
819 
820   // Concatenate uncompressed levels and the possibly compressed values
821   int64_t combined_size =
822       definition_levels_rle_size + repetition_levels_rle_size + compressed_values->size();
823   std::shared_ptr<ResizableBuffer> combined = AllocateBuffer(allocator_, combined_size);
824 
825   ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size,
826                      compressed_values, combined->mutable_data());
827 
828   EncodedStatistics page_stats = GetPageStatistics();
829   page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
830   page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
831   ResetPageStatistics();
832 
833   int32_t num_values = static_cast<int32_t>(num_buffered_values_);
834   int32_t null_count = static_cast<int32_t>(page_stats.null_count);
835   int32_t def_levels_byte_length = static_cast<int32_t>(definition_levels_rle_size);
836   int32_t rep_levels_byte_length = static_cast<int32_t>(repetition_levels_rle_size);
837 
838   // Write the page to OutputStream eagerly if there is no dictionary or
839   // if dictionary encoding has fallen back to PLAIN
840   if (has_dictionary_ && !fallback_) {  // Save pages until end of dictionary encoding
841     PARQUET_ASSIGN_OR_THROW(auto data_copy,
842                             combined->CopySlice(0, combined->size(), allocator_));
843     std::unique_ptr<DataPage> page_ptr(new DataPageV2(
844         combined, num_values, null_count, num_values, encoding_, def_levels_byte_length,
845         rep_levels_byte_length, uncompressed_size, pager_->has_compressor()));
846     total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
847     data_pages_.push_back(std::move(page_ptr));
848   } else {
849     DataPageV2 page(combined, num_values, null_count, num_values, encoding_,
850                     def_levels_byte_length, rep_levels_byte_length, uncompressed_size);
851     WriteDataPage(page);
852   }
853 }
854 
Close()855 int64_t ColumnWriterImpl::Close() {
856   if (!closed_) {
857     closed_ = true;
858     if (has_dictionary_ && !fallback_) {
859       WriteDictionaryPage();
860     }
861 
862     FlushBufferedDataPages();
863 
864     EncodedStatistics chunk_statistics = GetChunkStatistics();
865     chunk_statistics.ApplyStatSizeLimits(
866         properties_->max_statistics_size(descr_->path()));
867     chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
868 
869     // Write stats only if the column has at least one row written
870     if (rows_written_ > 0 && chunk_statistics.is_set()) {
871       metadata_->SetStatistics(chunk_statistics);
872     }
873     pager_->Close(has_dictionary_, fallback_);
874   }
875 
876   return total_bytes_written_;
877 }
878 
FlushBufferedDataPages()879 void ColumnWriterImpl::FlushBufferedDataPages() {
880   // Write all outstanding data to a new page
881   if (num_buffered_values_ > 0) {
882     AddDataPage();
883   }
884   for (const auto& page_ptr : data_pages_) {
885     WriteDataPage(*page_ptr);
886   }
887   data_pages_.clear();
888   total_compressed_bytes_ = 0;
889 }
890 
891 // ----------------------------------------------------------------------
892 // TypedColumnWriter
893 
894 template <typename Action>
DoInBatches(int64_t total,int64_t batch_size,Action && action)895 inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
896   int64_t num_batches = static_cast<int>(total / batch_size);
897   for (int round = 0; round < num_batches; round++) {
898     action(round * batch_size, batch_size);
899   }
900   // Write the remaining values
901   if (total % batch_size > 0) {
902     action(num_batches * batch_size, total % batch_size);
903   }
904 }
905 
DictionaryDirectWriteSupported(const::arrow::Array & array)906 bool DictionaryDirectWriteSupported(const ::arrow::Array& array) {
907   DCHECK_EQ(array.type_id(), ::arrow::Type::DICTIONARY);
908   const ::arrow::DictionaryType& dict_type =
909       static_cast<const ::arrow::DictionaryType&>(*array.type());
910   auto id = dict_type.value_type()->id();
911   return id == ::arrow::Type::BINARY || id == ::arrow::Type::STRING;
912 }
913 
ConvertDictionaryToDense(const::arrow::Array & array,MemoryPool * pool,std::shared_ptr<::arrow::Array> * out)914 Status ConvertDictionaryToDense(const ::arrow::Array& array, MemoryPool* pool,
915                                 std::shared_ptr<::arrow::Array>* out) {
916   const ::arrow::DictionaryType& dict_type =
917       static_cast<const ::arrow::DictionaryType&>(*array.type());
918 
919   // TODO(ARROW-1648): Remove this special handling once we require an Arrow
920   // version that has this fixed.
921   if (dict_type.value_type()->id() == ::arrow::Type::NA) {
922     *out = std::make_shared<::arrow::NullArray>(array.length());
923     return Status::OK();
924   }
925 
926   ::arrow::compute::ExecContext ctx(pool);
927   ARROW_ASSIGN_OR_RAISE(Datum cast_output,
928                         ::arrow::compute::Cast(array.data(), dict_type.value_type(),
929                                                ::arrow::compute::CastOptions(), &ctx));
930   *out = cast_output.make_array();
931   return Status::OK();
932 }
933 
IsDictionaryEncoding(Encoding::type encoding)934 static inline bool IsDictionaryEncoding(Encoding::type encoding) {
935   return encoding == Encoding::PLAIN_DICTIONARY;
936 }
937 
938 template <typename DType>
939 class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<DType> {
940  public:
941   using T = typename DType::c_type;
942 
TypedColumnWriterImpl(ColumnChunkMetaDataBuilder * metadata,std::unique_ptr<PageWriter> pager,const bool use_dictionary,Encoding::type encoding,const WriterProperties * properties)943   TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
944                         std::unique_ptr<PageWriter> pager, const bool use_dictionary,
945                         Encoding::type encoding, const WriterProperties* properties)
946       : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding,
947                          properties) {
948     current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_,
949                                    properties->memory_pool());
950 
951     if (properties->statistics_enabled(descr_->path()) &&
952         (SortOrder::UNKNOWN != descr_->sort_order())) {
953       page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
954       chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
955     }
956   }
957 
Close()958   int64_t Close() override { return ColumnWriterImpl::Close(); }
959 
WriteBatch(int64_t num_values,const int16_t * def_levels,const int16_t * rep_levels,const T * values)960   void WriteBatch(int64_t num_values, const int16_t* def_levels,
961                   const int16_t* rep_levels, const T* values) override {
962     // We check for DataPage limits only after we have inserted the values. If a user
963     // writes a large number of values, the DataPage size can be much above the limit.
964     // The purpose of this chunking is to bound this. Even if a user writes large number
965     // of values, the chunking will ensure the AddDataPage() is called at a reasonable
966     // pagesize limit
967     int64_t value_offset = 0;
968     auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
969       int64_t values_to_write = WriteLevels(batch_size, AddIfNotNull(def_levels, offset),
970                                             AddIfNotNull(rep_levels, offset));
971       // PARQUET-780
972       if (values_to_write > 0) {
973         DCHECK_NE(nullptr, values);
974       }
975       WriteValues(values + value_offset, values_to_write, batch_size - values_to_write);
976       CommitWriteAndCheckPageLimit(batch_size, values_to_write);
977       value_offset += values_to_write;
978 
979       // Dictionary size checked separately from data page size since we
980       // circumvent this check when writing ::arrow::DictionaryArray directly
981       CheckDictionarySizeLimit();
982     };
983     DoInBatches(num_values, properties_->write_batch_size(), WriteChunk);
984   }
985 
WriteBatchSpaced(int64_t num_values,const int16_t * def_levels,const int16_t * rep_levels,const uint8_t * valid_bits,int64_t valid_bits_offset,const T * values)986   void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
987                         const int16_t* rep_levels, const uint8_t* valid_bits,
988                         int64_t valid_bits_offset, const T* values) override {
989     // Like WriteBatch, but for spaced values
990     int64_t value_offset = 0;
991     auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
992       int64_t batch_num_values = 0;
993       int64_t batch_num_spaced_values = 0;
994       WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
995                         AddIfNotNull(rep_levels, offset), &batch_num_values,
996                         &batch_num_spaced_values);
997       WriteValuesSpaced(values + value_offset, batch_num_values, batch_num_spaced_values,
998                         valid_bits, valid_bits_offset + value_offset);
999       CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values);
1000       value_offset += batch_num_spaced_values;
1001 
1002       // Dictionary size checked separately from data page size since we
1003       // circumvent this check when writing ::arrow::DictionaryArray directly
1004       CheckDictionarySizeLimit();
1005     };
1006     DoInBatches(num_values, properties_->write_batch_size(), WriteChunk);
1007   }
1008 
WriteArrow(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1009   Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
1010                     int64_t num_levels, const ::arrow::Array& array,
1011                     ArrowWriteContext* ctx) override {
1012     if (array.type()->id() == ::arrow::Type::DICTIONARY) {
1013       return WriteArrowDictionary(def_levels, rep_levels, num_levels, array, ctx);
1014     } else {
1015       return WriteArrowDense(def_levels, rep_levels, num_levels, array, ctx);
1016     }
1017   }
1018 
EstimatedBufferedValueBytes() const1019   int64_t EstimatedBufferedValueBytes() const override {
1020     return current_encoder_->EstimatedDataEncodedSize();
1021   }
1022 
1023  protected:
GetValuesBuffer()1024   std::shared_ptr<Buffer> GetValuesBuffer() override {
1025     return current_encoder_->FlushValues();
1026   }
1027 
1028   // Internal function to handle direct writing of ::arrow::DictionaryArray,
1029   // since the standard logic concerning dictionary size limits and fallback to
1030   // plain encoding is circumvented
1031   Status WriteArrowDictionary(const int16_t* def_levels, const int16_t* rep_levels,
1032                               int64_t num_levels, const ::arrow::Array& array,
1033                               ArrowWriteContext* context);
1034 
1035   Status WriteArrowDense(const int16_t* def_levels, const int16_t* rep_levels,
1036                          int64_t num_levels, const ::arrow::Array& array,
1037                          ArrowWriteContext* context);
1038 
WriteDictionaryPage()1039   void WriteDictionaryPage() override {
1040     // We have to dynamic cast here because of TypedEncoder<Type> as
1041     // some compilers don't want to cast through virtual inheritance
1042     auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
1043     DCHECK(dict_encoder);
1044     std::shared_ptr<ResizableBuffer> buffer =
1045         AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
1046     dict_encoder->WriteDict(buffer->mutable_data());
1047 
1048     DictionaryPage page(buffer, dict_encoder->num_entries(),
1049                         properties_->dictionary_page_encoding());
1050     total_bytes_written_ += pager_->WriteDictionaryPage(page);
1051   }
1052 
GetPageStatistics()1053   EncodedStatistics GetPageStatistics() override {
1054     EncodedStatistics result;
1055     if (page_statistics_) result = page_statistics_->Encode();
1056     return result;
1057   }
1058 
GetChunkStatistics()1059   EncodedStatistics GetChunkStatistics() override {
1060     EncodedStatistics result;
1061     if (chunk_statistics_) result = chunk_statistics_->Encode();
1062     return result;
1063   }
1064 
ResetPageStatistics()1065   void ResetPageStatistics() override {
1066     if (chunk_statistics_ != nullptr) {
1067       chunk_statistics_->Merge(*page_statistics_);
1068       page_statistics_->Reset();
1069     }
1070   }
1071 
type() const1072   Type::type type() const override { return descr_->physical_type(); }
1073 
descr() const1074   const ColumnDescriptor* descr() const override { return descr_; }
1075 
rows_written() const1076   int64_t rows_written() const override { return rows_written_; }
1077 
total_compressed_bytes() const1078   int64_t total_compressed_bytes() const override { return total_compressed_bytes_; }
1079 
total_bytes_written() const1080   int64_t total_bytes_written() const override { return total_bytes_written_; }
1081 
properties()1082   const WriterProperties* properties() override { return properties_; }
1083 
1084  private:
1085   using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
1086   using TypedStats = TypedStatistics<DType>;
1087   std::unique_ptr<Encoder> current_encoder_;
1088   std::shared_ptr<TypedStats> page_statistics_;
1089   std::shared_ptr<TypedStats> chunk_statistics_;
1090 
1091   // If writing a sequence of ::arrow::DictionaryArray to the writer, we keep the
1092   // dictionary passed to DictEncoder<T>::PutDictionary so we can check
1093   // subsequent array chunks to see either if materialization is required (in
1094   // which case we call back to the dense write path)
1095   std::shared_ptr<::arrow::Array> preserved_dictionary_;
1096 
WriteLevels(int64_t num_values,const int16_t * def_levels,const int16_t * rep_levels)1097   int64_t WriteLevels(int64_t num_values, const int16_t* def_levels,
1098                       const int16_t* rep_levels) {
1099     int64_t values_to_write = 0;
1100     // If the field is required and non-repeated, there are no definition levels
1101     if (descr_->max_definition_level() > 0) {
1102       for (int64_t i = 0; i < num_values; ++i) {
1103         if (def_levels[i] == descr_->max_definition_level()) {
1104           ++values_to_write;
1105         }
1106       }
1107 
1108       WriteDefinitionLevels(num_values, def_levels);
1109     } else {
1110       // Required field, write all values
1111       values_to_write = num_values;
1112     }
1113 
1114     // Not present for non-repeated fields
1115     if (descr_->max_repetition_level() > 0) {
1116       // A row could include more than one value
1117       // Count the occasions where we start a new row
1118       for (int64_t i = 0; i < num_values; ++i) {
1119         if (rep_levels[i] == 0) {
1120           rows_written_++;
1121         }
1122       }
1123 
1124       WriteRepetitionLevels(num_values, rep_levels);
1125     } else {
1126       // Each value is exactly one row
1127       rows_written_ += static_cast<int>(num_values);
1128     }
1129     return values_to_write;
1130   }
1131 
WriteLevelsSpaced(int64_t num_levels,const int16_t * def_levels,const int16_t * rep_levels,int64_t * out_values_to_write,int64_t * out_spaced_values_to_write)1132   void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels,
1133                          const int16_t* rep_levels, int64_t* out_values_to_write,
1134                          int64_t* out_spaced_values_to_write) {
1135     int64_t values_to_write = 0;
1136     int64_t spaced_values_to_write = 0;
1137     // If the field is required and non-repeated, there are no definition levels
1138     if (descr_->max_definition_level() > 0) {
1139       // Minimal definition level for which spaced values are written
1140       int16_t min_spaced_def_level = descr_->max_definition_level();
1141       if (descr_->schema_node()->is_optional()) {
1142         min_spaced_def_level--;
1143       }
1144       for (int64_t i = 0; i < num_levels; ++i) {
1145         if (def_levels[i] == descr_->max_definition_level()) {
1146           ++values_to_write;
1147         }
1148         if (def_levels[i] >= min_spaced_def_level) {
1149           ++spaced_values_to_write;
1150         }
1151       }
1152 
1153       WriteDefinitionLevels(num_levels, def_levels);
1154     } else {
1155       // Required field, write all values
1156       values_to_write = num_levels;
1157       spaced_values_to_write = num_levels;
1158     }
1159 
1160     // Not present for non-repeated fields
1161     if (descr_->max_repetition_level() > 0) {
1162       // A row could include more than one value
1163       // Count the occasions where we start a new row
1164       for (int64_t i = 0; i < num_levels; ++i) {
1165         if (rep_levels[i] == 0) {
1166           rows_written_++;
1167         }
1168       }
1169 
1170       WriteRepetitionLevels(num_levels, rep_levels);
1171     } else {
1172       // Each value is exactly one row
1173       rows_written_ += static_cast<int>(num_levels);
1174     }
1175 
1176     *out_values_to_write = values_to_write;
1177     *out_spaced_values_to_write = spaced_values_to_write;
1178   }
1179 
CommitWriteAndCheckPageLimit(int64_t num_levels,int64_t num_values)1180   void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) {
1181     num_buffered_values_ += num_levels;
1182     num_buffered_encoded_values_ += num_values;
1183 
1184     if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
1185       AddDataPage();
1186     }
1187   }
1188 
FallbackToPlainEncoding()1189   void FallbackToPlainEncoding() {
1190     if (IsDictionaryEncoding(current_encoder_->encoding())) {
1191       WriteDictionaryPage();
1192       // Serialize the buffered Dictionary Indices
1193       FlushBufferedDataPages();
1194       fallback_ = true;
1195       // Only PLAIN encoding is supported for fallback in V1
1196       current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_,
1197                                      properties_->memory_pool());
1198       encoding_ = Encoding::PLAIN;
1199     }
1200   }
1201 
1202   // Checks if the Dictionary Page size limit is reached
1203   // If the limit is reached, the Dictionary and Data Pages are serialized
1204   // The encoding is switched to PLAIN
1205   //
1206   // Only one Dictionary Page is written.
1207   // Fallback to PLAIN if dictionary page limit is reached.
CheckDictionarySizeLimit()1208   void CheckDictionarySizeLimit() {
1209     if (!has_dictionary_ || fallback_) {
1210       // Either not using dictionary encoding, or we have already fallen back
1211       // to PLAIN encoding because the size threshold was reached
1212       return;
1213     }
1214 
1215     // We have to dynamic cast here because TypedEncoder<Type> as some compilers
1216     // don't want to cast through virtual inheritance
1217     auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
1218     if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
1219       FallbackToPlainEncoding();
1220     }
1221   }
1222 
WriteValues(const T * values,int64_t num_values,int64_t num_nulls)1223   void WriteValues(const T* values, int64_t num_values, int64_t num_nulls) {
1224     dynamic_cast<ValueEncoderType*>(current_encoder_.get())
1225         ->Put(values, static_cast<int>(num_values));
1226     if (page_statistics_ != nullptr) {
1227       page_statistics_->Update(values, num_values, num_nulls);
1228     }
1229   }
1230 
WriteValuesSpaced(const T * values,int64_t num_values,int64_t num_spaced_values,const uint8_t * valid_bits,int64_t valid_bits_offset)1231   void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values,
1232                          const uint8_t* valid_bits, int64_t valid_bits_offset) {
1233     if (descr_->schema_node()->is_optional()) {
1234       dynamic_cast<ValueEncoderType*>(current_encoder_.get())
1235           ->PutSpaced(values, static_cast<int>(num_spaced_values), valid_bits,
1236                       valid_bits_offset);
1237     } else {
1238       dynamic_cast<ValueEncoderType*>(current_encoder_.get())
1239           ->Put(values, static_cast<int>(num_values));
1240     }
1241     if (page_statistics_ != nullptr) {
1242       const int64_t num_nulls = num_spaced_values - num_values;
1243       page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_values,
1244                                      num_nulls);
1245     }
1246   }
1247 };
1248 
1249 template <typename DType>
WriteArrowDictionary(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1250 Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(const int16_t* def_levels,
1251                                                           const int16_t* rep_levels,
1252                                                           int64_t num_levels,
1253                                                           const ::arrow::Array& array,
1254                                                           ArrowWriteContext* ctx) {
1255   // If this is the first time writing a DictionaryArray, then there's
1256   // a few possible paths to take:
1257   //
1258   // - If dictionary encoding is not enabled, convert to densely
1259   //   encoded and call WriteArrow
1260   // - Dictionary encoding enabled
1261   //   - If this is the first time this is called, then we call
1262   //     PutDictionary into the encoder and then PutIndices on each
1263   //     chunk. We store the dictionary that was written in
1264   //     preserved_dictionary_ so that subsequent calls to this method
1265   //     can make sure the dictionary has not changed
1266   //   - On subsequent calls, we have to check whether the dictionary
1267   //     has changed. If it has, then we trigger the varying
1268   //     dictionary path and materialize each chunk and then call
1269   //     WriteArrow with that
1270   auto WriteDense = [&] {
1271     std::shared_ptr<::arrow::Array> dense_array;
1272     RETURN_NOT_OK(
1273         ConvertDictionaryToDense(array, properties_->memory_pool(), &dense_array));
1274     return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx);
1275   };
1276 
1277   if (!IsDictionaryEncoding(current_encoder_->encoding()) ||
1278       !DictionaryDirectWriteSupported(array)) {
1279     // No longer dictionary-encoding for whatever reason, maybe we never were
1280     // or we decided to stop. Note that WriteArrow can be invoked multiple
1281     // times with both dense and dictionary-encoded versions of the same data
1282     // without a problem. Any dense data will be hashed to indices until the
1283     // dictionary page limit is reached, at which everything (dictionary and
1284     // dense) will fall back to plain encoding
1285     return WriteDense();
1286   }
1287 
1288   auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
1289   const auto& data = checked_cast<const ::arrow::DictionaryArray&>(array);
1290   std::shared_ptr<::arrow::Array> dictionary = data.dictionary();
1291   std::shared_ptr<::arrow::Array> indices = data.indices();
1292 
1293   int64_t value_offset = 0;
1294   auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) {
1295     int64_t batch_num_values = 0;
1296     int64_t batch_num_spaced_values = 0;
1297     WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
1298                       AddIfNotNull(rep_levels, offset), &batch_num_values,
1299                       &batch_num_spaced_values);
1300     dict_encoder->PutIndices(*indices->Slice(value_offset, batch_num_spaced_values));
1301     CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
1302     value_offset += batch_num_spaced_values;
1303   };
1304 
1305   // Handle seeing dictionary for the first time
1306   if (!preserved_dictionary_) {
1307     // It's a new dictionary. Call PutDictionary and keep track of it
1308     PARQUET_CATCH_NOT_OK(dict_encoder->PutDictionary(*dictionary));
1309 
1310     // TODO(wesm): If some dictionary values are unobserved, then the
1311     // statistics will be inaccurate. Do we care enough to fix it?
1312     if (page_statistics_ != nullptr) {
1313       PARQUET_CATCH_NOT_OK(page_statistics_->Update(*dictionary));
1314     }
1315     preserved_dictionary_ = dictionary;
1316   } else if (!dictionary->Equals(*preserved_dictionary_)) {
1317     // Dictionary has changed
1318     PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding());
1319     return WriteDense();
1320   }
1321 
1322   PARQUET_CATCH_NOT_OK(
1323       DoInBatches(num_levels, properties_->write_batch_size(), WriteIndicesChunk));
1324   return Status::OK();
1325 }
1326 
1327 // ----------------------------------------------------------------------
1328 // Direct Arrow write path
1329 
1330 template <typename ParquetType, typename ArrowType, typename Enable = void>
1331 struct SerializeFunctor {
1332   using ArrowCType = typename ArrowType::c_type;
1333   using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
1334   using ParquetCType = typename ParquetType::c_type;
Serializeparquet::SerializeFunctor1335   Status Serialize(const ArrayType& array, ArrowWriteContext*, ParquetCType* out) {
1336     const ArrowCType* input = array.raw_values();
1337     if (array.null_count() > 0) {
1338       for (int i = 0; i < array.length(); i++) {
1339         out[i] = static_cast<ParquetCType>(input[i]);
1340       }
1341     } else {
1342       std::copy(input, input + array.length(), out);
1343     }
1344     return Status::OK();
1345   }
1346 };
1347 
1348 template <typename ParquetType, typename ArrowType>
WriteArrowSerialize(const::arrow::Array & array,int64_t num_levels,const int16_t * def_levels,const int16_t * rep_levels,ArrowWriteContext * ctx,TypedColumnWriter<ParquetType> * writer)1349 Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels,
1350                            const int16_t* def_levels, const int16_t* rep_levels,
1351                            ArrowWriteContext* ctx,
1352                            TypedColumnWriter<ParquetType>* writer) {
1353   using ParquetCType = typename ParquetType::c_type;
1354   using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
1355 
1356   ParquetCType* buffer = nullptr;
1357   PARQUET_THROW_NOT_OK(ctx->GetScratchData<ParquetCType>(array.length(), &buffer));
1358 
1359   bool no_nulls =
1360       writer->descr()->schema_node()->is_required() || (array.null_count() == 0);
1361 
1362   SerializeFunctor<ParquetType, ArrowType> functor;
1363   RETURN_NOT_OK(functor.Serialize(checked_cast<const ArrayType&>(array), ctx, buffer));
1364 
1365   if (no_nulls) {
1366     PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer));
1367   } else {
1368     PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
1369                                                   array.null_bitmap_data(),
1370                                                   array.offset(), buffer));
1371   }
1372   return Status::OK();
1373 }
1374 
1375 template <typename ParquetType>
WriteArrowZeroCopy(const::arrow::Array & array,int64_t num_levels,const int16_t * def_levels,const int16_t * rep_levels,ArrowWriteContext * ctx,TypedColumnWriter<ParquetType> * writer)1376 Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels,
1377                           const int16_t* def_levels, const int16_t* rep_levels,
1378                           ArrowWriteContext* ctx,
1379                           TypedColumnWriter<ParquetType>* writer) {
1380   using T = typename ParquetType::c_type;
1381   const auto& data = static_cast<const ::arrow::PrimitiveArray&>(array);
1382   const T* values = nullptr;
1383   // The values buffer may be null if the array is empty (ARROW-2744)
1384   if (data.values() != nullptr) {
1385     values = reinterpret_cast<const T*>(data.values()->data()) + data.offset();
1386   } else {
1387     DCHECK_EQ(data.length(), 0);
1388   }
1389   if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
1390     PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values));
1391   } else {
1392     PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
1393                                                   data.null_bitmap_data(), data.offset(),
1394                                                   values));
1395   }
1396   return Status::OK();
1397 }
1398 
1399 #define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType)  \
1400   case ::arrow::Type::ArrowEnum:                                 \
1401     return WriteArrowSerialize<ParquetType, ::arrow::ArrowType>( \
1402         array, num_levels, def_levels, rep_levels, ctx, this);
1403 
1404 #define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType)                       \
1405   case ::arrow::Type::ArrowEnum:                                                      \
1406     return WriteArrowZeroCopy<ParquetType>(array, num_levels, def_levels, rep_levels, \
1407                                            ctx, this);
1408 
1409 #define ARROW_UNSUPPORTED()                                          \
1410   std::stringstream ss;                                              \
1411   ss << "Arrow type " << array.type()->ToString()                    \
1412      << " cannot be written to Parquet type " << descr_->ToString(); \
1413   return Status::Invalid(ss.str());
1414 
1415 // ----------------------------------------------------------------------
1416 // Write Arrow to BooleanType
1417 
1418 template <>
1419 struct SerializeFunctor<BooleanType, ::arrow::BooleanType> {
Serializeparquet::SerializeFunctor1420   Status Serialize(const ::arrow::BooleanArray& data, ArrowWriteContext*, bool* out) {
1421     for (int i = 0; i < data.length(); i++) {
1422       *out++ = data.Value(i);
1423     }
1424     return Status::OK();
1425   }
1426 };
1427 
1428 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1429 Status TypedColumnWriterImpl<BooleanType>::WriteArrowDense(const int16_t* def_levels,
1430                                                            const int16_t* rep_levels,
1431                                                            int64_t num_levels,
1432                                                            const ::arrow::Array& array,
1433                                                            ArrowWriteContext* ctx) {
1434   if (array.type_id() != ::arrow::Type::BOOL) {
1435     ARROW_UNSUPPORTED();
1436   }
1437   return WriteArrowSerialize<BooleanType, ::arrow::BooleanType>(
1438       array, num_levels, def_levels, rep_levels, ctx, this);
1439 }
1440 
1441 // ----------------------------------------------------------------------
1442 // Write Arrow types to INT32
1443 
1444 template <>
1445 struct SerializeFunctor<Int32Type, ::arrow::Date64Type> {
Serializeparquet::SerializeFunctor1446   Status Serialize(const ::arrow::Date64Array& array, ArrowWriteContext*, int32_t* out) {
1447     const int64_t* input = array.raw_values();
1448     for (int i = 0; i < array.length(); i++) {
1449       *out++ = static_cast<int32_t>(*input++ / 86400000);
1450     }
1451     return Status::OK();
1452   }
1453 };
1454 
1455 template <>
1456 struct SerializeFunctor<Int32Type, ::arrow::Time32Type> {
Serializeparquet::SerializeFunctor1457   Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) {
1458     const int32_t* input = array.raw_values();
1459     const auto& type = static_cast<const ::arrow::Time32Type&>(*array.type());
1460     if (type.unit() == ::arrow::TimeUnit::SECOND) {
1461       for (int i = 0; i < array.length(); i++) {
1462         out[i] = input[i] * 1000;
1463       }
1464     } else {
1465       std::copy(input, input + array.length(), out);
1466     }
1467     return Status::OK();
1468   }
1469 };
1470 
1471 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1472 Status TypedColumnWriterImpl<Int32Type>::WriteArrowDense(const int16_t* def_levels,
1473                                                          const int16_t* rep_levels,
1474                                                          int64_t num_levels,
1475                                                          const ::arrow::Array& array,
1476                                                          ArrowWriteContext* ctx) {
1477   switch (array.type()->id()) {
1478     case ::arrow::Type::NA: {
1479       PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr));
1480     } break;
1481       WRITE_SERIALIZE_CASE(INT8, Int8Type, Int32Type)
1482       WRITE_SERIALIZE_CASE(UINT8, UInt8Type, Int32Type)
1483       WRITE_SERIALIZE_CASE(INT16, Int16Type, Int32Type)
1484       WRITE_SERIALIZE_CASE(UINT16, UInt16Type, Int32Type)
1485       WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int32Type)
1486       WRITE_ZERO_COPY_CASE(INT32, Int32Type, Int32Type)
1487       WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type)
1488       WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type)
1489       WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type)
1490     default:
1491       ARROW_UNSUPPORTED()
1492   }
1493   return Status::OK();
1494 }
1495 
1496 // ----------------------------------------------------------------------
1497 // Write Arrow to Int64 and Int96
1498 
1499 #define INT96_CONVERT_LOOP(ConversionFunction) \
1500   for (int64_t i = 0; i < array.length(); i++) ConversionFunction(input[i], &out[i]);
1501 
1502 template <>
1503 struct SerializeFunctor<Int96Type, ::arrow::TimestampType> {
Serializeparquet::SerializeFunctor1504   Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext*, Int96* out) {
1505     const int64_t* input = array.raw_values();
1506     const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
1507     switch (type.unit()) {
1508       case ::arrow::TimeUnit::NANO:
1509         INT96_CONVERT_LOOP(internal::NanosecondsToImpalaTimestamp);
1510         break;
1511       case ::arrow::TimeUnit::MICRO:
1512         INT96_CONVERT_LOOP(internal::MicrosecondsToImpalaTimestamp);
1513         break;
1514       case ::arrow::TimeUnit::MILLI:
1515         INT96_CONVERT_LOOP(internal::MillisecondsToImpalaTimestamp);
1516         break;
1517       case ::arrow::TimeUnit::SECOND:
1518         INT96_CONVERT_LOOP(internal::SecondsToImpalaTimestamp);
1519         break;
1520     }
1521     return Status::OK();
1522   }
1523 };
1524 
1525 #define COERCE_DIVIDE -1
1526 #define COERCE_INVALID 0
1527 #define COERCE_MULTIPLY +1
1528 
1529 static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {
1530     // from seconds ...
1531     {{COERCE_INVALID, 0},                      // ... to seconds
1532      {COERCE_MULTIPLY, 1000},                  // ... to millis
1533      {COERCE_MULTIPLY, 1000000},               // ... to micros
1534      {COERCE_MULTIPLY, INT64_C(1000000000)}},  // ... to nanos
1535     // from millis ...
1536     {{COERCE_INVALID, 0},
1537      {COERCE_MULTIPLY, 1},
1538      {COERCE_MULTIPLY, 1000},
1539      {COERCE_MULTIPLY, 1000000}},
1540     // from micros ...
1541     {{COERCE_INVALID, 0},
1542      {COERCE_DIVIDE, 1000},
1543      {COERCE_MULTIPLY, 1},
1544      {COERCE_MULTIPLY, 1000}},
1545     // from nanos ...
1546     {{COERCE_INVALID, 0},
1547      {COERCE_DIVIDE, 1000000},
1548      {COERCE_DIVIDE, 1000},
1549      {COERCE_MULTIPLY, 1}}};
1550 
1551 template <>
1552 struct SerializeFunctor<Int64Type, ::arrow::TimestampType> {
Serializeparquet::SerializeFunctor1553   Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext* ctx,
1554                    int64_t* out) {
1555     const auto& source_type = static_cast<const ::arrow::TimestampType&>(*array.type());
1556     auto source_unit = source_type.unit();
1557     const int64_t* values = array.raw_values();
1558 
1559     ::arrow::TimeUnit::type target_unit = ctx->properties->coerce_timestamps_unit();
1560     auto target_type = ::arrow::timestamp(target_unit);
1561     bool truncation_allowed = ctx->properties->truncated_timestamps_allowed();
1562 
1563     auto DivideBy = [&](const int64_t factor) {
1564       for (int64_t i = 0; i < array.length(); i++) {
1565         if (!truncation_allowed && array.IsValid(i) && (values[i] % factor != 0)) {
1566           return Status::Invalid("Casting from ", source_type.ToString(), " to ",
1567                                  target_type->ToString(),
1568                                  " would lose data: ", values[i]);
1569         }
1570         out[i] = values[i] / factor;
1571       }
1572       return Status::OK();
1573     };
1574 
1575     auto MultiplyBy = [&](const int64_t factor) {
1576       for (int64_t i = 0; i < array.length(); i++) {
1577         out[i] = values[i] * factor;
1578       }
1579       return Status::OK();
1580     };
1581 
1582     const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)]
1583                                                     [static_cast<int>(target_unit)];
1584 
1585     // .first -> coercion operation; .second -> scale factor
1586     DCHECK_NE(coercion.first, COERCE_INVALID);
1587     return coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second)
1588                                            : MultiplyBy(coercion.second);
1589   }
1590 };
1591 
1592 #undef COERCE_DIVIDE
1593 #undef COERCE_INVALID
1594 #undef COERCE_MULTIPLY
1595 
WriteTimestamps(const::arrow::Array & values,int64_t num_levels,const int16_t * def_levels,const int16_t * rep_levels,ArrowWriteContext * ctx,TypedColumnWriter<Int64Type> * writer)1596 Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels,
1597                        const int16_t* def_levels, const int16_t* rep_levels,
1598                        ArrowWriteContext* ctx, TypedColumnWriter<Int64Type>* writer) {
1599   const auto& source_type = static_cast<const ::arrow::TimestampType&>(*values.type());
1600 
1601   auto WriteCoerce = [&](const ArrowWriterProperties* properties) {
1602     ArrowWriteContext temp_ctx = *ctx;
1603     temp_ctx.properties = properties;
1604     return WriteArrowSerialize<Int64Type, ::arrow::TimestampType>(
1605         values, num_levels, def_levels, rep_levels, &temp_ctx, writer);
1606   };
1607 
1608   if (ctx->properties->coerce_timestamps_enabled()) {
1609     // User explicitly requested coercion to specific unit
1610     if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) {
1611       // No data conversion necessary
1612       return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels,
1613                                            ctx, writer);
1614     } else {
1615       return WriteCoerce(ctx->properties);
1616     }
1617   } else if (writer->properties()->version() == ParquetVersion::PARQUET_1_0 &&
1618              source_type.unit() == ::arrow::TimeUnit::NANO) {
1619     // Absent superseding user instructions, when writing Parquet version 1.0 files,
1620     // timestamps in nanoseconds are coerced to microseconds
1621     std::shared_ptr<ArrowWriterProperties> properties =
1622         (ArrowWriterProperties::Builder())
1623             .coerce_timestamps(::arrow::TimeUnit::MICRO)
1624             ->disallow_truncated_timestamps()
1625             ->build();
1626     return WriteCoerce(properties.get());
1627   } else if (source_type.unit() == ::arrow::TimeUnit::SECOND) {
1628     // Absent superseding user instructions, timestamps in seconds are coerced to
1629     // milliseconds
1630     std::shared_ptr<ArrowWriterProperties> properties =
1631         (ArrowWriterProperties::Builder())
1632             .coerce_timestamps(::arrow::TimeUnit::MILLI)
1633             ->build();
1634     return WriteCoerce(properties.get());
1635   } else {
1636     // No data conversion necessary
1637     return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels, ctx,
1638                                          writer);
1639   }
1640 }
1641 
1642 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1643 Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(const int16_t* def_levels,
1644                                                          const int16_t* rep_levels,
1645                                                          int64_t num_levels,
1646                                                          const ::arrow::Array& array,
1647                                                          ArrowWriteContext* ctx) {
1648   switch (array.type()->id()) {
1649     case ::arrow::Type::TIMESTAMP:
1650       return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this);
1651       WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type)
1652       WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type)
1653       WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type)
1654       WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type)
1655     default:
1656       ARROW_UNSUPPORTED();
1657   }
1658 }
1659 
1660 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1661 Status TypedColumnWriterImpl<Int96Type>::WriteArrowDense(const int16_t* def_levels,
1662                                                          const int16_t* rep_levels,
1663                                                          int64_t num_levels,
1664                                                          const ::arrow::Array& array,
1665                                                          ArrowWriteContext* ctx) {
1666   if (array.type_id() != ::arrow::Type::TIMESTAMP) {
1667     ARROW_UNSUPPORTED();
1668   }
1669   return WriteArrowSerialize<Int96Type, ::arrow::TimestampType>(
1670       array, num_levels, def_levels, rep_levels, ctx, this);
1671 }
1672 
1673 // ----------------------------------------------------------------------
1674 // Floating point types
1675 
1676 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1677 Status TypedColumnWriterImpl<FloatType>::WriteArrowDense(const int16_t* def_levels,
1678                                                          const int16_t* rep_levels,
1679                                                          int64_t num_levels,
1680                                                          const ::arrow::Array& array,
1681                                                          ArrowWriteContext* ctx) {
1682   if (array.type_id() != ::arrow::Type::FLOAT) {
1683     ARROW_UNSUPPORTED();
1684   }
1685   return WriteArrowZeroCopy<FloatType>(array, num_levels, def_levels, rep_levels, ctx,
1686                                        this);
1687 }
1688 
1689 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1690 Status TypedColumnWriterImpl<DoubleType>::WriteArrowDense(const int16_t* def_levels,
1691                                                           const int16_t* rep_levels,
1692                                                           int64_t num_levels,
1693                                                           const ::arrow::Array& array,
1694                                                           ArrowWriteContext* ctx) {
1695   if (array.type_id() != ::arrow::Type::DOUBLE) {
1696     ARROW_UNSUPPORTED();
1697   }
1698   return WriteArrowZeroCopy<DoubleType>(array, num_levels, def_levels, rep_levels, ctx,
1699                                         this);
1700 }
1701 
1702 // ----------------------------------------------------------------------
1703 // Write Arrow to BYTE_ARRAY
1704 
1705 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1706 Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(const int16_t* def_levels,
1707                                                              const int16_t* rep_levels,
1708                                                              int64_t num_levels,
1709                                                              const ::arrow::Array& array,
1710                                                              ArrowWriteContext* ctx) {
1711   if (array.type()->id() != ::arrow::Type::BINARY &&
1712       array.type()->id() != ::arrow::Type::STRING) {
1713     ARROW_UNSUPPORTED();
1714   }
1715 
1716   int64_t value_offset = 0;
1717   auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
1718     int64_t batch_num_values = 0;
1719     int64_t batch_num_spaced_values = 0;
1720     WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
1721                       AddIfNotNull(rep_levels, offset), &batch_num_values,
1722                       &batch_num_spaced_values);
1723     std::shared_ptr<::arrow::Array> data_slice =
1724         array.Slice(value_offset, batch_num_spaced_values);
1725     current_encoder_->Put(*data_slice);
1726     if (page_statistics_ != nullptr) {
1727       page_statistics_->Update(*data_slice);
1728     }
1729     CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
1730     CheckDictionarySizeLimit();
1731     value_offset += batch_num_spaced_values;
1732   };
1733 
1734   PARQUET_CATCH_NOT_OK(
1735       DoInBatches(num_levels, properties_->write_batch_size(), WriteChunk));
1736   return Status::OK();
1737 }
1738 
1739 // ----------------------------------------------------------------------
1740 // Write Arrow to FIXED_LEN_BYTE_ARRAY
1741 
1742 template <typename ParquetType, typename ArrowType>
1743 struct SerializeFunctor<
1744     ParquetType, ArrowType,
1745     ::arrow::enable_if_t<::arrow::is_fixed_size_binary_type<ArrowType>::value &&
1746                          !::arrow::is_decimal_type<ArrowType>::value>> {
Serializeparquet::SerializeFunctor1747   Status Serialize(const ::arrow::FixedSizeBinaryArray& array, ArrowWriteContext*,
1748                    FLBA* out) {
1749     if (array.null_count() == 0) {
1750       // no nulls, just dump the data
1751       // todo(advancedxy): use a writeBatch to avoid this step
1752       for (int64_t i = 0; i < array.length(); i++) {
1753         out[i] = FixedLenByteArray(array.GetValue(i));
1754       }
1755     } else {
1756       for (int64_t i = 0; i < array.length(); i++) {
1757         if (array.IsValid(i)) {
1758           out[i] = FixedLenByteArray(array.GetValue(i));
1759         }
1760       }
1761     }
1762     return Status::OK();
1763   }
1764 };
1765 
1766 // ----------------------------------------------------------------------
1767 // Write Arrow to Decimal128
1768 
1769 using ::arrow::internal::checked_pointer_cast;
1770 
1771 // Requires a custom serializer because decimal128 in parquet are in big-endian
1772 // format. Thus, a temporary local buffer is required.
1773 template <typename ParquetType, typename ArrowType>
1774 struct SerializeFunctor<ParquetType, ArrowType, ::arrow::enable_if_decimal<ArrowType>> {
Serializeparquet::SerializeFunctor1775   Status Serialize(const ::arrow::Decimal128Array& array, ArrowWriteContext* ctx,
1776                    FLBA* out) {
1777     AllocateScratch(array, ctx);
1778     auto offset = Offset(array);
1779 
1780     if (array.null_count() == 0) {
1781       for (int64_t i = 0; i < array.length(); i++) {
1782         out[i] = FixDecimalEndianess(array.GetValue(i), offset);
1783       }
1784     } else {
1785       for (int64_t i = 0; i < array.length(); i++) {
1786         out[i] = array.IsValid(i) ? FixDecimalEndianess(array.GetValue(i), offset)
1787                                   : FixedLenByteArray();
1788       }
1789     }
1790 
1791     return Status::OK();
1792   }
1793 
1794   // Parquet's Decimal are stored with FixedLength values where the length is
1795   // proportional to the precision. Arrow's Decimal are always stored with 16
1796   // bytes. Thus the internal FLBA pointer must be adjusted by the offset calculated
1797   // here.
Offsetparquet::SerializeFunctor1798   int32_t Offset(const ::arrow::Decimal128Array& array) {
1799     auto decimal_type = checked_pointer_cast<::arrow::Decimal128Type>(array.type());
1800     return decimal_type->byte_width() - internal::DecimalSize(decimal_type->precision());
1801   }
1802 
AllocateScratchparquet::SerializeFunctor1803   void AllocateScratch(const ::arrow::Decimal128Array& array, ArrowWriteContext* ctx) {
1804     int64_t non_null_count = array.length() - array.null_count();
1805     int64_t size = non_null_count * 16;
1806     scratch_buffer = AllocateBuffer(ctx->memory_pool, size);
1807     scratch = reinterpret_cast<int64_t*>(scratch_buffer->mutable_data());
1808   }
1809 
FixDecimalEndianessparquet::SerializeFunctor1810   FixedLenByteArray FixDecimalEndianess(const uint8_t* in, int64_t offset) {
1811     const auto* u64_in = reinterpret_cast<const int64_t*>(in);
1812     auto out = reinterpret_cast<const uint8_t*>(scratch) + offset;
1813     *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[1]);
1814     *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[0]);
1815     return FixedLenByteArray(out);
1816   }
1817 
1818   std::shared_ptr<ResizableBuffer> scratch_buffer;
1819   int64_t* scratch;
1820 };
1821 
1822 template <>
WriteArrowDense(const int16_t * def_levels,const int16_t * rep_levels,int64_t num_levels,const::arrow::Array & array,ArrowWriteContext * ctx)1823 Status TypedColumnWriterImpl<FLBAType>::WriteArrowDense(const int16_t* def_levels,
1824                                                         const int16_t* rep_levels,
1825                                                         int64_t num_levels,
1826                                                         const ::arrow::Array& array,
1827                                                         ArrowWriteContext* ctx) {
1828   switch (array.type()->id()) {
1829     WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
1830     WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType)
1831     default:
1832       break;
1833   }
1834   return Status::OK();
1835 }
1836 
1837 // ----------------------------------------------------------------------
1838 // Dynamic column writer constructor
1839 
Make(ColumnChunkMetaDataBuilder * metadata,std::unique_ptr<PageWriter> pager,const WriterProperties * properties)1840 std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
1841                                                  std::unique_ptr<PageWriter> pager,
1842                                                  const WriterProperties* properties) {
1843   const ColumnDescriptor* descr = metadata->descr();
1844   const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
1845                               descr->physical_type() != Type::BOOLEAN;
1846   Encoding::type encoding = properties->encoding(descr->path());
1847   if (use_dictionary) {
1848     encoding = properties->dictionary_index_encoding();
1849   }
1850   switch (descr->physical_type()) {
1851     case Type::BOOLEAN:
1852       return std::make_shared<TypedColumnWriterImpl<BooleanType>>(
1853           metadata, std::move(pager), use_dictionary, encoding, properties);
1854     case Type::INT32:
1855       return std::make_shared<TypedColumnWriterImpl<Int32Type>>(
1856           metadata, std::move(pager), use_dictionary, encoding, properties);
1857     case Type::INT64:
1858       return std::make_shared<TypedColumnWriterImpl<Int64Type>>(
1859           metadata, std::move(pager), use_dictionary, encoding, properties);
1860     case Type::INT96:
1861       return std::make_shared<TypedColumnWriterImpl<Int96Type>>(
1862           metadata, std::move(pager), use_dictionary, encoding, properties);
1863     case Type::FLOAT:
1864       return std::make_shared<TypedColumnWriterImpl<FloatType>>(
1865           metadata, std::move(pager), use_dictionary, encoding, properties);
1866     case Type::DOUBLE:
1867       return std::make_shared<TypedColumnWriterImpl<DoubleType>>(
1868           metadata, std::move(pager), use_dictionary, encoding, properties);
1869     case Type::BYTE_ARRAY:
1870       return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>(
1871           metadata, std::move(pager), use_dictionary, encoding, properties);
1872     case Type::FIXED_LEN_BYTE_ARRAY:
1873       return std::make_shared<TypedColumnWriterImpl<FLBAType>>(
1874           metadata, std::move(pager), use_dictionary, encoding, properties);
1875     default:
1876       ParquetException::NYI("type reader not implemented");
1877   }
1878   // Unreachable code, but suppress compiler warning
1879   return std::shared_ptr<ColumnWriter>(nullptr);
1880 }
1881 
1882 }  // namespace parquet
1883