1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // Overview.
19 //
20 // The strategy used for this code for repetition/definition
21 // is to dissect the top level array into a list of paths
22 // from the top level array to the final primitive (possibly
23 // dictionary encoded array). It then evaluates each one of
24 // those paths to produce results for the callback iteratively.
25 //
26 // This approach was taken to reduce the aggregate memory required if we were
27 // to build all def/rep levels in parallel as apart of a tree traversal.  It
28 // also allows for straightforward parallelization at the path level if that is
29 // desired in the future.
30 //
31 // The main downside to this approach is it duplicates effort for nodes
32 // that share common ancestors. This can be mitigated to some degree
33 // by adding in optimizations that detect leaf arrays that share
34 // the same common list ancestor and reuse the repetition levels
35 // from the first leaf encountered (only definition levels greater
36 // the list ancestor need to be re-evaluated. This is left for future
37 // work.
38 //
39 // Algorithm.
40 //
41 // As mentioned above this code dissects arrays into constituent parts:
42 // nullability data, and list offset data. It tries to optimize for
43 // some special cases, where it is known ahead of time that a step
44 // can be skipped (e.g. a nullable array happens to have all of its
45 // values) or batch filled (a nullable array has all null values).
46 // One further optimization that is not implemented but could be done
47 // in the future is special handling for nested list arrays that
48 // have some intermediate data which indicates the final array contains only
49 // nulls.
50 //
51 // In general, the algorithm attempts to batch work at each node as much
52 // as possible.  For nullability nodes this means finding runs of null
53 // values and batch filling those interspersed with finding runs of non-null values
54 // to process in batch at the next column.
55 //
56 // Similarly, list runs of empty lists are all processed in one batch
57 // followed by either:
58 //    - A single list entry for non-terminal lists (i.e. the upper part of a nested list)
59 //    - Runs of non-empty lists for the terminal list (i.e. the lowest part of a nested
60 //    list).
61 //
62 // This makes use of the following observations.
63 // 1.  Null values at any node on the path are terminal (repetition and definition
64 //     level can be set directly when a Null value is encountered).
65 // 2.  Empty lists share this eager termination property with Null values.
66 // 3.  In order to keep repetition/definition level populated the algorithm is lazy
67 //     in assigning repetition levels. The algorithm tracks whether it is currently
68 //     in the middle of a list by comparing the lengths of repetition/definition levels.
69 //     If it is currently in the middle of a list the the number of repetition levels
70 //     populated will be greater than definition levels (the start of a List requires
71 //     adding the first element). If there are equal numbers of definition and repetition
72 //     levels populated this indicates a list is waiting to be started and the next list
73 //     encountered will have its repetition level signify the beginning of the list.
74 //
75 //     Other implementation notes.
76 //
77 //     This code hasn't been benchmarked (or assembly analyzed) but did the following
78 //     as optimizations (yes premature optimization is the root of all evil).
79 //     - This code does not use recursion, instead it constructs its own stack and manages
80 //       updating elements accordingly.
81 //     - It tries to avoid using Status for common return states.
82 //     - Avoids virtual dispatch in favor of if/else statements on a set of well known
83 //     classes.
84 
85 #include "parquet/arrow/path_internal.h"
86 
87 #include <atomic>
88 #include <cstddef>
89 #include <memory>
90 #include <type_traits>
91 #include <utility>
92 #include <vector>
93 
94 #include "arrow/array.h"
95 #include "arrow/buffer.h"
96 #include "arrow/buffer_builder.h"
97 #include "arrow/extension_type.h"
98 #include "arrow/memory_pool.h"
99 #include "arrow/type.h"
100 #include "arrow/type_traits.h"
101 #include "arrow/util/bit_util.h"
102 #include "arrow/util/logging.h"
103 #include "arrow/util/macros.h"
104 #include "arrow/util/make_unique.h"
105 #include "arrow/util/variant.h"
106 #include "arrow/visitor_inline.h"
107 
108 #include "parquet/properties.h"
109 
110 namespace parquet {
111 namespace arrow {
112 
113 namespace {
114 
115 using ::arrow::Array;
116 using ::arrow::Status;
117 using ::arrow::TypedBufferBuilder;
118 using ::arrow::util::holds_alternative;
119 
120 constexpr static int16_t kLevelNotSet = -1;
121 
122 /// \brief Simple result of a iterating over a column to determine values.
123 enum IterationResult {
124   /// Processing is done at this node. Move back up the path
125   /// to continue processing.
126   kDone = -1,
127   /// Move down towards the leaf for processing.
128   kNext = 1,
129   /// An error occurred while processing.
130   kError = 2
131 };
132 
133 #define RETURN_IF_ERROR(iteration_result)                  \
134   do {                                                     \
135     if (ARROW_PREDICT_FALSE(iteration_result == kError)) { \
136       return iteration_result;                             \
137     }                                                      \
138   } while (false)
139 
LazyNullCount(const Array & array)140 int64_t LazyNullCount(const Array& array) { return array.data()->null_count.load(); }
141 
LazyNoNulls(const Array & array)142 bool LazyNoNulls(const Array& array) {
143   int64_t null_count = LazyNullCount(array);
144   return null_count == 0 ||
145          // kUnkownNullCount comparison is needed to account
146          // for null arrays.
147          (null_count == ::arrow::kUnknownNullCount &&
148           array.null_bitmap_data() == nullptr);
149 }
150 
151 struct PathWriteContext {
PathWriteContextparquet::arrow::__anonaab4c0630111::PathWriteContext152   PathWriteContext(::arrow::MemoryPool* pool,
153                    std::shared_ptr<::arrow::ResizableBuffer> def_levels_buffer)
154       : rep_levels(pool), def_levels(std::move(def_levels_buffer), pool) {}
ReserveDefLevelsparquet::arrow::__anonaab4c0630111::PathWriteContext155   IterationResult ReserveDefLevels(int64_t elements) {
156     last_status = def_levels.Reserve(elements);
157     if (ARROW_PREDICT_TRUE(last_status.ok())) {
158       return kDone;
159     }
160     return kError;
161   }
162 
AppendDefLevelparquet::arrow::__anonaab4c0630111::PathWriteContext163   IterationResult AppendDefLevel(int16_t def_level) {
164     last_status = def_levels.Append(def_level);
165     if (ARROW_PREDICT_TRUE(last_status.ok())) {
166       return kDone;
167     }
168     return kError;
169   }
170 
AppendDefLevelsparquet::arrow::__anonaab4c0630111::PathWriteContext171   IterationResult AppendDefLevels(int64_t count, int16_t def_level) {
172     last_status = def_levels.Append(count, def_level);
173     if (ARROW_PREDICT_TRUE(last_status.ok())) {
174       return kDone;
175     }
176     return kError;
177   }
178 
UnsafeAppendDefLevelparquet::arrow::__anonaab4c0630111::PathWriteContext179   void UnsafeAppendDefLevel(int16_t def_level) { def_levels.UnsafeAppend(def_level); }
180 
AppendRepLevelparquet::arrow::__anonaab4c0630111::PathWriteContext181   IterationResult AppendRepLevel(int16_t rep_level) {
182     last_status = rep_levels.Append(rep_level);
183 
184     if (ARROW_PREDICT_TRUE(last_status.ok())) {
185       return kDone;
186     }
187     return kError;
188   }
189 
AppendRepLevelsparquet::arrow::__anonaab4c0630111::PathWriteContext190   IterationResult AppendRepLevels(int64_t count, int16_t rep_level) {
191     last_status = rep_levels.Append(count, rep_level);
192     if (ARROW_PREDICT_TRUE(last_status.ok())) {
193       return kDone;
194     }
195     return kError;
196   }
197 
EqualRepDefLevelsLengthsparquet::arrow::__anonaab4c0630111::PathWriteContext198   bool EqualRepDefLevelsLengths() const {
199     return rep_levels.length() == def_levels.length();
200   }
201 
202   // Incorporates |range| into visited elements. If the |range| is contiguous
203   // with the last range, extend the last range, otherwise add |range| separately
204   // tot he list.
RecordPostListVisitparquet::arrow::__anonaab4c0630111::PathWriteContext205   void RecordPostListVisit(const ElementRange& range) {
206     if (!visited_elements.empty() && range.start == visited_elements.back().end) {
207       visited_elements.back().end = range.end;
208       return;
209     }
210     visited_elements.push_back(range);
211   }
212 
213   Status last_status;
214   TypedBufferBuilder<int16_t> rep_levels;
215   TypedBufferBuilder<int16_t> def_levels;
216   std::vector<ElementRange> visited_elements;
217 };
218 
FillRepLevels(int64_t count,int16_t rep_level,PathWriteContext * context)219 IterationResult FillRepLevels(int64_t count, int16_t rep_level,
220                               PathWriteContext* context) {
221   if (rep_level == kLevelNotSet) {
222     return kDone;
223   }
224   int64_t fill_count = count;
225   // This condition occurs (rep and dep levels equals), in one of
226   // in a few cases:
227   // 1.  Before any list is encountered.
228   // 2.  After rep-level has been filled in due to null/empty
229   //     values above it.
230   // 3.  After finishing a list.
231   if (!context->EqualRepDefLevelsLengths()) {
232     fill_count--;
233   }
234   return context->AppendRepLevels(fill_count, rep_level);
235 }
236 
237 // A node for handling an array that is discovered to have all
238 // null elements. It is referred to as a TerminalNode because
239 // traversal of nodes will not continue it when generating
240 // rep/def levels. However, there could be many nested children
241 // elements beyond it in the Array that is being processed.
242 class AllNullsTerminalNode {
243  public:
AllNullsTerminalNode(int16_t def_level,int16_t rep_level=kLevelNotSet)244   explicit AllNullsTerminalNode(int16_t def_level, int16_t rep_level = kLevelNotSet)
245       : def_level_(def_level), rep_level_(rep_level) {}
SetRepLevelIfNull(int16_t rep_level)246   void SetRepLevelIfNull(int16_t rep_level) { rep_level_ = rep_level; }
Run(const ElementRange & range,PathWriteContext * context)247   IterationResult Run(const ElementRange& range, PathWriteContext* context) {
248     int64_t size = range.Size();
249     RETURN_IF_ERROR(FillRepLevels(size, rep_level_, context));
250     return context->AppendDefLevels(size, def_level_);
251   }
252 
253  private:
254   int16_t def_level_;
255   int16_t rep_level_;
256 };
257 
258 // Handles the case where all remaining arrays until the leaf have no nulls
259 // (and are not interrupted by lists). Unlike AllNullsTerminalNode this is
260 // always the last node in a path. We don't need an analogue to the AllNullsTerminalNode
261 // because if all values are present at an intermediate array no node is added for it
262 // (the def-level for the next nullable node is incremented).
263 struct AllPresentTerminalNode {
Runparquet::arrow::__anonaab4c0630111::AllPresentTerminalNode264   IterationResult Run(const ElementRange& range, PathWriteContext* context) {
265     return context->AppendDefLevels(range.end - range.start, def_level);
266     // No need to worry about rep levels, because this state should
267     // only be applicable for after all list/repeated values
268     // have been evaluated in the path.
269   }
270   int16_t def_level;
271 };
272 
273 /// Node for handling the case when the leaf-array is nullable
274 /// and contains null elements.
275 struct NullableTerminalNode {
276   NullableTerminalNode();
NullableTerminalNodeparquet::arrow::__anonaab4c0630111::NullableTerminalNode277   NullableTerminalNode(const uint8_t* bitmap, int64_t element_offset,
278                        int16_t def_level_if_present)
279       : bitmap_(bitmap),
280         element_offset_(element_offset),
281         def_level_if_present_(def_level_if_present),
282         def_level_if_null_(def_level_if_present - 1) {}
283 
Runparquet::arrow::__anonaab4c0630111::NullableTerminalNode284   IterationResult Run(const ElementRange& range, PathWriteContext* context) {
285     int64_t elements = range.Size();
286     RETURN_IF_ERROR(context->ReserveDefLevels(elements));
287 
288     DCHECK_GT(elements, 0);
289 
290     auto bit_visitor = [&](bool is_set) {
291       context->UnsafeAppendDefLevel(is_set ? def_level_if_present_ : def_level_if_null_);
292     };
293 
294     if (elements > 16) {  // 16 guarantees at least one unrolled loop.
295       ::arrow::internal::VisitBitsUnrolled(bitmap_, range.start + element_offset_,
296                                            elements, bit_visitor);
297     } else {
298       ::arrow::internal::VisitBits(bitmap_, range.start + element_offset_, elements,
299                                    bit_visitor);
300     }
301     return kDone;
302   }
303   const uint8_t* bitmap_;
304   int64_t element_offset_;
305   int16_t def_level_if_present_;
306   int16_t def_level_if_null_;
307 };
308 
309 // List nodes handle populating rep_level for Arrow Lists and def-level for empty lists.
310 // Nullability (both list and children) is handled by other Nodes. By
311 // construction all list nodes will be intermediate nodes (they will always be followed by
312 // at least one other node).
313 //
314 // Type parameters:
315 //    |RangeSelector| - A strategy for determine the the range of the child node to
316 //    process.
317 //       this varies depending on the type of list (int32_t* offsets, int64_t* offsets of
318 //       fixed.
319 template <typename RangeSelector>
320 class ListPathNode {
321  public:
ListPathNode(RangeSelector selector,int16_t rep_lev,int16_t def_level_if_empty)322   ListPathNode(RangeSelector selector, int16_t rep_lev, int16_t def_level_if_empty)
323       : selector_(std::move(selector)),
324         prev_rep_level_(rep_lev - 1),
325         rep_level_(rep_lev),
326         def_level_if_empty_(def_level_if_empty) {}
327 
rep_level() const328   int16_t rep_level() const { return rep_level_; }
329 
Run(ElementRange * range,ElementRange * child_range,PathWriteContext * context)330   IterationResult Run(ElementRange* range, ElementRange* child_range,
331                       PathWriteContext* context) {
332     if (range->Empty()) {
333       return kDone;
334     }
335 
336     // Find the first non-empty list (skipping a run of empties).
337     int64_t start = range->start;
338     // Retrieves the range of elements that this list contains.
339     // Uses the strategy pattern to distinguish between the different
340     // lists that are supported in Arrow (fixed size, normal and "large").
341     *child_range = selector_.GetRange(range->start);
342     while (child_range->Empty() && !range->Empty()) {
343       ++range->start;
344       *child_range = selector_.GetRange(range->start);
345     }
346     // Loops post-condition:
347     //   * range is either empty (we are done processing at this node)
348     //     or start corresponds a non-empty list.
349     //   * If range is non-empty child_range contains
350     //     the bounds of non-empty list.
351 
352     // Handle any skipped over empty lists.
353     int64_t empty_elements = range->start - start;
354     if (empty_elements > 0) {
355       RETURN_IF_ERROR(FillRepLevels(empty_elements, prev_rep_level_, context));
356       RETURN_IF_ERROR(context->AppendDefLevels(empty_elements, def_level_if_empty_));
357     }
358     // Start of a new list. Note that for nested lists adding the element
359     // here effectively suppresses this code until we either encounter null
360     // elements or empty lists between here and the innermost list (since
361     // we make the rep levels repetition and definition levels unequal).
362     // Similarly when we are backtracking up the stack the repetition and
363     // definition levels are again equal so if we encounter an intermediate list
364     // with more elements this will detect it as a new list.
365     if (context->EqualRepDefLevelsLengths() && !range->Empty()) {
366       RETURN_IF_ERROR(context->AppendRepLevel(prev_rep_level_));
367     }
368 
369     if (range->Empty()) {
370       return kDone;
371     }
372 
373     ++range->start;
374     if (is_last_) {
375       // If this is the last repeated node, we can extend try
376       // to extend the child range as wide as possible before
377       // continuing to the next node.
378       return FillForLast(range, child_range, context);
379     }
380     return kNext;
381   }
382 
SetLast()383   void SetLast() { is_last_ = true; }
384 
385  private:
FillForLast(ElementRange * range,ElementRange * child_range,PathWriteContext * context)386   IterationResult FillForLast(ElementRange* range, ElementRange* child_range,
387                               PathWriteContext* context) {
388     // First fill int the remainder of the list.
389     RETURN_IF_ERROR(FillRepLevels(child_range->Size(), rep_level_, context));
390     // Once we've reached this point the following preconditions should hold:
391     // 1.  There are no more repeated path nodes to deal with.
392     // 2.  All elements in |range| represent contiguous elements in the
393     //     child array (Null values would have shortened the range to ensure
394     //     all remaining list elements are present (though they may be empty lists)).
395     // 3.  No element of range spans a parent list (intermediate
396     //     list nodes only handle one list entry at a time).
397     //
398     // Given these preconditions it should be safe to fill runs on non-empty
399     // lists here and expand the range in the child node accordingly.
400 
401     while (!range->Empty()) {
402       ElementRange size_check = selector_.GetRange(range->start);
403       if (size_check.Empty()) {
404         // The empty range will need to be handled after we pass down the accumulated
405         // range because it affects def_level placement and we need to get the children
406         // def_levels entered first.
407         break;
408       }
409       // This is the start of a new list. We can be sure it only applies
410       // to the previous list (and doesn't jump to the start of any list
411       // further up in nesting due to the constraints mentioned at the start
412       // of the function).
413       RETURN_IF_ERROR(context->AppendRepLevel(prev_rep_level_));
414       RETURN_IF_ERROR(context->AppendRepLevels(size_check.Size() - 1, rep_level_));
415       DCHECK_EQ(size_check.start, child_range->end);
416       child_range->end = size_check.end;
417       ++range->start;
418     }
419 
420     // Do book-keeping to track the elements of the arrays that are actually visited
421     // beyond this point.  This is necessary to identify "gaps" in values that should
422     // not be processed (written out to parquet).
423     context->RecordPostListVisit(*child_range);
424     return kNext;
425   }
426 
427   RangeSelector selector_;
428   int16_t prev_rep_level_;
429   int16_t rep_level_;
430   int16_t def_level_if_empty_;
431   bool is_last_ = false;
432 };
433 
434 template <typename OffsetType>
435 struct VarRangeSelector {
GetRangeparquet::arrow::__anonaab4c0630111::VarRangeSelector436   ElementRange GetRange(int64_t index) const {
437     return ElementRange{offsets[index], offsets[index + 1]};
438   }
439 
440   // Either int32_t* or int64_t*.
441   const OffsetType* offsets;
442 };
443 
444 struct FixedSizedRangeSelector {
GetRangeparquet::arrow::__anonaab4c0630111::FixedSizedRangeSelector445   ElementRange GetRange(int64_t index) const {
446     int64_t start = index * list_size;
447     return ElementRange{start, start + list_size};
448   }
449   int list_size;
450 };
451 
452 // An intermediate node that handles null values.
453 class NullableNode {
454  public:
NullableNode(const uint8_t * null_bitmap,int64_t entry_offset,int16_t def_level_if_null,int16_t rep_level_if_null=kLevelNotSet)455   NullableNode(const uint8_t* null_bitmap, int64_t entry_offset,
456                int16_t def_level_if_null, int16_t rep_level_if_null = kLevelNotSet)
457       : null_bitmap_(null_bitmap),
458         entry_offset_(entry_offset),
459         valid_bits_reader_(MakeReader(ElementRange{0, 0})),
460         def_level_if_null_(def_level_if_null),
461         rep_level_if_null_(rep_level_if_null),
462         new_range_(true) {}
463 
SetRepLevelIfNull(int16_t rep_level)464   void SetRepLevelIfNull(int16_t rep_level) { rep_level_if_null_ = rep_level; }
465 
MakeReader(const ElementRange & range)466   ::arrow::internal::BitmapReader MakeReader(const ElementRange& range) {
467     return ::arrow::internal::BitmapReader(null_bitmap_, entry_offset_ + range.start,
468                                            range.Size());
469   }
470 
Run(ElementRange * range,ElementRange * child_range,PathWriteContext * context)471   IterationResult Run(ElementRange* range, ElementRange* child_range,
472                       PathWriteContext* context) {
473     if (new_range_) {
474       // Reset the reader each time we are starting fresh on a range.
475       // We can't rely on continuity because nulls above can
476       // cause discontinuities.
477       valid_bits_reader_ = MakeReader(*range);
478     }
479     child_range->start = range->start;
480     while (!range->Empty() && !valid_bits_reader_.IsSet()) {
481       ++range->start;
482       valid_bits_reader_.Next();
483     }
484     int64_t null_count = range->start - child_range->start;
485     if (null_count > 0) {
486       RETURN_IF_ERROR(FillRepLevels(null_count, rep_level_if_null_, context));
487       RETURN_IF_ERROR(context->AppendDefLevels(null_count, def_level_if_null_));
488     }
489     if (range->Empty()) {
490       new_range_ = true;
491       return kDone;
492     }
493     child_range->end = child_range->start = range->start;
494 
495     while (child_range->end != range->end && valid_bits_reader_.IsSet()) {
496       ++child_range->end;
497       valid_bits_reader_.Next();
498     }
499     DCHECK(!child_range->Empty());
500     range->start += child_range->Size();
501     new_range_ = false;
502     return kNext;
503   }
504 
505   const uint8_t* null_bitmap_;
506   int64_t entry_offset_;
507   ::arrow::internal::BitmapReader valid_bits_reader_;
508   int16_t def_level_if_null_;
509   int16_t rep_level_if_null_;
510 
511   // Whether the next invocation will be a new range.
512   bool new_range_ = true;
513 };
514 
515 using ListNode = ListPathNode<VarRangeSelector<int32_t>>;
516 using LargeListNode = ListPathNode<VarRangeSelector<int64_t>>;
517 using FixedSizeListNode = ListPathNode<FixedSizedRangeSelector>;
518 
519 // Contains static information derived from traversing the schema.
520 struct PathInfo {
521   // The vectors are expected to the same length info.
522 
523   // Note index order matters here.
524   using Node = ::arrow::util::variant<NullableTerminalNode, ListNode, LargeListNode,
525                                       FixedSizeListNode, NullableNode,
526                                       AllPresentTerminalNode, AllNullsTerminalNode>;
527   std::vector<Node> path;
528   std::shared_ptr<Array> primitive_array;
529   int16_t max_def_level = 0;
530   int16_t max_rep_level = 0;
531   bool has_dictionary;
532 };
533 
534 /// Contains logic for writing a single leaf node to parquet.
535 /// This tracks the path from root to leaf.
536 ///
537 /// |writer| will be called after all of the definition/repetition
538 /// values have been calculated for root_range with the calculated
539 /// values. It is intended to abstract the complexity of writing
540 /// the levels and values to parquet.
WritePath(ElementRange root_range,PathInfo * path_info,ArrowWriteContext * arrow_context,MultipathLevelBuilder::CallbackFunction writer)541 Status WritePath(ElementRange root_range, PathInfo* path_info,
542                  ArrowWriteContext* arrow_context,
543                  MultipathLevelBuilder::CallbackFunction writer) {
544   std::vector<ElementRange> stack(path_info->path.size());
545   MultipathLevelBuilderResult builder_result;
546   builder_result.leaf_array = path_info->primitive_array;
547 
548   if (path_info->max_def_level == 0) {
549     // This case only occurs when there are no nullable or repeated
550     // columns in the path from the root to leaf.
551     int64_t leaf_length = builder_result.leaf_array->length();
552     builder_result.def_rep_level_count = leaf_length;
553     builder_result.post_list_visited_elements.push_back({0, leaf_length});
554     return writer(builder_result);
555   }
556   stack[0] = root_range;
557   RETURN_NOT_OK(
558       arrow_context->def_levels_buffer->Resize(/*new_size=*/0, /*shrink_to_fit*/ false));
559   PathWriteContext context(arrow_context->memory_pool, arrow_context->def_levels_buffer);
560   // We should need at least this many entries so reserve the space ahead of time.
561   RETURN_NOT_OK(context.def_levels.Reserve(root_range.Size()));
562   if (path_info->max_rep_level > 0) {
563     RETURN_NOT_OK(context.rep_levels.Reserve(root_range.Size()));
564   }
565 
566   auto stack_base = &stack[0];
567   auto stack_position = stack_base;
568   // This is the main loop for calculated rep/def levels. The nodes
569   // in the path implement a chain-of-responsibility like pattern
570   // where each node can add some number of repetition/definition
571   // levels to PathWriteContext and also delegate to the next node
572   // in the path to add values. The values are added through each Run(...)
573   // call and the choice to delegate to the next node (or return to the
574   // previous node) is communicated by the return value of Run(...).
575   // The loop terminates after the first node indicates all values in
576   // |root_range| are processed.
577   while (stack_position >= stack_base) {
578     PathInfo::Node& node = path_info->path[stack_position - stack_base];
579     struct {
580       IterationResult operator()(
581           NullableNode& node) {  // NOLINT google-runtime-references
582         return node.Run(stack_position, stack_position + 1, context);
583       }
584       IterationResult operator()(ListNode& node) {  // NOLINT google-runtime-references
585         return node.Run(stack_position, stack_position + 1, context);
586       }
587       IterationResult operator()(
588           NullableTerminalNode& node) {  // NOLINT google-runtime-references
589         return node.Run(*stack_position, context);
590       }
591       IterationResult operator()(
592           FixedSizeListNode& node) {  // NOLINT google-runtime-references
593         return node.Run(stack_position, stack_position + 1, context);
594       }
595       IterationResult operator()(
596           AllPresentTerminalNode& node) {  // NOLINT google-runtime-references
597         return node.Run(*stack_position, context);
598       }
599       IterationResult operator()(
600           AllNullsTerminalNode& node) {  // NOLINT google-runtime-references
601         return node.Run(*stack_position, context);
602       }
603       IterationResult operator()(
604           LargeListNode& node) {  // NOLINT google-runtime-references
605         return node.Run(stack_position, stack_position + 1, context);
606       }
607       ElementRange* stack_position;
608       PathWriteContext* context;
609     } visitor = {stack_position, &context};
610 
611     IterationResult result = ::arrow::util::visit(visitor, node);
612 
613     if (ARROW_PREDICT_FALSE(result == kError)) {
614       DCHECK(!context.last_status.ok());
615       return context.last_status;
616     }
617     stack_position += static_cast<int>(result);
618   }
619   RETURN_NOT_OK(context.last_status);
620   builder_result.def_rep_level_count = context.def_levels.length();
621 
622   if (context.rep_levels.length() > 0) {
623     // This case only occurs when there was a repeated element that needs to be
624     // processed.
625     builder_result.rep_levels = context.rep_levels.data();
626     std::swap(builder_result.post_list_visited_elements, context.visited_elements);
627     // If it is possible when processing lists that all lists where empty. In this
628     // case no elements would have been added to post_list_visited_elements. By
629     // added an empty element we avoid special casing in downstream consumers.
630     if (builder_result.post_list_visited_elements.empty()) {
631       builder_result.post_list_visited_elements.push_back({0, 0});
632     }
633   } else {
634     builder_result.post_list_visited_elements.push_back(
635         {0, builder_result.leaf_array->length()});
636     builder_result.rep_levels = nullptr;
637   }
638 
639   builder_result.def_levels = context.def_levels.data();
640   return writer(builder_result);
641 }
642 
643 struct FixupVisitor {
644   int max_rep_level = -1;
645   int16_t rep_level_if_null = kLevelNotSet;
646 
647   template <typename T>
HandleListNodeparquet::arrow::__anonaab4c0630111::FixupVisitor648   void HandleListNode(T* arg) {
649     if (arg->rep_level() == max_rep_level) {
650       arg->SetLast();
651       // after the last list node we don't need to fill
652       // rep levels on null.
653       rep_level_if_null = kLevelNotSet;
654     } else {
655       rep_level_if_null = arg->rep_level();
656     }
657   }
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor658   void operator()(ListNode& node) {  // NOLINT google-runtime-references
659     HandleListNode(&node);
660   }
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor661   void operator()(LargeListNode& node) {  // NOLINT google-runtime-references
662     HandleListNode(&node);
663   }
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor664   void operator()(FixedSizeListNode& node) {  // NOLINT google-runtime-references
665     HandleListNode(&node);
666   }
667 
668   // For non-list intermediate nodes.
669   template <typename T>
HandleIntermediateNodeparquet::arrow::__anonaab4c0630111::FixupVisitor670   void HandleIntermediateNode(T* arg) {
671     if (rep_level_if_null != kLevelNotSet) {
672       arg->SetRepLevelIfNull(rep_level_if_null);
673     }
674   }
675 
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor676   void operator()(NullableNode& arg) {  // NOLINT google-runtime-references
677     HandleIntermediateNode(&arg);
678   }
679 
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor680   void operator()(AllNullsTerminalNode& arg) {  // NOLINT google-runtime-references
681     // Even though no processing happens past this point we
682     // still need to adjust it if a list occurred after an
683     // all null array.
684     HandleIntermediateNode(&arg);
685   }
686 
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor687   void operator()(NullableTerminalNode& arg) {}    // NOLINT google-runtime-references
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor688   void operator()(AllPresentTerminalNode& arg) {}  // NOLINT google-runtime-references
689 };
690 
Fixup(PathInfo info)691 PathInfo Fixup(PathInfo info) {
692   // We only need to fixup the path if there were repeated
693   // elements on it.
694   if (info.max_rep_level == 0) {
695     return info;
696   }
697   FixupVisitor visitor;
698   visitor.max_rep_level = info.max_rep_level;
699   if (visitor.max_rep_level > 0) {
700     visitor.rep_level_if_null = 0;
701   }
702   for (size_t x = 0; x < info.path.size(); x++) {
703     ::arrow::util::visit(visitor, info.path[x]);
704   }
705   return info;
706 }
707 
708 class PathBuilder {
709  public:
PathBuilder(bool start_nullable)710   explicit PathBuilder(bool start_nullable) : nullable_in_parent_(start_nullable) {}
711   template <typename T>
AddTerminalInfo(const T & array)712   void AddTerminalInfo(const T& array) {
713     if (nullable_in_parent_) {
714       info_.max_def_level++;
715     }
716     // We don't use null_count() because if the null_count isn't known
717     // and the array does in fact contain nulls, we will end up
718     // traversing the null bitmap twice (once here and once when calculating
719     // rep/def levels).
720     if (LazyNoNulls(array)) {
721       info_.path.push_back(AllPresentTerminalNode{info_.max_def_level});
722     } else if (LazyNullCount(array) == array.length()) {
723       info_.path.push_back(AllNullsTerminalNode(info_.max_def_level - 1));
724     } else {
725       info_.path.push_back(NullableTerminalNode(array.null_bitmap_data(), array.offset(),
726                                                 info_.max_def_level));
727     }
728     info_.primitive_array = std::make_shared<T>(array.data());
729     paths_.push_back(Fixup(info_));
730   }
731 
732   template <typename T>
Visit(const T & array)733   ::arrow::enable_if_t<std::is_base_of<::arrow::FlatArray, T>::value, Status> Visit(
734       const T& array) {
735     AddTerminalInfo(array);
736     return Status::OK();
737   }
738 
739   template <typename T>
740   ::arrow::enable_if_t<std::is_same<::arrow::ListArray, T>::value ||
741                            std::is_same<::arrow::LargeListArray, T>::value,
742                        Status>
Visit(const T & array)743   Visit(const T& array) {
744     MaybeAddNullable(array);
745     // Increment necessary due to empty lists.
746     info_.max_def_level++;
747     info_.max_rep_level++;
748     // raw_value_offsets() accounts for any slice offset.
749     ListPathNode<VarRangeSelector<typename T::offset_type>> node(
750         VarRangeSelector<typename T::offset_type>{array.raw_value_offsets()},
751         info_.max_rep_level, info_.max_def_level - 1);
752     info_.path.push_back(node);
753     nullable_in_parent_ = array.list_type()->value_field()->nullable();
754     return VisitInline(*array.values());
755   }
756 
Visit(const::arrow::DictionaryArray & array)757   Status Visit(const ::arrow::DictionaryArray& array) {
758     // Only currently handle DictionaryArray where the dictionary is a
759     // primitive type
760     if (array.dict_type()->value_type()->num_fields() > 0) {
761       return Status::NotImplemented(
762           "Writing DictionaryArray with nested dictionary "
763           "type not yet supported");
764     }
765     if (array.dictionary()->null_count() > 0) {
766       return Status::NotImplemented(
767           "Writing DictionaryArray with null encoded in dictionary "
768           "type not yet supported");
769     }
770     AddTerminalInfo(array);
771     return Status::OK();
772   }
773 
MaybeAddNullable(const Array & array)774   void MaybeAddNullable(const Array& array) {
775     if (!nullable_in_parent_) {
776       return;
777     }
778     info_.max_def_level++;
779     // We don't use null_count() because if the null_count isn't known
780     // and the array does in fact contain nulls, we will end up
781     // traversing the null bitmap twice (once here and once when calculating
782     // rep/def levels). Because this isn't terminal this might not be
783     // the right decision for structs that share the same nullable
784     // parents.
785     if (LazyNoNulls(array)) {
786       // Don't add anything because there won't be any point checking
787       // null values for the array.  There will always be at least
788       // one more array to handle nullability.
789       return;
790     }
791     if (LazyNullCount(array) == array.length()) {
792       info_.path.push_back(AllNullsTerminalNode(info_.max_def_level - 1));
793       return;
794     }
795     info_.path.push_back(NullableNode(array.null_bitmap_data(), array.offset(),
796                                       /* def_level_if_null = */ info_.max_def_level - 1));
797   }
798 
799   Status VisitInline(const Array& array);
800 
Visit(const::arrow::MapArray & array)801   Status Visit(const ::arrow::MapArray& array) {
802     return Visit(static_cast<const ::arrow::ListArray&>(array));
803   }
804 
Visit(const::arrow::StructArray & array)805   Status Visit(const ::arrow::StructArray& array) {
806     MaybeAddNullable(array);
807     PathInfo info_backup = info_;
808     for (int x = 0; x < array.num_fields(); x++) {
809       nullable_in_parent_ = array.type()->field(x)->nullable();
810       RETURN_NOT_OK(VisitInline(*array.field(x)));
811       info_ = info_backup;
812     }
813     return Status::OK();
814   }
815 
Visit(const::arrow::FixedSizeListArray & array)816   Status Visit(const ::arrow::FixedSizeListArray& array) {
817     MaybeAddNullable(array);
818     int32_t list_size = array.list_type()->list_size();
819     if (list_size == 0) {
820       info_.max_def_level++;
821     }
822     info_.max_rep_level++;
823     info_.path.push_back(FixedSizeListNode(FixedSizedRangeSelector{list_size},
824                                            info_.max_rep_level, info_.max_def_level));
825     nullable_in_parent_ = array.list_type()->value_field()->nullable();
826     return VisitInline(*array.values());
827   }
828 
Visit(const::arrow::ExtensionArray & array)829   Status Visit(const ::arrow::ExtensionArray& array) {
830     return VisitInline(*array.storage());
831   }
832 
833 #define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix)                             \
834   Status Visit(const ::arrow::ArrowTypePrefix##Array& array) {             \
835     return Status::NotImplemented("Level generation for " #ArrowTypePrefix \
836                                   " not supported yet");                   \
837   }
838 
839   // Union types aren't supported in Parquet.
NOT_IMPLEMENTED_VISIT(Union)840   NOT_IMPLEMENTED_VISIT(Union)
841 
842 #undef NOT_IMPLEMENTED_VISIT
843   std::vector<PathInfo>& paths() { return paths_; }
844 
845  private:
846   PathInfo info_;
847   std::vector<PathInfo> paths_;
848   bool nullable_in_parent_;
849 };
850 
VisitInline(const Array & array)851 Status PathBuilder::VisitInline(const Array& array) {
852   return ::arrow::VisitArrayInline(array, this);
853 }
854 
855 #undef RETURN_IF_ERROR
856 }  // namespace
857 
858 class MultipathLevelBuilderImpl : public MultipathLevelBuilder {
859  public:
MultipathLevelBuilderImpl(std::shared_ptr<::arrow::ArrayData> data,std::unique_ptr<PathBuilder> path_builder)860   MultipathLevelBuilderImpl(std::shared_ptr<::arrow::ArrayData> data,
861                             std::unique_ptr<PathBuilder> path_builder)
862       : root_range_{0, data->length},
863         data_(std::move(data)),
864         path_builder_(std::move(path_builder)) {}
865 
GetLeafCount() const866   int GetLeafCount() const override {
867     return static_cast<int>(path_builder_->paths().size());
868   }
869 
Write(int leaf_index,ArrowWriteContext * context,CallbackFunction write_leaf_callback)870   ::arrow::Status Write(int leaf_index, ArrowWriteContext* context,
871                         CallbackFunction write_leaf_callback) override {
872     DCHECK_GE(leaf_index, 0);
873     DCHECK_LT(leaf_index, GetLeafCount());
874     return WritePath(root_range_, &path_builder_->paths()[leaf_index], context,
875                      std::move(write_leaf_callback));
876   }
877 
878  private:
879   ElementRange root_range_;
880   // Reference holder to ensure the data stays valid.
881   std::shared_ptr<::arrow::ArrayData> data_;
882   std::unique_ptr<PathBuilder> path_builder_;
883 };
884 
885 // static
Make(const::arrow::Array & array,bool array_field_nullable)886 ::arrow::Result<std::unique_ptr<MultipathLevelBuilder>> MultipathLevelBuilder::Make(
887     const ::arrow::Array& array, bool array_field_nullable) {
888   auto constructor = ::arrow::internal::make_unique<PathBuilder>(array_field_nullable);
889   RETURN_NOT_OK(VisitArrayInline(array, constructor.get()));
890   return ::arrow::internal::make_unique<MultipathLevelBuilderImpl>(
891       array.data(), std::move(constructor));
892 }
893 
894 // static
Write(const Array & array,bool array_field_nullable,ArrowWriteContext * context,MultipathLevelBuilder::CallbackFunction callback)895 Status MultipathLevelBuilder::Write(const Array& array, bool array_field_nullable,
896                                     ArrowWriteContext* context,
897                                     MultipathLevelBuilder::CallbackFunction callback) {
898   ARROW_ASSIGN_OR_RAISE(std::unique_ptr<MultipathLevelBuilder> builder,
899                         MultipathLevelBuilder::Make(array, array_field_nullable));
900   PathBuilder constructor(array_field_nullable);
901   RETURN_NOT_OK(VisitArrayInline(array, &constructor));
902   for (int leaf_idx = 0; leaf_idx < builder->GetLeafCount(); leaf_idx++) {
903     RETURN_NOT_OK(builder->Write(leaf_idx, context, callback));
904   }
905   return Status::OK();
906 }
907 
908 }  // namespace arrow
909 }  // namespace parquet
910