1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "parquet/arrow/reader_internal.h"
19
20 #include <algorithm>
21 #include <climits>
22 #include <cstdint>
23 #include <cstring>
24 #include <memory>
25 #include <string>
26 #include <type_traits>
27 #include <unordered_map>
28 #include <vector>
29
30 #include "arrow/array.h"
31 #include "arrow/builder.h"
32 #include "arrow/datum.h"
33 #include "arrow/extension_type.h"
34 #include "arrow/io/memory.h"
35 #include "arrow/ipc/reader.h"
36 #include "arrow/ipc/writer.h"
37 #include "arrow/status.h"
38 #include "arrow/table.h"
39 #include "arrow/type.h"
40 #include "arrow/type_traits.h"
41 #include "arrow/util/base64.h"
42 #include "arrow/util/checked_cast.h"
43 #include "arrow/util/int_util.h"
44 #include "arrow/util/logging.h"
45 #include "arrow/util/string_view.h"
46 #include "arrow/util/ubsan.h"
47
48 #include "parquet/arrow/reader.h"
49 #include "parquet/column_reader.h"
50 #include "parquet/platform.h"
51 #include "parquet/properties.h"
52 #include "parquet/schema.h"
53 #include "parquet/statistics.h"
54 #include "parquet/types.h"
55
56 using arrow::Array;
57 using arrow::BooleanArray;
58 using arrow::ChunkedArray;
59 using arrow::DataType;
60 using arrow::Datum;
61 using arrow::Field;
62 using arrow::Int32Array;
63 using arrow::ListArray;
64 using arrow::MemoryPool;
65 using arrow::ResizableBuffer;
66 using arrow::Status;
67 using arrow::StructArray;
68 using arrow::Table;
69 using arrow::TimestampArray;
70
71 using ::arrow::BitUtil::FromBigEndian;
72 using ::arrow::internal::checked_cast;
73 using ::arrow::internal::checked_pointer_cast;
74 using ::arrow::internal::SafeLeftShift;
75 using ::arrow::util::SafeLoadAs;
76
77 using parquet::internal::BinaryRecordReader;
78 using parquet::internal::DictionaryRecordReader;
79 using parquet::internal::RecordReader;
80 using parquet::schema::GroupNode;
81 using parquet::schema::Node;
82 using parquet::schema::PrimitiveNode;
83 using ParquetType = parquet::Type;
84
85 namespace parquet {
86 namespace arrow {
87
88 template <typename ArrowType>
89 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
90
91 // ----------------------------------------------------------------------
92 // Schema logic
93
MakeArrowDecimal(const LogicalType & logical_type,std::shared_ptr<DataType> * out)94 static Status MakeArrowDecimal(const LogicalType& logical_type,
95 std::shared_ptr<DataType>* out) {
96 const auto& decimal = checked_cast<const DecimalLogicalType&>(logical_type);
97 return ::arrow::Decimal128Type::Make(decimal.precision(), decimal.scale()).Value(out);
98 }
99
MakeArrowInt(const LogicalType & logical_type,std::shared_ptr<DataType> * out)100 static Status MakeArrowInt(const LogicalType& logical_type,
101 std::shared_ptr<DataType>* out) {
102 const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
103 switch (integer.bit_width()) {
104 case 8:
105 *out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
106 break;
107 case 16:
108 *out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
109 break;
110 case 32:
111 *out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
112 break;
113 default:
114 return Status::TypeError(logical_type.ToString(),
115 " can not annotate physical type Int32");
116 }
117 return Status::OK();
118 }
119
MakeArrowInt64(const LogicalType & logical_type,std::shared_ptr<DataType> * out)120 static Status MakeArrowInt64(const LogicalType& logical_type,
121 std::shared_ptr<DataType>* out) {
122 const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
123 switch (integer.bit_width()) {
124 case 64:
125 *out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
126 break;
127 default:
128 return Status::TypeError(logical_type.ToString(),
129 " can not annotate physical type Int64");
130 }
131 return Status::OK();
132 }
133
MakeArrowTime32(const LogicalType & logical_type,std::shared_ptr<DataType> * out)134 static Status MakeArrowTime32(const LogicalType& logical_type,
135 std::shared_ptr<DataType>* out) {
136 const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
137 switch (time.time_unit()) {
138 case LogicalType::TimeUnit::MILLIS:
139 *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
140 break;
141 default:
142 return Status::TypeError(logical_type.ToString(),
143 " can not annotate physical type Time32");
144 }
145 return Status::OK();
146 }
147
MakeArrowTime64(const LogicalType & logical_type,std::shared_ptr<DataType> * out)148 static Status MakeArrowTime64(const LogicalType& logical_type,
149 std::shared_ptr<DataType>* out) {
150 const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
151 switch (time.time_unit()) {
152 case LogicalType::TimeUnit::MICROS:
153 *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
154 break;
155 case LogicalType::TimeUnit::NANOS:
156 *out = ::arrow::time64(::arrow::TimeUnit::NANO);
157 break;
158 default:
159 return Status::TypeError(logical_type.ToString(),
160 " can not annotate physical type Time64");
161 }
162 return Status::OK();
163 }
164
MakeArrowTimestamp(const LogicalType & logical_type,std::shared_ptr<DataType> * out)165 static Status MakeArrowTimestamp(const LogicalType& logical_type,
166 std::shared_ptr<DataType>* out) {
167 const auto& timestamp = checked_cast<const TimestampLogicalType&>(logical_type);
168 const bool utc_normalized =
169 timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc();
170 static const char* utc_timezone = "UTC";
171 switch (timestamp.time_unit()) {
172 case LogicalType::TimeUnit::MILLIS:
173 *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone)
174 : ::arrow::timestamp(::arrow::TimeUnit::MILLI));
175 break;
176 case LogicalType::TimeUnit::MICROS:
177 *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone)
178 : ::arrow::timestamp(::arrow::TimeUnit::MICRO));
179 break;
180 case LogicalType::TimeUnit::NANOS:
181 *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone)
182 : ::arrow::timestamp(::arrow::TimeUnit::NANO));
183 break;
184 default:
185 return Status::TypeError("Unrecognized time unit in timestamp logical_type: ",
186 logical_type.ToString());
187 }
188 return Status::OK();
189 }
190
FromByteArray(const LogicalType & logical_type,std::shared_ptr<DataType> * out)191 static Status FromByteArray(const LogicalType& logical_type,
192 std::shared_ptr<DataType>* out) {
193 switch (logical_type.type()) {
194 case LogicalType::Type::STRING:
195 *out = ::arrow::utf8();
196 break;
197 case LogicalType::Type::DECIMAL:
198 RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
199 break;
200 case LogicalType::Type::NONE:
201 case LogicalType::Type::ENUM:
202 case LogicalType::Type::JSON:
203 case LogicalType::Type::BSON:
204 *out = ::arrow::binary();
205 break;
206 default:
207 return Status::NotImplemented("Unhandled logical logical_type ",
208 logical_type.ToString(), " for binary array");
209 }
210 return Status::OK();
211 }
212
FromFLBA(const LogicalType & logical_type,int32_t physical_length,std::shared_ptr<DataType> * out)213 static Status FromFLBA(const LogicalType& logical_type, int32_t physical_length,
214 std::shared_ptr<DataType>* out) {
215 switch (logical_type.type()) {
216 case LogicalType::Type::DECIMAL:
217 RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
218 break;
219 case LogicalType::Type::NONE:
220 case LogicalType::Type::INTERVAL:
221 case LogicalType::Type::UUID:
222 *out = ::arrow::fixed_size_binary(physical_length);
223 break;
224 default:
225 return Status::NotImplemented("Unhandled logical logical_type ",
226 logical_type.ToString(),
227 " for fixed-length binary array");
228 }
229
230 return Status::OK();
231 }
232
FromInt32(const LogicalType & logical_type,std::shared_ptr<DataType> * out)233 static Status FromInt32(const LogicalType& logical_type, std::shared_ptr<DataType>* out) {
234 switch (logical_type.type()) {
235 case LogicalType::Type::INT:
236 RETURN_NOT_OK(MakeArrowInt(logical_type, out));
237 break;
238 case LogicalType::Type::DATE:
239 *out = ::arrow::date32();
240 break;
241 case LogicalType::Type::TIME:
242 RETURN_NOT_OK(MakeArrowTime32(logical_type, out));
243 break;
244 case LogicalType::Type::DECIMAL:
245 RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
246 break;
247 case LogicalType::Type::NONE:
248 *out = ::arrow::int32();
249 break;
250 default:
251 return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
252 " for INT32");
253 }
254 return Status::OK();
255 }
256
FromInt64(const LogicalType & logical_type,std::shared_ptr<DataType> * out)257 static Status FromInt64(const LogicalType& logical_type, std::shared_ptr<DataType>* out) {
258 switch (logical_type.type()) {
259 case LogicalType::Type::INT:
260 RETURN_NOT_OK(MakeArrowInt64(logical_type, out));
261 break;
262 case LogicalType::Type::DECIMAL:
263 RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
264 break;
265 case LogicalType::Type::TIMESTAMP:
266 RETURN_NOT_OK(MakeArrowTimestamp(logical_type, out));
267 break;
268 case LogicalType::Type::TIME:
269 RETURN_NOT_OK(MakeArrowTime64(logical_type, out));
270 break;
271 case LogicalType::Type::NONE:
272 *out = ::arrow::int64();
273 break;
274 default:
275 return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
276 " for INT64");
277 }
278 return Status::OK();
279 }
280
GetPrimitiveType(const schema::PrimitiveNode & primitive,std::shared_ptr<DataType> * out)281 Status GetPrimitiveType(const schema::PrimitiveNode& primitive,
282 std::shared_ptr<DataType>* out) {
283 const std::shared_ptr<const LogicalType>& logical_type = primitive.logical_type();
284 if (logical_type->is_invalid() || logical_type->is_null()) {
285 *out = ::arrow::null();
286 return Status::OK();
287 }
288
289 switch (primitive.physical_type()) {
290 case ParquetType::BOOLEAN:
291 *out = ::arrow::boolean();
292 break;
293 case ParquetType::INT32:
294 RETURN_NOT_OK(FromInt32(*logical_type, out));
295 break;
296 case ParquetType::INT64:
297 RETURN_NOT_OK(FromInt64(*logical_type, out));
298 break;
299 case ParquetType::INT96:
300 *out = ::arrow::timestamp(::arrow::TimeUnit::NANO);
301 break;
302 case ParquetType::FLOAT:
303 *out = ::arrow::float32();
304 break;
305 case ParquetType::DOUBLE:
306 *out = ::arrow::float64();
307 break;
308 case ParquetType::BYTE_ARRAY:
309 RETURN_NOT_OK(FromByteArray(*logical_type, out));
310 break;
311 case ParquetType::FIXED_LEN_BYTE_ARRAY:
312 RETURN_NOT_OK(FromFLBA(*logical_type, primitive.type_length(), out));
313 break;
314 default: {
315 // PARQUET-1565: This can occur if the file is corrupt
316 return Status::IOError("Invalid physical column type: ",
317 TypeToString(primitive.physical_type()));
318 }
319 }
320 return Status::OK();
321 }
322
323 struct SchemaTreeContext {
324 SchemaManifest* manifest;
325 ArrowReaderProperties properties;
326 const SchemaDescriptor* schema;
327
LinkParentparquet::arrow::SchemaTreeContext328 void LinkParent(const SchemaField* child, const SchemaField* parent) {
329 manifest->child_to_parent[child] = parent;
330 }
331
RecordLeafparquet::arrow::SchemaTreeContext332 void RecordLeaf(const SchemaField* leaf) {
333 manifest->column_index_to_field[leaf->column_index] = leaf;
334 }
335 };
336
IsDictionaryReadSupported(const DataType & type)337 bool IsDictionaryReadSupported(const DataType& type) {
338 // Only supported currently for BYTE_ARRAY types
339 return type.id() == ::arrow::Type::BINARY || type.id() == ::arrow::Type::STRING;
340 }
341
GetTypeForNode(int column_index,const schema::PrimitiveNode & primitive_node,SchemaTreeContext * ctx,std::shared_ptr<DataType> * out)342 Status GetTypeForNode(int column_index, const schema::PrimitiveNode& primitive_node,
343 SchemaTreeContext* ctx, std::shared_ptr<DataType>* out) {
344 std::shared_ptr<DataType> storage_type;
345 RETURN_NOT_OK(GetPrimitiveType(primitive_node, &storage_type));
346 if (ctx->properties.read_dictionary(column_index) &&
347 IsDictionaryReadSupported(*storage_type)) {
348 *out = ::arrow::dictionary(::arrow::int32(), storage_type);
349 } else {
350 *out = storage_type;
351 }
352 return Status::OK();
353 }
354
355 Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level,
356 SchemaTreeContext* ctx, const SchemaField* parent,
357 SchemaField* out);
358
359 Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level,
360 int16_t max_rep_level, SchemaTreeContext* ctx,
361 const SchemaField* parent, SchemaField* out);
362
PopulateLeaf(int column_index,const std::shared_ptr<Field> & field,int16_t max_def_level,int16_t max_rep_level,SchemaTreeContext * ctx,const SchemaField * parent,SchemaField * out)363 Status PopulateLeaf(int column_index, const std::shared_ptr<Field>& field,
364 int16_t max_def_level, int16_t max_rep_level, SchemaTreeContext* ctx,
365 const SchemaField* parent, SchemaField* out) {
366 out->field = field;
367 out->column_index = column_index;
368 out->max_definition_level = max_def_level;
369 out->max_repetition_level = max_rep_level;
370 ctx->RecordLeaf(out);
371 ctx->LinkParent(out, parent);
372 return Status::OK();
373 }
374
375 // Special case mentioned in the format spec:
376 // If the name is array or ends in _tuple, this should be a list of struct
377 // even for single child elements.
HasStructListName(const GroupNode & node)378 bool HasStructListName(const GroupNode& node) {
379 ::arrow::util::string_view name{node.name()};
380 return name == "array" || name.ends_with("_tuple");
381 }
382
FieldIdMetadata(int field_id)383 std::shared_ptr<::arrow::KeyValueMetadata> FieldIdMetadata(int field_id) {
384 return ::arrow::key_value_metadata({"PARQUET:field_id"}, {std::to_string(field_id)});
385 }
386
GroupToStruct(const GroupNode & node,int16_t max_def_level,int16_t max_rep_level,SchemaTreeContext * ctx,const SchemaField * parent,SchemaField * out)387 Status GroupToStruct(const GroupNode& node, int16_t max_def_level, int16_t max_rep_level,
388 SchemaTreeContext* ctx, const SchemaField* parent,
389 SchemaField* out) {
390 std::vector<std::shared_ptr<Field>> arrow_fields;
391 out->children.resize(node.field_count());
392 for (int i = 0; i < node.field_count(); i++) {
393 RETURN_NOT_OK(NodeToSchemaField(*node.field(i), max_def_level, max_rep_level, ctx,
394 out, &out->children[i]));
395 arrow_fields.push_back(out->children[i].field);
396 }
397 auto struct_type = ::arrow::struct_(arrow_fields);
398 out->field = ::arrow::field(node.name(), struct_type, node.is_optional(),
399 FieldIdMetadata(node.field_id()));
400 out->max_definition_level = max_def_level;
401 out->max_repetition_level = max_rep_level;
402 return Status::OK();
403 }
404
ListToSchemaField(const GroupNode & group,int16_t max_def_level,int16_t max_rep_level,SchemaTreeContext * ctx,const SchemaField * parent,SchemaField * out)405 Status ListToSchemaField(const GroupNode& group, int16_t max_def_level,
406 int16_t max_rep_level, SchemaTreeContext* ctx,
407 const SchemaField* parent, SchemaField* out) {
408 if (group.field_count() != 1) {
409 return Status::NotImplemented(
410 "Only LIST-annotated groups with a single child can be handled.");
411 }
412
413 out->children.resize(1);
414 SchemaField* child_field = &out->children[0];
415
416 ctx->LinkParent(out, parent);
417 ctx->LinkParent(child_field, out);
418
419 const Node& list_node = *group.field(0);
420
421 if (!list_node.is_repeated()) {
422 return Status::NotImplemented(
423 "Non-repeated nodes in a LIST-annotated group are not supported.");
424 }
425
426 ++max_def_level;
427 ++max_rep_level;
428 if (list_node.is_group()) {
429 // Resolve 3-level encoding
430 //
431 // required/optional group name=whatever {
432 // repeated group name=list {
433 // required/optional TYPE item;
434 // }
435 // }
436 //
437 // yields list<item: TYPE ?nullable> ?nullable
438 //
439 // We distinguish the special base that we have
440 //
441 // required/optional group name=whatever {
442 // repeated group name=array or $SOMETHING_tuple {
443 // required/optional TYPE item;
444 // }
445 // }
446 //
447 // In this latter case, the inner type of the list should be a struct
448 // rather than a primitive value
449 //
450 // yields list<item: struct<item: TYPE ?nullable> not null> ?nullable
451 const auto& list_group = static_cast<const GroupNode&>(list_node);
452 // Special case mentioned in the format spec:
453 // If the name is array or ends in _tuple, this should be a list of struct
454 // even for single child elements.
455 if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
456 // List of primitive type
457 RETURN_NOT_OK(NodeToSchemaField(*list_group.field(0), max_def_level, max_rep_level,
458 ctx, out, child_field));
459 } else {
460 RETURN_NOT_OK(
461 GroupToStruct(list_group, max_def_level, max_rep_level, ctx, out, child_field));
462 }
463 } else {
464 // Two-level list encoding
465 //
466 // required/optional group LIST {
467 // repeated TYPE;
468 // }
469 const auto& primitive_node = static_cast<const PrimitiveNode&>(list_node);
470 int column_index = ctx->schema->GetColumnIndex(primitive_node);
471 std::shared_ptr<DataType> type;
472 RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type));
473 auto item_field = ::arrow::field(list_node.name(), type, /*nullable=*/false,
474 FieldIdMetadata(list_node.field_id()));
475 RETURN_NOT_OK(PopulateLeaf(column_index, item_field, max_def_level, max_rep_level,
476 ctx, out, child_field));
477 }
478 out->field = ::arrow::field(group.name(), ::arrow::list(child_field->field),
479 group.is_optional(), FieldIdMetadata(group.field_id()));
480 out->max_definition_level = max_def_level;
481 out->max_repetition_level = max_rep_level;
482 return Status::OK();
483 }
484
GroupToSchemaField(const GroupNode & node,int16_t max_def_level,int16_t max_rep_level,SchemaTreeContext * ctx,const SchemaField * parent,SchemaField * out)485 Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level,
486 int16_t max_rep_level, SchemaTreeContext* ctx,
487 const SchemaField* parent, SchemaField* out) {
488 if (node.logical_type()->is_list()) {
489 return ListToSchemaField(node, max_def_level, max_rep_level, ctx, parent, out);
490 }
491 std::shared_ptr<DataType> type;
492 if (node.is_repeated()) {
493 // Simple repeated struct
494 //
495 // repeated group $NAME {
496 // r/o TYPE[0] f0
497 // r/o TYPE[1] f1
498 // }
499 out->children.resize(1);
500 RETURN_NOT_OK(
501 GroupToStruct(node, max_def_level, max_rep_level, ctx, out, &out->children[0]));
502 out->field = ::arrow::field(node.name(), ::arrow::list(out->children[0].field),
503 node.is_optional(), FieldIdMetadata(node.field_id()));
504 out->max_definition_level = max_def_level;
505 out->max_repetition_level = max_rep_level;
506 return Status::OK();
507 } else {
508 return GroupToStruct(node, max_def_level, max_rep_level, ctx, parent, out);
509 }
510 }
511
NodeToSchemaField(const Node & node,int16_t max_def_level,int16_t max_rep_level,SchemaTreeContext * ctx,const SchemaField * parent,SchemaField * out)512 Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level,
513 SchemaTreeContext* ctx, const SchemaField* parent,
514 SchemaField* out) {
515 /// Workhorse function for converting a Parquet schema node to an Arrow
516 /// type. Handles different conventions for nested data
517 if (node.is_optional()) {
518 ++max_def_level;
519 } else if (node.is_repeated()) {
520 // Repeated fields add both a repetition and definition level. This is used
521 // to distinguish between an empty list and a list with an item in it.
522 ++max_rep_level;
523 ++max_def_level;
524 }
525
526 ctx->LinkParent(out, parent);
527
528 // Now, walk the schema and create a ColumnDescriptor for each leaf node
529 if (node.is_group()) {
530 // A nested field, but we don't know what kind yet
531 return GroupToSchemaField(static_cast<const GroupNode&>(node), max_def_level,
532 max_rep_level, ctx, parent, out);
533 } else {
534 // Either a normal flat primitive type, or a list type encoded with 1-level
535 // list encoding. Note that the 3-level encoding is the form recommended by
536 // the parquet specification, but technically we can have either
537 //
538 // required/optional $TYPE $FIELD_NAME
539 //
540 // or
541 //
542 // repeated $TYPE $FIELD_NAME
543 const auto& primitive_node = static_cast<const PrimitiveNode&>(node);
544 int column_index = ctx->schema->GetColumnIndex(primitive_node);
545 std::shared_ptr<DataType> type;
546 RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type));
547 if (node.is_repeated()) {
548 // One-level list encoding, e.g.
549 // a: repeated int32;
550 out->children.resize(1);
551 auto child_field = ::arrow::field(node.name(), type, /*nullable=*/false);
552 RETURN_NOT_OK(PopulateLeaf(column_index, child_field, max_def_level, max_rep_level,
553 ctx, out, &out->children[0]));
554
555 out->field = ::arrow::field(node.name(), ::arrow::list(child_field),
556 /*nullable=*/false, FieldIdMetadata(node.field_id()));
557 // Is this right?
558 out->max_definition_level = max_def_level;
559 out->max_repetition_level = max_rep_level;
560 return Status::OK();
561 } else {
562 // A normal (required/optional) primitive node
563 return PopulateLeaf(column_index,
564 ::arrow::field(node.name(), type, node.is_optional(),
565 FieldIdMetadata(node.field_id())),
566 max_def_level, max_rep_level, ctx, parent, out);
567 }
568 }
569 }
570
571 // Get the original Arrow schema, as serialized in the Parquet metadata
GetOriginSchema(const std::shared_ptr<const KeyValueMetadata> & metadata,std::shared_ptr<const KeyValueMetadata> * clean_metadata,std::shared_ptr<::arrow::Schema> * out)572 Status GetOriginSchema(const std::shared_ptr<const KeyValueMetadata>& metadata,
573 std::shared_ptr<const KeyValueMetadata>* clean_metadata,
574 std::shared_ptr<::arrow::Schema>* out) {
575 if (metadata == nullptr) {
576 *out = nullptr;
577 *clean_metadata = nullptr;
578 return Status::OK();
579 }
580
581 static const std::string kArrowSchemaKey = "ARROW:schema";
582 int schema_index = metadata->FindKey(kArrowSchemaKey);
583 if (schema_index == -1) {
584 *out = nullptr;
585 *clean_metadata = metadata;
586 return Status::OK();
587 }
588
589 // The original Arrow schema was serialized using the store_schema option.
590 // We deserialize it here and use it to inform read options such as
591 // dictionary-encoded fields.
592 auto decoded = ::arrow::util::base64_decode(metadata->value(schema_index));
593 auto schema_buf = std::make_shared<Buffer>(decoded);
594
595 ::arrow::ipc::DictionaryMemo dict_memo;
596 ::arrow::io::BufferReader input(schema_buf);
597
598 ARROW_ASSIGN_OR_RAISE(*out, ::arrow::ipc::ReadSchema(&input, &dict_memo));
599
600 if (metadata->size() > 1) {
601 // Copy the metadata without the schema key
602 auto new_metadata = ::arrow::key_value_metadata({}, {});
603 new_metadata->reserve(metadata->size() - 1);
604 for (int64_t i = 0; i < metadata->size(); ++i) {
605 if (i == schema_index) continue;
606 new_metadata->Append(metadata->key(i), metadata->value(i));
607 }
608 *clean_metadata = new_metadata;
609 } else {
610 // No other keys, let metadata be null
611 *clean_metadata = nullptr;
612 }
613 return Status::OK();
614 }
615
616 // Restore original Arrow field information that was serialized as Parquet metadata
617 // but that is not necessarily present in the field reconstitued from Parquet data
618 // (for example, Parquet timestamp types doesn't carry timezone information).
ApplyOriginalMetadata(std::shared_ptr<Field> field,const Field & origin_field,std::shared_ptr<Field> * out)619 Status ApplyOriginalMetadata(std::shared_ptr<Field> field, const Field& origin_field,
620 std::shared_ptr<Field>* out) {
621 auto origin_type = origin_field.type();
622 if (field->type()->id() == ::arrow::Type::TIMESTAMP) {
623 // Restore time zone, if any
624 const auto& ts_type = static_cast<const ::arrow::TimestampType&>(*field->type());
625 const auto& ts_origin_type = static_cast<const ::arrow::TimestampType&>(*origin_type);
626
627 // If the unit is the same and the data is tz-aware, then set the original
628 // time zone, since Parquet has no native storage for timezones
629 if (ts_type.unit() == ts_origin_type.unit() && ts_type.timezone() == "UTC" &&
630 ts_origin_type.timezone() != "") {
631 field = field->WithType(origin_type);
632 }
633 }
634 if (origin_type->id() == ::arrow::Type::DICTIONARY &&
635 field->type()->id() != ::arrow::Type::DICTIONARY &&
636 IsDictionaryReadSupported(*field->type())) {
637 const auto& dict_origin_type =
638 static_cast<const ::arrow::DictionaryType&>(*origin_type);
639 field = field->WithType(
640 ::arrow::dictionary(::arrow::int32(), field->type(), dict_origin_type.ordered()));
641 }
642
643 if (origin_type->id() == ::arrow::Type::EXTENSION) {
644 // Restore extension type, if the storage type is as read from Parquet
645 const auto& ex_type = checked_cast<const ::arrow::ExtensionType&>(*origin_type);
646 if (ex_type.storage_type()->Equals(*field->type())) {
647 field = field->WithType(origin_type);
648 }
649 }
650
651 // Restore field metadata
652 std::shared_ptr<const KeyValueMetadata> field_metadata = origin_field.metadata();
653 if (field_metadata != nullptr) {
654 if (field->metadata()) {
655 // Prefer the metadata keys (like field_id) from the current metadata
656 field_metadata = field_metadata->Merge(*field->metadata());
657 }
658 field = field->WithMetadata(field_metadata);
659 }
660 *out = field;
661 return Status::OK();
662 }
663
Make(const SchemaDescriptor * schema,const std::shared_ptr<const KeyValueMetadata> & metadata,const ArrowReaderProperties & properties,SchemaManifest * manifest)664 Status SchemaManifest::Make(const SchemaDescriptor* schema,
665 const std::shared_ptr<const KeyValueMetadata>& metadata,
666 const ArrowReaderProperties& properties,
667 SchemaManifest* manifest) {
668 std::shared_ptr<::arrow::Schema> origin_schema;
669 RETURN_NOT_OK(
670 GetOriginSchema(metadata, &manifest->schema_metadata, &manifest->origin_schema));
671
672 SchemaTreeContext ctx;
673 ctx.manifest = manifest;
674 ctx.properties = properties;
675 ctx.schema = schema;
676 const GroupNode& schema_node = *schema->group_node();
677 manifest->descr = schema;
678 manifest->schema_fields.resize(schema_node.field_count());
679 for (int i = 0; i < static_cast<int>(schema_node.field_count()); ++i) {
680 SchemaField* out_field = &manifest->schema_fields[i];
681 RETURN_NOT_OK(NodeToSchemaField(*schema_node.field(i), 0, 0, &ctx,
682 /*parent=*/nullptr, out_field));
683
684 // TODO(wesm): as follow up to ARROW-3246, we should really pass the origin
685 // schema (if any) through all functions in the schema reconstruction, but
686 // I'm being lazy and just setting dictionary fields at the top level for
687 // now
688 if (manifest->origin_schema == nullptr) {
689 continue;
690 }
691 auto origin_field = manifest->origin_schema->field(i);
692 RETURN_NOT_OK(
693 ApplyOriginalMetadata(out_field->field, *origin_field, &out_field->field));
694 }
695 return Status::OK();
696 }
697
698 template <typename CType, typename StatisticsType>
MakeMinMaxScalar(const Statistics & statistics,std::shared_ptr<::arrow::Scalar> * min,std::shared_ptr<::arrow::Scalar> * max)699 Status MakeMinMaxScalar(const Statistics& statistics,
700 std::shared_ptr<::arrow::Scalar>* min,
701 std::shared_ptr<::arrow::Scalar>* max) {
702 const auto& typed_statistics = checked_cast<const StatisticsType&>(statistics);
703 *min = ::arrow::MakeScalar(static_cast<CType>(typed_statistics.min()));
704 *max = ::arrow::MakeScalar(static_cast<CType>(typed_statistics.max()));
705 return Status::OK();
706 }
707
708 template <typename StatisticsType>
MakeMinMaxIntegralScalar(const Statistics & statistics,std::shared_ptr<::arrow::Scalar> * min,std::shared_ptr<::arrow::Scalar> * max)709 Status MakeMinMaxIntegralScalar(const Statistics& statistics,
710 std::shared_ptr<::arrow::Scalar>* min,
711 std::shared_ptr<::arrow::Scalar>* max) {
712 const auto column_desc = statistics.descr();
713 const auto& logical_type = column_desc->logical_type();
714 const auto& integer = checked_pointer_cast<const IntLogicalType>(logical_type);
715 const bool is_signed = integer->is_signed();
716
717 switch (integer->bit_width()) {
718 case 8:
719 return is_signed ? MakeMinMaxScalar<int8_t, StatisticsType>(statistics, min, max)
720 : MakeMinMaxScalar<uint8_t, StatisticsType>(statistics, min, max);
721 case 16:
722 return is_signed ? MakeMinMaxScalar<int16_t, StatisticsType>(statistics, min, max)
723 : MakeMinMaxScalar<uint16_t, StatisticsType>(statistics, min, max);
724 case 32:
725 return is_signed ? MakeMinMaxScalar<int32_t, StatisticsType>(statistics, min, max)
726 : MakeMinMaxScalar<uint32_t, StatisticsType>(statistics, min, max);
727 case 64:
728 return is_signed ? MakeMinMaxScalar<int64_t, StatisticsType>(statistics, min, max)
729 : MakeMinMaxScalar<uint64_t, StatisticsType>(statistics, min, max);
730 }
731
732 return Status::OK();
733 }
734
735 template <typename StatisticsType>
TypedIntegralStatisticsAsScalars(const Statistics & statistics,std::shared_ptr<::arrow::Scalar> * min,std::shared_ptr<::arrow::Scalar> * max)736 Status TypedIntegralStatisticsAsScalars(const Statistics& statistics,
737 std::shared_ptr<::arrow::Scalar>* min,
738 std::shared_ptr<::arrow::Scalar>* max) {
739 auto column_desc = statistics.descr();
740 auto logical_type = column_desc->logical_type();
741
742 switch (logical_type->type()) {
743 case LogicalType::Type::INT:
744 return MakeMinMaxIntegralScalar<StatisticsType>(statistics, min, max);
745 case LogicalType::Type::NONE:
746 // Fallback to the physical type
747 using CType = typename StatisticsType::T;
748 return MakeMinMaxScalar<CType, StatisticsType>(statistics, min, max);
749 default:
750 return Status::NotImplemented("Cannot extract statistics for type ");
751 }
752
753 return Status::OK();
754 }
755
StatisticsAsScalars(const Statistics & statistics,std::shared_ptr<::arrow::Scalar> * min,std::shared_ptr<::arrow::Scalar> * max)756 Status StatisticsAsScalars(const Statistics& statistics,
757 std::shared_ptr<::arrow::Scalar>* min,
758 std::shared_ptr<::arrow::Scalar>* max) {
759 if (!statistics.HasMinMax()) {
760 return Status::Invalid("Statistics has no min max.");
761 }
762
763 auto column_desc = statistics.descr();
764 if (column_desc == nullptr) {
765 return Status::Invalid("Statistics carries no descriptor, can't infer arrow type.");
766 }
767
768 auto physical_type = column_desc->physical_type();
769
770 switch (physical_type) {
771 case Type::BOOLEAN:
772 return MakeMinMaxScalar<bool, BoolStatistics>(statistics, min, max);
773 case Type::FLOAT:
774 return MakeMinMaxScalar<float, FloatStatistics>(statistics, min, max);
775 case Type::DOUBLE:
776 return MakeMinMaxScalar<double, DoubleStatistics>(statistics, min, max);
777 case Type::INT32:
778 return TypedIntegralStatisticsAsScalars<Int32Statistics>(statistics, min, max);
779 case Type::INT64:
780 return TypedIntegralStatisticsAsScalars<Int64Statistics>(statistics, min, max);
781 default:
782 return Status::NotImplemented("Extract statistics unsupported for physical_type ",
783 physical_type, " unsupported.");
784 }
785
786 return Status::OK();
787 }
788
789 // ----------------------------------------------------------------------
790 // Primitive types
791
792 template <typename ArrowType, typename ParquetType>
TransferInt(RecordReader * reader,MemoryPool * pool,const std::shared_ptr<DataType> & type,Datum * out)793 Status TransferInt(RecordReader* reader, MemoryPool* pool,
794 const std::shared_ptr<DataType>& type, Datum* out) {
795 using ArrowCType = typename ArrowType::c_type;
796 using ParquetCType = typename ParquetType::c_type;
797 int64_t length = reader->values_written();
798 ARROW_ASSIGN_OR_RAISE(auto data,
799 ::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool));
800
801 auto values = reinterpret_cast<const ParquetCType*>(reader->values());
802 auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
803 std::copy(values, values + length, out_ptr);
804 *out = std::make_shared<ArrayType<ArrowType>>(
805 type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count());
806 return Status::OK();
807 }
808
TransferZeroCopy(RecordReader * reader,const std::shared_ptr<DataType> & type)809 std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
810 const std::shared_ptr<DataType>& type) {
811 std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
812 reader->ReleaseValues()};
813 auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(),
814 buffers, reader->null_count());
815 return ::arrow::MakeArray(data);
816 }
817
TransferBool(RecordReader * reader,MemoryPool * pool,Datum * out)818 Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
819 int64_t length = reader->values_written();
820
821 const int64_t buffer_size = BitUtil::BytesForBits(length);
822 ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(buffer_size, pool));
823
824 // Transfer boolean values to packed bitmap
825 auto values = reinterpret_cast<const bool*>(reader->values());
826 uint8_t* data_ptr = data->mutable_data();
827 memset(data_ptr, 0, buffer_size);
828
829 for (int64_t i = 0; i < length; i++) {
830 if (values[i]) {
831 ::arrow::BitUtil::SetBit(data_ptr, i);
832 }
833 }
834
835 *out = std::make_shared<BooleanArray>(length, std::move(data), reader->ReleaseIsValid(),
836 reader->null_count());
837 return Status::OK();
838 }
839
TransferInt96(RecordReader * reader,MemoryPool * pool,const std::shared_ptr<DataType> & type,Datum * out)840 Status TransferInt96(RecordReader* reader, MemoryPool* pool,
841 const std::shared_ptr<DataType>& type, Datum* out) {
842 int64_t length = reader->values_written();
843 auto values = reinterpret_cast<const Int96*>(reader->values());
844 ARROW_ASSIGN_OR_RAISE(auto data,
845 ::arrow::AllocateBuffer(length * sizeof(int64_t), pool));
846 auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
847 for (int64_t i = 0; i < length; i++) {
848 if (values[i].value[2] == 0) {
849 // Happens for null entries: avoid triggering UBSAN as that Int96 timestamp
850 // isn't representable as a 64-bit Unix timestamp.
851 *data_ptr++ = 0;
852 } else {
853 *data_ptr++ = Int96GetNanoSeconds(values[i]);
854 }
855 }
856 *out = std::make_shared<TimestampArray>(type, length, std::move(data),
857 reader->ReleaseIsValid(), reader->null_count());
858 return Status::OK();
859 }
860
TransferDate64(RecordReader * reader,MemoryPool * pool,const std::shared_ptr<DataType> & type,Datum * out)861 Status TransferDate64(RecordReader* reader, MemoryPool* pool,
862 const std::shared_ptr<DataType>& type, Datum* out) {
863 int64_t length = reader->values_written();
864 auto values = reinterpret_cast<const int32_t*>(reader->values());
865
866 ARROW_ASSIGN_OR_RAISE(auto data,
867 ::arrow::AllocateBuffer(length * sizeof(int64_t), pool));
868 auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
869
870 for (int64_t i = 0; i < length; i++) {
871 *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
872 }
873
874 *out = std::make_shared<::arrow::Date64Array>(
875 type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count());
876 return Status::OK();
877 }
878
879 // ----------------------------------------------------------------------
880 // Binary, direct to dictionary-encoded
881
TransferDictionary(RecordReader * reader,const std::shared_ptr<DataType> & logical_value_type,std::shared_ptr<ChunkedArray> * out)882 Status TransferDictionary(RecordReader* reader,
883 const std::shared_ptr<DataType>& logical_value_type,
884 std::shared_ptr<ChunkedArray>* out) {
885 auto dict_reader = dynamic_cast<DictionaryRecordReader*>(reader);
886 DCHECK(dict_reader);
887 *out = dict_reader->GetResult();
888 if (!logical_value_type->Equals(*(*out)->type())) {
889 ARROW_ASSIGN_OR_RAISE(*out, (*out)->View(logical_value_type));
890 }
891 return Status::OK();
892 }
893
TransferBinary(RecordReader * reader,const std::shared_ptr<DataType> & logical_value_type,std::shared_ptr<ChunkedArray> * out)894 Status TransferBinary(RecordReader* reader,
895 const std::shared_ptr<DataType>& logical_value_type,
896 std::shared_ptr<ChunkedArray>* out) {
897 if (reader->read_dictionary()) {
898 return TransferDictionary(
899 reader, ::arrow::dictionary(::arrow::int32(), logical_value_type), out);
900 }
901 auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
902 DCHECK(binary_reader);
903 auto chunks = binary_reader->GetBuilderChunks();
904 for (const auto& chunk : chunks) {
905 if (!chunk->type()->Equals(*logical_value_type)) {
906 ARROW_ASSIGN_OR_RAISE(*out, ChunkedArray(chunks).View(logical_value_type));
907 return Status::OK();
908 }
909 }
910 *out = std::make_shared<ChunkedArray>(chunks, logical_value_type);
911 return Status::OK();
912 }
913
914 // ----------------------------------------------------------------------
915 // INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
916
BytesToInteger(const uint8_t * bytes,int32_t start,int32_t stop)917 static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) {
918 const int32_t length = stop - start;
919
920 DCHECK_GE(length, 0);
921 DCHECK_LE(length, 8);
922
923 switch (length) {
924 case 0:
925 return 0;
926 case 1:
927 return bytes[start];
928 case 2:
929 return FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
930 case 3: {
931 const uint64_t first_two_bytes = FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
932 const uint64_t last_byte = bytes[stop - 1];
933 return first_two_bytes << 8 | last_byte;
934 }
935 case 4:
936 return FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
937 case 5: {
938 const uint64_t first_four_bytes =
939 FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
940 const uint64_t last_byte = bytes[stop - 1];
941 return first_four_bytes << 8 | last_byte;
942 }
943 case 6: {
944 const uint64_t first_four_bytes =
945 FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
946 const uint64_t last_two_bytes =
947 FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
948 return first_four_bytes << 16 | last_two_bytes;
949 }
950 case 7: {
951 const uint64_t first_four_bytes =
952 FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
953 const uint64_t second_two_bytes =
954 FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
955 const uint64_t last_byte = bytes[stop - 1];
956 return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
957 }
958 case 8:
959 return FromBigEndian(SafeLoadAs<uint64_t>(bytes + start));
960 default: {
961 DCHECK(false);
962 return UINT64_MAX;
963 }
964 }
965 }
966
967 static constexpr int32_t kMinDecimalBytes = 1;
968 static constexpr int32_t kMaxDecimalBytes = 16;
969
970 /// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one
971 /// uint64_t (low bits).
BytesToIntegerPair(const uint8_t * bytes,const int32_t length,int64_t * out_high,uint64_t * out_low)972 static void BytesToIntegerPair(const uint8_t* bytes, const int32_t length,
973 int64_t* out_high, uint64_t* out_low) {
974 DCHECK_GE(length, kMinDecimalBytes);
975 DCHECK_LE(length, kMaxDecimalBytes);
976
977 // XXX This code is copied from Decimal::FromBigEndian
978
979 int64_t high, low;
980
981 // Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the
982 // sign bit.
983 const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
984
985 // 1. Extract the high bytes
986 // Stop byte of the high bytes
987 const int32_t high_bits_offset = std::max(0, length - 8);
988 const auto high_bits = BytesToInteger(bytes, 0, high_bits_offset);
989
990 if (high_bits_offset == 8) {
991 // Avoid undefined shift by 64 below
992 high = high_bits;
993 } else {
994 high = -1 * (is_negative && length < kMaxDecimalBytes);
995 // Shift left enough bits to make room for the incoming int64_t
996 high = SafeLeftShift(high, high_bits_offset * CHAR_BIT);
997 // Preserve the upper bits by inplace OR-ing the int64_t
998 high |= high_bits;
999 }
1000
1001 // 2. Extract the low bytes
1002 // Stop byte of the low bytes
1003 const int32_t low_bits_offset = std::min(length, 8);
1004 const auto low_bits = BytesToInteger(bytes, high_bits_offset, length);
1005
1006 if (low_bits_offset == 8) {
1007 // Avoid undefined shift by 64 below
1008 low = low_bits;
1009 } else {
1010 // Sign extend the low bits if necessary
1011 low = -1 * (is_negative && length < 8);
1012 // Shift left enough bits to make room for the incoming int64_t
1013 low = SafeLeftShift(low, low_bits_offset * CHAR_BIT);
1014 // Preserve the upper bits by inplace OR-ing the int64_t
1015 low |= low_bits;
1016 }
1017
1018 *out_high = high;
1019 *out_low = static_cast<uint64_t>(low);
1020 }
1021
RawBytesToDecimalBytes(const uint8_t * value,int32_t byte_width,uint8_t * out_buf)1022 static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
1023 uint8_t* out_buf) {
1024 // view the first 8 bytes as an unsigned 64-bit integer
1025 auto low = reinterpret_cast<uint64_t*>(out_buf);
1026
1027 // view the second 8 bytes as a signed 64-bit integer
1028 auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
1029
1030 // Convert the fixed size binary array bytes into a Decimal128 compatible layout
1031 BytesToIntegerPair(value, byte_width, high, low);
1032 }
1033
1034 template <typename T>
ConvertToDecimal128(const Array & array,const std::shared_ptr<DataType> &,MemoryPool * pool,std::shared_ptr<Array> *)1035 Status ConvertToDecimal128(const Array& array, const std::shared_ptr<DataType>&,
1036 MemoryPool* pool, std::shared_ptr<Array>*) {
1037 return Status::NotImplemented("not implemented");
1038 }
1039
1040 template <>
ConvertToDecimal128(const Array & array,const std::shared_ptr<DataType> & type,MemoryPool * pool,std::shared_ptr<Array> * out)1041 Status ConvertToDecimal128<FLBAType>(const Array& array,
1042 const std::shared_ptr<DataType>& type,
1043 MemoryPool* pool, std::shared_ptr<Array>* out) {
1044 const auto& fixed_size_binary_array =
1045 static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
1046
1047 // The byte width of each decimal value
1048 const int32_t type_length =
1049 static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
1050
1051 // number of elements in the entire array
1052 const int64_t length = fixed_size_binary_array.length();
1053
1054 // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
1055 // this will be different from the decimal array width because we write the minimum
1056 // number of bytes necessary to represent a given precision
1057 const int32_t byte_width =
1058 static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
1059 .byte_width();
1060 if (byte_width < kMinDecimalBytes || byte_width > kMaxDecimalBytes) {
1061 return Status::Invalid("Invalid FIXED_LEN_BYTE_ARRAY length for Decimal128");
1062 }
1063
1064 // allocate memory for the decimal array
1065 ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
1066
1067 // raw bytes that we can write to
1068 uint8_t* out_ptr = data->mutable_data();
1069
1070 // convert each FixedSizeBinary value to valid decimal bytes
1071 const int64_t null_count = fixed_size_binary_array.null_count();
1072 if (null_count > 0) {
1073 for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
1074 if (!fixed_size_binary_array.IsNull(i)) {
1075 RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
1076 }
1077 }
1078 } else {
1079 for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
1080 RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
1081 }
1082 }
1083
1084 *out = std::make_shared<::arrow::Decimal128Array>(
1085 type, length, std::move(data), fixed_size_binary_array.null_bitmap(), null_count);
1086
1087 return Status::OK();
1088 }
1089
1090 template <>
ConvertToDecimal128(const Array & array,const std::shared_ptr<DataType> & type,MemoryPool * pool,std::shared_ptr<Array> * out)1091 Status ConvertToDecimal128<ByteArrayType>(const Array& array,
1092 const std::shared_ptr<DataType>& type,
1093 MemoryPool* pool, std::shared_ptr<Array>* out) {
1094 const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array);
1095 const int64_t length = binary_array.length();
1096
1097 const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
1098 const int64_t type_length = decimal_type.byte_width();
1099
1100 ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
1101
1102 // raw bytes that we can write to
1103 uint8_t* out_ptr = data->mutable_data();
1104
1105 const int64_t null_count = binary_array.null_count();
1106
1107 // convert each BinaryArray value to valid decimal bytes
1108 for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
1109 int32_t record_len = 0;
1110 const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
1111
1112 if (record_len < 0 || record_len > type_length) {
1113 return Status::Invalid("Invalid BYTE_ARRAY length for Decimal128");
1114 }
1115
1116 auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
1117 out_ptr_view[0] = 0;
1118 out_ptr_view[1] = 0;
1119
1120 // only convert rows that are not null if there are nulls, or
1121 // all rows, if there are not
1122 if ((null_count > 0 && !binary_array.IsNull(i)) || null_count <= 0) {
1123 if (record_len <= 0) {
1124 return Status::Invalid("Invalid BYTE_ARRAY length for Decimal128");
1125 }
1126 RawBytesToDecimalBytes(record_loc, record_len, out_ptr);
1127 }
1128 }
1129
1130 *out = std::make_shared<::arrow::Decimal128Array>(
1131 type, length, std::move(data), binary_array.null_bitmap(), null_count);
1132 return Status::OK();
1133 }
1134
1135 /// \brief Convert an Int32 or Int64 array into a Decimal128Array
1136 /// The parquet spec allows systems to write decimals in int32, int64 if the values are
1137 /// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
1138 /// This function implements the conversion from int32 and int64 arrays to decimal arrays.
1139 template <
1140 typename ParquetIntegerType,
1141 typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType, Int32Type>::value ||
1142 std::is_same<ParquetIntegerType, Int64Type>::value>>
DecimalIntegerTransfer(RecordReader * reader,MemoryPool * pool,const std::shared_ptr<DataType> & type,Datum * out)1143 static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
1144 const std::shared_ptr<DataType>& type, Datum* out) {
1145 DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
1146
1147 const int64_t length = reader->values_written();
1148
1149 using ElementType = typename ParquetIntegerType::c_type;
1150 static_assert(std::is_same<ElementType, int32_t>::value ||
1151 std::is_same<ElementType, int64_t>::value,
1152 "ElementType must be int32_t or int64_t");
1153
1154 const auto values = reinterpret_cast<const ElementType*>(reader->values());
1155
1156 const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
1157 const int64_t type_length = decimal_type.byte_width();
1158
1159 ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
1160 uint8_t* out_ptr = data->mutable_data();
1161
1162 using ::arrow::BitUtil::FromLittleEndian;
1163
1164 for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
1165 // sign/zero extend int32_t values, otherwise a no-op
1166 const auto value = static_cast<int64_t>(values[i]);
1167
1168 auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
1169
1170 // No-op on little endian machines, byteswap on big endian
1171 out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value));
1172
1173 // no need to byteswap here because we're sign/zero extending exactly 8 bytes
1174 out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0);
1175 }
1176
1177 if (reader->nullable_values()) {
1178 std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
1179 *out = std::make_shared<::arrow::Decimal128Array>(type, length, std::move(data),
1180 is_valid, reader->null_count());
1181 } else {
1182 *out = std::make_shared<::arrow::Decimal128Array>(type, length, std::move(data));
1183 }
1184 return Status::OK();
1185 }
1186
1187 /// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
1188 /// We do this by:
1189 /// 1. Creating an arrow::BinaryArray from the RecordReader's builder
1190 /// 2. Allocating a buffer for the arrow::Decimal128Array
1191 /// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
1192 /// representing the high and low bits of each decimal value.
1193 template <typename ParquetType>
TransferDecimal(RecordReader * reader,MemoryPool * pool,const std::shared_ptr<DataType> & type,Datum * out)1194 Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
1195 const std::shared_ptr<DataType>& type, Datum* out) {
1196 DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
1197
1198 auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
1199 DCHECK(binary_reader);
1200 ::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
1201 for (size_t i = 0; i < chunks.size(); ++i) {
1202 std::shared_ptr<Array> chunk_as_decimal;
1203 RETURN_NOT_OK(
1204 ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal));
1205 // Replace the chunk, which will hopefully also free memory as we go
1206 chunks[i] = chunk_as_decimal;
1207 }
1208 *out = std::make_shared<ChunkedArray>(chunks, type);
1209 return Status::OK();
1210 }
1211
TransferExtension(RecordReader * reader,std::shared_ptr<DataType> value_type,const ColumnDescriptor * descr,MemoryPool * pool,Datum * out)1212 Status TransferExtension(RecordReader* reader, std::shared_ptr<DataType> value_type,
1213 const ColumnDescriptor* descr, MemoryPool* pool, Datum* out) {
1214 std::shared_ptr<ChunkedArray> result;
1215 auto ext_type = std::static_pointer_cast<::arrow::ExtensionType>(value_type);
1216 auto storage_type = ext_type->storage_type();
1217 RETURN_NOT_OK(TransferColumnData(reader, storage_type, descr, pool, &result));
1218
1219 ::arrow::ArrayVector out_chunks(result->num_chunks());
1220 for (int i = 0; i < result->num_chunks(); i++) {
1221 auto chunk = result->chunk(i);
1222 auto ext_data = chunk->data()->Copy();
1223 ext_data->type = ext_type;
1224 auto ext_result = ext_type->MakeArray(ext_data);
1225 out_chunks[i] = ext_result;
1226 }
1227 *out = std::make_shared<ChunkedArray>(out_chunks);
1228 return Status::OK();
1229 }
1230
1231 #define TRANSFER_INT32(ENUM, ArrowType) \
1232 case ::arrow::Type::ENUM: { \
1233 Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_type, &result); \
1234 RETURN_NOT_OK(s); \
1235 } break;
1236
1237 #define TRANSFER_INT64(ENUM, ArrowType) \
1238 case ::arrow::Type::ENUM: { \
1239 Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_type, &result); \
1240 RETURN_NOT_OK(s); \
1241 } break;
1242
TransferColumnData(RecordReader * reader,std::shared_ptr<DataType> value_type,const ColumnDescriptor * descr,MemoryPool * pool,std::shared_ptr<ChunkedArray> * out)1243 Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType> value_type,
1244 const ColumnDescriptor* descr, MemoryPool* pool,
1245 std::shared_ptr<ChunkedArray>* out) {
1246 Datum result;
1247 std::shared_ptr<ChunkedArray> chunked_result;
1248 switch (value_type->id()) {
1249 case ::arrow::Type::DICTIONARY: {
1250 RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result));
1251 result = chunked_result;
1252 } break;
1253 case ::arrow::Type::NA: {
1254 result = std::make_shared<::arrow::NullArray>(reader->values_written());
1255 break;
1256 }
1257 case ::arrow::Type::INT32:
1258 case ::arrow::Type::INT64:
1259 case ::arrow::Type::FLOAT:
1260 case ::arrow::Type::DOUBLE:
1261 result = TransferZeroCopy(reader, value_type);
1262 break;
1263 case ::arrow::Type::BOOL:
1264 RETURN_NOT_OK(TransferBool(reader, pool, &result));
1265 break;
1266 TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
1267 TRANSFER_INT32(INT8, ::arrow::Int8Type);
1268 TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
1269 TRANSFER_INT32(INT16, ::arrow::Int16Type);
1270 TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
1271 TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
1272 TRANSFER_INT32(DATE32, ::arrow::Date32Type);
1273 TRANSFER_INT32(TIME32, ::arrow::Time32Type);
1274 TRANSFER_INT64(TIME64, ::arrow::Time64Type);
1275 case ::arrow::Type::DATE64:
1276 RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result));
1277 break;
1278 case ::arrow::Type::FIXED_SIZE_BINARY:
1279 case ::arrow::Type::BINARY:
1280 case ::arrow::Type::STRING: {
1281 RETURN_NOT_OK(TransferBinary(reader, value_type, &chunked_result));
1282 result = chunked_result;
1283 } break;
1284 case ::arrow::Type::DECIMAL: {
1285 switch (descr->physical_type()) {
1286 case ::parquet::Type::INT32: {
1287 RETURN_NOT_OK(
1288 DecimalIntegerTransfer<Int32Type>(reader, pool, value_type, &result));
1289 } break;
1290 case ::parquet::Type::INT64: {
1291 RETURN_NOT_OK(
1292 DecimalIntegerTransfer<Int64Type>(reader, pool, value_type, &result));
1293 } break;
1294 case ::parquet::Type::BYTE_ARRAY: {
1295 RETURN_NOT_OK(
1296 TransferDecimal<ByteArrayType>(reader, pool, value_type, &result));
1297 } break;
1298 case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
1299 RETURN_NOT_OK(TransferDecimal<FLBAType>(reader, pool, value_type, &result));
1300 } break;
1301 default:
1302 return Status::Invalid(
1303 "Physical type for decimal must be int32, int64, byte array, or fixed "
1304 "length binary");
1305 }
1306 } break;
1307 case ::arrow::Type::TIMESTAMP: {
1308 const ::arrow::TimestampType& timestamp_type =
1309 static_cast<::arrow::TimestampType&>(*value_type);
1310 switch (timestamp_type.unit()) {
1311 case ::arrow::TimeUnit::MILLI:
1312 case ::arrow::TimeUnit::MICRO: {
1313 result = TransferZeroCopy(reader, value_type);
1314 } break;
1315 case ::arrow::TimeUnit::NANO: {
1316 if (descr->physical_type() == ::parquet::Type::INT96) {
1317 RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result));
1318 } else {
1319 result = TransferZeroCopy(reader, value_type);
1320 }
1321 } break;
1322 default:
1323 return Status::NotImplemented("TimeUnit not supported");
1324 }
1325 } break;
1326 case ::arrow::Type::EXTENSION: {
1327 RETURN_NOT_OK(TransferExtension(reader, value_type, descr, pool, &result));
1328 } break;
1329 default:
1330 return Status::NotImplemented("No support for reading columns of type ",
1331 value_type->ToString());
1332 }
1333
1334 DCHECK_NE(result.kind(), Datum::NONE);
1335
1336 if (result.kind() == Datum::ARRAY) {
1337 *out = std::make_shared<ChunkedArray>(result.make_array());
1338 } else if (result.kind() == Datum::CHUNKED_ARRAY) {
1339 *out = result.chunked_array();
1340 } else {
1341 DCHECK(false) << "Should be impossible";
1342 }
1343
1344 return Status::OK();
1345 }
1346
ReconstructNestedList(const std::shared_ptr<Array> & arr,std::shared_ptr<Field> field,int16_t max_def_level,int16_t max_rep_level,const int16_t * def_levels,const int16_t * rep_levels,int64_t total_levels,::arrow::MemoryPool * pool,std::shared_ptr<Array> * out)1347 Status ReconstructNestedList(const std::shared_ptr<Array>& arr,
1348 std::shared_ptr<Field> field, int16_t max_def_level,
1349 int16_t max_rep_level, const int16_t* def_levels,
1350 const int16_t* rep_levels, int64_t total_levels,
1351 ::arrow::MemoryPool* pool, std::shared_ptr<Array>* out) {
1352 // Walk downwards to extract nullability
1353 std::vector<std::string> item_names;
1354 std::vector<bool> nullable;
1355 std::vector<std::shared_ptr<const ::arrow::KeyValueMetadata>> field_metadata;
1356 std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
1357 std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
1358 nullable.push_back(field->nullable());
1359 while (field->type()->num_fields() > 0) {
1360 if (field->type()->num_fields() > 1) {
1361 return Status::NotImplemented("Fields with more than one child are not supported.");
1362 } else {
1363 if (field->type()->id() != ::arrow::Type::LIST) {
1364 return Status::NotImplemented("Currently only nesting with Lists is supported.");
1365 }
1366 field = field->type()->field(0);
1367 }
1368 item_names.push_back(field->name());
1369 offset_builders.emplace_back(
1370 std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool));
1371 valid_bits_builders.emplace_back(
1372 std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool));
1373 nullable.push_back(field->nullable());
1374 field_metadata.push_back(field->metadata());
1375 }
1376
1377 int64_t list_depth = offset_builders.size();
1378 // This describes the minimal definition that describes a level that
1379 // reflects a value in the primitive values array.
1380 int16_t values_def_level = max_def_level;
1381 if (nullable[nullable.size() - 1]) {
1382 values_def_level--;
1383 }
1384
1385 // The definition levels that are needed so that a list is declared
1386 // as empty and not null.
1387 std::vector<int16_t> empty_def_level(list_depth);
1388 int def_level = 0;
1389 for (int i = 0; i < list_depth; i++) {
1390 if (nullable[i]) {
1391 def_level++;
1392 }
1393 empty_def_level[i] = static_cast<int16_t>(def_level);
1394 def_level++;
1395 }
1396
1397 int32_t values_offset = 0;
1398 std::vector<int64_t> null_counts(list_depth, 0);
1399 for (int64_t i = 0; i < total_levels; i++) {
1400 int16_t rep_level = rep_levels[i];
1401 if (rep_level < max_rep_level) {
1402 for (int64_t j = rep_level; j < list_depth; j++) {
1403 if (j == (list_depth - 1)) {
1404 RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
1405 } else {
1406 RETURN_NOT_OK(offset_builders[j]->Append(
1407 static_cast<int32_t>(offset_builders[j + 1]->length())));
1408 }
1409
1410 if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
1411 RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
1412 null_counts[j]++;
1413 break;
1414 } else {
1415 RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
1416 if (empty_def_level[j] == def_levels[i]) {
1417 break;
1418 }
1419 }
1420 }
1421 }
1422 if (def_levels[i] >= values_def_level) {
1423 values_offset++;
1424 }
1425 }
1426 // Add the final offset to all lists
1427 for (int64_t j = 0; j < list_depth; j++) {
1428 if (j == (list_depth - 1)) {
1429 RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
1430 } else {
1431 RETURN_NOT_OK(offset_builders[j]->Append(
1432 static_cast<int32_t>(offset_builders[j + 1]->length())));
1433 }
1434 }
1435
1436 std::vector<std::shared_ptr<Buffer>> offsets;
1437 std::vector<std::shared_ptr<Buffer>> valid_bits;
1438 std::vector<int64_t> list_lengths;
1439 for (int64_t j = 0; j < list_depth; j++) {
1440 list_lengths.push_back(offset_builders[j]->length() - 1);
1441 std::shared_ptr<Array> array;
1442 RETURN_NOT_OK(offset_builders[j]->Finish(&array));
1443 offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
1444 RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
1445 valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
1446 }
1447
1448 *out = arr;
1449
1450 // TODO(wesm): Use passed-in field
1451 for (int64_t j = list_depth - 1; j >= 0; j--) {
1452 auto list_type = ::arrow::list(::arrow::field(item_names[j], (*out)->type(),
1453 nullable[j + 1], field_metadata[j]));
1454 *out = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
1455 *out, valid_bits[j], null_counts[j]);
1456 }
1457 return Status::OK();
1458 }
1459
1460 } // namespace arrow
1461 } // namespace parquet
1462