1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // Overview.
19 //
20 // The strategy used for this code for repetition/definition
21 // is to dissect the top level array into a list of paths
22 // from the top level array to the final primitive (possibly
23 // dictionary encoded array). It then evaluates each one of
24 // those paths to produce results for the callback iteratively.
25 //
26 // This approach was taken to reduce the aggregate memory required if we were
27 // to build all def/rep levels in parallel as apart of a tree traversal. It
28 // also allows for straightforward parallelization at the path level if that is
29 // desired in the future.
30 //
31 // The main downside to this approach is it duplicates effort for nodes
32 // that share common ancestors. This can be mitigated to some degree
33 // by adding in optimizations that detect leaf arrays that share
34 // the same common list ancestor and reuse the repetition levels
35 // from the first leaf encountered (only definition levels greater
36 // the list ancestor need to be re-evaluated. This is left for future
37 // work.
38 //
39 // Algorithm.
40 //
41 // As mentioned above this code dissects arrays into constituent parts:
42 // nullability data, and list offset data. It tries to optimize for
43 // some special cases, where it is known ahead of time that a step
44 // can be skipped (e.g. a nullable array happens to have all of its
45 // values) or batch filled (a nullable array has all null values).
46 // One further optimization that is not implemented but could be done
47 // in the future is special handling for nested list arrays that
48 // have some intermediate data which indicates the final array contains only
49 // nulls.
50 //
51 // In general, the algorithm attempts to batch work at each node as much
52 // as possible. For nullability nodes this means finding runs of null
53 // values and batch filling those interspersed with finding runs of non-null values
54 // to process in batch at the next column.
55 //
56 // Similarly, list runs of empty lists are all processed in one batch
57 // followed by either:
58 // - A single list entry for non-terminal lists (i.e. the upper part of a nested list)
59 // - Runs of non-empty lists for the terminal list (i.e. the lowest part of a nested
60 // list).
61 //
62 // This makes use of the following observations.
63 // 1. Null values at any node on the path are terminal (repetition and definition
64 // level can be set directly when a Null value is encountered).
65 // 2. Empty lists share this eager termination property with Null values.
66 // 3. In order to keep repetition/definition level populated the algorithm is lazy
67 // in assigning repetition levels. The algorithm tracks whether it is currently
68 // in the middle of a list by comparing the lengths of repetition/definition levels.
69 // If it is currently in the middle of a list the the number of repetition levels
70 // populated will be greater than definition levels (the start of a List requires
71 // adding the first element). If there are equal numbers of definition and repetition
72 // levels populated this indicates a list is waiting to be started and the next list
73 // encountered will have its repetition level signify the beginning of the list.
74 //
75 // Other implementation notes.
76 //
77 // This code hasn't been benchmarked (or assembly analyzed) but did the following
78 // as optimizations (yes premature optimization is the root of all evil).
79 // - This code does not use recursion, instead it constructs its own stack and manages
80 // updating elements accordingly.
81 // - It tries to avoid using Status for common return states.
82 // - Avoids virtual dispatch in favor of if/else statements on a set of well known
83 // classes.
84
85 #include "parquet/arrow/path_internal.h"
86
87 #include <atomic>
88 #include <cstddef>
89 #include <memory>
90 #include <type_traits>
91 #include <utility>
92 #include <vector>
93
94 #include "arrow/array.h"
95 #include "arrow/buffer.h"
96 #include "arrow/buffer_builder.h"
97 #include "arrow/extension_type.h"
98 #include "arrow/memory_pool.h"
99 #include "arrow/type.h"
100 #include "arrow/type_traits.h"
101 #include "arrow/util/bit_run_reader.h"
102 #include "arrow/util/bit_util.h"
103 #include "arrow/util/bitmap_visit.h"
104 #include "arrow/util/logging.h"
105 #include "arrow/util/macros.h"
106 #include "arrow/util/make_unique.h"
107 #include "arrow/util/variant.h"
108 #include "arrow/visitor_inline.h"
109 #include "parquet/properties.h"
110
111 namespace parquet {
112 namespace arrow {
113
114 namespace {
115
116 using ::arrow::Array;
117 using ::arrow::Status;
118 using ::arrow::TypedBufferBuilder;
119
120 constexpr static int16_t kLevelNotSet = -1;
121
122 /// \brief Simple result of a iterating over a column to determine values.
123 enum IterationResult {
124 /// Processing is done at this node. Move back up the path
125 /// to continue processing.
126 kDone = -1,
127 /// Move down towards the leaf for processing.
128 kNext = 1,
129 /// An error occurred while processing.
130 kError = 2
131 };
132
133 #define RETURN_IF_ERROR(iteration_result) \
134 do { \
135 if (ARROW_PREDICT_FALSE(iteration_result == kError)) { \
136 return iteration_result; \
137 } \
138 } while (false)
139
LazyNullCount(const Array & array)140 int64_t LazyNullCount(const Array& array) { return array.data()->null_count.load(); }
141
LazyNoNulls(const Array & array)142 bool LazyNoNulls(const Array& array) {
143 int64_t null_count = LazyNullCount(array);
144 return null_count == 0 ||
145 // kUnkownNullCount comparison is needed to account
146 // for null arrays.
147 (null_count == ::arrow::kUnknownNullCount &&
148 array.null_bitmap_data() == nullptr);
149 }
150
151 struct PathWriteContext {
PathWriteContextparquet::arrow::__anonedc2b3750111::PathWriteContext152 PathWriteContext(::arrow::MemoryPool* pool,
153 std::shared_ptr<::arrow::ResizableBuffer> def_levels_buffer)
154 : rep_levels(pool), def_levels(std::move(def_levels_buffer), pool) {}
ReserveDefLevelsparquet::arrow::__anonedc2b3750111::PathWriteContext155 IterationResult ReserveDefLevels(int64_t elements) {
156 last_status = def_levels.Reserve(elements);
157 if (ARROW_PREDICT_TRUE(last_status.ok())) {
158 return kDone;
159 }
160 return kError;
161 }
162
AppendDefLevelparquet::arrow::__anonedc2b3750111::PathWriteContext163 IterationResult AppendDefLevel(int16_t def_level) {
164 last_status = def_levels.Append(def_level);
165 if (ARROW_PREDICT_TRUE(last_status.ok())) {
166 return kDone;
167 }
168 return kError;
169 }
170
AppendDefLevelsparquet::arrow::__anonedc2b3750111::PathWriteContext171 IterationResult AppendDefLevels(int64_t count, int16_t def_level) {
172 last_status = def_levels.Append(count, def_level);
173 if (ARROW_PREDICT_TRUE(last_status.ok())) {
174 return kDone;
175 }
176 return kError;
177 }
178
UnsafeAppendDefLevelparquet::arrow::__anonedc2b3750111::PathWriteContext179 void UnsafeAppendDefLevel(int16_t def_level) { def_levels.UnsafeAppend(def_level); }
180
AppendRepLevelparquet::arrow::__anonedc2b3750111::PathWriteContext181 IterationResult AppendRepLevel(int16_t rep_level) {
182 last_status = rep_levels.Append(rep_level);
183
184 if (ARROW_PREDICT_TRUE(last_status.ok())) {
185 return kDone;
186 }
187 return kError;
188 }
189
AppendRepLevelsparquet::arrow::__anonedc2b3750111::PathWriteContext190 IterationResult AppendRepLevels(int64_t count, int16_t rep_level) {
191 last_status = rep_levels.Append(count, rep_level);
192 if (ARROW_PREDICT_TRUE(last_status.ok())) {
193 return kDone;
194 }
195 return kError;
196 }
197
EqualRepDefLevelsLengthsparquet::arrow::__anonedc2b3750111::PathWriteContext198 bool EqualRepDefLevelsLengths() const {
199 return rep_levels.length() == def_levels.length();
200 }
201
202 // Incorporates |range| into visited elements. If the |range| is contiguous
203 // with the last range, extend the last range, otherwise add |range| separately
204 // tot he list.
RecordPostListVisitparquet::arrow::__anonedc2b3750111::PathWriteContext205 void RecordPostListVisit(const ElementRange& range) {
206 if (!visited_elements.empty() && range.start == visited_elements.back().end) {
207 visited_elements.back().end = range.end;
208 return;
209 }
210 visited_elements.push_back(range);
211 }
212
213 Status last_status;
214 TypedBufferBuilder<int16_t> rep_levels;
215 TypedBufferBuilder<int16_t> def_levels;
216 std::vector<ElementRange> visited_elements;
217 };
218
FillRepLevels(int64_t count,int16_t rep_level,PathWriteContext * context)219 IterationResult FillRepLevels(int64_t count, int16_t rep_level,
220 PathWriteContext* context) {
221 if (rep_level == kLevelNotSet) {
222 return kDone;
223 }
224 int64_t fill_count = count;
225 // This condition occurs (rep and dep levels equals), in one of
226 // in a few cases:
227 // 1. Before any list is encountered.
228 // 2. After rep-level has been filled in due to null/empty
229 // values above it.
230 // 3. After finishing a list.
231 if (!context->EqualRepDefLevelsLengths()) {
232 fill_count--;
233 }
234 return context->AppendRepLevels(fill_count, rep_level);
235 }
236
237 // A node for handling an array that is discovered to have all
238 // null elements. It is referred to as a TerminalNode because
239 // traversal of nodes will not continue it when generating
240 // rep/def levels. However, there could be many nested children
241 // elements beyond it in the Array that is being processed.
242 class AllNullsTerminalNode {
243 public:
AllNullsTerminalNode(int16_t def_level,int16_t rep_level=kLevelNotSet)244 explicit AllNullsTerminalNode(int16_t def_level, int16_t rep_level = kLevelNotSet)
245 : def_level_(def_level), rep_level_(rep_level) {}
SetRepLevelIfNull(int16_t rep_level)246 void SetRepLevelIfNull(int16_t rep_level) { rep_level_ = rep_level; }
Run(const ElementRange & range,PathWriteContext * context)247 IterationResult Run(const ElementRange& range, PathWriteContext* context) {
248 int64_t size = range.Size();
249 RETURN_IF_ERROR(FillRepLevels(size, rep_level_, context));
250 return context->AppendDefLevels(size, def_level_);
251 }
252
253 private:
254 int16_t def_level_;
255 int16_t rep_level_;
256 };
257
258 // Handles the case where all remaining arrays until the leaf have no nulls
259 // (and are not interrupted by lists). Unlike AllNullsTerminalNode this is
260 // always the last node in a path. We don't need an analogue to the AllNullsTerminalNode
261 // because if all values are present at an intermediate array no node is added for it
262 // (the def-level for the next nullable node is incremented).
263 struct AllPresentTerminalNode {
Runparquet::arrow::__anonedc2b3750111::AllPresentTerminalNode264 IterationResult Run(const ElementRange& range, PathWriteContext* context) {
265 return context->AppendDefLevels(range.end - range.start, def_level);
266 // No need to worry about rep levels, because this state should
267 // only be applicable for after all list/repeated values
268 // have been evaluated in the path.
269 }
270 int16_t def_level;
271 };
272
273 /// Node for handling the case when the leaf-array is nullable
274 /// and contains null elements.
275 struct NullableTerminalNode {
276 NullableTerminalNode() = default;
277
NullableTerminalNodeparquet::arrow::__anonedc2b3750111::NullableTerminalNode278 NullableTerminalNode(const uint8_t* bitmap, int64_t element_offset,
279 int16_t def_level_if_present)
280 : bitmap_(bitmap),
281 element_offset_(element_offset),
282 def_level_if_present_(def_level_if_present),
283 def_level_if_null_(def_level_if_present - 1) {}
284
Runparquet::arrow::__anonedc2b3750111::NullableTerminalNode285 IterationResult Run(const ElementRange& range, PathWriteContext* context) {
286 int64_t elements = range.Size();
287 RETURN_IF_ERROR(context->ReserveDefLevels(elements));
288
289 DCHECK_GT(elements, 0);
290
291 auto bit_visitor = [&](bool is_set) {
292 context->UnsafeAppendDefLevel(is_set ? def_level_if_present_ : def_level_if_null_);
293 };
294
295 if (elements > 16) { // 16 guarantees at least one unrolled loop.
296 ::arrow::internal::VisitBitsUnrolled(bitmap_, range.start + element_offset_,
297 elements, bit_visitor);
298 } else {
299 ::arrow::internal::VisitBits(bitmap_, range.start + element_offset_, elements,
300 bit_visitor);
301 }
302 return kDone;
303 }
304 const uint8_t* bitmap_;
305 int64_t element_offset_;
306 int16_t def_level_if_present_;
307 int16_t def_level_if_null_;
308 };
309
310 // List nodes handle populating rep_level for Arrow Lists and def-level for empty lists.
311 // Nullability (both list and children) is handled by other Nodes. By
312 // construction all list nodes will be intermediate nodes (they will always be followed by
313 // at least one other node).
314 //
315 // Type parameters:
316 // |RangeSelector| - A strategy for determine the the range of the child node to
317 // process.
318 // this varies depending on the type of list (int32_t* offsets, int64_t* offsets of
319 // fixed.
320 template <typename RangeSelector>
321 class ListPathNode {
322 public:
ListPathNode(RangeSelector selector,int16_t rep_lev,int16_t def_level_if_empty)323 ListPathNode(RangeSelector selector, int16_t rep_lev, int16_t def_level_if_empty)
324 : selector_(std::move(selector)),
325 prev_rep_level_(rep_lev - 1),
326 rep_level_(rep_lev),
327 def_level_if_empty_(def_level_if_empty) {}
328
rep_level() const329 int16_t rep_level() const { return rep_level_; }
330
Run(ElementRange * range,ElementRange * child_range,PathWriteContext * context)331 IterationResult Run(ElementRange* range, ElementRange* child_range,
332 PathWriteContext* context) {
333 if (range->Empty()) {
334 return kDone;
335 }
336 // Find the first non-empty list (skipping a run of empties).
337 int64_t empty_elements = 0;
338 do {
339 // Retrieve the range of elements that this list contains.
340 *child_range = selector_.GetRange(range->start);
341 if (!child_range->Empty()) {
342 break;
343 }
344 ++empty_elements;
345 ++range->start;
346 } while (!range->Empty());
347
348 // Post condition:
349 // * range is either empty (we are done processing at this node)
350 // or start corresponds a non-empty list.
351 // * If range is non-empty child_range contains
352 // the bounds of non-empty list.
353
354 // Handle any skipped over empty lists.
355 if (empty_elements > 0) {
356 RETURN_IF_ERROR(FillRepLevels(empty_elements, prev_rep_level_, context));
357 RETURN_IF_ERROR(context->AppendDefLevels(empty_elements, def_level_if_empty_));
358 }
359 // Start of a new list. Note that for nested lists adding the element
360 // here effectively suppresses this code until we either encounter null
361 // elements or empty lists between here and the innermost list (since
362 // we make the rep levels repetition and definition levels unequal).
363 // Similarly when we are backtracking up the stack the repetition and
364 // definition levels are again equal so if we encounter an intermediate list
365 // with more elements this will detect it as a new list.
366 if (context->EqualRepDefLevelsLengths() && !range->Empty()) {
367 RETURN_IF_ERROR(context->AppendRepLevel(prev_rep_level_));
368 }
369
370 if (range->Empty()) {
371 return kDone;
372 }
373
374 ++range->start;
375 if (is_last_) {
376 // If this is the last repeated node, we can extend try
377 // to extend the child range as wide as possible before
378 // continuing to the next node.
379 return FillForLast(range, child_range, context);
380 }
381 return kNext;
382 }
383
SetLast()384 void SetLast() { is_last_ = true; }
385
386 private:
FillForLast(ElementRange * range,ElementRange * child_range,PathWriteContext * context)387 IterationResult FillForLast(ElementRange* range, ElementRange* child_range,
388 PathWriteContext* context) {
389 // First fill int the remainder of the list.
390 RETURN_IF_ERROR(FillRepLevels(child_range->Size(), rep_level_, context));
391 // Once we've reached this point the following preconditions should hold:
392 // 1. There are no more repeated path nodes to deal with.
393 // 2. All elements in |range| represent contiguous elements in the
394 // child array (Null values would have shortened the range to ensure
395 // all remaining list elements are present (though they may be empty lists)).
396 // 3. No element of range spans a parent list (intermediate
397 // list nodes only handle one list entry at a time).
398 //
399 // Given these preconditions it should be safe to fill runs on non-empty
400 // lists here and expand the range in the child node accordingly.
401
402 while (!range->Empty()) {
403 ElementRange size_check = selector_.GetRange(range->start);
404 if (size_check.Empty()) {
405 // The empty range will need to be handled after we pass down the accumulated
406 // range because it affects def_level placement and we need to get the children
407 // def_levels entered first.
408 break;
409 }
410 // This is the start of a new list. We can be sure it only applies
411 // to the previous list (and doesn't jump to the start of any list
412 // further up in nesting due to the constraints mentioned at the start
413 // of the function).
414 RETURN_IF_ERROR(context->AppendRepLevel(prev_rep_level_));
415 RETURN_IF_ERROR(context->AppendRepLevels(size_check.Size() - 1, rep_level_));
416 DCHECK_EQ(size_check.start, child_range->end)
417 << size_check.start << " != " << child_range->end;
418 child_range->end = size_check.end;
419 ++range->start;
420 }
421
422 // Do book-keeping to track the elements of the arrays that are actually visited
423 // beyond this point. This is necessary to identify "gaps" in values that should
424 // not be processed (written out to parquet).
425 context->RecordPostListVisit(*child_range);
426 return kNext;
427 }
428
429 RangeSelector selector_;
430 int16_t prev_rep_level_;
431 int16_t rep_level_;
432 int16_t def_level_if_empty_;
433 bool is_last_ = false;
434 };
435
436 template <typename OffsetType>
437 struct VarRangeSelector {
GetRangeparquet::arrow::__anonedc2b3750111::VarRangeSelector438 ElementRange GetRange(int64_t index) const {
439 return ElementRange{offsets[index], offsets[index + 1]};
440 }
441
442 // Either int32_t* or int64_t*.
443 const OffsetType* offsets;
444 };
445
446 struct FixedSizedRangeSelector {
GetRangeparquet::arrow::__anonedc2b3750111::FixedSizedRangeSelector447 ElementRange GetRange(int64_t index) const {
448 int64_t start = index * list_size;
449 return ElementRange{start, start + list_size};
450 }
451 int list_size;
452 };
453
454 // An intermediate node that handles null values.
455 class NullableNode {
456 public:
NullableNode(const uint8_t * null_bitmap,int64_t entry_offset,int16_t def_level_if_null,int16_t rep_level_if_null=kLevelNotSet)457 NullableNode(const uint8_t* null_bitmap, int64_t entry_offset,
458 int16_t def_level_if_null, int16_t rep_level_if_null = kLevelNotSet)
459 : null_bitmap_(null_bitmap),
460 entry_offset_(entry_offset),
461 valid_bits_reader_(MakeReader(ElementRange{0, 0})),
462 def_level_if_null_(def_level_if_null),
463 rep_level_if_null_(rep_level_if_null),
464 new_range_(true) {}
465
SetRepLevelIfNull(int16_t rep_level)466 void SetRepLevelIfNull(int16_t rep_level) { rep_level_if_null_ = rep_level; }
467
MakeReader(const ElementRange & range)468 ::arrow::internal::BitRunReader MakeReader(const ElementRange& range) {
469 return ::arrow::internal::BitRunReader(null_bitmap_, entry_offset_ + range.start,
470 range.Size());
471 }
472
Run(ElementRange * range,ElementRange * child_range,PathWriteContext * context)473 IterationResult Run(ElementRange* range, ElementRange* child_range,
474 PathWriteContext* context) {
475 if (new_range_) {
476 // Reset the reader each time we are starting fresh on a range.
477 // We can't rely on continuity because nulls above can
478 // cause discontinuities.
479 valid_bits_reader_ = MakeReader(*range);
480 }
481 child_range->start = range->start;
482 ::arrow::internal::BitRun run = valid_bits_reader_.NextRun();
483 if (!run.set) {
484 range->start += run.length;
485 RETURN_IF_ERROR(FillRepLevels(run.length, rep_level_if_null_, context));
486 RETURN_IF_ERROR(context->AppendDefLevels(run.length, def_level_if_null_));
487 run = valid_bits_reader_.NextRun();
488 }
489 if (range->Empty()) {
490 new_range_ = true;
491 return kDone;
492 }
493 child_range->end = child_range->start = range->start;
494 child_range->end += run.length;
495
496 DCHECK(!child_range->Empty());
497 range->start += child_range->Size();
498 new_range_ = false;
499 return kNext;
500 }
501
502 const uint8_t* null_bitmap_;
503 int64_t entry_offset_;
504 ::arrow::internal::BitRunReader valid_bits_reader_;
505 int16_t def_level_if_null_;
506 int16_t rep_level_if_null_;
507
508 // Whether the next invocation will be a new range.
509 bool new_range_ = true;
510 };
511
512 using ListNode = ListPathNode<VarRangeSelector<int32_t>>;
513 using LargeListNode = ListPathNode<VarRangeSelector<int64_t>>;
514 using FixedSizeListNode = ListPathNode<FixedSizedRangeSelector>;
515
516 // Contains static information derived from traversing the schema.
517 struct PathInfo {
518 // The vectors are expected to the same length info.
519
520 // Note index order matters here.
521 using Node = ::arrow::util::Variant<NullableTerminalNode, ListNode, LargeListNode,
522 FixedSizeListNode, NullableNode,
523 AllPresentTerminalNode, AllNullsTerminalNode>;
524
525 std::vector<Node> path;
526 std::shared_ptr<Array> primitive_array;
527 int16_t max_def_level = 0;
528 int16_t max_rep_level = 0;
529 bool has_dictionary = false;
530 bool leaf_is_nullable = false;
531 };
532
533 /// Contains logic for writing a single leaf node to parquet.
534 /// This tracks the path from root to leaf.
535 ///
536 /// |writer| will be called after all of the definition/repetition
537 /// values have been calculated for root_range with the calculated
538 /// values. It is intended to abstract the complexity of writing
539 /// the levels and values to parquet.
WritePath(ElementRange root_range,PathInfo * path_info,ArrowWriteContext * arrow_context,MultipathLevelBuilder::CallbackFunction writer)540 Status WritePath(ElementRange root_range, PathInfo* path_info,
541 ArrowWriteContext* arrow_context,
542 MultipathLevelBuilder::CallbackFunction writer) {
543 std::vector<ElementRange> stack(path_info->path.size());
544 MultipathLevelBuilderResult builder_result;
545 builder_result.leaf_array = path_info->primitive_array;
546 builder_result.leaf_is_nullable = path_info->leaf_is_nullable;
547
548 if (path_info->max_def_level == 0) {
549 // This case only occurs when there are no nullable or repeated
550 // columns in the path from the root to leaf.
551 int64_t leaf_length = builder_result.leaf_array->length();
552 builder_result.def_rep_level_count = leaf_length;
553 builder_result.post_list_visited_elements.push_back({0, leaf_length});
554 return writer(builder_result);
555 }
556 stack[0] = root_range;
557 RETURN_NOT_OK(
558 arrow_context->def_levels_buffer->Resize(/*new_size=*/0, /*shrink_to_fit*/ false));
559 PathWriteContext context(arrow_context->memory_pool, arrow_context->def_levels_buffer);
560 // We should need at least this many entries so reserve the space ahead of time.
561 RETURN_NOT_OK(context.def_levels.Reserve(root_range.Size()));
562 if (path_info->max_rep_level > 0) {
563 RETURN_NOT_OK(context.rep_levels.Reserve(root_range.Size()));
564 }
565
566 auto stack_base = &stack[0];
567 auto stack_position = stack_base;
568 // This is the main loop for calculated rep/def levels. The nodes
569 // in the path implement a chain-of-responsibility like pattern
570 // where each node can add some number of repetition/definition
571 // levels to PathWriteContext and also delegate to the next node
572 // in the path to add values. The values are added through each Run(...)
573 // call and the choice to delegate to the next node (or return to the
574 // previous node) is communicated by the return value of Run(...).
575 // The loop terminates after the first node indicates all values in
576 // |root_range| are processed.
577 while (stack_position >= stack_base) {
578 PathInfo::Node& node = path_info->path[stack_position - stack_base];
579 struct {
580 IterationResult operator()(NullableNode* node) {
581 return node->Run(stack_position, stack_position + 1, context);
582 }
583 IterationResult operator()(ListNode* node) {
584 return node->Run(stack_position, stack_position + 1, context);
585 }
586 IterationResult operator()(NullableTerminalNode* node) {
587 return node->Run(*stack_position, context);
588 }
589 IterationResult operator()(FixedSizeListNode* node) {
590 return node->Run(stack_position, stack_position + 1, context);
591 }
592 IterationResult operator()(AllPresentTerminalNode* node) {
593 return node->Run(*stack_position, context);
594 }
595 IterationResult operator()(AllNullsTerminalNode* node) {
596 return node->Run(*stack_position, context);
597 }
598 IterationResult operator()(LargeListNode* node) {
599 return node->Run(stack_position, stack_position + 1, context);
600 }
601 ElementRange* stack_position;
602 PathWriteContext* context;
603 } visitor = {stack_position, &context};
604
605 IterationResult result = ::arrow::util::visit(visitor, &node);
606
607 if (ARROW_PREDICT_FALSE(result == kError)) {
608 DCHECK(!context.last_status.ok());
609 return context.last_status;
610 }
611 stack_position += static_cast<int>(result);
612 }
613 RETURN_NOT_OK(context.last_status);
614 builder_result.def_rep_level_count = context.def_levels.length();
615
616 if (context.rep_levels.length() > 0) {
617 // This case only occurs when there was a repeated element that needs to be
618 // processed.
619 builder_result.rep_levels = context.rep_levels.data();
620 std::swap(builder_result.post_list_visited_elements, context.visited_elements);
621 // If it is possible when processing lists that all lists where empty. In this
622 // case no elements would have been added to post_list_visited_elements. By
623 // added an empty element we avoid special casing in downstream consumers.
624 if (builder_result.post_list_visited_elements.empty()) {
625 builder_result.post_list_visited_elements.push_back({0, 0});
626 }
627 } else {
628 builder_result.post_list_visited_elements.push_back(
629 {0, builder_result.leaf_array->length()});
630 builder_result.rep_levels = nullptr;
631 }
632
633 builder_result.def_levels = context.def_levels.data();
634 return writer(builder_result);
635 }
636
637 struct FixupVisitor {
638 int max_rep_level = -1;
639 int16_t rep_level_if_null = kLevelNotSet;
640
641 template <typename T>
HandleListNodeparquet::arrow::__anonedc2b3750111::FixupVisitor642 void HandleListNode(T* arg) {
643 if (arg->rep_level() == max_rep_level) {
644 arg->SetLast();
645 // after the last list node we don't need to fill
646 // rep levels on null.
647 rep_level_if_null = kLevelNotSet;
648 } else {
649 rep_level_if_null = arg->rep_level();
650 }
651 }
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor652 void operator()(ListNode* node) { HandleListNode(node); }
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor653 void operator()(LargeListNode* node) { HandleListNode(node); }
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor654 void operator()(FixedSizeListNode* node) { HandleListNode(node); }
655
656 // For non-list intermediate nodes.
657 template <typename T>
HandleIntermediateNodeparquet::arrow::__anonedc2b3750111::FixupVisitor658 void HandleIntermediateNode(T* arg) {
659 if (rep_level_if_null != kLevelNotSet) {
660 arg->SetRepLevelIfNull(rep_level_if_null);
661 }
662 }
663
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor664 void operator()(NullableNode* arg) { HandleIntermediateNode(arg); }
665
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor666 void operator()(AllNullsTerminalNode* arg) {
667 // Even though no processing happens past this point we
668 // still need to adjust it if a list occurred after an
669 // all null array.
670 HandleIntermediateNode(arg);
671 }
672
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor673 void operator()(NullableTerminalNode*) {}
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor674 void operator()(AllPresentTerminalNode*) {}
675 };
676
Fixup(PathInfo info)677 PathInfo Fixup(PathInfo info) {
678 // We only need to fixup the path if there were repeated
679 // elements on it.
680 if (info.max_rep_level == 0) {
681 return info;
682 }
683 FixupVisitor visitor;
684 visitor.max_rep_level = info.max_rep_level;
685 if (visitor.max_rep_level > 0) {
686 visitor.rep_level_if_null = 0;
687 }
688 for (size_t x = 0; x < info.path.size(); x++) {
689 ::arrow::util::visit(visitor, &info.path[x]);
690 }
691 return info;
692 }
693
694 class PathBuilder {
695 public:
PathBuilder(bool start_nullable)696 explicit PathBuilder(bool start_nullable) : nullable_in_parent_(start_nullable) {}
697 template <typename T>
AddTerminalInfo(const T & array)698 void AddTerminalInfo(const T& array) {
699 info_.leaf_is_nullable = nullable_in_parent_;
700 if (nullable_in_parent_) {
701 info_.max_def_level++;
702 }
703 // We don't use null_count() because if the null_count isn't known
704 // and the array does in fact contain nulls, we will end up
705 // traversing the null bitmap twice (once here and once when calculating
706 // rep/def levels).
707 if (LazyNoNulls(array)) {
708 info_.path.emplace_back(AllPresentTerminalNode{info_.max_def_level});
709 } else if (LazyNullCount(array) == array.length()) {
710 info_.path.emplace_back(AllNullsTerminalNode(info_.max_def_level - 1));
711 } else {
712 info_.path.emplace_back(NullableTerminalNode(array.null_bitmap_data(),
713 array.offset(), info_.max_def_level));
714 }
715 info_.primitive_array = std::make_shared<T>(array.data());
716 paths_.push_back(Fixup(info_));
717 }
718
719 template <typename T>
Visit(const T & array)720 ::arrow::enable_if_t<std::is_base_of<::arrow::FlatArray, T>::value, Status> Visit(
721 const T& array) {
722 AddTerminalInfo(array);
723 return Status::OK();
724 }
725
726 template <typename T>
727 ::arrow::enable_if_t<std::is_same<::arrow::ListArray, T>::value ||
728 std::is_same<::arrow::LargeListArray, T>::value,
729 Status>
Visit(const T & array)730 Visit(const T& array) {
731 MaybeAddNullable(array);
732 // Increment necessary due to empty lists.
733 info_.max_def_level++;
734 info_.max_rep_level++;
735 // raw_value_offsets() accounts for any slice offset.
736 ListPathNode<VarRangeSelector<typename T::offset_type>> node(
737 VarRangeSelector<typename T::offset_type>{array.raw_value_offsets()},
738 info_.max_rep_level, info_.max_def_level - 1);
739 info_.path.emplace_back(std::move(node));
740 nullable_in_parent_ = array.list_type()->value_field()->nullable();
741 return VisitInline(*array.values());
742 }
743
Visit(const::arrow::DictionaryArray & array)744 Status Visit(const ::arrow::DictionaryArray& array) {
745 // Only currently handle DictionaryArray where the dictionary is a
746 // primitive type
747 if (array.dict_type()->value_type()->num_fields() > 0) {
748 return Status::NotImplemented(
749 "Writing DictionaryArray with nested dictionary "
750 "type not yet supported");
751 }
752 if (array.dictionary()->null_count() > 0) {
753 return Status::NotImplemented(
754 "Writing DictionaryArray with null encoded in dictionary "
755 "type not yet supported");
756 }
757 AddTerminalInfo(array);
758 return Status::OK();
759 }
760
MaybeAddNullable(const Array & array)761 void MaybeAddNullable(const Array& array) {
762 if (!nullable_in_parent_) {
763 return;
764 }
765 info_.max_def_level++;
766 // We don't use null_count() because if the null_count isn't known
767 // and the array does in fact contain nulls, we will end up
768 // traversing the null bitmap twice (once here and once when calculating
769 // rep/def levels). Because this isn't terminal this might not be
770 // the right decision for structs that share the same nullable
771 // parents.
772 if (LazyNoNulls(array)) {
773 // Don't add anything because there won't be any point checking
774 // null values for the array. There will always be at least
775 // one more array to handle nullability.
776 return;
777 }
778 if (LazyNullCount(array) == array.length()) {
779 info_.path.emplace_back(AllNullsTerminalNode(info_.max_def_level - 1));
780 return;
781 }
782 info_.path.emplace_back(
783 NullableNode(array.null_bitmap_data(), array.offset(),
784 /* def_level_if_null = */ info_.max_def_level - 1));
785 }
786
787 Status VisitInline(const Array& array);
788
Visit(const::arrow::MapArray & array)789 Status Visit(const ::arrow::MapArray& array) {
790 return Visit(static_cast<const ::arrow::ListArray&>(array));
791 }
792
Visit(const::arrow::StructArray & array)793 Status Visit(const ::arrow::StructArray& array) {
794 MaybeAddNullable(array);
795 PathInfo info_backup = info_;
796 for (int x = 0; x < array.num_fields(); x++) {
797 nullable_in_parent_ = array.type()->field(x)->nullable();
798 RETURN_NOT_OK(VisitInline(*array.field(x)));
799 info_ = info_backup;
800 }
801 return Status::OK();
802 }
803
Visit(const::arrow::FixedSizeListArray & array)804 Status Visit(const ::arrow::FixedSizeListArray& array) {
805 MaybeAddNullable(array);
806 int32_t list_size = array.list_type()->list_size();
807 // Technically we could encode fixed size lists with two level encodings
808 // but since we always use 3 level encoding we increment def levels as
809 // well.
810 info_.max_def_level++;
811 info_.max_rep_level++;
812 info_.path.emplace_back(FixedSizeListNode(FixedSizedRangeSelector{list_size},
813 info_.max_rep_level, info_.max_def_level));
814 nullable_in_parent_ = array.list_type()->value_field()->nullable();
815 if (array.offset() > 0) {
816 return VisitInline(*array.values()->Slice(array.value_offset(0)));
817 }
818 return VisitInline(*array.values());
819 }
820
Visit(const::arrow::ExtensionArray & array)821 Status Visit(const ::arrow::ExtensionArray& array) {
822 return VisitInline(*array.storage());
823 }
824
825 #define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \
826 Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \
827 return Status::NotImplemented("Level generation for " #ArrowTypePrefix \
828 " not supported yet"); \
829 }
830
831 // Union types aren't supported in Parquet.
NOT_IMPLEMENTED_VISIT(Union)832 NOT_IMPLEMENTED_VISIT(Union)
833
834 #undef NOT_IMPLEMENTED_VISIT
835 std::vector<PathInfo>& paths() { return paths_; }
836
837 private:
838 PathInfo info_;
839 std::vector<PathInfo> paths_;
840 bool nullable_in_parent_;
841 };
842
VisitInline(const Array & array)843 Status PathBuilder::VisitInline(const Array& array) {
844 return ::arrow::VisitArrayInline(array, this);
845 }
846
847 #undef RETURN_IF_ERROR
848 } // namespace
849
850 class MultipathLevelBuilderImpl : public MultipathLevelBuilder {
851 public:
MultipathLevelBuilderImpl(std::shared_ptr<::arrow::ArrayData> data,std::unique_ptr<PathBuilder> path_builder)852 MultipathLevelBuilderImpl(std::shared_ptr<::arrow::ArrayData> data,
853 std::unique_ptr<PathBuilder> path_builder)
854 : root_range_{0, data->length},
855 data_(std::move(data)),
856 path_builder_(std::move(path_builder)) {}
857
GetLeafCount() const858 int GetLeafCount() const override {
859 return static_cast<int>(path_builder_->paths().size());
860 }
861
Write(int leaf_index,ArrowWriteContext * context,CallbackFunction write_leaf_callback)862 ::arrow::Status Write(int leaf_index, ArrowWriteContext* context,
863 CallbackFunction write_leaf_callback) override {
864 DCHECK_GE(leaf_index, 0);
865 DCHECK_LT(leaf_index, GetLeafCount());
866 return WritePath(root_range_, &path_builder_->paths()[leaf_index], context,
867 std::move(write_leaf_callback));
868 }
869
870 private:
871 ElementRange root_range_;
872 // Reference holder to ensure the data stays valid.
873 std::shared_ptr<::arrow::ArrayData> data_;
874 std::unique_ptr<PathBuilder> path_builder_;
875 };
876
877 // static
Make(const::arrow::Array & array,bool array_field_nullable)878 ::arrow::Result<std::unique_ptr<MultipathLevelBuilder>> MultipathLevelBuilder::Make(
879 const ::arrow::Array& array, bool array_field_nullable) {
880 auto constructor = ::arrow::internal::make_unique<PathBuilder>(array_field_nullable);
881 RETURN_NOT_OK(VisitArrayInline(array, constructor.get()));
882 return ::arrow::internal::make_unique<MultipathLevelBuilderImpl>(
883 array.data(), std::move(constructor));
884 }
885
886 // static
Write(const Array & array,bool array_field_nullable,ArrowWriteContext * context,MultipathLevelBuilder::CallbackFunction callback)887 Status MultipathLevelBuilder::Write(const Array& array, bool array_field_nullable,
888 ArrowWriteContext* context,
889 MultipathLevelBuilder::CallbackFunction callback) {
890 ARROW_ASSIGN_OR_RAISE(std::unique_ptr<MultipathLevelBuilder> builder,
891 MultipathLevelBuilder::Make(array, array_field_nullable));
892 PathBuilder constructor(array_field_nullable);
893 RETURN_NOT_OK(VisitArrayInline(array, &constructor));
894 for (int leaf_idx = 0; leaf_idx < builder->GetLeafCount(); leaf_idx++) {
895 RETURN_NOT_OK(builder->Write(leaf_idx, context, callback));
896 }
897 return Status::OK();
898 }
899
900 } // namespace arrow
901 } // namespace parquet
902