1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // Overview.
19 //
20 // The strategy used for this code for repetition/definition
21 // is to dissect the top level array into a list of paths
22 // from the top level array to the final primitive (possibly
23 // dictionary encoded array). It then evaluates each one of
24 // those paths to produce results for the callback iteratively.
25 //
26 // This approach was taken to reduce the aggregate memory required if we were
27 // to build all def/rep levels in parallel as apart of a tree traversal. It
28 // also allows for straightforward parallelization at the path level if that is
29 // desired in the future.
30 //
31 // The main downside to this approach is it duplicates effort for nodes
32 // that share common ancestors. This can be mitigated to some degree
33 // by adding in optimizations that detect leaf arrays that share
34 // the same common list ancestor and reuse the repetition levels
35 // from the first leaf encountered (only definition levels greater
36 // the list ancestor need to be re-evaluated. This is left for future
37 // work.
38 //
39 // Algorithm.
40 //
41 // As mentioned above this code dissects arrays into constituent parts:
42 // nullability data, and list offset data. It tries to optimize for
43 // some special cases, where it is known ahead of time that a step
44 // can be skipped (e.g. a nullable array happens to have all of its
45 // values) or batch filled (a nullable array has all null values).
46 // One further optimization that is not implemented but could be done
47 // in the future is special handling for nested list arrays that
48 // have some intermediate data which indicates the final array contains only
49 // nulls.
50 //
51 // In general, the algorithm attempts to batch work at each node as much
52 // as possible. For nullability nodes this means finding runs of null
53 // values and batch filling those interspersed with finding runs of non-null values
54 // to process in batch at the next column.
55 //
56 // Similarly, list runs of empty lists are all processed in one batch
57 // followed by either:
58 // - A single list entry for non-terminal lists (i.e. the upper part of a nested list)
59 // - Runs of non-empty lists for the terminal list (i.e. the lowest part of a nested
60 // list).
61 //
62 // This makes use of the following observations.
63 // 1. Null values at any node on the path are terminal (repetition and definition
64 // level can be set directly when a Null value is encountered).
65 // 2. Empty lists share this eager termination property with Null values.
66 // 3. In order to keep repetition/definition level populated the algorithm is lazy
67 // in assigning repetition levels. The algorithm tracks whether it is currently
68 // in the middle of a list by comparing the lengths of repetition/definition levels.
69 // If it is currently in the middle of a list the the number of repetition levels
70 // populated will be greater than definition levels (the start of a List requires
71 // adding the first element). If there are equal numbers of definition and repetition
72 // levels populated this indicates a list is waiting to be started and the next list
73 // encountered will have its repetition level signify the beginning of the list.
74 //
75 // Other implementation notes.
76 //
77 // This code hasn't been benchmarked (or assembly analyzed) but did the following
78 // as optimizations (yes premature optimization is the root of all evil).
79 // - This code does not use recursion, instead it constructs its own stack and manages
80 // updating elements accordingly.
81 // - It tries to avoid using Status for common return states.
82 // - Avoids virtual dispatch in favor of if/else statements on a set of well known
83 // classes.
84
85 #include "parquet/arrow/path_internal.h"
86
87 #include <atomic>
88 #include <cstddef>
89 #include <memory>
90 #include <type_traits>
91 #include <utility>
92 #include <vector>
93
94 #include "arrow/array.h"
95 #include "arrow/buffer.h"
96 #include "arrow/buffer_builder.h"
97 #include "arrow/extension_type.h"
98 #include "arrow/memory_pool.h"
99 #include "arrow/type.h"
100 #include "arrow/type_traits.h"
101 #include "arrow/util/bit_util.h"
102 #include "arrow/util/logging.h"
103 #include "arrow/util/macros.h"
104 #include "arrow/util/make_unique.h"
105 #include "arrow/util/variant.h"
106 #include "arrow/visitor_inline.h"
107
108 #include "parquet/properties.h"
109
110 namespace parquet {
111 namespace arrow {
112
113 namespace {
114
115 using ::arrow::Array;
116 using ::arrow::Status;
117 using ::arrow::TypedBufferBuilder;
118 using ::arrow::util::holds_alternative;
119
120 constexpr static int16_t kLevelNotSet = -1;
121
122 /// \brief Simple result of a iterating over a column to determine values.
123 enum IterationResult {
124 /// Processing is done at this node. Move back up the path
125 /// to continue processing.
126 kDone = -1,
127 /// Move down towards the leaf for processing.
128 kNext = 1,
129 /// An error occurred while processing.
130 kError = 2
131 };
132
133 #define RETURN_IF_ERROR(iteration_result) \
134 do { \
135 if (ARROW_PREDICT_FALSE(iteration_result == kError)) { \
136 return iteration_result; \
137 } \
138 } while (false)
139
LazyNullCount(const Array & array)140 int64_t LazyNullCount(const Array& array) { return array.data()->null_count.load(); }
141
LazyNoNulls(const Array & array)142 bool LazyNoNulls(const Array& array) {
143 int64_t null_count = LazyNullCount(array);
144 return null_count == 0 ||
145 // kUnkownNullCount comparison is needed to account
146 // for null arrays.
147 (null_count == ::arrow::kUnknownNullCount &&
148 array.null_bitmap_data() == nullptr);
149 }
150
151 struct PathWriteContext {
PathWriteContextparquet::arrow::__anonaab4c0630111::PathWriteContext152 PathWriteContext(::arrow::MemoryPool* pool,
153 std::shared_ptr<::arrow::ResizableBuffer> def_levels_buffer)
154 : rep_levels(pool), def_levels(std::move(def_levels_buffer), pool) {}
ReserveDefLevelsparquet::arrow::__anonaab4c0630111::PathWriteContext155 IterationResult ReserveDefLevels(int64_t elements) {
156 last_status = def_levels.Reserve(elements);
157 if (ARROW_PREDICT_TRUE(last_status.ok())) {
158 return kDone;
159 }
160 return kError;
161 }
162
AppendDefLevelparquet::arrow::__anonaab4c0630111::PathWriteContext163 IterationResult AppendDefLevel(int16_t def_level) {
164 last_status = def_levels.Append(def_level);
165 if (ARROW_PREDICT_TRUE(last_status.ok())) {
166 return kDone;
167 }
168 return kError;
169 }
170
AppendDefLevelsparquet::arrow::__anonaab4c0630111::PathWriteContext171 IterationResult AppendDefLevels(int64_t count, int16_t def_level) {
172 last_status = def_levels.Append(count, def_level);
173 if (ARROW_PREDICT_TRUE(last_status.ok())) {
174 return kDone;
175 }
176 return kError;
177 }
178
UnsafeAppendDefLevelparquet::arrow::__anonaab4c0630111::PathWriteContext179 void UnsafeAppendDefLevel(int16_t def_level) { def_levels.UnsafeAppend(def_level); }
180
AppendRepLevelparquet::arrow::__anonaab4c0630111::PathWriteContext181 IterationResult AppendRepLevel(int16_t rep_level) {
182 last_status = rep_levels.Append(rep_level);
183
184 if (ARROW_PREDICT_TRUE(last_status.ok())) {
185 return kDone;
186 }
187 return kError;
188 }
189
AppendRepLevelsparquet::arrow::__anonaab4c0630111::PathWriteContext190 IterationResult AppendRepLevels(int64_t count, int16_t rep_level) {
191 last_status = rep_levels.Append(count, rep_level);
192 if (ARROW_PREDICT_TRUE(last_status.ok())) {
193 return kDone;
194 }
195 return kError;
196 }
197
EqualRepDefLevelsLengthsparquet::arrow::__anonaab4c0630111::PathWriteContext198 bool EqualRepDefLevelsLengths() const {
199 return rep_levels.length() == def_levels.length();
200 }
201
202 // Incorporates |range| into visited elements. If the |range| is contiguous
203 // with the last range, extend the last range, otherwise add |range| separately
204 // tot he list.
RecordPostListVisitparquet::arrow::__anonaab4c0630111::PathWriteContext205 void RecordPostListVisit(const ElementRange& range) {
206 if (!visited_elements.empty() && range.start == visited_elements.back().end) {
207 visited_elements.back().end = range.end;
208 return;
209 }
210 visited_elements.push_back(range);
211 }
212
213 Status last_status;
214 TypedBufferBuilder<int16_t> rep_levels;
215 TypedBufferBuilder<int16_t> def_levels;
216 std::vector<ElementRange> visited_elements;
217 };
218
FillRepLevels(int64_t count,int16_t rep_level,PathWriteContext * context)219 IterationResult FillRepLevels(int64_t count, int16_t rep_level,
220 PathWriteContext* context) {
221 if (rep_level == kLevelNotSet) {
222 return kDone;
223 }
224 int64_t fill_count = count;
225 // This condition occurs (rep and dep levels equals), in one of
226 // in a few cases:
227 // 1. Before any list is encountered.
228 // 2. After rep-level has been filled in due to null/empty
229 // values above it.
230 // 3. After finishing a list.
231 if (!context->EqualRepDefLevelsLengths()) {
232 fill_count--;
233 }
234 return context->AppendRepLevels(fill_count, rep_level);
235 }
236
237 // A node for handling an array that is discovered to have all
238 // null elements. It is referred to as a TerminalNode because
239 // traversal of nodes will not continue it when generating
240 // rep/def levels. However, there could be many nested children
241 // elements beyond it in the Array that is being processed.
242 class AllNullsTerminalNode {
243 public:
AllNullsTerminalNode(int16_t def_level,int16_t rep_level=kLevelNotSet)244 explicit AllNullsTerminalNode(int16_t def_level, int16_t rep_level = kLevelNotSet)
245 : def_level_(def_level), rep_level_(rep_level) {}
SetRepLevelIfNull(int16_t rep_level)246 void SetRepLevelIfNull(int16_t rep_level) { rep_level_ = rep_level; }
Run(const ElementRange & range,PathWriteContext * context)247 IterationResult Run(const ElementRange& range, PathWriteContext* context) {
248 int64_t size = range.Size();
249 RETURN_IF_ERROR(FillRepLevels(size, rep_level_, context));
250 return context->AppendDefLevels(size, def_level_);
251 }
252
253 private:
254 int16_t def_level_;
255 int16_t rep_level_;
256 };
257
258 // Handles the case where all remaining arrays until the leaf have no nulls
259 // (and are not interrupted by lists). Unlike AllNullsTerminalNode this is
260 // always the last node in a path. We don't need an analogue to the AllNullsTerminalNode
261 // because if all values are present at an intermediate array no node is added for it
262 // (the def-level for the next nullable node is incremented).
263 struct AllPresentTerminalNode {
Runparquet::arrow::__anonaab4c0630111::AllPresentTerminalNode264 IterationResult Run(const ElementRange& range, PathWriteContext* context) {
265 return context->AppendDefLevels(range.end - range.start, def_level);
266 // No need to worry about rep levels, because this state should
267 // only be applicable for after all list/repeated values
268 // have been evaluated in the path.
269 }
270 int16_t def_level;
271 };
272
273 /// Node for handling the case when the leaf-array is nullable
274 /// and contains null elements.
275 struct NullableTerminalNode {
276 NullableTerminalNode();
NullableTerminalNodeparquet::arrow::__anonaab4c0630111::NullableTerminalNode277 NullableTerminalNode(const uint8_t* bitmap, int64_t element_offset,
278 int16_t def_level_if_present)
279 : bitmap_(bitmap),
280 element_offset_(element_offset),
281 def_level_if_present_(def_level_if_present),
282 def_level_if_null_(def_level_if_present - 1) {}
283
Runparquet::arrow::__anonaab4c0630111::NullableTerminalNode284 IterationResult Run(const ElementRange& range, PathWriteContext* context) {
285 int64_t elements = range.Size();
286 RETURN_IF_ERROR(context->ReserveDefLevels(elements));
287
288 DCHECK_GT(elements, 0);
289
290 auto bit_visitor = [&](bool is_set) {
291 context->UnsafeAppendDefLevel(is_set ? def_level_if_present_ : def_level_if_null_);
292 };
293
294 if (elements > 16) { // 16 guarantees at least one unrolled loop.
295 ::arrow::internal::VisitBitsUnrolled(bitmap_, range.start + element_offset_,
296 elements, bit_visitor);
297 } else {
298 ::arrow::internal::VisitBits(bitmap_, range.start + element_offset_, elements,
299 bit_visitor);
300 }
301 return kDone;
302 }
303 const uint8_t* bitmap_;
304 int64_t element_offset_;
305 int16_t def_level_if_present_;
306 int16_t def_level_if_null_;
307 };
308
309 // List nodes handle populating rep_level for Arrow Lists and def-level for empty lists.
310 // Nullability (both list and children) is handled by other Nodes. By
311 // construction all list nodes will be intermediate nodes (they will always be followed by
312 // at least one other node).
313 //
314 // Type parameters:
315 // |RangeSelector| - A strategy for determine the the range of the child node to
316 // process.
317 // this varies depending on the type of list (int32_t* offsets, int64_t* offsets of
318 // fixed.
319 template <typename RangeSelector>
320 class ListPathNode {
321 public:
ListPathNode(RangeSelector selector,int16_t rep_lev,int16_t def_level_if_empty)322 ListPathNode(RangeSelector selector, int16_t rep_lev, int16_t def_level_if_empty)
323 : selector_(std::move(selector)),
324 prev_rep_level_(rep_lev - 1),
325 rep_level_(rep_lev),
326 def_level_if_empty_(def_level_if_empty) {}
327
rep_level() const328 int16_t rep_level() const { return rep_level_; }
329
Run(ElementRange * range,ElementRange * child_range,PathWriteContext * context)330 IterationResult Run(ElementRange* range, ElementRange* child_range,
331 PathWriteContext* context) {
332 if (range->Empty()) {
333 return kDone;
334 }
335
336 // Find the first non-empty list (skipping a run of empties).
337 int64_t start = range->start;
338 // Retrieves the range of elements that this list contains.
339 // Uses the strategy pattern to distinguish between the different
340 // lists that are supported in Arrow (fixed size, normal and "large").
341 *child_range = selector_.GetRange(range->start);
342 while (child_range->Empty() && !range->Empty()) {
343 ++range->start;
344 *child_range = selector_.GetRange(range->start);
345 }
346 // Loops post-condition:
347 // * range is either empty (we are done processing at this node)
348 // or start corresponds a non-empty list.
349 // * If range is non-empty child_range contains
350 // the bounds of non-empty list.
351
352 // Handle any skipped over empty lists.
353 int64_t empty_elements = range->start - start;
354 if (empty_elements > 0) {
355 RETURN_IF_ERROR(FillRepLevels(empty_elements, prev_rep_level_, context));
356 RETURN_IF_ERROR(context->AppendDefLevels(empty_elements, def_level_if_empty_));
357 }
358 // Start of a new list. Note that for nested lists adding the element
359 // here effectively suppresses this code until we either encounter null
360 // elements or empty lists between here and the innermost list (since
361 // we make the rep levels repetition and definition levels unequal).
362 // Similarly when we are backtracking up the stack the repetition and
363 // definition levels are again equal so if we encounter an intermediate list
364 // with more elements this will detect it as a new list.
365 if (context->EqualRepDefLevelsLengths() && !range->Empty()) {
366 RETURN_IF_ERROR(context->AppendRepLevel(prev_rep_level_));
367 }
368
369 if (range->Empty()) {
370 return kDone;
371 }
372
373 ++range->start;
374 if (is_last_) {
375 // If this is the last repeated node, we can extend try
376 // to extend the child range as wide as possible before
377 // continuing to the next node.
378 return FillForLast(range, child_range, context);
379 }
380 return kNext;
381 }
382
SetLast()383 void SetLast() { is_last_ = true; }
384
385 private:
FillForLast(ElementRange * range,ElementRange * child_range,PathWriteContext * context)386 IterationResult FillForLast(ElementRange* range, ElementRange* child_range,
387 PathWriteContext* context) {
388 // First fill int the remainder of the list.
389 RETURN_IF_ERROR(FillRepLevels(child_range->Size(), rep_level_, context));
390 // Once we've reached this point the following preconditions should hold:
391 // 1. There are no more repeated path nodes to deal with.
392 // 2. All elements in |range| represent contiguous elements in the
393 // child array (Null values would have shortened the range to ensure
394 // all remaining list elements are present (though they may be empty lists)).
395 // 3. No element of range spans a parent list (intermediate
396 // list nodes only handle one list entry at a time).
397 //
398 // Given these preconditions it should be safe to fill runs on non-empty
399 // lists here and expand the range in the child node accordingly.
400
401 while (!range->Empty()) {
402 ElementRange size_check = selector_.GetRange(range->start);
403 if (size_check.Empty()) {
404 // The empty range will need to be handled after we pass down the accumulated
405 // range because it affects def_level placement and we need to get the children
406 // def_levels entered first.
407 break;
408 }
409 // This is the start of a new list. We can be sure it only applies
410 // to the previous list (and doesn't jump to the start of any list
411 // further up in nesting due to the constraints mentioned at the start
412 // of the function).
413 RETURN_IF_ERROR(context->AppendRepLevel(prev_rep_level_));
414 RETURN_IF_ERROR(context->AppendRepLevels(size_check.Size() - 1, rep_level_));
415 DCHECK_EQ(size_check.start, child_range->end);
416 child_range->end = size_check.end;
417 ++range->start;
418 }
419
420 // Do book-keeping to track the elements of the arrays that are actually visited
421 // beyond this point. This is necessary to identify "gaps" in values that should
422 // not be processed (written out to parquet).
423 context->RecordPostListVisit(*child_range);
424 return kNext;
425 }
426
427 RangeSelector selector_;
428 int16_t prev_rep_level_;
429 int16_t rep_level_;
430 int16_t def_level_if_empty_;
431 bool is_last_ = false;
432 };
433
434 template <typename OffsetType>
435 struct VarRangeSelector {
GetRangeparquet::arrow::__anonaab4c0630111::VarRangeSelector436 ElementRange GetRange(int64_t index) const {
437 return ElementRange{offsets[index], offsets[index + 1]};
438 }
439
440 // Either int32_t* or int64_t*.
441 const OffsetType* offsets;
442 };
443
444 struct FixedSizedRangeSelector {
GetRangeparquet::arrow::__anonaab4c0630111::FixedSizedRangeSelector445 ElementRange GetRange(int64_t index) const {
446 int64_t start = index * list_size;
447 return ElementRange{start, start + list_size};
448 }
449 int list_size;
450 };
451
452 // An intermediate node that handles null values.
453 class NullableNode {
454 public:
NullableNode(const uint8_t * null_bitmap,int64_t entry_offset,int16_t def_level_if_null,int16_t rep_level_if_null=kLevelNotSet)455 NullableNode(const uint8_t* null_bitmap, int64_t entry_offset,
456 int16_t def_level_if_null, int16_t rep_level_if_null = kLevelNotSet)
457 : null_bitmap_(null_bitmap),
458 entry_offset_(entry_offset),
459 valid_bits_reader_(MakeReader(ElementRange{0, 0})),
460 def_level_if_null_(def_level_if_null),
461 rep_level_if_null_(rep_level_if_null),
462 new_range_(true) {}
463
SetRepLevelIfNull(int16_t rep_level)464 void SetRepLevelIfNull(int16_t rep_level) { rep_level_if_null_ = rep_level; }
465
MakeReader(const ElementRange & range)466 ::arrow::internal::BitmapReader MakeReader(const ElementRange& range) {
467 return ::arrow::internal::BitmapReader(null_bitmap_, entry_offset_ + range.start,
468 range.Size());
469 }
470
Run(ElementRange * range,ElementRange * child_range,PathWriteContext * context)471 IterationResult Run(ElementRange* range, ElementRange* child_range,
472 PathWriteContext* context) {
473 if (new_range_) {
474 // Reset the reader each time we are starting fresh on a range.
475 // We can't rely on continuity because nulls above can
476 // cause discontinuities.
477 valid_bits_reader_ = MakeReader(*range);
478 }
479 child_range->start = range->start;
480 while (!range->Empty() && !valid_bits_reader_.IsSet()) {
481 ++range->start;
482 valid_bits_reader_.Next();
483 }
484 int64_t null_count = range->start - child_range->start;
485 if (null_count > 0) {
486 RETURN_IF_ERROR(FillRepLevels(null_count, rep_level_if_null_, context));
487 RETURN_IF_ERROR(context->AppendDefLevels(null_count, def_level_if_null_));
488 }
489 if (range->Empty()) {
490 new_range_ = true;
491 return kDone;
492 }
493 child_range->end = child_range->start = range->start;
494
495 while (child_range->end != range->end && valid_bits_reader_.IsSet()) {
496 ++child_range->end;
497 valid_bits_reader_.Next();
498 }
499 DCHECK(!child_range->Empty());
500 range->start += child_range->Size();
501 new_range_ = false;
502 return kNext;
503 }
504
505 const uint8_t* null_bitmap_;
506 int64_t entry_offset_;
507 ::arrow::internal::BitmapReader valid_bits_reader_;
508 int16_t def_level_if_null_;
509 int16_t rep_level_if_null_;
510
511 // Whether the next invocation will be a new range.
512 bool new_range_ = true;
513 };
514
515 using ListNode = ListPathNode<VarRangeSelector<int32_t>>;
516 using LargeListNode = ListPathNode<VarRangeSelector<int64_t>>;
517 using FixedSizeListNode = ListPathNode<FixedSizedRangeSelector>;
518
519 // Contains static information derived from traversing the schema.
520 struct PathInfo {
521 // The vectors are expected to the same length info.
522
523 // Note index order matters here.
524 using Node = ::arrow::util::variant<NullableTerminalNode, ListNode, LargeListNode,
525 FixedSizeListNode, NullableNode,
526 AllPresentTerminalNode, AllNullsTerminalNode>;
527 std::vector<Node> path;
528 std::shared_ptr<Array> primitive_array;
529 int16_t max_def_level = 0;
530 int16_t max_rep_level = 0;
531 bool has_dictionary;
532 };
533
534 /// Contains logic for writing a single leaf node to parquet.
535 /// This tracks the path from root to leaf.
536 ///
537 /// |writer| will be called after all of the definition/repetition
538 /// values have been calculated for root_range with the calculated
539 /// values. It is intended to abstract the complexity of writing
540 /// the levels and values to parquet.
WritePath(ElementRange root_range,PathInfo * path_info,ArrowWriteContext * arrow_context,MultipathLevelBuilder::CallbackFunction writer)541 Status WritePath(ElementRange root_range, PathInfo* path_info,
542 ArrowWriteContext* arrow_context,
543 MultipathLevelBuilder::CallbackFunction writer) {
544 std::vector<ElementRange> stack(path_info->path.size());
545 MultipathLevelBuilderResult builder_result;
546 builder_result.leaf_array = path_info->primitive_array;
547
548 if (path_info->max_def_level == 0) {
549 // This case only occurs when there are no nullable or repeated
550 // columns in the path from the root to leaf.
551 int64_t leaf_length = builder_result.leaf_array->length();
552 builder_result.def_rep_level_count = leaf_length;
553 builder_result.post_list_visited_elements.push_back({0, leaf_length});
554 return writer(builder_result);
555 }
556 stack[0] = root_range;
557 RETURN_NOT_OK(
558 arrow_context->def_levels_buffer->Resize(/*new_size=*/0, /*shrink_to_fit*/ false));
559 PathWriteContext context(arrow_context->memory_pool, arrow_context->def_levels_buffer);
560 // We should need at least this many entries so reserve the space ahead of time.
561 RETURN_NOT_OK(context.def_levels.Reserve(root_range.Size()));
562 if (path_info->max_rep_level > 0) {
563 RETURN_NOT_OK(context.rep_levels.Reserve(root_range.Size()));
564 }
565
566 auto stack_base = &stack[0];
567 auto stack_position = stack_base;
568 // This is the main loop for calculated rep/def levels. The nodes
569 // in the path implement a chain-of-responsibility like pattern
570 // where each node can add some number of repetition/definition
571 // levels to PathWriteContext and also delegate to the next node
572 // in the path to add values. The values are added through each Run(...)
573 // call and the choice to delegate to the next node (or return to the
574 // previous node) is communicated by the return value of Run(...).
575 // The loop terminates after the first node indicates all values in
576 // |root_range| are processed.
577 while (stack_position >= stack_base) {
578 PathInfo::Node& node = path_info->path[stack_position - stack_base];
579 struct {
580 IterationResult operator()(
581 NullableNode& node) { // NOLINT google-runtime-references
582 return node.Run(stack_position, stack_position + 1, context);
583 }
584 IterationResult operator()(ListNode& node) { // NOLINT google-runtime-references
585 return node.Run(stack_position, stack_position + 1, context);
586 }
587 IterationResult operator()(
588 NullableTerminalNode& node) { // NOLINT google-runtime-references
589 return node.Run(*stack_position, context);
590 }
591 IterationResult operator()(
592 FixedSizeListNode& node) { // NOLINT google-runtime-references
593 return node.Run(stack_position, stack_position + 1, context);
594 }
595 IterationResult operator()(
596 AllPresentTerminalNode& node) { // NOLINT google-runtime-references
597 return node.Run(*stack_position, context);
598 }
599 IterationResult operator()(
600 AllNullsTerminalNode& node) { // NOLINT google-runtime-references
601 return node.Run(*stack_position, context);
602 }
603 IterationResult operator()(
604 LargeListNode& node) { // NOLINT google-runtime-references
605 return node.Run(stack_position, stack_position + 1, context);
606 }
607 ElementRange* stack_position;
608 PathWriteContext* context;
609 } visitor = {stack_position, &context};
610
611 IterationResult result = ::arrow::util::visit(visitor, node);
612
613 if (ARROW_PREDICT_FALSE(result == kError)) {
614 DCHECK(!context.last_status.ok());
615 return context.last_status;
616 }
617 stack_position += static_cast<int>(result);
618 }
619 RETURN_NOT_OK(context.last_status);
620 builder_result.def_rep_level_count = context.def_levels.length();
621
622 if (context.rep_levels.length() > 0) {
623 // This case only occurs when there was a repeated element that needs to be
624 // processed.
625 builder_result.rep_levels = context.rep_levels.data();
626 std::swap(builder_result.post_list_visited_elements, context.visited_elements);
627 // If it is possible when processing lists that all lists where empty. In this
628 // case no elements would have been added to post_list_visited_elements. By
629 // added an empty element we avoid special casing in downstream consumers.
630 if (builder_result.post_list_visited_elements.empty()) {
631 builder_result.post_list_visited_elements.push_back({0, 0});
632 }
633 } else {
634 builder_result.post_list_visited_elements.push_back(
635 {0, builder_result.leaf_array->length()});
636 builder_result.rep_levels = nullptr;
637 }
638
639 builder_result.def_levels = context.def_levels.data();
640 return writer(builder_result);
641 }
642
643 struct FixupVisitor {
644 int max_rep_level = -1;
645 int16_t rep_level_if_null = kLevelNotSet;
646
647 template <typename T>
HandleListNodeparquet::arrow::__anonaab4c0630111::FixupVisitor648 void HandleListNode(T* arg) {
649 if (arg->rep_level() == max_rep_level) {
650 arg->SetLast();
651 // after the last list node we don't need to fill
652 // rep levels on null.
653 rep_level_if_null = kLevelNotSet;
654 } else {
655 rep_level_if_null = arg->rep_level();
656 }
657 }
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor658 void operator()(ListNode& node) { // NOLINT google-runtime-references
659 HandleListNode(&node);
660 }
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor661 void operator()(LargeListNode& node) { // NOLINT google-runtime-references
662 HandleListNode(&node);
663 }
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor664 void operator()(FixedSizeListNode& node) { // NOLINT google-runtime-references
665 HandleListNode(&node);
666 }
667
668 // For non-list intermediate nodes.
669 template <typename T>
HandleIntermediateNodeparquet::arrow::__anonaab4c0630111::FixupVisitor670 void HandleIntermediateNode(T* arg) {
671 if (rep_level_if_null != kLevelNotSet) {
672 arg->SetRepLevelIfNull(rep_level_if_null);
673 }
674 }
675
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor676 void operator()(NullableNode& arg) { // NOLINT google-runtime-references
677 HandleIntermediateNode(&arg);
678 }
679
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor680 void operator()(AllNullsTerminalNode& arg) { // NOLINT google-runtime-references
681 // Even though no processing happens past this point we
682 // still need to adjust it if a list occurred after an
683 // all null array.
684 HandleIntermediateNode(&arg);
685 }
686
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor687 void operator()(NullableTerminalNode& arg) {} // NOLINT google-runtime-references
operator ()parquet::arrow::__anonaab4c0630111::FixupVisitor688 void operator()(AllPresentTerminalNode& arg) {} // NOLINT google-runtime-references
689 };
690
Fixup(PathInfo info)691 PathInfo Fixup(PathInfo info) {
692 // We only need to fixup the path if there were repeated
693 // elements on it.
694 if (info.max_rep_level == 0) {
695 return info;
696 }
697 FixupVisitor visitor;
698 visitor.max_rep_level = info.max_rep_level;
699 if (visitor.max_rep_level > 0) {
700 visitor.rep_level_if_null = 0;
701 }
702 for (size_t x = 0; x < info.path.size(); x++) {
703 ::arrow::util::visit(visitor, info.path[x]);
704 }
705 return info;
706 }
707
708 class PathBuilder {
709 public:
PathBuilder(bool start_nullable)710 explicit PathBuilder(bool start_nullable) : nullable_in_parent_(start_nullable) {}
711 template <typename T>
AddTerminalInfo(const T & array)712 void AddTerminalInfo(const T& array) {
713 if (nullable_in_parent_) {
714 info_.max_def_level++;
715 }
716 // We don't use null_count() because if the null_count isn't known
717 // and the array does in fact contain nulls, we will end up
718 // traversing the null bitmap twice (once here and once when calculating
719 // rep/def levels).
720 if (LazyNoNulls(array)) {
721 info_.path.push_back(AllPresentTerminalNode{info_.max_def_level});
722 } else if (LazyNullCount(array) == array.length()) {
723 info_.path.push_back(AllNullsTerminalNode(info_.max_def_level - 1));
724 } else {
725 info_.path.push_back(NullableTerminalNode(array.null_bitmap_data(), array.offset(),
726 info_.max_def_level));
727 }
728 info_.primitive_array = std::make_shared<T>(array.data());
729 paths_.push_back(Fixup(info_));
730 }
731
732 template <typename T>
Visit(const T & array)733 ::arrow::enable_if_t<std::is_base_of<::arrow::FlatArray, T>::value, Status> Visit(
734 const T& array) {
735 AddTerminalInfo(array);
736 return Status::OK();
737 }
738
739 template <typename T>
740 ::arrow::enable_if_t<std::is_same<::arrow::ListArray, T>::value ||
741 std::is_same<::arrow::LargeListArray, T>::value,
742 Status>
Visit(const T & array)743 Visit(const T& array) {
744 MaybeAddNullable(array);
745 // Increment necessary due to empty lists.
746 info_.max_def_level++;
747 info_.max_rep_level++;
748 // raw_value_offsets() accounts for any slice offset.
749 ListPathNode<VarRangeSelector<typename T::offset_type>> node(
750 VarRangeSelector<typename T::offset_type>{array.raw_value_offsets()},
751 info_.max_rep_level, info_.max_def_level - 1);
752 info_.path.push_back(node);
753 nullable_in_parent_ = array.list_type()->value_field()->nullable();
754 return VisitInline(*array.values());
755 }
756
Visit(const::arrow::DictionaryArray & array)757 Status Visit(const ::arrow::DictionaryArray& array) {
758 // Only currently handle DictionaryArray where the dictionary is a
759 // primitive type
760 if (array.dict_type()->value_type()->num_fields() > 0) {
761 return Status::NotImplemented(
762 "Writing DictionaryArray with nested dictionary "
763 "type not yet supported");
764 }
765 if (array.dictionary()->null_count() > 0) {
766 return Status::NotImplemented(
767 "Writing DictionaryArray with null encoded in dictionary "
768 "type not yet supported");
769 }
770 AddTerminalInfo(array);
771 return Status::OK();
772 }
773
MaybeAddNullable(const Array & array)774 void MaybeAddNullable(const Array& array) {
775 if (!nullable_in_parent_) {
776 return;
777 }
778 info_.max_def_level++;
779 // We don't use null_count() because if the null_count isn't known
780 // and the array does in fact contain nulls, we will end up
781 // traversing the null bitmap twice (once here and once when calculating
782 // rep/def levels). Because this isn't terminal this might not be
783 // the right decision for structs that share the same nullable
784 // parents.
785 if (LazyNoNulls(array)) {
786 // Don't add anything because there won't be any point checking
787 // null values for the array. There will always be at least
788 // one more array to handle nullability.
789 return;
790 }
791 if (LazyNullCount(array) == array.length()) {
792 info_.path.push_back(AllNullsTerminalNode(info_.max_def_level - 1));
793 return;
794 }
795 info_.path.push_back(NullableNode(array.null_bitmap_data(), array.offset(),
796 /* def_level_if_null = */ info_.max_def_level - 1));
797 }
798
799 Status VisitInline(const Array& array);
800
Visit(const::arrow::MapArray & array)801 Status Visit(const ::arrow::MapArray& array) {
802 return Visit(static_cast<const ::arrow::ListArray&>(array));
803 }
804
Visit(const::arrow::StructArray & array)805 Status Visit(const ::arrow::StructArray& array) {
806 MaybeAddNullable(array);
807 PathInfo info_backup = info_;
808 for (int x = 0; x < array.num_fields(); x++) {
809 nullable_in_parent_ = array.type()->field(x)->nullable();
810 RETURN_NOT_OK(VisitInline(*array.field(x)));
811 info_ = info_backup;
812 }
813 return Status::OK();
814 }
815
Visit(const::arrow::FixedSizeListArray & array)816 Status Visit(const ::arrow::FixedSizeListArray& array) {
817 MaybeAddNullable(array);
818 int32_t list_size = array.list_type()->list_size();
819 if (list_size == 0) {
820 info_.max_def_level++;
821 }
822 info_.max_rep_level++;
823 info_.path.push_back(FixedSizeListNode(FixedSizedRangeSelector{list_size},
824 info_.max_rep_level, info_.max_def_level));
825 nullable_in_parent_ = array.list_type()->value_field()->nullable();
826 return VisitInline(*array.values());
827 }
828
Visit(const::arrow::ExtensionArray & array)829 Status Visit(const ::arrow::ExtensionArray& array) {
830 return VisitInline(*array.storage());
831 }
832
833 #define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \
834 Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \
835 return Status::NotImplemented("Level generation for " #ArrowTypePrefix \
836 " not supported yet"); \
837 }
838
839 // Union types aren't supported in Parquet.
NOT_IMPLEMENTED_VISIT(Union)840 NOT_IMPLEMENTED_VISIT(Union)
841
842 #undef NOT_IMPLEMENTED_VISIT
843 std::vector<PathInfo>& paths() { return paths_; }
844
845 private:
846 PathInfo info_;
847 std::vector<PathInfo> paths_;
848 bool nullable_in_parent_;
849 };
850
VisitInline(const Array & array)851 Status PathBuilder::VisitInline(const Array& array) {
852 return ::arrow::VisitArrayInline(array, this);
853 }
854
855 #undef RETURN_IF_ERROR
856 } // namespace
857
858 class MultipathLevelBuilderImpl : public MultipathLevelBuilder {
859 public:
MultipathLevelBuilderImpl(std::shared_ptr<::arrow::ArrayData> data,std::unique_ptr<PathBuilder> path_builder)860 MultipathLevelBuilderImpl(std::shared_ptr<::arrow::ArrayData> data,
861 std::unique_ptr<PathBuilder> path_builder)
862 : root_range_{0, data->length},
863 data_(std::move(data)),
864 path_builder_(std::move(path_builder)) {}
865
GetLeafCount() const866 int GetLeafCount() const override {
867 return static_cast<int>(path_builder_->paths().size());
868 }
869
Write(int leaf_index,ArrowWriteContext * context,CallbackFunction write_leaf_callback)870 ::arrow::Status Write(int leaf_index, ArrowWriteContext* context,
871 CallbackFunction write_leaf_callback) override {
872 DCHECK_GE(leaf_index, 0);
873 DCHECK_LT(leaf_index, GetLeafCount());
874 return WritePath(root_range_, &path_builder_->paths()[leaf_index], context,
875 std::move(write_leaf_callback));
876 }
877
878 private:
879 ElementRange root_range_;
880 // Reference holder to ensure the data stays valid.
881 std::shared_ptr<::arrow::ArrayData> data_;
882 std::unique_ptr<PathBuilder> path_builder_;
883 };
884
885 // static
Make(const::arrow::Array & array,bool array_field_nullable)886 ::arrow::Result<std::unique_ptr<MultipathLevelBuilder>> MultipathLevelBuilder::Make(
887 const ::arrow::Array& array, bool array_field_nullable) {
888 auto constructor = ::arrow::internal::make_unique<PathBuilder>(array_field_nullable);
889 RETURN_NOT_OK(VisitArrayInline(array, constructor.get()));
890 return ::arrow::internal::make_unique<MultipathLevelBuilderImpl>(
891 array.data(), std::move(constructor));
892 }
893
894 // static
Write(const Array & array,bool array_field_nullable,ArrowWriteContext * context,MultipathLevelBuilder::CallbackFunction callback)895 Status MultipathLevelBuilder::Write(const Array& array, bool array_field_nullable,
896 ArrowWriteContext* context,
897 MultipathLevelBuilder::CallbackFunction callback) {
898 ARROW_ASSIGN_OR_RAISE(std::unique_ptr<MultipathLevelBuilder> builder,
899 MultipathLevelBuilder::Make(array, array_field_nullable));
900 PathBuilder constructor(array_field_nullable);
901 RETURN_NOT_OK(VisitArrayInline(array, &constructor));
902 for (int leaf_idx = 0; leaf_idx < builder->GetLeafCount(); leaf_idx++) {
903 RETURN_NOT_OK(builder->Write(leaf_idx, context, callback));
904 }
905 return Status::OK();
906 }
907
908 } // namespace arrow
909 } // namespace parquet
910