1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // Overview.
19 //
20 // The strategy used for this code for repetition/definition
21 // is to dissect the top level array into a list of paths
22 // from the top level array to the final primitive (possibly
23 // dictionary encoded array). It then evaluates each one of
24 // those paths to produce results for the callback iteratively.
25 //
26 // This approach was taken to reduce the aggregate memory required if we were
27 // to build all def/rep levels in parallel as apart of a tree traversal.  It
28 // also allows for straightforward parallelization at the path level if that is
29 // desired in the future.
30 //
31 // The main downside to this approach is it duplicates effort for nodes
32 // that share common ancestors. This can be mitigated to some degree
33 // by adding in optimizations that detect leaf arrays that share
34 // the same common list ancestor and reuse the repetition levels
35 // from the first leaf encountered (only definition levels greater
36 // the list ancestor need to be re-evaluated. This is left for future
37 // work.
38 //
39 // Algorithm.
40 //
41 // As mentioned above this code dissects arrays into constituent parts:
42 // nullability data, and list offset data. It tries to optimize for
43 // some special cases, where it is known ahead of time that a step
44 // can be skipped (e.g. a nullable array happens to have all of its
45 // values) or batch filled (a nullable array has all null values).
46 // One further optimization that is not implemented but could be done
47 // in the future is special handling for nested list arrays that
48 // have some intermediate data which indicates the final array contains only
49 // nulls.
50 //
51 // In general, the algorithm attempts to batch work at each node as much
52 // as possible.  For nullability nodes this means finding runs of null
53 // values and batch filling those interspersed with finding runs of non-null values
54 // to process in batch at the next column.
55 //
56 // Similarly, list runs of empty lists are all processed in one batch
57 // followed by either:
58 //    - A single list entry for non-terminal lists (i.e. the upper part of a nested list)
59 //    - Runs of non-empty lists for the terminal list (i.e. the lowest part of a nested
60 //    list).
61 //
62 // This makes use of the following observations.
63 // 1.  Null values at any node on the path are terminal (repetition and definition
64 //     level can be set directly when a Null value is encountered).
65 // 2.  Empty lists share this eager termination property with Null values.
66 // 3.  In order to keep repetition/definition level populated the algorithm is lazy
67 //     in assigning repetition levels. The algorithm tracks whether it is currently
68 //     in the middle of a list by comparing the lengths of repetition/definition levels.
69 //     If it is currently in the middle of a list the the number of repetition levels
70 //     populated will be greater than definition levels (the start of a List requires
71 //     adding the first element). If there are equal numbers of definition and repetition
72 //     levels populated this indicates a list is waiting to be started and the next list
73 //     encountered will have its repetition level signify the beginning of the list.
74 //
75 //     Other implementation notes.
76 //
77 //     This code hasn't been benchmarked (or assembly analyzed) but did the following
78 //     as optimizations (yes premature optimization is the root of all evil).
79 //     - This code does not use recursion, instead it constructs its own stack and manages
80 //       updating elements accordingly.
81 //     - It tries to avoid using Status for common return states.
82 //     - Avoids virtual dispatch in favor of if/else statements on a set of well known
83 //     classes.
84 
85 #include "parquet/arrow/path_internal.h"
86 
87 #include <atomic>
88 #include <cstddef>
89 #include <memory>
90 #include <type_traits>
91 #include <utility>
92 #include <vector>
93 
94 #include "arrow/array.h"
95 #include "arrow/buffer.h"
96 #include "arrow/buffer_builder.h"
97 #include "arrow/extension_type.h"
98 #include "arrow/memory_pool.h"
99 #include "arrow/type.h"
100 #include "arrow/type_traits.h"
101 #include "arrow/util/bit_run_reader.h"
102 #include "arrow/util/bit_util.h"
103 #include "arrow/util/bitmap_visit.h"
104 #include "arrow/util/logging.h"
105 #include "arrow/util/macros.h"
106 #include "arrow/util/make_unique.h"
107 #include "arrow/util/variant.h"
108 #include "arrow/visitor_inline.h"
109 #include "parquet/properties.h"
110 
111 namespace parquet {
112 namespace arrow {
113 
114 namespace {
115 
116 using ::arrow::Array;
117 using ::arrow::Status;
118 using ::arrow::TypedBufferBuilder;
119 
120 constexpr static int16_t kLevelNotSet = -1;
121 
122 /// \brief Simple result of a iterating over a column to determine values.
123 enum IterationResult {
124   /// Processing is done at this node. Move back up the path
125   /// to continue processing.
126   kDone = -1,
127   /// Move down towards the leaf for processing.
128   kNext = 1,
129   /// An error occurred while processing.
130   kError = 2
131 };
132 
133 #define RETURN_IF_ERROR(iteration_result)                  \
134   do {                                                     \
135     if (ARROW_PREDICT_FALSE(iteration_result == kError)) { \
136       return iteration_result;                             \
137     }                                                      \
138   } while (false)
139 
LazyNullCount(const Array & array)140 int64_t LazyNullCount(const Array& array) { return array.data()->null_count.load(); }
141 
LazyNoNulls(const Array & array)142 bool LazyNoNulls(const Array& array) {
143   int64_t null_count = LazyNullCount(array);
144   return null_count == 0 ||
145          // kUnkownNullCount comparison is needed to account
146          // for null arrays.
147          (null_count == ::arrow::kUnknownNullCount &&
148           array.null_bitmap_data() == nullptr);
149 }
150 
151 struct PathWriteContext {
PathWriteContextparquet::arrow::__anonedc2b3750111::PathWriteContext152   PathWriteContext(::arrow::MemoryPool* pool,
153                    std::shared_ptr<::arrow::ResizableBuffer> def_levels_buffer)
154       : rep_levels(pool), def_levels(std::move(def_levels_buffer), pool) {}
ReserveDefLevelsparquet::arrow::__anonedc2b3750111::PathWriteContext155   IterationResult ReserveDefLevels(int64_t elements) {
156     last_status = def_levels.Reserve(elements);
157     if (ARROW_PREDICT_TRUE(last_status.ok())) {
158       return kDone;
159     }
160     return kError;
161   }
162 
AppendDefLevelparquet::arrow::__anonedc2b3750111::PathWriteContext163   IterationResult AppendDefLevel(int16_t def_level) {
164     last_status = def_levels.Append(def_level);
165     if (ARROW_PREDICT_TRUE(last_status.ok())) {
166       return kDone;
167     }
168     return kError;
169   }
170 
AppendDefLevelsparquet::arrow::__anonedc2b3750111::PathWriteContext171   IterationResult AppendDefLevels(int64_t count, int16_t def_level) {
172     last_status = def_levels.Append(count, def_level);
173     if (ARROW_PREDICT_TRUE(last_status.ok())) {
174       return kDone;
175     }
176     return kError;
177   }
178 
UnsafeAppendDefLevelparquet::arrow::__anonedc2b3750111::PathWriteContext179   void UnsafeAppendDefLevel(int16_t def_level) { def_levels.UnsafeAppend(def_level); }
180 
AppendRepLevelparquet::arrow::__anonedc2b3750111::PathWriteContext181   IterationResult AppendRepLevel(int16_t rep_level) {
182     last_status = rep_levels.Append(rep_level);
183 
184     if (ARROW_PREDICT_TRUE(last_status.ok())) {
185       return kDone;
186     }
187     return kError;
188   }
189 
AppendRepLevelsparquet::arrow::__anonedc2b3750111::PathWriteContext190   IterationResult AppendRepLevels(int64_t count, int16_t rep_level) {
191     last_status = rep_levels.Append(count, rep_level);
192     if (ARROW_PREDICT_TRUE(last_status.ok())) {
193       return kDone;
194     }
195     return kError;
196   }
197 
EqualRepDefLevelsLengthsparquet::arrow::__anonedc2b3750111::PathWriteContext198   bool EqualRepDefLevelsLengths() const {
199     return rep_levels.length() == def_levels.length();
200   }
201 
202   // Incorporates |range| into visited elements. If the |range| is contiguous
203   // with the last range, extend the last range, otherwise add |range| separately
204   // tot he list.
RecordPostListVisitparquet::arrow::__anonedc2b3750111::PathWriteContext205   void RecordPostListVisit(const ElementRange& range) {
206     if (!visited_elements.empty() && range.start == visited_elements.back().end) {
207       visited_elements.back().end = range.end;
208       return;
209     }
210     visited_elements.push_back(range);
211   }
212 
213   Status last_status;
214   TypedBufferBuilder<int16_t> rep_levels;
215   TypedBufferBuilder<int16_t> def_levels;
216   std::vector<ElementRange> visited_elements;
217 };
218 
FillRepLevels(int64_t count,int16_t rep_level,PathWriteContext * context)219 IterationResult FillRepLevels(int64_t count, int16_t rep_level,
220                               PathWriteContext* context) {
221   if (rep_level == kLevelNotSet) {
222     return kDone;
223   }
224   int64_t fill_count = count;
225   // This condition occurs (rep and dep levels equals), in one of
226   // in a few cases:
227   // 1.  Before any list is encountered.
228   // 2.  After rep-level has been filled in due to null/empty
229   //     values above it.
230   // 3.  After finishing a list.
231   if (!context->EqualRepDefLevelsLengths()) {
232     fill_count--;
233   }
234   return context->AppendRepLevels(fill_count, rep_level);
235 }
236 
237 // A node for handling an array that is discovered to have all
238 // null elements. It is referred to as a TerminalNode because
239 // traversal of nodes will not continue it when generating
240 // rep/def levels. However, there could be many nested children
241 // elements beyond it in the Array that is being processed.
242 class AllNullsTerminalNode {
243  public:
AllNullsTerminalNode(int16_t def_level,int16_t rep_level=kLevelNotSet)244   explicit AllNullsTerminalNode(int16_t def_level, int16_t rep_level = kLevelNotSet)
245       : def_level_(def_level), rep_level_(rep_level) {}
SetRepLevelIfNull(int16_t rep_level)246   void SetRepLevelIfNull(int16_t rep_level) { rep_level_ = rep_level; }
Run(const ElementRange & range,PathWriteContext * context)247   IterationResult Run(const ElementRange& range, PathWriteContext* context) {
248     int64_t size = range.Size();
249     RETURN_IF_ERROR(FillRepLevels(size, rep_level_, context));
250     return context->AppendDefLevels(size, def_level_);
251   }
252 
253  private:
254   int16_t def_level_;
255   int16_t rep_level_;
256 };
257 
258 // Handles the case where all remaining arrays until the leaf have no nulls
259 // (and are not interrupted by lists). Unlike AllNullsTerminalNode this is
260 // always the last node in a path. We don't need an analogue to the AllNullsTerminalNode
261 // because if all values are present at an intermediate array no node is added for it
262 // (the def-level for the next nullable node is incremented).
263 struct AllPresentTerminalNode {
Runparquet::arrow::__anonedc2b3750111::AllPresentTerminalNode264   IterationResult Run(const ElementRange& range, PathWriteContext* context) {
265     return context->AppendDefLevels(range.end - range.start, def_level);
266     // No need to worry about rep levels, because this state should
267     // only be applicable for after all list/repeated values
268     // have been evaluated in the path.
269   }
270   int16_t def_level;
271 };
272 
273 /// Node for handling the case when the leaf-array is nullable
274 /// and contains null elements.
275 struct NullableTerminalNode {
276   NullableTerminalNode() = default;
277 
NullableTerminalNodeparquet::arrow::__anonedc2b3750111::NullableTerminalNode278   NullableTerminalNode(const uint8_t* bitmap, int64_t element_offset,
279                        int16_t def_level_if_present)
280       : bitmap_(bitmap),
281         element_offset_(element_offset),
282         def_level_if_present_(def_level_if_present),
283         def_level_if_null_(def_level_if_present - 1) {}
284 
Runparquet::arrow::__anonedc2b3750111::NullableTerminalNode285   IterationResult Run(const ElementRange& range, PathWriteContext* context) {
286     int64_t elements = range.Size();
287     RETURN_IF_ERROR(context->ReserveDefLevels(elements));
288 
289     DCHECK_GT(elements, 0);
290 
291     auto bit_visitor = [&](bool is_set) {
292       context->UnsafeAppendDefLevel(is_set ? def_level_if_present_ : def_level_if_null_);
293     };
294 
295     if (elements > 16) {  // 16 guarantees at least one unrolled loop.
296       ::arrow::internal::VisitBitsUnrolled(bitmap_, range.start + element_offset_,
297                                            elements, bit_visitor);
298     } else {
299       ::arrow::internal::VisitBits(bitmap_, range.start + element_offset_, elements,
300                                    bit_visitor);
301     }
302     return kDone;
303   }
304   const uint8_t* bitmap_;
305   int64_t element_offset_;
306   int16_t def_level_if_present_;
307   int16_t def_level_if_null_;
308 };
309 
310 // List nodes handle populating rep_level for Arrow Lists and def-level for empty lists.
311 // Nullability (both list and children) is handled by other Nodes. By
312 // construction all list nodes will be intermediate nodes (they will always be followed by
313 // at least one other node).
314 //
315 // Type parameters:
316 //    |RangeSelector| - A strategy for determine the the range of the child node to
317 //    process.
318 //       this varies depending on the type of list (int32_t* offsets, int64_t* offsets of
319 //       fixed.
320 template <typename RangeSelector>
321 class ListPathNode {
322  public:
ListPathNode(RangeSelector selector,int16_t rep_lev,int16_t def_level_if_empty)323   ListPathNode(RangeSelector selector, int16_t rep_lev, int16_t def_level_if_empty)
324       : selector_(std::move(selector)),
325         prev_rep_level_(rep_lev - 1),
326         rep_level_(rep_lev),
327         def_level_if_empty_(def_level_if_empty) {}
328 
rep_level() const329   int16_t rep_level() const { return rep_level_; }
330 
Run(ElementRange * range,ElementRange * child_range,PathWriteContext * context)331   IterationResult Run(ElementRange* range, ElementRange* child_range,
332                       PathWriteContext* context) {
333     if (range->Empty()) {
334       return kDone;
335     }
336     // Find the first non-empty list (skipping a run of empties).
337     int64_t empty_elements = 0;
338     do {
339       // Retrieve the range of elements that this list contains.
340       *child_range = selector_.GetRange(range->start);
341       if (!child_range->Empty()) {
342         break;
343       }
344       ++empty_elements;
345       ++range->start;
346     } while (!range->Empty());
347 
348     // Post condition:
349     //   * range is either empty (we are done processing at this node)
350     //     or start corresponds a non-empty list.
351     //   * If range is non-empty child_range contains
352     //     the bounds of non-empty list.
353 
354     // Handle any skipped over empty lists.
355     if (empty_elements > 0) {
356       RETURN_IF_ERROR(FillRepLevels(empty_elements, prev_rep_level_, context));
357       RETURN_IF_ERROR(context->AppendDefLevels(empty_elements, def_level_if_empty_));
358     }
359     // Start of a new list. Note that for nested lists adding the element
360     // here effectively suppresses this code until we either encounter null
361     // elements or empty lists between here and the innermost list (since
362     // we make the rep levels repetition and definition levels unequal).
363     // Similarly when we are backtracking up the stack the repetition and
364     // definition levels are again equal so if we encounter an intermediate list
365     // with more elements this will detect it as a new list.
366     if (context->EqualRepDefLevelsLengths() && !range->Empty()) {
367       RETURN_IF_ERROR(context->AppendRepLevel(prev_rep_level_));
368     }
369 
370     if (range->Empty()) {
371       return kDone;
372     }
373 
374     ++range->start;
375     if (is_last_) {
376       // If this is the last repeated node, we can extend try
377       // to extend the child range as wide as possible before
378       // continuing to the next node.
379       return FillForLast(range, child_range, context);
380     }
381     return kNext;
382   }
383 
SetLast()384   void SetLast() { is_last_ = true; }
385 
386  private:
FillForLast(ElementRange * range,ElementRange * child_range,PathWriteContext * context)387   IterationResult FillForLast(ElementRange* range, ElementRange* child_range,
388                               PathWriteContext* context) {
389     // First fill int the remainder of the list.
390     RETURN_IF_ERROR(FillRepLevels(child_range->Size(), rep_level_, context));
391     // Once we've reached this point the following preconditions should hold:
392     // 1.  There are no more repeated path nodes to deal with.
393     // 2.  All elements in |range| represent contiguous elements in the
394     //     child array (Null values would have shortened the range to ensure
395     //     all remaining list elements are present (though they may be empty lists)).
396     // 3.  No element of range spans a parent list (intermediate
397     //     list nodes only handle one list entry at a time).
398     //
399     // Given these preconditions it should be safe to fill runs on non-empty
400     // lists here and expand the range in the child node accordingly.
401 
402     while (!range->Empty()) {
403       ElementRange size_check = selector_.GetRange(range->start);
404       if (size_check.Empty()) {
405         // The empty range will need to be handled after we pass down the accumulated
406         // range because it affects def_level placement and we need to get the children
407         // def_levels entered first.
408         break;
409       }
410       // This is the start of a new list. We can be sure it only applies
411       // to the previous list (and doesn't jump to the start of any list
412       // further up in nesting due to the constraints mentioned at the start
413       // of the function).
414       RETURN_IF_ERROR(context->AppendRepLevel(prev_rep_level_));
415       RETURN_IF_ERROR(context->AppendRepLevels(size_check.Size() - 1, rep_level_));
416       DCHECK_EQ(size_check.start, child_range->end)
417           << size_check.start << " != " << child_range->end;
418       child_range->end = size_check.end;
419       ++range->start;
420     }
421 
422     // Do book-keeping to track the elements of the arrays that are actually visited
423     // beyond this point.  This is necessary to identify "gaps" in values that should
424     // not be processed (written out to parquet).
425     context->RecordPostListVisit(*child_range);
426     return kNext;
427   }
428 
429   RangeSelector selector_;
430   int16_t prev_rep_level_;
431   int16_t rep_level_;
432   int16_t def_level_if_empty_;
433   bool is_last_ = false;
434 };
435 
436 template <typename OffsetType>
437 struct VarRangeSelector {
GetRangeparquet::arrow::__anonedc2b3750111::VarRangeSelector438   ElementRange GetRange(int64_t index) const {
439     return ElementRange{offsets[index], offsets[index + 1]};
440   }
441 
442   // Either int32_t* or int64_t*.
443   const OffsetType* offsets;
444 };
445 
446 struct FixedSizedRangeSelector {
GetRangeparquet::arrow::__anonedc2b3750111::FixedSizedRangeSelector447   ElementRange GetRange(int64_t index) const {
448     int64_t start = index * list_size;
449     return ElementRange{start, start + list_size};
450   }
451   int list_size;
452 };
453 
454 // An intermediate node that handles null values.
455 class NullableNode {
456  public:
NullableNode(const uint8_t * null_bitmap,int64_t entry_offset,int16_t def_level_if_null,int16_t rep_level_if_null=kLevelNotSet)457   NullableNode(const uint8_t* null_bitmap, int64_t entry_offset,
458                int16_t def_level_if_null, int16_t rep_level_if_null = kLevelNotSet)
459       : null_bitmap_(null_bitmap),
460         entry_offset_(entry_offset),
461         valid_bits_reader_(MakeReader(ElementRange{0, 0})),
462         def_level_if_null_(def_level_if_null),
463         rep_level_if_null_(rep_level_if_null),
464         new_range_(true) {}
465 
SetRepLevelIfNull(int16_t rep_level)466   void SetRepLevelIfNull(int16_t rep_level) { rep_level_if_null_ = rep_level; }
467 
MakeReader(const ElementRange & range)468   ::arrow::internal::BitRunReader MakeReader(const ElementRange& range) {
469     return ::arrow::internal::BitRunReader(null_bitmap_, entry_offset_ + range.start,
470                                            range.Size());
471   }
472 
Run(ElementRange * range,ElementRange * child_range,PathWriteContext * context)473   IterationResult Run(ElementRange* range, ElementRange* child_range,
474                       PathWriteContext* context) {
475     if (new_range_) {
476       // Reset the reader each time we are starting fresh on a range.
477       // We can't rely on continuity because nulls above can
478       // cause discontinuities.
479       valid_bits_reader_ = MakeReader(*range);
480     }
481     child_range->start = range->start;
482     ::arrow::internal::BitRun run = valid_bits_reader_.NextRun();
483     if (!run.set) {
484       range->start += run.length;
485       RETURN_IF_ERROR(FillRepLevels(run.length, rep_level_if_null_, context));
486       RETURN_IF_ERROR(context->AppendDefLevels(run.length, def_level_if_null_));
487       run = valid_bits_reader_.NextRun();
488     }
489     if (range->Empty()) {
490       new_range_ = true;
491       return kDone;
492     }
493     child_range->end = child_range->start = range->start;
494     child_range->end += run.length;
495 
496     DCHECK(!child_range->Empty());
497     range->start += child_range->Size();
498     new_range_ = false;
499     return kNext;
500   }
501 
502   const uint8_t* null_bitmap_;
503   int64_t entry_offset_;
504   ::arrow::internal::BitRunReader valid_bits_reader_;
505   int16_t def_level_if_null_;
506   int16_t rep_level_if_null_;
507 
508   // Whether the next invocation will be a new range.
509   bool new_range_ = true;
510 };
511 
512 using ListNode = ListPathNode<VarRangeSelector<int32_t>>;
513 using LargeListNode = ListPathNode<VarRangeSelector<int64_t>>;
514 using FixedSizeListNode = ListPathNode<FixedSizedRangeSelector>;
515 
516 // Contains static information derived from traversing the schema.
517 struct PathInfo {
518   // The vectors are expected to the same length info.
519 
520   // Note index order matters here.
521   using Node = ::arrow::util::Variant<NullableTerminalNode, ListNode, LargeListNode,
522                                       FixedSizeListNode, NullableNode,
523                                       AllPresentTerminalNode, AllNullsTerminalNode>;
524 
525   std::vector<Node> path;
526   std::shared_ptr<Array> primitive_array;
527   int16_t max_def_level = 0;
528   int16_t max_rep_level = 0;
529   bool has_dictionary = false;
530   bool leaf_is_nullable = false;
531 };
532 
533 /// Contains logic for writing a single leaf node to parquet.
534 /// This tracks the path from root to leaf.
535 ///
536 /// |writer| will be called after all of the definition/repetition
537 /// values have been calculated for root_range with the calculated
538 /// values. It is intended to abstract the complexity of writing
539 /// the levels and values to parquet.
WritePath(ElementRange root_range,PathInfo * path_info,ArrowWriteContext * arrow_context,MultipathLevelBuilder::CallbackFunction writer)540 Status WritePath(ElementRange root_range, PathInfo* path_info,
541                  ArrowWriteContext* arrow_context,
542                  MultipathLevelBuilder::CallbackFunction writer) {
543   std::vector<ElementRange> stack(path_info->path.size());
544   MultipathLevelBuilderResult builder_result;
545   builder_result.leaf_array = path_info->primitive_array;
546   builder_result.leaf_is_nullable = path_info->leaf_is_nullable;
547 
548   if (path_info->max_def_level == 0) {
549     // This case only occurs when there are no nullable or repeated
550     // columns in the path from the root to leaf.
551     int64_t leaf_length = builder_result.leaf_array->length();
552     builder_result.def_rep_level_count = leaf_length;
553     builder_result.post_list_visited_elements.push_back({0, leaf_length});
554     return writer(builder_result);
555   }
556   stack[0] = root_range;
557   RETURN_NOT_OK(
558       arrow_context->def_levels_buffer->Resize(/*new_size=*/0, /*shrink_to_fit*/ false));
559   PathWriteContext context(arrow_context->memory_pool, arrow_context->def_levels_buffer);
560   // We should need at least this many entries so reserve the space ahead of time.
561   RETURN_NOT_OK(context.def_levels.Reserve(root_range.Size()));
562   if (path_info->max_rep_level > 0) {
563     RETURN_NOT_OK(context.rep_levels.Reserve(root_range.Size()));
564   }
565 
566   auto stack_base = &stack[0];
567   auto stack_position = stack_base;
568   // This is the main loop for calculated rep/def levels. The nodes
569   // in the path implement a chain-of-responsibility like pattern
570   // where each node can add some number of repetition/definition
571   // levels to PathWriteContext and also delegate to the next node
572   // in the path to add values. The values are added through each Run(...)
573   // call and the choice to delegate to the next node (or return to the
574   // previous node) is communicated by the return value of Run(...).
575   // The loop terminates after the first node indicates all values in
576   // |root_range| are processed.
577   while (stack_position >= stack_base) {
578     PathInfo::Node& node = path_info->path[stack_position - stack_base];
579     struct {
580       IterationResult operator()(NullableNode* node) {
581         return node->Run(stack_position, stack_position + 1, context);
582       }
583       IterationResult operator()(ListNode* node) {
584         return node->Run(stack_position, stack_position + 1, context);
585       }
586       IterationResult operator()(NullableTerminalNode* node) {
587         return node->Run(*stack_position, context);
588       }
589       IterationResult operator()(FixedSizeListNode* node) {
590         return node->Run(stack_position, stack_position + 1, context);
591       }
592       IterationResult operator()(AllPresentTerminalNode* node) {
593         return node->Run(*stack_position, context);
594       }
595       IterationResult operator()(AllNullsTerminalNode* node) {
596         return node->Run(*stack_position, context);
597       }
598       IterationResult operator()(LargeListNode* node) {
599         return node->Run(stack_position, stack_position + 1, context);
600       }
601       ElementRange* stack_position;
602       PathWriteContext* context;
603     } visitor = {stack_position, &context};
604 
605     IterationResult result = ::arrow::util::visit(visitor, &node);
606 
607     if (ARROW_PREDICT_FALSE(result == kError)) {
608       DCHECK(!context.last_status.ok());
609       return context.last_status;
610     }
611     stack_position += static_cast<int>(result);
612   }
613   RETURN_NOT_OK(context.last_status);
614   builder_result.def_rep_level_count = context.def_levels.length();
615 
616   if (context.rep_levels.length() > 0) {
617     // This case only occurs when there was a repeated element that needs to be
618     // processed.
619     builder_result.rep_levels = context.rep_levels.data();
620     std::swap(builder_result.post_list_visited_elements, context.visited_elements);
621     // If it is possible when processing lists that all lists where empty. In this
622     // case no elements would have been added to post_list_visited_elements. By
623     // added an empty element we avoid special casing in downstream consumers.
624     if (builder_result.post_list_visited_elements.empty()) {
625       builder_result.post_list_visited_elements.push_back({0, 0});
626     }
627   } else {
628     builder_result.post_list_visited_elements.push_back(
629         {0, builder_result.leaf_array->length()});
630     builder_result.rep_levels = nullptr;
631   }
632 
633   builder_result.def_levels = context.def_levels.data();
634   return writer(builder_result);
635 }
636 
637 struct FixupVisitor {
638   int max_rep_level = -1;
639   int16_t rep_level_if_null = kLevelNotSet;
640 
641   template <typename T>
HandleListNodeparquet::arrow::__anonedc2b3750111::FixupVisitor642   void HandleListNode(T* arg) {
643     if (arg->rep_level() == max_rep_level) {
644       arg->SetLast();
645       // after the last list node we don't need to fill
646       // rep levels on null.
647       rep_level_if_null = kLevelNotSet;
648     } else {
649       rep_level_if_null = arg->rep_level();
650     }
651   }
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor652   void operator()(ListNode* node) { HandleListNode(node); }
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor653   void operator()(LargeListNode* node) { HandleListNode(node); }
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor654   void operator()(FixedSizeListNode* node) { HandleListNode(node); }
655 
656   // For non-list intermediate nodes.
657   template <typename T>
HandleIntermediateNodeparquet::arrow::__anonedc2b3750111::FixupVisitor658   void HandleIntermediateNode(T* arg) {
659     if (rep_level_if_null != kLevelNotSet) {
660       arg->SetRepLevelIfNull(rep_level_if_null);
661     }
662   }
663 
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor664   void operator()(NullableNode* arg) { HandleIntermediateNode(arg); }
665 
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor666   void operator()(AllNullsTerminalNode* arg) {
667     // Even though no processing happens past this point we
668     // still need to adjust it if a list occurred after an
669     // all null array.
670     HandleIntermediateNode(arg);
671   }
672 
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor673   void operator()(NullableTerminalNode*) {}
operator ()parquet::arrow::__anonedc2b3750111::FixupVisitor674   void operator()(AllPresentTerminalNode*) {}
675 };
676 
Fixup(PathInfo info)677 PathInfo Fixup(PathInfo info) {
678   // We only need to fixup the path if there were repeated
679   // elements on it.
680   if (info.max_rep_level == 0) {
681     return info;
682   }
683   FixupVisitor visitor;
684   visitor.max_rep_level = info.max_rep_level;
685   if (visitor.max_rep_level > 0) {
686     visitor.rep_level_if_null = 0;
687   }
688   for (size_t x = 0; x < info.path.size(); x++) {
689     ::arrow::util::visit(visitor, &info.path[x]);
690   }
691   return info;
692 }
693 
694 class PathBuilder {
695  public:
PathBuilder(bool start_nullable)696   explicit PathBuilder(bool start_nullable) : nullable_in_parent_(start_nullable) {}
697   template <typename T>
AddTerminalInfo(const T & array)698   void AddTerminalInfo(const T& array) {
699     info_.leaf_is_nullable = nullable_in_parent_;
700     if (nullable_in_parent_) {
701       info_.max_def_level++;
702     }
703     // We don't use null_count() because if the null_count isn't known
704     // and the array does in fact contain nulls, we will end up
705     // traversing the null bitmap twice (once here and once when calculating
706     // rep/def levels).
707     if (LazyNoNulls(array)) {
708       info_.path.emplace_back(AllPresentTerminalNode{info_.max_def_level});
709     } else if (LazyNullCount(array) == array.length()) {
710       info_.path.emplace_back(AllNullsTerminalNode(info_.max_def_level - 1));
711     } else {
712       info_.path.emplace_back(NullableTerminalNode(array.null_bitmap_data(),
713                                                    array.offset(), info_.max_def_level));
714     }
715     info_.primitive_array = std::make_shared<T>(array.data());
716     paths_.push_back(Fixup(info_));
717   }
718 
719   template <typename T>
Visit(const T & array)720   ::arrow::enable_if_t<std::is_base_of<::arrow::FlatArray, T>::value, Status> Visit(
721       const T& array) {
722     AddTerminalInfo(array);
723     return Status::OK();
724   }
725 
726   template <typename T>
727   ::arrow::enable_if_t<std::is_same<::arrow::ListArray, T>::value ||
728                            std::is_same<::arrow::LargeListArray, T>::value,
729                        Status>
Visit(const T & array)730   Visit(const T& array) {
731     MaybeAddNullable(array);
732     // Increment necessary due to empty lists.
733     info_.max_def_level++;
734     info_.max_rep_level++;
735     // raw_value_offsets() accounts for any slice offset.
736     ListPathNode<VarRangeSelector<typename T::offset_type>> node(
737         VarRangeSelector<typename T::offset_type>{array.raw_value_offsets()},
738         info_.max_rep_level, info_.max_def_level - 1);
739     info_.path.emplace_back(std::move(node));
740     nullable_in_parent_ = array.list_type()->value_field()->nullable();
741     return VisitInline(*array.values());
742   }
743 
Visit(const::arrow::DictionaryArray & array)744   Status Visit(const ::arrow::DictionaryArray& array) {
745     // Only currently handle DictionaryArray where the dictionary is a
746     // primitive type
747     if (array.dict_type()->value_type()->num_fields() > 0) {
748       return Status::NotImplemented(
749           "Writing DictionaryArray with nested dictionary "
750           "type not yet supported");
751     }
752     if (array.dictionary()->null_count() > 0) {
753       return Status::NotImplemented(
754           "Writing DictionaryArray with null encoded in dictionary "
755           "type not yet supported");
756     }
757     AddTerminalInfo(array);
758     return Status::OK();
759   }
760 
MaybeAddNullable(const Array & array)761   void MaybeAddNullable(const Array& array) {
762     if (!nullable_in_parent_) {
763       return;
764     }
765     info_.max_def_level++;
766     // We don't use null_count() because if the null_count isn't known
767     // and the array does in fact contain nulls, we will end up
768     // traversing the null bitmap twice (once here and once when calculating
769     // rep/def levels). Because this isn't terminal this might not be
770     // the right decision for structs that share the same nullable
771     // parents.
772     if (LazyNoNulls(array)) {
773       // Don't add anything because there won't be any point checking
774       // null values for the array.  There will always be at least
775       // one more array to handle nullability.
776       return;
777     }
778     if (LazyNullCount(array) == array.length()) {
779       info_.path.emplace_back(AllNullsTerminalNode(info_.max_def_level - 1));
780       return;
781     }
782     info_.path.emplace_back(
783         NullableNode(array.null_bitmap_data(), array.offset(),
784                      /* def_level_if_null = */ info_.max_def_level - 1));
785   }
786 
787   Status VisitInline(const Array& array);
788 
Visit(const::arrow::MapArray & array)789   Status Visit(const ::arrow::MapArray& array) {
790     return Visit(static_cast<const ::arrow::ListArray&>(array));
791   }
792 
Visit(const::arrow::StructArray & array)793   Status Visit(const ::arrow::StructArray& array) {
794     MaybeAddNullable(array);
795     PathInfo info_backup = info_;
796     for (int x = 0; x < array.num_fields(); x++) {
797       nullable_in_parent_ = array.type()->field(x)->nullable();
798       RETURN_NOT_OK(VisitInline(*array.field(x)));
799       info_ = info_backup;
800     }
801     return Status::OK();
802   }
803 
Visit(const::arrow::FixedSizeListArray & array)804   Status Visit(const ::arrow::FixedSizeListArray& array) {
805     MaybeAddNullable(array);
806     int32_t list_size = array.list_type()->list_size();
807     // Technically we could encode fixed size lists with two level encodings
808     // but since we always use 3 level encoding we increment def levels as
809     // well.
810     info_.max_def_level++;
811     info_.max_rep_level++;
812     info_.path.emplace_back(FixedSizeListNode(FixedSizedRangeSelector{list_size},
813                                               info_.max_rep_level, info_.max_def_level));
814     nullable_in_parent_ = array.list_type()->value_field()->nullable();
815     if (array.offset() > 0) {
816       return VisitInline(*array.values()->Slice(array.value_offset(0)));
817     }
818     return VisitInline(*array.values());
819   }
820 
Visit(const::arrow::ExtensionArray & array)821   Status Visit(const ::arrow::ExtensionArray& array) {
822     return VisitInline(*array.storage());
823   }
824 
825 #define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix)                             \
826   Status Visit(const ::arrow::ArrowTypePrefix##Array& array) {             \
827     return Status::NotImplemented("Level generation for " #ArrowTypePrefix \
828                                   " not supported yet");                   \
829   }
830 
831   // Union types aren't supported in Parquet.
NOT_IMPLEMENTED_VISIT(Union)832   NOT_IMPLEMENTED_VISIT(Union)
833 
834 #undef NOT_IMPLEMENTED_VISIT
835   std::vector<PathInfo>& paths() { return paths_; }
836 
837  private:
838   PathInfo info_;
839   std::vector<PathInfo> paths_;
840   bool nullable_in_parent_;
841 };
842 
VisitInline(const Array & array)843 Status PathBuilder::VisitInline(const Array& array) {
844   return ::arrow::VisitArrayInline(array, this);
845 }
846 
847 #undef RETURN_IF_ERROR
848 }  // namespace
849 
850 class MultipathLevelBuilderImpl : public MultipathLevelBuilder {
851  public:
MultipathLevelBuilderImpl(std::shared_ptr<::arrow::ArrayData> data,std::unique_ptr<PathBuilder> path_builder)852   MultipathLevelBuilderImpl(std::shared_ptr<::arrow::ArrayData> data,
853                             std::unique_ptr<PathBuilder> path_builder)
854       : root_range_{0, data->length},
855         data_(std::move(data)),
856         path_builder_(std::move(path_builder)) {}
857 
GetLeafCount() const858   int GetLeafCount() const override {
859     return static_cast<int>(path_builder_->paths().size());
860   }
861 
Write(int leaf_index,ArrowWriteContext * context,CallbackFunction write_leaf_callback)862   ::arrow::Status Write(int leaf_index, ArrowWriteContext* context,
863                         CallbackFunction write_leaf_callback) override {
864     DCHECK_GE(leaf_index, 0);
865     DCHECK_LT(leaf_index, GetLeafCount());
866     return WritePath(root_range_, &path_builder_->paths()[leaf_index], context,
867                      std::move(write_leaf_callback));
868   }
869 
870  private:
871   ElementRange root_range_;
872   // Reference holder to ensure the data stays valid.
873   std::shared_ptr<::arrow::ArrayData> data_;
874   std::unique_ptr<PathBuilder> path_builder_;
875 };
876 
877 // static
Make(const::arrow::Array & array,bool array_field_nullable)878 ::arrow::Result<std::unique_ptr<MultipathLevelBuilder>> MultipathLevelBuilder::Make(
879     const ::arrow::Array& array, bool array_field_nullable) {
880   auto constructor = ::arrow::internal::make_unique<PathBuilder>(array_field_nullable);
881   RETURN_NOT_OK(VisitArrayInline(array, constructor.get()));
882   return ::arrow::internal::make_unique<MultipathLevelBuilderImpl>(
883       array.data(), std::move(constructor));
884 }
885 
886 // static
Write(const Array & array,bool array_field_nullable,ArrowWriteContext * context,MultipathLevelBuilder::CallbackFunction callback)887 Status MultipathLevelBuilder::Write(const Array& array, bool array_field_nullable,
888                                     ArrowWriteContext* context,
889                                     MultipathLevelBuilder::CallbackFunction callback) {
890   ARROW_ASSIGN_OR_RAISE(std::unique_ptr<MultipathLevelBuilder> builder,
891                         MultipathLevelBuilder::Make(array, array_field_nullable));
892   PathBuilder constructor(array_field_nullable);
893   RETURN_NOT_OK(VisitArrayInline(array, &constructor));
894   for (int leaf_idx = 0; leaf_idx < builder->GetLeafCount(); leaf_idx++) {
895     RETURN_NOT_OK(builder->Write(leaf_idx, context, callback));
896   }
897   return Status::OK();
898 }
899 
900 }  // namespace arrow
901 }  // namespace parquet
902