1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "parquet/arrow/reader_internal.h"
19 
20 #include <algorithm>
21 #include <climits>
22 #include <cstdint>
23 #include <cstring>
24 #include <memory>
25 #include <string>
26 #include <type_traits>
27 #include <unordered_map>
28 #include <vector>
29 
30 #include "arrow/array.h"
31 #include "arrow/builder.h"
32 #include "arrow/datum.h"
33 #include "arrow/extension_type.h"
34 #include "arrow/io/memory.h"
35 #include "arrow/ipc/reader.h"
36 #include "arrow/ipc/writer.h"
37 #include "arrow/status.h"
38 #include "arrow/table.h"
39 #include "arrow/type.h"
40 #include "arrow/type_traits.h"
41 #include "arrow/util/base64.h"
42 #include "arrow/util/checked_cast.h"
43 #include "arrow/util/int_util.h"
44 #include "arrow/util/logging.h"
45 #include "arrow/util/string_view.h"
46 #include "arrow/util/ubsan.h"
47 
48 #include "parquet/arrow/reader.h"
49 #include "parquet/column_reader.h"
50 #include "parquet/platform.h"
51 #include "parquet/properties.h"
52 #include "parquet/schema.h"
53 #include "parquet/statistics.h"
54 #include "parquet/types.h"
55 
56 using arrow::Array;
57 using arrow::BooleanArray;
58 using arrow::ChunkedArray;
59 using arrow::DataType;
60 using arrow::Datum;
61 using arrow::Field;
62 using arrow::Int32Array;
63 using arrow::ListArray;
64 using arrow::MemoryPool;
65 using arrow::ResizableBuffer;
66 using arrow::Status;
67 using arrow::StructArray;
68 using arrow::Table;
69 using arrow::TimestampArray;
70 
71 using ::arrow::BitUtil::FromBigEndian;
72 using ::arrow::internal::checked_cast;
73 using ::arrow::internal::checked_pointer_cast;
74 using ::arrow::internal::SafeLeftShift;
75 using ::arrow::util::SafeLoadAs;
76 
77 using parquet::internal::BinaryRecordReader;
78 using parquet::internal::DictionaryRecordReader;
79 using parquet::internal::RecordReader;
80 using parquet::schema::GroupNode;
81 using parquet::schema::Node;
82 using parquet::schema::PrimitiveNode;
83 using ParquetType = parquet::Type;
84 
85 namespace parquet {
86 namespace arrow {
87 
88 template <typename ArrowType>
89 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
90 
91 // ----------------------------------------------------------------------
92 // Schema logic
93 
MakeArrowDecimal(const LogicalType & logical_type,std::shared_ptr<DataType> * out)94 static Status MakeArrowDecimal(const LogicalType& logical_type,
95                                std::shared_ptr<DataType>* out) {
96   const auto& decimal = checked_cast<const DecimalLogicalType&>(logical_type);
97   return ::arrow::Decimal128Type::Make(decimal.precision(), decimal.scale()).Value(out);
98 }
99 
MakeArrowInt(const LogicalType & logical_type,std::shared_ptr<DataType> * out)100 static Status MakeArrowInt(const LogicalType& logical_type,
101                            std::shared_ptr<DataType>* out) {
102   const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
103   switch (integer.bit_width()) {
104     case 8:
105       *out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
106       break;
107     case 16:
108       *out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
109       break;
110     case 32:
111       *out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
112       break;
113     default:
114       return Status::TypeError(logical_type.ToString(),
115                                " can not annotate physical type Int32");
116   }
117   return Status::OK();
118 }
119 
MakeArrowInt64(const LogicalType & logical_type,std::shared_ptr<DataType> * out)120 static Status MakeArrowInt64(const LogicalType& logical_type,
121                              std::shared_ptr<DataType>* out) {
122   const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
123   switch (integer.bit_width()) {
124     case 64:
125       *out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
126       break;
127     default:
128       return Status::TypeError(logical_type.ToString(),
129                                " can not annotate physical type Int64");
130   }
131   return Status::OK();
132 }
133 
MakeArrowTime32(const LogicalType & logical_type,std::shared_ptr<DataType> * out)134 static Status MakeArrowTime32(const LogicalType& logical_type,
135                               std::shared_ptr<DataType>* out) {
136   const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
137   switch (time.time_unit()) {
138     case LogicalType::TimeUnit::MILLIS:
139       *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
140       break;
141     default:
142       return Status::TypeError(logical_type.ToString(),
143                                " can not annotate physical type Time32");
144   }
145   return Status::OK();
146 }
147 
MakeArrowTime64(const LogicalType & logical_type,std::shared_ptr<DataType> * out)148 static Status MakeArrowTime64(const LogicalType& logical_type,
149                               std::shared_ptr<DataType>* out) {
150   const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
151   switch (time.time_unit()) {
152     case LogicalType::TimeUnit::MICROS:
153       *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
154       break;
155     case LogicalType::TimeUnit::NANOS:
156       *out = ::arrow::time64(::arrow::TimeUnit::NANO);
157       break;
158     default:
159       return Status::TypeError(logical_type.ToString(),
160                                " can not annotate physical type Time64");
161   }
162   return Status::OK();
163 }
164 
MakeArrowTimestamp(const LogicalType & logical_type,std::shared_ptr<DataType> * out)165 static Status MakeArrowTimestamp(const LogicalType& logical_type,
166                                  std::shared_ptr<DataType>* out) {
167   const auto& timestamp = checked_cast<const TimestampLogicalType&>(logical_type);
168   const bool utc_normalized =
169       timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc();
170   static const char* utc_timezone = "UTC";
171   switch (timestamp.time_unit()) {
172     case LogicalType::TimeUnit::MILLIS:
173       *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone)
174                              : ::arrow::timestamp(::arrow::TimeUnit::MILLI));
175       break;
176     case LogicalType::TimeUnit::MICROS:
177       *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone)
178                              : ::arrow::timestamp(::arrow::TimeUnit::MICRO));
179       break;
180     case LogicalType::TimeUnit::NANOS:
181       *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone)
182                              : ::arrow::timestamp(::arrow::TimeUnit::NANO));
183       break;
184     default:
185       return Status::TypeError("Unrecognized time unit in timestamp logical_type: ",
186                                logical_type.ToString());
187   }
188   return Status::OK();
189 }
190 
FromByteArray(const LogicalType & logical_type,std::shared_ptr<DataType> * out)191 static Status FromByteArray(const LogicalType& logical_type,
192                             std::shared_ptr<DataType>* out) {
193   switch (logical_type.type()) {
194     case LogicalType::Type::STRING:
195       *out = ::arrow::utf8();
196       break;
197     case LogicalType::Type::DECIMAL:
198       RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
199       break;
200     case LogicalType::Type::NONE:
201     case LogicalType::Type::ENUM:
202     case LogicalType::Type::JSON:
203     case LogicalType::Type::BSON:
204       *out = ::arrow::binary();
205       break;
206     default:
207       return Status::NotImplemented("Unhandled logical logical_type ",
208                                     logical_type.ToString(), " for binary array");
209   }
210   return Status::OK();
211 }
212 
FromFLBA(const LogicalType & logical_type,int32_t physical_length,std::shared_ptr<DataType> * out)213 static Status FromFLBA(const LogicalType& logical_type, int32_t physical_length,
214                        std::shared_ptr<DataType>* out) {
215   switch (logical_type.type()) {
216     case LogicalType::Type::DECIMAL:
217       RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
218       break;
219     case LogicalType::Type::NONE:
220     case LogicalType::Type::INTERVAL:
221     case LogicalType::Type::UUID:
222       *out = ::arrow::fixed_size_binary(physical_length);
223       break;
224     default:
225       return Status::NotImplemented("Unhandled logical logical_type ",
226                                     logical_type.ToString(),
227                                     " for fixed-length binary array");
228   }
229 
230   return Status::OK();
231 }
232 
FromInt32(const LogicalType & logical_type,std::shared_ptr<DataType> * out)233 static Status FromInt32(const LogicalType& logical_type, std::shared_ptr<DataType>* out) {
234   switch (logical_type.type()) {
235     case LogicalType::Type::INT:
236       RETURN_NOT_OK(MakeArrowInt(logical_type, out));
237       break;
238     case LogicalType::Type::DATE:
239       *out = ::arrow::date32();
240       break;
241     case LogicalType::Type::TIME:
242       RETURN_NOT_OK(MakeArrowTime32(logical_type, out));
243       break;
244     case LogicalType::Type::DECIMAL:
245       RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
246       break;
247     case LogicalType::Type::NONE:
248       *out = ::arrow::int32();
249       break;
250     default:
251       return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
252                                     " for INT32");
253   }
254   return Status::OK();
255 }
256 
FromInt64(const LogicalType & logical_type,std::shared_ptr<DataType> * out)257 static Status FromInt64(const LogicalType& logical_type, std::shared_ptr<DataType>* out) {
258   switch (logical_type.type()) {
259     case LogicalType::Type::INT:
260       RETURN_NOT_OK(MakeArrowInt64(logical_type, out));
261       break;
262     case LogicalType::Type::DECIMAL:
263       RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
264       break;
265     case LogicalType::Type::TIMESTAMP:
266       RETURN_NOT_OK(MakeArrowTimestamp(logical_type, out));
267       break;
268     case LogicalType::Type::TIME:
269       RETURN_NOT_OK(MakeArrowTime64(logical_type, out));
270       break;
271     case LogicalType::Type::NONE:
272       *out = ::arrow::int64();
273       break;
274     default:
275       return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
276                                     " for INT64");
277   }
278   return Status::OK();
279 }
280 
GetPrimitiveType(const schema::PrimitiveNode & primitive,std::shared_ptr<DataType> * out)281 Status GetPrimitiveType(const schema::PrimitiveNode& primitive,
282                         std::shared_ptr<DataType>* out) {
283   const std::shared_ptr<const LogicalType>& logical_type = primitive.logical_type();
284   if (logical_type->is_invalid() || logical_type->is_null()) {
285     *out = ::arrow::null();
286     return Status::OK();
287   }
288 
289   switch (primitive.physical_type()) {
290     case ParquetType::BOOLEAN:
291       *out = ::arrow::boolean();
292       break;
293     case ParquetType::INT32:
294       RETURN_NOT_OK(FromInt32(*logical_type, out));
295       break;
296     case ParquetType::INT64:
297       RETURN_NOT_OK(FromInt64(*logical_type, out));
298       break;
299     case ParquetType::INT96:
300       *out = ::arrow::timestamp(::arrow::TimeUnit::NANO);
301       break;
302     case ParquetType::FLOAT:
303       *out = ::arrow::float32();
304       break;
305     case ParquetType::DOUBLE:
306       *out = ::arrow::float64();
307       break;
308     case ParquetType::BYTE_ARRAY:
309       RETURN_NOT_OK(FromByteArray(*logical_type, out));
310       break;
311     case ParquetType::FIXED_LEN_BYTE_ARRAY:
312       RETURN_NOT_OK(FromFLBA(*logical_type, primitive.type_length(), out));
313       break;
314     default: {
315       // PARQUET-1565: This can occur if the file is corrupt
316       return Status::IOError("Invalid physical column type: ",
317                              TypeToString(primitive.physical_type()));
318     }
319   }
320   return Status::OK();
321 }
322 
323 struct SchemaTreeContext {
324   SchemaManifest* manifest;
325   ArrowReaderProperties properties;
326   const SchemaDescriptor* schema;
327 
LinkParentparquet::arrow::SchemaTreeContext328   void LinkParent(const SchemaField* child, const SchemaField* parent) {
329     manifest->child_to_parent[child] = parent;
330   }
331 
RecordLeafparquet::arrow::SchemaTreeContext332   void RecordLeaf(const SchemaField* leaf) {
333     manifest->column_index_to_field[leaf->column_index] = leaf;
334   }
335 };
336 
IsDictionaryReadSupported(const DataType & type)337 bool IsDictionaryReadSupported(const DataType& type) {
338   // Only supported currently for BYTE_ARRAY types
339   return type.id() == ::arrow::Type::BINARY || type.id() == ::arrow::Type::STRING;
340 }
341 
GetTypeForNode(int column_index,const schema::PrimitiveNode & primitive_node,SchemaTreeContext * ctx,std::shared_ptr<DataType> * out)342 Status GetTypeForNode(int column_index, const schema::PrimitiveNode& primitive_node,
343                       SchemaTreeContext* ctx, std::shared_ptr<DataType>* out) {
344   std::shared_ptr<DataType> storage_type;
345   RETURN_NOT_OK(GetPrimitiveType(primitive_node, &storage_type));
346   if (ctx->properties.read_dictionary(column_index) &&
347       IsDictionaryReadSupported(*storage_type)) {
348     *out = ::arrow::dictionary(::arrow::int32(), storage_type);
349   } else {
350     *out = storage_type;
351   }
352   return Status::OK();
353 }
354 
355 Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level,
356                          SchemaTreeContext* ctx, const SchemaField* parent,
357                          SchemaField* out);
358 
359 Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level,
360                           int16_t max_rep_level, SchemaTreeContext* ctx,
361                           const SchemaField* parent, SchemaField* out);
362 
PopulateLeaf(int column_index,const std::shared_ptr<Field> & field,int16_t max_def_level,int16_t max_rep_level,SchemaTreeContext * ctx,const SchemaField * parent,SchemaField * out)363 Status PopulateLeaf(int column_index, const std::shared_ptr<Field>& field,
364                     int16_t max_def_level, int16_t max_rep_level, SchemaTreeContext* ctx,
365                     const SchemaField* parent, SchemaField* out) {
366   out->field = field;
367   out->column_index = column_index;
368   out->max_definition_level = max_def_level;
369   out->max_repetition_level = max_rep_level;
370   ctx->RecordLeaf(out);
371   ctx->LinkParent(out, parent);
372   return Status::OK();
373 }
374 
375 // Special case mentioned in the format spec:
376 //   If the name is array or ends in _tuple, this should be a list of struct
377 //   even for single child elements.
HasStructListName(const GroupNode & node)378 bool HasStructListName(const GroupNode& node) {
379   ::arrow::util::string_view name{node.name()};
380   return name == "array" || name.ends_with("_tuple");
381 }
382 
FieldIdMetadata(int field_id)383 std::shared_ptr<::arrow::KeyValueMetadata> FieldIdMetadata(int field_id) {
384   return ::arrow::key_value_metadata({"PARQUET:field_id"}, {std::to_string(field_id)});
385 }
386 
GroupToStruct(const GroupNode & node,int16_t max_def_level,int16_t max_rep_level,SchemaTreeContext * ctx,const SchemaField * parent,SchemaField * out)387 Status GroupToStruct(const GroupNode& node, int16_t max_def_level, int16_t max_rep_level,
388                      SchemaTreeContext* ctx, const SchemaField* parent,
389                      SchemaField* out) {
390   std::vector<std::shared_ptr<Field>> arrow_fields;
391   out->children.resize(node.field_count());
392   for (int i = 0; i < node.field_count(); i++) {
393     RETURN_NOT_OK(NodeToSchemaField(*node.field(i), max_def_level, max_rep_level, ctx,
394                                     out, &out->children[i]));
395     arrow_fields.push_back(out->children[i].field);
396   }
397   auto struct_type = ::arrow::struct_(arrow_fields);
398   out->field = ::arrow::field(node.name(), struct_type, node.is_optional(),
399                               FieldIdMetadata(node.field_id()));
400   out->max_definition_level = max_def_level;
401   out->max_repetition_level = max_rep_level;
402   return Status::OK();
403 }
404 
ListToSchemaField(const GroupNode & group,int16_t max_def_level,int16_t max_rep_level,SchemaTreeContext * ctx,const SchemaField * parent,SchemaField * out)405 Status ListToSchemaField(const GroupNode& group, int16_t max_def_level,
406                          int16_t max_rep_level, SchemaTreeContext* ctx,
407                          const SchemaField* parent, SchemaField* out) {
408   if (group.field_count() != 1) {
409     return Status::NotImplemented(
410         "Only LIST-annotated groups with a single child can be handled.");
411   }
412 
413   out->children.resize(1);
414   SchemaField* child_field = &out->children[0];
415 
416   ctx->LinkParent(out, parent);
417   ctx->LinkParent(child_field, out);
418 
419   const Node& list_node = *group.field(0);
420 
421   if (!list_node.is_repeated()) {
422     return Status::NotImplemented(
423         "Non-repeated nodes in a LIST-annotated group are not supported.");
424   }
425 
426   ++max_def_level;
427   ++max_rep_level;
428   if (list_node.is_group()) {
429     // Resolve 3-level encoding
430     //
431     // required/optional group name=whatever {
432     //   repeated group name=list {
433     //     required/optional TYPE item;
434     //   }
435     // }
436     //
437     // yields list<item: TYPE ?nullable> ?nullable
438     //
439     // We distinguish the special base that we have
440     //
441     // required/optional group name=whatever {
442     //   repeated group name=array or $SOMETHING_tuple {
443     //     required/optional TYPE item;
444     //   }
445     // }
446     //
447     // In this latter case, the inner type of the list should be a struct
448     // rather than a primitive value
449     //
450     // yields list<item: struct<item: TYPE ?nullable> not null> ?nullable
451     const auto& list_group = static_cast<const GroupNode&>(list_node);
452     // Special case mentioned in the format spec:
453     //   If the name is array or ends in _tuple, this should be a list of struct
454     //   even for single child elements.
455     if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
456       // List of primitive type
457       RETURN_NOT_OK(NodeToSchemaField(*list_group.field(0), max_def_level, max_rep_level,
458                                       ctx, out, child_field));
459     } else {
460       RETURN_NOT_OK(
461           GroupToStruct(list_group, max_def_level, max_rep_level, ctx, out, child_field));
462     }
463   } else {
464     // Two-level list encoding
465     //
466     // required/optional group LIST {
467     //   repeated TYPE;
468     // }
469     const auto& primitive_node = static_cast<const PrimitiveNode&>(list_node);
470     int column_index = ctx->schema->GetColumnIndex(primitive_node);
471     std::shared_ptr<DataType> type;
472     RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type));
473     auto item_field = ::arrow::field(list_node.name(), type, /*nullable=*/false,
474                                      FieldIdMetadata(list_node.field_id()));
475     RETURN_NOT_OK(PopulateLeaf(column_index, item_field, max_def_level, max_rep_level,
476                                ctx, out, child_field));
477   }
478   out->field = ::arrow::field(group.name(), ::arrow::list(child_field->field),
479                               group.is_optional(), FieldIdMetadata(group.field_id()));
480   out->max_definition_level = max_def_level;
481   out->max_repetition_level = max_rep_level;
482   return Status::OK();
483 }
484 
GroupToSchemaField(const GroupNode & node,int16_t max_def_level,int16_t max_rep_level,SchemaTreeContext * ctx,const SchemaField * parent,SchemaField * out)485 Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level,
486                           int16_t max_rep_level, SchemaTreeContext* ctx,
487                           const SchemaField* parent, SchemaField* out) {
488   if (node.logical_type()->is_list()) {
489     return ListToSchemaField(node, max_def_level, max_rep_level, ctx, parent, out);
490   }
491   std::shared_ptr<DataType> type;
492   if (node.is_repeated()) {
493     // Simple repeated struct
494     //
495     // repeated group $NAME {
496     //   r/o TYPE[0] f0
497     //   r/o TYPE[1] f1
498     // }
499     out->children.resize(1);
500     RETURN_NOT_OK(
501         GroupToStruct(node, max_def_level, max_rep_level, ctx, out, &out->children[0]));
502     out->field = ::arrow::field(node.name(), ::arrow::list(out->children[0].field),
503                                 node.is_optional(), FieldIdMetadata(node.field_id()));
504     out->max_definition_level = max_def_level;
505     out->max_repetition_level = max_rep_level;
506     return Status::OK();
507   } else {
508     return GroupToStruct(node, max_def_level, max_rep_level, ctx, parent, out);
509   }
510 }
511 
NodeToSchemaField(const Node & node,int16_t max_def_level,int16_t max_rep_level,SchemaTreeContext * ctx,const SchemaField * parent,SchemaField * out)512 Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level,
513                          SchemaTreeContext* ctx, const SchemaField* parent,
514                          SchemaField* out) {
515   /// Workhorse function for converting a Parquet schema node to an Arrow
516   /// type. Handles different conventions for nested data
517   if (node.is_optional()) {
518     ++max_def_level;
519   } else if (node.is_repeated()) {
520     // Repeated fields add both a repetition and definition level. This is used
521     // to distinguish between an empty list and a list with an item in it.
522     ++max_rep_level;
523     ++max_def_level;
524   }
525 
526   ctx->LinkParent(out, parent);
527 
528   // Now, walk the schema and create a ColumnDescriptor for each leaf node
529   if (node.is_group()) {
530     // A nested field, but we don't know what kind yet
531     return GroupToSchemaField(static_cast<const GroupNode&>(node), max_def_level,
532                               max_rep_level, ctx, parent, out);
533   } else {
534     // Either a normal flat primitive type, or a list type encoded with 1-level
535     // list encoding. Note that the 3-level encoding is the form recommended by
536     // the parquet specification, but technically we can have either
537     //
538     // required/optional $TYPE $FIELD_NAME
539     //
540     // or
541     //
542     // repeated $TYPE $FIELD_NAME
543     const auto& primitive_node = static_cast<const PrimitiveNode&>(node);
544     int column_index = ctx->schema->GetColumnIndex(primitive_node);
545     std::shared_ptr<DataType> type;
546     RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type));
547     if (node.is_repeated()) {
548       // One-level list encoding, e.g.
549       // a: repeated int32;
550       out->children.resize(1);
551       auto child_field = ::arrow::field(node.name(), type, /*nullable=*/false);
552       RETURN_NOT_OK(PopulateLeaf(column_index, child_field, max_def_level, max_rep_level,
553                                  ctx, out, &out->children[0]));
554 
555       out->field = ::arrow::field(node.name(), ::arrow::list(child_field),
556                                   /*nullable=*/false, FieldIdMetadata(node.field_id()));
557       // Is this right?
558       out->max_definition_level = max_def_level;
559       out->max_repetition_level = max_rep_level;
560       return Status::OK();
561     } else {
562       // A normal (required/optional) primitive node
563       return PopulateLeaf(column_index,
564                           ::arrow::field(node.name(), type, node.is_optional(),
565                                          FieldIdMetadata(node.field_id())),
566                           max_def_level, max_rep_level, ctx, parent, out);
567     }
568   }
569 }
570 
571 // Get the original Arrow schema, as serialized in the Parquet metadata
GetOriginSchema(const std::shared_ptr<const KeyValueMetadata> & metadata,std::shared_ptr<const KeyValueMetadata> * clean_metadata,std::shared_ptr<::arrow::Schema> * out)572 Status GetOriginSchema(const std::shared_ptr<const KeyValueMetadata>& metadata,
573                        std::shared_ptr<const KeyValueMetadata>* clean_metadata,
574                        std::shared_ptr<::arrow::Schema>* out) {
575   if (metadata == nullptr) {
576     *out = nullptr;
577     *clean_metadata = nullptr;
578     return Status::OK();
579   }
580 
581   static const std::string kArrowSchemaKey = "ARROW:schema";
582   int schema_index = metadata->FindKey(kArrowSchemaKey);
583   if (schema_index == -1) {
584     *out = nullptr;
585     *clean_metadata = metadata;
586     return Status::OK();
587   }
588 
589   // The original Arrow schema was serialized using the store_schema option.
590   // We deserialize it here and use it to inform read options such as
591   // dictionary-encoded fields.
592   auto decoded = ::arrow::util::base64_decode(metadata->value(schema_index));
593   auto schema_buf = std::make_shared<Buffer>(decoded);
594 
595   ::arrow::ipc::DictionaryMemo dict_memo;
596   ::arrow::io::BufferReader input(schema_buf);
597 
598   ARROW_ASSIGN_OR_RAISE(*out, ::arrow::ipc::ReadSchema(&input, &dict_memo));
599 
600   if (metadata->size() > 1) {
601     // Copy the metadata without the schema key
602     auto new_metadata = ::arrow::key_value_metadata({}, {});
603     new_metadata->reserve(metadata->size() - 1);
604     for (int64_t i = 0; i < metadata->size(); ++i) {
605       if (i == schema_index) continue;
606       new_metadata->Append(metadata->key(i), metadata->value(i));
607     }
608     *clean_metadata = new_metadata;
609   } else {
610     // No other keys, let metadata be null
611     *clean_metadata = nullptr;
612   }
613   return Status::OK();
614 }
615 
616 // Restore original Arrow field information that was serialized as Parquet metadata
617 // but that is not necessarily present in the field reconstitued from Parquet data
618 // (for example, Parquet timestamp types doesn't carry timezone information).
ApplyOriginalMetadata(std::shared_ptr<Field> field,const Field & origin_field,std::shared_ptr<Field> * out)619 Status ApplyOriginalMetadata(std::shared_ptr<Field> field, const Field& origin_field,
620                              std::shared_ptr<Field>* out) {
621   auto origin_type = origin_field.type();
622   if (field->type()->id() == ::arrow::Type::TIMESTAMP) {
623     // Restore time zone, if any
624     const auto& ts_type = static_cast<const ::arrow::TimestampType&>(*field->type());
625     const auto& ts_origin_type = static_cast<const ::arrow::TimestampType&>(*origin_type);
626 
627     // If the unit is the same and the data is tz-aware, then set the original
628     // time zone, since Parquet has no native storage for timezones
629     if (ts_type.unit() == ts_origin_type.unit() && ts_type.timezone() == "UTC" &&
630         ts_origin_type.timezone() != "") {
631       field = field->WithType(origin_type);
632     }
633   }
634   if (origin_type->id() == ::arrow::Type::DICTIONARY &&
635       field->type()->id() != ::arrow::Type::DICTIONARY &&
636       IsDictionaryReadSupported(*field->type())) {
637     const auto& dict_origin_type =
638         static_cast<const ::arrow::DictionaryType&>(*origin_type);
639     field = field->WithType(
640         ::arrow::dictionary(::arrow::int32(), field->type(), dict_origin_type.ordered()));
641   }
642 
643   if (origin_type->id() == ::arrow::Type::EXTENSION) {
644     // Restore extension type, if the storage type is as read from Parquet
645     const auto& ex_type = checked_cast<const ::arrow::ExtensionType&>(*origin_type);
646     if (ex_type.storage_type()->Equals(*field->type())) {
647       field = field->WithType(origin_type);
648     }
649   }
650 
651   // Restore field metadata
652   std::shared_ptr<const KeyValueMetadata> field_metadata = origin_field.metadata();
653   if (field_metadata != nullptr) {
654     if (field->metadata()) {
655       // Prefer the metadata keys (like field_id) from the current metadata
656       field_metadata = field_metadata->Merge(*field->metadata());
657     }
658     field = field->WithMetadata(field_metadata);
659   }
660   *out = field;
661   return Status::OK();
662 }
663 
Make(const SchemaDescriptor * schema,const std::shared_ptr<const KeyValueMetadata> & metadata,const ArrowReaderProperties & properties,SchemaManifest * manifest)664 Status SchemaManifest::Make(const SchemaDescriptor* schema,
665                             const std::shared_ptr<const KeyValueMetadata>& metadata,
666                             const ArrowReaderProperties& properties,
667                             SchemaManifest* manifest) {
668   std::shared_ptr<::arrow::Schema> origin_schema;
669   RETURN_NOT_OK(
670       GetOriginSchema(metadata, &manifest->schema_metadata, &manifest->origin_schema));
671 
672   SchemaTreeContext ctx;
673   ctx.manifest = manifest;
674   ctx.properties = properties;
675   ctx.schema = schema;
676   const GroupNode& schema_node = *schema->group_node();
677   manifest->descr = schema;
678   manifest->schema_fields.resize(schema_node.field_count());
679   for (int i = 0; i < static_cast<int>(schema_node.field_count()); ++i) {
680     SchemaField* out_field = &manifest->schema_fields[i];
681     RETURN_NOT_OK(NodeToSchemaField(*schema_node.field(i), 0, 0, &ctx,
682                                     /*parent=*/nullptr, out_field));
683 
684     // TODO(wesm): as follow up to ARROW-3246, we should really pass the origin
685     // schema (if any) through all functions in the schema reconstruction, but
686     // I'm being lazy and just setting dictionary fields at the top level for
687     // now
688     if (manifest->origin_schema == nullptr) {
689       continue;
690     }
691     auto origin_field = manifest->origin_schema->field(i);
692     RETURN_NOT_OK(
693         ApplyOriginalMetadata(out_field->field, *origin_field, &out_field->field));
694   }
695   return Status::OK();
696 }
697 
698 template <typename CType, typename StatisticsType>
MakeMinMaxScalar(const Statistics & statistics,std::shared_ptr<::arrow::Scalar> * min,std::shared_ptr<::arrow::Scalar> * max)699 Status MakeMinMaxScalar(const Statistics& statistics,
700                         std::shared_ptr<::arrow::Scalar>* min,
701                         std::shared_ptr<::arrow::Scalar>* max) {
702   const auto& typed_statistics = checked_cast<const StatisticsType&>(statistics);
703   *min = ::arrow::MakeScalar(static_cast<CType>(typed_statistics.min()));
704   *max = ::arrow::MakeScalar(static_cast<CType>(typed_statistics.max()));
705   return Status::OK();
706 }
707 
708 template <typename StatisticsType>
MakeMinMaxIntegralScalar(const Statistics & statistics,std::shared_ptr<::arrow::Scalar> * min,std::shared_ptr<::arrow::Scalar> * max)709 Status MakeMinMaxIntegralScalar(const Statistics& statistics,
710                                 std::shared_ptr<::arrow::Scalar>* min,
711                                 std::shared_ptr<::arrow::Scalar>* max) {
712   const auto column_desc = statistics.descr();
713   const auto& logical_type = column_desc->logical_type();
714   const auto& integer = checked_pointer_cast<const IntLogicalType>(logical_type);
715   const bool is_signed = integer->is_signed();
716 
717   switch (integer->bit_width()) {
718     case 8:
719       return is_signed ? MakeMinMaxScalar<int8_t, StatisticsType>(statistics, min, max)
720                        : MakeMinMaxScalar<uint8_t, StatisticsType>(statistics, min, max);
721     case 16:
722       return is_signed ? MakeMinMaxScalar<int16_t, StatisticsType>(statistics, min, max)
723                        : MakeMinMaxScalar<uint16_t, StatisticsType>(statistics, min, max);
724     case 32:
725       return is_signed ? MakeMinMaxScalar<int32_t, StatisticsType>(statistics, min, max)
726                        : MakeMinMaxScalar<uint32_t, StatisticsType>(statistics, min, max);
727     case 64:
728       return is_signed ? MakeMinMaxScalar<int64_t, StatisticsType>(statistics, min, max)
729                        : MakeMinMaxScalar<uint64_t, StatisticsType>(statistics, min, max);
730   }
731 
732   return Status::OK();
733 }
734 
735 template <typename StatisticsType>
TypedIntegralStatisticsAsScalars(const Statistics & statistics,std::shared_ptr<::arrow::Scalar> * min,std::shared_ptr<::arrow::Scalar> * max)736 Status TypedIntegralStatisticsAsScalars(const Statistics& statistics,
737                                         std::shared_ptr<::arrow::Scalar>* min,
738                                         std::shared_ptr<::arrow::Scalar>* max) {
739   auto column_desc = statistics.descr();
740   auto logical_type = column_desc->logical_type();
741 
742   switch (logical_type->type()) {
743     case LogicalType::Type::INT:
744       return MakeMinMaxIntegralScalar<StatisticsType>(statistics, min, max);
745     case LogicalType::Type::NONE:
746       // Fallback to the physical type
747       using CType = typename StatisticsType::T;
748       return MakeMinMaxScalar<CType, StatisticsType>(statistics, min, max);
749     default:
750       return Status::NotImplemented("Cannot extract statistics for type ");
751   }
752 
753   return Status::OK();
754 }
755 
StatisticsAsScalars(const Statistics & statistics,std::shared_ptr<::arrow::Scalar> * min,std::shared_ptr<::arrow::Scalar> * max)756 Status StatisticsAsScalars(const Statistics& statistics,
757                            std::shared_ptr<::arrow::Scalar>* min,
758                            std::shared_ptr<::arrow::Scalar>* max) {
759   if (!statistics.HasMinMax()) {
760     return Status::Invalid("Statistics has no min max.");
761   }
762 
763   auto column_desc = statistics.descr();
764   if (column_desc == nullptr) {
765     return Status::Invalid("Statistics carries no descriptor, can't infer arrow type.");
766   }
767 
768   auto physical_type = column_desc->physical_type();
769 
770   switch (physical_type) {
771     case Type::BOOLEAN:
772       return MakeMinMaxScalar<bool, BoolStatistics>(statistics, min, max);
773     case Type::FLOAT:
774       return MakeMinMaxScalar<float, FloatStatistics>(statistics, min, max);
775     case Type::DOUBLE:
776       return MakeMinMaxScalar<double, DoubleStatistics>(statistics, min, max);
777     case Type::INT32:
778       return TypedIntegralStatisticsAsScalars<Int32Statistics>(statistics, min, max);
779     case Type::INT64:
780       return TypedIntegralStatisticsAsScalars<Int64Statistics>(statistics, min, max);
781     default:
782       return Status::NotImplemented("Extract statistics unsupported for physical_type ",
783                                     physical_type, " unsupported.");
784   }
785 
786   return Status::OK();
787 }
788 
789 // ----------------------------------------------------------------------
790 // Primitive types
791 
792 template <typename ArrowType, typename ParquetType>
TransferInt(RecordReader * reader,MemoryPool * pool,const std::shared_ptr<DataType> & type,Datum * out)793 Status TransferInt(RecordReader* reader, MemoryPool* pool,
794                    const std::shared_ptr<DataType>& type, Datum* out) {
795   using ArrowCType = typename ArrowType::c_type;
796   using ParquetCType = typename ParquetType::c_type;
797   int64_t length = reader->values_written();
798   ARROW_ASSIGN_OR_RAISE(auto data,
799                         ::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool));
800 
801   auto values = reinterpret_cast<const ParquetCType*>(reader->values());
802   auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
803   std::copy(values, values + length, out_ptr);
804   *out = std::make_shared<ArrayType<ArrowType>>(
805       type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count());
806   return Status::OK();
807 }
808 
TransferZeroCopy(RecordReader * reader,const std::shared_ptr<DataType> & type)809 std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
810                                         const std::shared_ptr<DataType>& type) {
811   std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
812                                                   reader->ReleaseValues()};
813   auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(),
814                                                    buffers, reader->null_count());
815   return ::arrow::MakeArray(data);
816 }
817 
TransferBool(RecordReader * reader,MemoryPool * pool,Datum * out)818 Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
819   int64_t length = reader->values_written();
820 
821   const int64_t buffer_size = BitUtil::BytesForBits(length);
822   ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(buffer_size, pool));
823 
824   // Transfer boolean values to packed bitmap
825   auto values = reinterpret_cast<const bool*>(reader->values());
826   uint8_t* data_ptr = data->mutable_data();
827   memset(data_ptr, 0, buffer_size);
828 
829   for (int64_t i = 0; i < length; i++) {
830     if (values[i]) {
831       ::arrow::BitUtil::SetBit(data_ptr, i);
832     }
833   }
834 
835   *out = std::make_shared<BooleanArray>(length, std::move(data), reader->ReleaseIsValid(),
836                                         reader->null_count());
837   return Status::OK();
838 }
839 
TransferInt96(RecordReader * reader,MemoryPool * pool,const std::shared_ptr<DataType> & type,Datum * out)840 Status TransferInt96(RecordReader* reader, MemoryPool* pool,
841                      const std::shared_ptr<DataType>& type, Datum* out) {
842   int64_t length = reader->values_written();
843   auto values = reinterpret_cast<const Int96*>(reader->values());
844   ARROW_ASSIGN_OR_RAISE(auto data,
845                         ::arrow::AllocateBuffer(length * sizeof(int64_t), pool));
846   auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
847   for (int64_t i = 0; i < length; i++) {
848     if (values[i].value[2] == 0) {
849       // Happens for null entries: avoid triggering UBSAN as that Int96 timestamp
850       // isn't representable as a 64-bit Unix timestamp.
851       *data_ptr++ = 0;
852     } else {
853       *data_ptr++ = Int96GetNanoSeconds(values[i]);
854     }
855   }
856   *out = std::make_shared<TimestampArray>(type, length, std::move(data),
857                                           reader->ReleaseIsValid(), reader->null_count());
858   return Status::OK();
859 }
860 
TransferDate64(RecordReader * reader,MemoryPool * pool,const std::shared_ptr<DataType> & type,Datum * out)861 Status TransferDate64(RecordReader* reader, MemoryPool* pool,
862                       const std::shared_ptr<DataType>& type, Datum* out) {
863   int64_t length = reader->values_written();
864   auto values = reinterpret_cast<const int32_t*>(reader->values());
865 
866   ARROW_ASSIGN_OR_RAISE(auto data,
867                         ::arrow::AllocateBuffer(length * sizeof(int64_t), pool));
868   auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
869 
870   for (int64_t i = 0; i < length; i++) {
871     *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
872   }
873 
874   *out = std::make_shared<::arrow::Date64Array>(
875       type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count());
876   return Status::OK();
877 }
878 
879 // ----------------------------------------------------------------------
880 // Binary, direct to dictionary-encoded
881 
TransferDictionary(RecordReader * reader,const std::shared_ptr<DataType> & logical_value_type,std::shared_ptr<ChunkedArray> * out)882 Status TransferDictionary(RecordReader* reader,
883                           const std::shared_ptr<DataType>& logical_value_type,
884                           std::shared_ptr<ChunkedArray>* out) {
885   auto dict_reader = dynamic_cast<DictionaryRecordReader*>(reader);
886   DCHECK(dict_reader);
887   *out = dict_reader->GetResult();
888   if (!logical_value_type->Equals(*(*out)->type())) {
889     ARROW_ASSIGN_OR_RAISE(*out, (*out)->View(logical_value_type));
890   }
891   return Status::OK();
892 }
893 
TransferBinary(RecordReader * reader,const std::shared_ptr<DataType> & logical_value_type,std::shared_ptr<ChunkedArray> * out)894 Status TransferBinary(RecordReader* reader,
895                       const std::shared_ptr<DataType>& logical_value_type,
896                       std::shared_ptr<ChunkedArray>* out) {
897   if (reader->read_dictionary()) {
898     return TransferDictionary(
899         reader, ::arrow::dictionary(::arrow::int32(), logical_value_type), out);
900   }
901   auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
902   DCHECK(binary_reader);
903   auto chunks = binary_reader->GetBuilderChunks();
904   for (const auto& chunk : chunks) {
905     if (!chunk->type()->Equals(*logical_value_type)) {
906       ARROW_ASSIGN_OR_RAISE(*out, ChunkedArray(chunks).View(logical_value_type));
907       return Status::OK();
908     }
909   }
910   *out = std::make_shared<ChunkedArray>(chunks, logical_value_type);
911   return Status::OK();
912 }
913 
914 // ----------------------------------------------------------------------
915 // INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
916 
BytesToInteger(const uint8_t * bytes,int32_t start,int32_t stop)917 static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) {
918   const int32_t length = stop - start;
919 
920   DCHECK_GE(length, 0);
921   DCHECK_LE(length, 8);
922 
923   switch (length) {
924     case 0:
925       return 0;
926     case 1:
927       return bytes[start];
928     case 2:
929       return FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
930     case 3: {
931       const uint64_t first_two_bytes = FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
932       const uint64_t last_byte = bytes[stop - 1];
933       return first_two_bytes << 8 | last_byte;
934     }
935     case 4:
936       return FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
937     case 5: {
938       const uint64_t first_four_bytes =
939           FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
940       const uint64_t last_byte = bytes[stop - 1];
941       return first_four_bytes << 8 | last_byte;
942     }
943     case 6: {
944       const uint64_t first_four_bytes =
945           FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
946       const uint64_t last_two_bytes =
947           FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
948       return first_four_bytes << 16 | last_two_bytes;
949     }
950     case 7: {
951       const uint64_t first_four_bytes =
952           FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
953       const uint64_t second_two_bytes =
954           FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
955       const uint64_t last_byte = bytes[stop - 1];
956       return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
957     }
958     case 8:
959       return FromBigEndian(SafeLoadAs<uint64_t>(bytes + start));
960     default: {
961       DCHECK(false);
962       return UINT64_MAX;
963     }
964   }
965 }
966 
967 static constexpr int32_t kMinDecimalBytes = 1;
968 static constexpr int32_t kMaxDecimalBytes = 16;
969 
970 /// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one
971 /// uint64_t (low bits).
BytesToIntegerPair(const uint8_t * bytes,const int32_t length,int64_t * out_high,uint64_t * out_low)972 static void BytesToIntegerPair(const uint8_t* bytes, const int32_t length,
973                                int64_t* out_high, uint64_t* out_low) {
974   DCHECK_GE(length, kMinDecimalBytes);
975   DCHECK_LE(length, kMaxDecimalBytes);
976 
977   // XXX This code is copied from Decimal::FromBigEndian
978 
979   int64_t high, low;
980 
981   // Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the
982   // sign bit.
983   const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
984 
985   // 1. Extract the high bytes
986   // Stop byte of the high bytes
987   const int32_t high_bits_offset = std::max(0, length - 8);
988   const auto high_bits = BytesToInteger(bytes, 0, high_bits_offset);
989 
990   if (high_bits_offset == 8) {
991     // Avoid undefined shift by 64 below
992     high = high_bits;
993   } else {
994     high = -1 * (is_negative && length < kMaxDecimalBytes);
995     // Shift left enough bits to make room for the incoming int64_t
996     high = SafeLeftShift(high, high_bits_offset * CHAR_BIT);
997     // Preserve the upper bits by inplace OR-ing the int64_t
998     high |= high_bits;
999   }
1000 
1001   // 2. Extract the low bytes
1002   // Stop byte of the low bytes
1003   const int32_t low_bits_offset = std::min(length, 8);
1004   const auto low_bits = BytesToInteger(bytes, high_bits_offset, length);
1005 
1006   if (low_bits_offset == 8) {
1007     // Avoid undefined shift by 64 below
1008     low = low_bits;
1009   } else {
1010     // Sign extend the low bits if necessary
1011     low = -1 * (is_negative && length < 8);
1012     // Shift left enough bits to make room for the incoming int64_t
1013     low = SafeLeftShift(low, low_bits_offset * CHAR_BIT);
1014     // Preserve the upper bits by inplace OR-ing the int64_t
1015     low |= low_bits;
1016   }
1017 
1018   *out_high = high;
1019   *out_low = static_cast<uint64_t>(low);
1020 }
1021 
RawBytesToDecimalBytes(const uint8_t * value,int32_t byte_width,uint8_t * out_buf)1022 static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
1023                                           uint8_t* out_buf) {
1024   // view the first 8 bytes as an unsigned 64-bit integer
1025   auto low = reinterpret_cast<uint64_t*>(out_buf);
1026 
1027   // view the second 8 bytes as a signed 64-bit integer
1028   auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
1029 
1030   // Convert the fixed size binary array bytes into a Decimal128 compatible layout
1031   BytesToIntegerPair(value, byte_width, high, low);
1032 }
1033 
1034 template <typename T>
ConvertToDecimal128(const Array & array,const std::shared_ptr<DataType> &,MemoryPool * pool,std::shared_ptr<Array> *)1035 Status ConvertToDecimal128(const Array& array, const std::shared_ptr<DataType>&,
1036                            MemoryPool* pool, std::shared_ptr<Array>*) {
1037   return Status::NotImplemented("not implemented");
1038 }
1039 
1040 template <>
ConvertToDecimal128(const Array & array,const std::shared_ptr<DataType> & type,MemoryPool * pool,std::shared_ptr<Array> * out)1041 Status ConvertToDecimal128<FLBAType>(const Array& array,
1042                                      const std::shared_ptr<DataType>& type,
1043                                      MemoryPool* pool, std::shared_ptr<Array>* out) {
1044   const auto& fixed_size_binary_array =
1045       static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
1046 
1047   // The byte width of each decimal value
1048   const int32_t type_length =
1049       static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
1050 
1051   // number of elements in the entire array
1052   const int64_t length = fixed_size_binary_array.length();
1053 
1054   // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
1055   // this will be different from the decimal array width because we write the minimum
1056   // number of bytes necessary to represent a given precision
1057   const int32_t byte_width =
1058       static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
1059           .byte_width();
1060   if (byte_width < kMinDecimalBytes || byte_width > kMaxDecimalBytes) {
1061     return Status::Invalid("Invalid FIXED_LEN_BYTE_ARRAY length for Decimal128");
1062   }
1063 
1064   // allocate memory for the decimal array
1065   ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
1066 
1067   // raw bytes that we can write to
1068   uint8_t* out_ptr = data->mutable_data();
1069 
1070   // convert each FixedSizeBinary value to valid decimal bytes
1071   const int64_t null_count = fixed_size_binary_array.null_count();
1072   if (null_count > 0) {
1073     for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
1074       if (!fixed_size_binary_array.IsNull(i)) {
1075         RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
1076       }
1077     }
1078   } else {
1079     for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
1080       RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
1081     }
1082   }
1083 
1084   *out = std::make_shared<::arrow::Decimal128Array>(
1085       type, length, std::move(data), fixed_size_binary_array.null_bitmap(), null_count);
1086 
1087   return Status::OK();
1088 }
1089 
1090 template <>
ConvertToDecimal128(const Array & array,const std::shared_ptr<DataType> & type,MemoryPool * pool,std::shared_ptr<Array> * out)1091 Status ConvertToDecimal128<ByteArrayType>(const Array& array,
1092                                           const std::shared_ptr<DataType>& type,
1093                                           MemoryPool* pool, std::shared_ptr<Array>* out) {
1094   const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array);
1095   const int64_t length = binary_array.length();
1096 
1097   const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
1098   const int64_t type_length = decimal_type.byte_width();
1099 
1100   ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
1101 
1102   // raw bytes that we can write to
1103   uint8_t* out_ptr = data->mutable_data();
1104 
1105   const int64_t null_count = binary_array.null_count();
1106 
1107   // convert each BinaryArray value to valid decimal bytes
1108   for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
1109     int32_t record_len = 0;
1110     const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
1111 
1112     if (record_len < 0 || record_len > type_length) {
1113       return Status::Invalid("Invalid BYTE_ARRAY length for Decimal128");
1114     }
1115 
1116     auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
1117     out_ptr_view[0] = 0;
1118     out_ptr_view[1] = 0;
1119 
1120     // only convert rows that are not null if there are nulls, or
1121     // all rows, if there are not
1122     if ((null_count > 0 && !binary_array.IsNull(i)) || null_count <= 0) {
1123       if (record_len <= 0) {
1124         return Status::Invalid("Invalid BYTE_ARRAY length for Decimal128");
1125       }
1126       RawBytesToDecimalBytes(record_loc, record_len, out_ptr);
1127     }
1128   }
1129 
1130   *out = std::make_shared<::arrow::Decimal128Array>(
1131       type, length, std::move(data), binary_array.null_bitmap(), null_count);
1132   return Status::OK();
1133 }
1134 
1135 /// \brief Convert an Int32 or Int64 array into a Decimal128Array
1136 /// The parquet spec allows systems to write decimals in int32, int64 if the values are
1137 /// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
1138 /// This function implements the conversion from int32 and int64 arrays to decimal arrays.
1139 template <
1140     typename ParquetIntegerType,
1141     typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType, Int32Type>::value ||
1142                                     std::is_same<ParquetIntegerType, Int64Type>::value>>
DecimalIntegerTransfer(RecordReader * reader,MemoryPool * pool,const std::shared_ptr<DataType> & type,Datum * out)1143 static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
1144                                      const std::shared_ptr<DataType>& type, Datum* out) {
1145   DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
1146 
1147   const int64_t length = reader->values_written();
1148 
1149   using ElementType = typename ParquetIntegerType::c_type;
1150   static_assert(std::is_same<ElementType, int32_t>::value ||
1151                     std::is_same<ElementType, int64_t>::value,
1152                 "ElementType must be int32_t or int64_t");
1153 
1154   const auto values = reinterpret_cast<const ElementType*>(reader->values());
1155 
1156   const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
1157   const int64_t type_length = decimal_type.byte_width();
1158 
1159   ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
1160   uint8_t* out_ptr = data->mutable_data();
1161 
1162   using ::arrow::BitUtil::FromLittleEndian;
1163 
1164   for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
1165     // sign/zero extend int32_t values, otherwise a no-op
1166     const auto value = static_cast<int64_t>(values[i]);
1167 
1168     auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
1169 
1170     // No-op on little endian machines, byteswap on big endian
1171     out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value));
1172 
1173     // no need to byteswap here because we're sign/zero extending exactly 8 bytes
1174     out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0);
1175   }
1176 
1177   if (reader->nullable_values()) {
1178     std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
1179     *out = std::make_shared<::arrow::Decimal128Array>(type, length, std::move(data),
1180                                                       is_valid, reader->null_count());
1181   } else {
1182     *out = std::make_shared<::arrow::Decimal128Array>(type, length, std::move(data));
1183   }
1184   return Status::OK();
1185 }
1186 
1187 /// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
1188 /// We do this by:
1189 /// 1. Creating an arrow::BinaryArray from the RecordReader's builder
1190 /// 2. Allocating a buffer for the arrow::Decimal128Array
1191 /// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
1192 ///    representing the high and low bits of each decimal value.
1193 template <typename ParquetType>
TransferDecimal(RecordReader * reader,MemoryPool * pool,const std::shared_ptr<DataType> & type,Datum * out)1194 Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
1195                        const std::shared_ptr<DataType>& type, Datum* out) {
1196   DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
1197 
1198   auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
1199   DCHECK(binary_reader);
1200   ::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
1201   for (size_t i = 0; i < chunks.size(); ++i) {
1202     std::shared_ptr<Array> chunk_as_decimal;
1203     RETURN_NOT_OK(
1204         ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal));
1205     // Replace the chunk, which will hopefully also free memory as we go
1206     chunks[i] = chunk_as_decimal;
1207   }
1208   *out = std::make_shared<ChunkedArray>(chunks, type);
1209   return Status::OK();
1210 }
1211 
TransferExtension(RecordReader * reader,std::shared_ptr<DataType> value_type,const ColumnDescriptor * descr,MemoryPool * pool,Datum * out)1212 Status TransferExtension(RecordReader* reader, std::shared_ptr<DataType> value_type,
1213                          const ColumnDescriptor* descr, MemoryPool* pool, Datum* out) {
1214   std::shared_ptr<ChunkedArray> result;
1215   auto ext_type = std::static_pointer_cast<::arrow::ExtensionType>(value_type);
1216   auto storage_type = ext_type->storage_type();
1217   RETURN_NOT_OK(TransferColumnData(reader, storage_type, descr, pool, &result));
1218 
1219   ::arrow::ArrayVector out_chunks(result->num_chunks());
1220   for (int i = 0; i < result->num_chunks(); i++) {
1221     auto chunk = result->chunk(i);
1222     auto ext_data = chunk->data()->Copy();
1223     ext_data->type = ext_type;
1224     auto ext_result = ext_type->MakeArray(ext_data);
1225     out_chunks[i] = ext_result;
1226   }
1227   *out = std::make_shared<ChunkedArray>(out_chunks);
1228   return Status::OK();
1229 }
1230 
1231 #define TRANSFER_INT32(ENUM, ArrowType)                                              \
1232   case ::arrow::Type::ENUM: {                                                        \
1233     Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_type, &result); \
1234     RETURN_NOT_OK(s);                                                                \
1235   } break;
1236 
1237 #define TRANSFER_INT64(ENUM, ArrowType)                                              \
1238   case ::arrow::Type::ENUM: {                                                        \
1239     Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_type, &result); \
1240     RETURN_NOT_OK(s);                                                                \
1241   } break;
1242 
TransferColumnData(RecordReader * reader,std::shared_ptr<DataType> value_type,const ColumnDescriptor * descr,MemoryPool * pool,std::shared_ptr<ChunkedArray> * out)1243 Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType> value_type,
1244                           const ColumnDescriptor* descr, MemoryPool* pool,
1245                           std::shared_ptr<ChunkedArray>* out) {
1246   Datum result;
1247   std::shared_ptr<ChunkedArray> chunked_result;
1248   switch (value_type->id()) {
1249     case ::arrow::Type::DICTIONARY: {
1250       RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result));
1251       result = chunked_result;
1252     } break;
1253     case ::arrow::Type::NA: {
1254       result = std::make_shared<::arrow::NullArray>(reader->values_written());
1255       break;
1256     }
1257     case ::arrow::Type::INT32:
1258     case ::arrow::Type::INT64:
1259     case ::arrow::Type::FLOAT:
1260     case ::arrow::Type::DOUBLE:
1261       result = TransferZeroCopy(reader, value_type);
1262       break;
1263     case ::arrow::Type::BOOL:
1264       RETURN_NOT_OK(TransferBool(reader, pool, &result));
1265       break;
1266       TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
1267       TRANSFER_INT32(INT8, ::arrow::Int8Type);
1268       TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
1269       TRANSFER_INT32(INT16, ::arrow::Int16Type);
1270       TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
1271       TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
1272       TRANSFER_INT32(DATE32, ::arrow::Date32Type);
1273       TRANSFER_INT32(TIME32, ::arrow::Time32Type);
1274       TRANSFER_INT64(TIME64, ::arrow::Time64Type);
1275     case ::arrow::Type::DATE64:
1276       RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result));
1277       break;
1278     case ::arrow::Type::FIXED_SIZE_BINARY:
1279     case ::arrow::Type::BINARY:
1280     case ::arrow::Type::STRING: {
1281       RETURN_NOT_OK(TransferBinary(reader, value_type, &chunked_result));
1282       result = chunked_result;
1283     } break;
1284     case ::arrow::Type::DECIMAL: {
1285       switch (descr->physical_type()) {
1286         case ::parquet::Type::INT32: {
1287           RETURN_NOT_OK(
1288               DecimalIntegerTransfer<Int32Type>(reader, pool, value_type, &result));
1289         } break;
1290         case ::parquet::Type::INT64: {
1291           RETURN_NOT_OK(
1292               DecimalIntegerTransfer<Int64Type>(reader, pool, value_type, &result));
1293         } break;
1294         case ::parquet::Type::BYTE_ARRAY: {
1295           RETURN_NOT_OK(
1296               TransferDecimal<ByteArrayType>(reader, pool, value_type, &result));
1297         } break;
1298         case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
1299           RETURN_NOT_OK(TransferDecimal<FLBAType>(reader, pool, value_type, &result));
1300         } break;
1301         default:
1302           return Status::Invalid(
1303               "Physical type for decimal must be int32, int64, byte array, or fixed "
1304               "length binary");
1305       }
1306     } break;
1307     case ::arrow::Type::TIMESTAMP: {
1308       const ::arrow::TimestampType& timestamp_type =
1309           static_cast<::arrow::TimestampType&>(*value_type);
1310       switch (timestamp_type.unit()) {
1311         case ::arrow::TimeUnit::MILLI:
1312         case ::arrow::TimeUnit::MICRO: {
1313           result = TransferZeroCopy(reader, value_type);
1314         } break;
1315         case ::arrow::TimeUnit::NANO: {
1316           if (descr->physical_type() == ::parquet::Type::INT96) {
1317             RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result));
1318           } else {
1319             result = TransferZeroCopy(reader, value_type);
1320           }
1321         } break;
1322         default:
1323           return Status::NotImplemented("TimeUnit not supported");
1324       }
1325     } break;
1326     case ::arrow::Type::EXTENSION: {
1327       RETURN_NOT_OK(TransferExtension(reader, value_type, descr, pool, &result));
1328     } break;
1329     default:
1330       return Status::NotImplemented("No support for reading columns of type ",
1331                                     value_type->ToString());
1332   }
1333 
1334   DCHECK_NE(result.kind(), Datum::NONE);
1335 
1336   if (result.kind() == Datum::ARRAY) {
1337     *out = std::make_shared<ChunkedArray>(result.make_array());
1338   } else if (result.kind() == Datum::CHUNKED_ARRAY) {
1339     *out = result.chunked_array();
1340   } else {
1341     DCHECK(false) << "Should be impossible";
1342   }
1343 
1344   return Status::OK();
1345 }
1346 
ReconstructNestedList(const std::shared_ptr<Array> & arr,std::shared_ptr<Field> field,int16_t max_def_level,int16_t max_rep_level,const int16_t * def_levels,const int16_t * rep_levels,int64_t total_levels,::arrow::MemoryPool * pool,std::shared_ptr<Array> * out)1347 Status ReconstructNestedList(const std::shared_ptr<Array>& arr,
1348                              std::shared_ptr<Field> field, int16_t max_def_level,
1349                              int16_t max_rep_level, const int16_t* def_levels,
1350                              const int16_t* rep_levels, int64_t total_levels,
1351                              ::arrow::MemoryPool* pool, std::shared_ptr<Array>* out) {
1352   // Walk downwards to extract nullability
1353   std::vector<std::string> item_names;
1354   std::vector<bool> nullable;
1355   std::vector<std::shared_ptr<const ::arrow::KeyValueMetadata>> field_metadata;
1356   std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
1357   std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
1358   nullable.push_back(field->nullable());
1359   while (field->type()->num_fields() > 0) {
1360     if (field->type()->num_fields() > 1) {
1361       return Status::NotImplemented("Fields with more than one child are not supported.");
1362     } else {
1363       if (field->type()->id() != ::arrow::Type::LIST) {
1364         return Status::NotImplemented("Currently only nesting with Lists is supported.");
1365       }
1366       field = field->type()->field(0);
1367     }
1368     item_names.push_back(field->name());
1369     offset_builders.emplace_back(
1370         std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool));
1371     valid_bits_builders.emplace_back(
1372         std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool));
1373     nullable.push_back(field->nullable());
1374     field_metadata.push_back(field->metadata());
1375   }
1376 
1377   int64_t list_depth = offset_builders.size();
1378   // This describes the minimal definition that describes a level that
1379   // reflects a value in the primitive values array.
1380   int16_t values_def_level = max_def_level;
1381   if (nullable[nullable.size() - 1]) {
1382     values_def_level--;
1383   }
1384 
1385   // The definition levels that are needed so that a list is declared
1386   // as empty and not null.
1387   std::vector<int16_t> empty_def_level(list_depth);
1388   int def_level = 0;
1389   for (int i = 0; i < list_depth; i++) {
1390     if (nullable[i]) {
1391       def_level++;
1392     }
1393     empty_def_level[i] = static_cast<int16_t>(def_level);
1394     def_level++;
1395   }
1396 
1397   int32_t values_offset = 0;
1398   std::vector<int64_t> null_counts(list_depth, 0);
1399   for (int64_t i = 0; i < total_levels; i++) {
1400     int16_t rep_level = rep_levels[i];
1401     if (rep_level < max_rep_level) {
1402       for (int64_t j = rep_level; j < list_depth; j++) {
1403         if (j == (list_depth - 1)) {
1404           RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
1405         } else {
1406           RETURN_NOT_OK(offset_builders[j]->Append(
1407               static_cast<int32_t>(offset_builders[j + 1]->length())));
1408         }
1409 
1410         if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
1411           RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
1412           null_counts[j]++;
1413           break;
1414         } else {
1415           RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
1416           if (empty_def_level[j] == def_levels[i]) {
1417             break;
1418           }
1419         }
1420       }
1421     }
1422     if (def_levels[i] >= values_def_level) {
1423       values_offset++;
1424     }
1425   }
1426   // Add the final offset to all lists
1427   for (int64_t j = 0; j < list_depth; j++) {
1428     if (j == (list_depth - 1)) {
1429       RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
1430     } else {
1431       RETURN_NOT_OK(offset_builders[j]->Append(
1432           static_cast<int32_t>(offset_builders[j + 1]->length())));
1433     }
1434   }
1435 
1436   std::vector<std::shared_ptr<Buffer>> offsets;
1437   std::vector<std::shared_ptr<Buffer>> valid_bits;
1438   std::vector<int64_t> list_lengths;
1439   for (int64_t j = 0; j < list_depth; j++) {
1440     list_lengths.push_back(offset_builders[j]->length() - 1);
1441     std::shared_ptr<Array> array;
1442     RETURN_NOT_OK(offset_builders[j]->Finish(&array));
1443     offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
1444     RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
1445     valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
1446   }
1447 
1448   *out = arr;
1449 
1450   // TODO(wesm): Use passed-in field
1451   for (int64_t j = list_depth - 1; j >= 0; j--) {
1452     auto list_type = ::arrow::list(::arrow::field(item_names[j], (*out)->type(),
1453                                                   nullable[j + 1], field_metadata[j]));
1454     *out = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
1455                                                 *out, valid_bits[j], null_counts[j]);
1456   }
1457   return Status::OK();
1458 }
1459 
1460 }  // namespace arrow
1461 }  // namespace parquet
1462