1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/array/util.h"
19 
20 #include <algorithm>
21 #include <array>
22 #include <cstdint>
23 #include <cstring>
24 #include <limits>
25 #include <memory>
26 #include <type_traits>
27 #include <utility>
28 #include <vector>
29 
30 #include "arrow/array/array_base.h"
31 #include "arrow/array/array_dict.h"
32 #include "arrow/array/array_primitive.h"
33 #include "arrow/array/concatenate.h"
34 #include "arrow/buffer.h"
35 #include "arrow/buffer_builder.h"
36 #include "arrow/extension_type.h"
37 #include "arrow/result.h"
38 #include "arrow/scalar.h"
39 #include "arrow/status.h"
40 #include "arrow/type.h"
41 #include "arrow/type_traits.h"
42 #include "arrow/util/bit_util.h"
43 #include "arrow/util/checked_cast.h"
44 #include "arrow/util/decimal.h"
45 #include "arrow/util/endian.h"
46 #include "arrow/util/logging.h"
47 #include "arrow/visitor_inline.h"
48 
49 namespace arrow {
50 
51 using internal::checked_cast;
52 
53 // ----------------------------------------------------------------------
54 // Loading from ArrayData
55 
56 namespace {
57 
58 class ArrayDataWrapper {
59  public:
ArrayDataWrapper(const std::shared_ptr<ArrayData> & data,std::shared_ptr<Array> * out)60   ArrayDataWrapper(const std::shared_ptr<ArrayData>& data, std::shared_ptr<Array>* out)
61       : data_(data), out_(out) {}
62 
63   template <typename T>
Visit(const T &)64   Status Visit(const T&) {
65     using ArrayType = typename TypeTraits<T>::ArrayType;
66     *out_ = std::make_shared<ArrayType>(data_);
67     return Status::OK();
68   }
69 
Visit(const ExtensionType & type)70   Status Visit(const ExtensionType& type) {
71     *out_ = type.MakeArray(data_);
72     return Status::OK();
73   }
74 
75   const std::shared_ptr<ArrayData>& data_;
76   std::shared_ptr<Array>* out_;
77 };
78 
79 class ArrayDataEndianSwapper {
80  public:
ArrayDataEndianSwapper(const std::shared_ptr<ArrayData> & data)81   explicit ArrayDataEndianSwapper(const std::shared_ptr<ArrayData>& data) : data_(data) {
82     out_ = data->Copy();
83   }
84 
85   // WARNING: this facility can be called on invalid Array data by the IPC reader.
86   // Do not rely on the advertised ArrayData length, instead use the physical
87   // buffer sizes to avoid accessing memory out of bounds.
88   //
89   // (If this guarantee turns out to be difficult to maintain, we should call
90   //  Validate() instead)
SwapType(const DataType & type)91   Status SwapType(const DataType& type) {
92     RETURN_NOT_OK(VisitTypeInline(type, this));
93     RETURN_NOT_OK(SwapChildren(type.fields()));
94     if (internal::HasValidityBitmap(type.id())) {
95       // Copy null bitmap
96       out_->buffers[0] = data_->buffers[0];
97     }
98     return Status::OK();
99   }
100 
SwapChildren(const FieldVector & child_fields)101   Status SwapChildren(const FieldVector& child_fields) {
102     for (size_t i = 0; i < child_fields.size(); i++) {
103       ARROW_ASSIGN_OR_RAISE(out_->child_data[i],
104                             internal::SwapEndianArrayData(data_->child_data[i]));
105     }
106     return Status::OK();
107   }
108 
109   template <typename T>
ByteSwapBuffer(const std::shared_ptr<Buffer> & in_buffer)110   Result<std::shared_ptr<Buffer>> ByteSwapBuffer(
111       const std::shared_ptr<Buffer>& in_buffer) {
112     if (sizeof(T) == 1) {
113       // if data size is 1, element is not swapped. We can use the original buffer
114       return in_buffer;
115     }
116     auto in_data = reinterpret_cast<const T*>(in_buffer->data());
117     ARROW_ASSIGN_OR_RAISE(auto out_buffer, AllocateBuffer(in_buffer->size()));
118     auto out_data = reinterpret_cast<T*>(out_buffer->mutable_data());
119     // NOTE: data_->length not trusted (see warning above)
120     int64_t length = in_buffer->size() / sizeof(T);
121     for (int64_t i = 0; i < length; i++) {
122       out_data[i] = BitUtil::ByteSwap(in_data[i]);
123     }
124     return std::move(out_buffer);
125   }
126 
127   template <typename VALUE_TYPE>
SwapOffsets(int index)128   Status SwapOffsets(int index) {
129     if (data_->buffers[index] == nullptr || data_->buffers[index]->size() == 0) {
130       out_->buffers[index] = data_->buffers[index];
131       return Status::OK();
132     }
133     // Except union, offset has one more element rather than data->length
134     ARROW_ASSIGN_OR_RAISE(out_->buffers[index],
135                           ByteSwapBuffer<VALUE_TYPE>(data_->buffers[index]));
136     return Status::OK();
137   }
138 
139   template <typename T>
140   enable_if_t<std::is_base_of<FixedWidthType, T>::value &&
141                   !std::is_base_of<FixedSizeBinaryType, T>::value &&
142                   !std::is_base_of<DictionaryType, T>::value,
143               Status>
Visit(const T & type)144   Visit(const T& type) {
145     using value_type = typename T::c_type;
146     ARROW_ASSIGN_OR_RAISE(out_->buffers[1],
147                           ByteSwapBuffer<value_type>(data_->buffers[1]));
148     return Status::OK();
149   }
150 
Visit(const Decimal128Type & type)151   Status Visit(const Decimal128Type& type) {
152     auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
153     ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
154     auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
155     // NOTE: data_->length not trusted (see warning above)
156     const int64_t length = data_->buffers[1]->size() / Decimal128Type::kByteWidth;
157     for (int64_t i = 0; i < length; i++) {
158       uint64_t tmp;
159       auto idx = i * 2;
160 #if ARROW_LITTLE_ENDIAN
161       tmp = BitUtil::FromBigEndian(data[idx]);
162       new_data[idx] = BitUtil::FromBigEndian(data[idx + 1]);
163       new_data[idx + 1] = tmp;
164 #else
165       tmp = BitUtil::FromLittleEndian(data[idx]);
166       new_data[idx] = BitUtil::FromLittleEndian(data[idx + 1]);
167       new_data[idx + 1] = tmp;
168 #endif
169     }
170     out_->buffers[1] = std::move(new_buffer);
171     return Status::OK();
172   }
173 
Visit(const Decimal256Type & type)174   Status Visit(const Decimal256Type& type) {
175     auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
176     ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
177     auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
178     // NOTE: data_->length not trusted (see warning above)
179     const int64_t length = data_->buffers[1]->size() / Decimal256Type::kByteWidth;
180     for (int64_t i = 0; i < length; i++) {
181       uint64_t tmp0, tmp1, tmp2;
182       auto idx = i * 4;
183 #if ARROW_LITTLE_ENDIAN
184       tmp0 = BitUtil::FromBigEndian(data[idx]);
185       tmp1 = BitUtil::FromBigEndian(data[idx + 1]);
186       tmp2 = BitUtil::FromBigEndian(data[idx + 2]);
187       new_data[idx] = BitUtil::FromBigEndian(data[idx + 3]);
188       new_data[idx + 1] = tmp2;
189       new_data[idx + 2] = tmp1;
190       new_data[idx + 3] = tmp0;
191 #else
192       tmp0 = BitUtil::FromLittleEndian(data[idx]);
193       tmp1 = BitUtil::FromLittleEndian(data[idx + 1]);
194       tmp2 = BitUtil::FromLittleEndian(data[idx + 2]);
195       new_data[idx] = BitUtil::FromLittleEndian(data[idx + 3]);
196       new_data[idx + 1] = tmp2;
197       new_data[idx + 2] = tmp1;
198       new_data[idx + 3] = tmp0;
199 #endif
200     }
201     out_->buffers[1] = std::move(new_buffer);
202     return Status::OK();
203   }
204 
Visit(const DayTimeIntervalType & type)205   Status Visit(const DayTimeIntervalType& type) {
206     ARROW_ASSIGN_OR_RAISE(out_->buffers[1], ByteSwapBuffer<uint32_t>(data_->buffers[1]));
207     return Status::OK();
208   }
209 
Visit(const MonthDayNanoIntervalType & type)210   Status Visit(const MonthDayNanoIntervalType& type) {
211     using MonthDayNanos = MonthDayNanoIntervalType::MonthDayNanos;
212     auto data = reinterpret_cast<const MonthDayNanos*>(data_->buffers[1]->data());
213     ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
214     auto new_data = reinterpret_cast<MonthDayNanos*>(new_buffer->mutable_data());
215     // NOTE: data_->length not trusted (see warning above)
216     const int64_t length = data_->buffers[1]->size() / sizeof(MonthDayNanos);
217     for (int64_t i = 0; i < length; i++) {
218       MonthDayNanos tmp = data[i];
219 #if ARROW_LITTLE_ENDIAN
220       tmp.months = BitUtil::FromBigEndian(tmp.months);
221       tmp.days = BitUtil::FromBigEndian(tmp.days);
222       tmp.nanoseconds = BitUtil::FromBigEndian(tmp.nanoseconds);
223 #else
224       tmp.months = BitUtil::FromLittleEndian(tmp.months);
225       tmp.days = BitUtil::FromLittleEndian(tmp.days);
226       tmp.nanoseconds = BitUtil::FromLittleEndian(tmp.nanoseconds);
227 #endif
228       new_data[i] = tmp;
229     }
230     out_->buffers[1] = std::move(new_buffer);
231     return Status::OK();
232   }
233 
Visit(const NullType & type)234   Status Visit(const NullType& type) { return Status::OK(); }
Visit(const BooleanType & type)235   Status Visit(const BooleanType& type) { return Status::OK(); }
Visit(const Int8Type & type)236   Status Visit(const Int8Type& type) { return Status::OK(); }
Visit(const UInt8Type & type)237   Status Visit(const UInt8Type& type) { return Status::OK(); }
Visit(const FixedSizeBinaryType & type)238   Status Visit(const FixedSizeBinaryType& type) { return Status::OK(); }
Visit(const FixedSizeListType & type)239   Status Visit(const FixedSizeListType& type) { return Status::OK(); }
Visit(const StructType & type)240   Status Visit(const StructType& type) { return Status::OK(); }
Visit(const UnionType & type)241   Status Visit(const UnionType& type) {
242     out_->buffers[1] = data_->buffers[1];
243     if (type.mode() == UnionMode::DENSE) {
244       RETURN_NOT_OK(SwapOffsets<int32_t>(2));
245     }
246     return Status::OK();
247   }
248 
249   template <typename T>
250   enable_if_t<std::is_same<BinaryType, T>::value || std::is_same<StringType, T>::value,
251               Status>
Visit(const T & type)252   Visit(const T& type) {
253     RETURN_NOT_OK(SwapOffsets<int32_t>(1));
254     out_->buffers[2] = data_->buffers[2];
255     return Status::OK();
256   }
257 
258   template <typename T>
259   enable_if_t<std::is_same<LargeBinaryType, T>::value ||
260                   std::is_same<LargeStringType, T>::value,
261               Status>
Visit(const T & type)262   Visit(const T& type) {
263     RETURN_NOT_OK(SwapOffsets<int64_t>(1));
264     out_->buffers[2] = data_->buffers[2];
265     return Status::OK();
266   }
267 
Visit(const ListType & type)268   Status Visit(const ListType& type) {
269     RETURN_NOT_OK(SwapOffsets<int32_t>(1));
270     return Status::OK();
271   }
Visit(const LargeListType & type)272   Status Visit(const LargeListType& type) {
273     RETURN_NOT_OK(SwapOffsets<int64_t>(1));
274     return Status::OK();
275   }
276 
Visit(const DictionaryType & type)277   Status Visit(const DictionaryType& type) {
278     // dictionary was already swapped in ReadDictionary() in ipc/reader.cc
279     RETURN_NOT_OK(SwapType(*type.index_type()));
280     return Status::OK();
281   }
282 
Visit(const ExtensionType & type)283   Status Visit(const ExtensionType& type) {
284     RETURN_NOT_OK(SwapType(*type.storage_type()));
285     return Status::OK();
286   }
287 
288   const std::shared_ptr<ArrayData>& data_;
289   std::shared_ptr<ArrayData> out_;
290 };
291 
292 }  // namespace
293 
294 namespace internal {
295 
SwapEndianArrayData(const std::shared_ptr<ArrayData> & data)296 Result<std::shared_ptr<ArrayData>> SwapEndianArrayData(
297     const std::shared_ptr<ArrayData>& data) {
298   if (data->offset != 0) {
299     return Status::Invalid("Unsupported data format: data.offset != 0");
300   }
301   ArrayDataEndianSwapper swapper(data);
302   RETURN_NOT_OK(swapper.SwapType(*data->type));
303   return std::move(swapper.out_);
304 }
305 
306 }  // namespace internal
307 
MakeArray(const std::shared_ptr<ArrayData> & data)308 std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data) {
309   std::shared_ptr<Array> out;
310   ArrayDataWrapper wrapper_visitor(data, &out);
311   DCHECK_OK(VisitTypeInline(*data->type, &wrapper_visitor));
312   DCHECK(out);
313   return out;
314 }
315 
316 // ----------------------------------------------------------------------
317 // Misc APIs
318 
319 namespace {
320 
321 // get the maximum buffer length required, then allocate a single zeroed buffer
322 // to use anywhere a buffer is required
323 class NullArrayFactory {
324  public:
325   struct GetBufferLength {
GetBufferLengtharrow::__anoncda392270211::NullArrayFactory::GetBufferLength326     GetBufferLength(const std::shared_ptr<DataType>& type, int64_t length)
327         : type_(*type), length_(length), buffer_length_(BitUtil::BytesForBits(length)) {}
328 
Finisharrow::__anoncda392270211::NullArrayFactory::GetBufferLength329     Result<int64_t> Finish() && {
330       RETURN_NOT_OK(VisitTypeInline(type_, this));
331       return buffer_length_;
332     }
333 
334     template <typename T, typename = decltype(TypeTraits<T>::bytes_required(0))>
Visitarrow::__anoncda392270211::NullArrayFactory::GetBufferLength335     Status Visit(const T&) {
336       return MaxOf(TypeTraits<T>::bytes_required(length_));
337     }
338 
339     template <typename T>
Visitarrow::__anoncda392270211::NullArrayFactory::GetBufferLength340     enable_if_var_size_list<T, Status> Visit(const T&) {
341       // values array may be empty, but there must be at least one offset of 0
342       return MaxOf(sizeof(typename T::offset_type) * (length_ + 1));
343     }
344 
345     template <typename T>
Visitarrow::__anoncda392270211::NullArrayFactory::GetBufferLength346     enable_if_base_binary<T, Status> Visit(const T&) {
347       // values buffer may be empty, but there must be at least one offset of 0
348       return MaxOf(sizeof(typename T::offset_type) * (length_ + 1));
349     }
350 
Visitarrow::__anoncda392270211::NullArrayFactory::GetBufferLength351     Status Visit(const FixedSizeListType& type) {
352       return MaxOf(GetBufferLength(type.value_type(), type.list_size() * length_));
353     }
354 
Visitarrow::__anoncda392270211::NullArrayFactory::GetBufferLength355     Status Visit(const FixedSizeBinaryType& type) {
356       return MaxOf(type.byte_width() * length_);
357     }
358 
Visitarrow::__anoncda392270211::NullArrayFactory::GetBufferLength359     Status Visit(const StructType& type) {
360       for (const auto& child : type.fields()) {
361         RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_)));
362       }
363       return Status::OK();
364     }
365 
Visitarrow::__anoncda392270211::NullArrayFactory::GetBufferLength366     Status Visit(const UnionType& type) {
367       // type codes
368       RETURN_NOT_OK(MaxOf(length_));
369       if (type.mode() == UnionMode::DENSE) {
370         // offsets
371         RETURN_NOT_OK(MaxOf(sizeof(int32_t) * length_));
372       }
373       for (const auto& child : type.fields()) {
374         RETURN_NOT_OK(MaxOf(GetBufferLength(child->type(), length_)));
375       }
376       return Status::OK();
377     }
378 
Visitarrow::__anoncda392270211::NullArrayFactory::GetBufferLength379     Status Visit(const DictionaryType& type) {
380       RETURN_NOT_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
381       return MaxOf(GetBufferLength(type.index_type(), length_));
382     }
383 
Visitarrow::__anoncda392270211::NullArrayFactory::GetBufferLength384     Status Visit(const ExtensionType& type) {
385       // XXX is an extension array's length always == storage length
386       return MaxOf(GetBufferLength(type.storage_type(), length_));
387     }
388 
Visitarrow::__anoncda392270211::NullArrayFactory::GetBufferLength389     Status Visit(const DataType& type) {
390       return Status::NotImplemented("construction of all-null ", type);
391     }
392 
393    private:
MaxOfarrow::__anoncda392270211::NullArrayFactory::GetBufferLength394     Status MaxOf(GetBufferLength&& other) {
395       ARROW_ASSIGN_OR_RAISE(int64_t buffer_length, std::move(other).Finish());
396       return MaxOf(buffer_length);
397     }
398 
MaxOfarrow::__anoncda392270211::NullArrayFactory::GetBufferLength399     Status MaxOf(int64_t buffer_length) {
400       if (buffer_length > buffer_length_) {
401         buffer_length_ = buffer_length;
402       }
403       return Status::OK();
404     }
405 
406     const DataType& type_;
407     int64_t length_, buffer_length_;
408   };
409 
NullArrayFactory(MemoryPool * pool,const std::shared_ptr<DataType> & type,int64_t length)410   NullArrayFactory(MemoryPool* pool, const std::shared_ptr<DataType>& type,
411                    int64_t length)
412       : pool_(pool), type_(type), length_(length) {}
413 
CreateBuffer()414   Status CreateBuffer() {
415     ARROW_ASSIGN_OR_RAISE(int64_t buffer_length,
416                           GetBufferLength(type_, length_).Finish());
417     ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(buffer_length, pool_));
418     std::memset(buffer_->mutable_data(), 0, buffer_->size());
419     return Status::OK();
420   }
421 
Create()422   Result<std::shared_ptr<ArrayData>> Create() {
423     if (buffer_ == nullptr) {
424       RETURN_NOT_OK(CreateBuffer());
425     }
426     std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_fields());
427     out_ = ArrayData::Make(type_, length_, {buffer_}, child_data, length_, 0);
428     RETURN_NOT_OK(VisitTypeInline(*type_, this));
429     return out_;
430   }
431 
Visit(const NullType &)432   Status Visit(const NullType&) {
433     out_->buffers.resize(1, nullptr);
434     return Status::OK();
435   }
436 
Visit(const FixedWidthType &)437   Status Visit(const FixedWidthType&) {
438     out_->buffers.resize(2, buffer_);
439     return Status::OK();
440   }
441 
442   template <typename T>
Visit(const T &)443   enable_if_base_binary<T, Status> Visit(const T&) {
444     out_->buffers.resize(3, buffer_);
445     return Status::OK();
446   }
447 
448   template <typename T>
Visit(const T & type)449   enable_if_var_size_list<T, Status> Visit(const T& type) {
450     out_->buffers.resize(2, buffer_);
451     ARROW_ASSIGN_OR_RAISE(out_->child_data[0], CreateChild(0, /*length=*/0));
452     return Status::OK();
453   }
454 
Visit(const FixedSizeListType & type)455   Status Visit(const FixedSizeListType& type) {
456     ARROW_ASSIGN_OR_RAISE(out_->child_data[0],
457                           CreateChild(0, length_ * type.list_size()));
458     return Status::OK();
459   }
460 
Visit(const StructType & type)461   Status Visit(const StructType& type) {
462     for (int i = 0; i < type_->num_fields(); ++i) {
463       ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(i, length_));
464     }
465     return Status::OK();
466   }
467 
Visit(const UnionType & type)468   Status Visit(const UnionType& type) {
469     out_->buffers.resize(2);
470 
471     // First buffer is always null
472     out_->buffers[0] = nullptr;
473 
474     out_->buffers[1] = buffer_;
475     // buffer_ is zeroed, but 0 may not be a valid type code
476     if (type.type_codes()[0] != 0) {
477       ARROW_ASSIGN_OR_RAISE(out_->buffers[1], AllocateBuffer(length_, pool_));
478       std::memset(out_->buffers[1]->mutable_data(), type.type_codes()[0], length_);
479     }
480 
481     // For sparse unions, we now create children with the same length as the
482     // parent
483     int64_t child_length = length_;
484     if (type.mode() == UnionMode::DENSE) {
485       // For dense unions, we set the offsets to all zero and create children
486       // with length 1
487       out_->buffers.resize(3);
488       out_->buffers[2] = buffer_;
489 
490       child_length = 1;
491     }
492     for (int i = 0; i < type_->num_fields(); ++i) {
493       ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(i, child_length));
494     }
495     return Status::OK();
496   }
497 
Visit(const DictionaryType & type)498   Status Visit(const DictionaryType& type) {
499     out_->buffers.resize(2, buffer_);
500     ARROW_ASSIGN_OR_RAISE(auto typed_null_dict, MakeArrayOfNull(type.value_type(), 0));
501     out_->dictionary = typed_null_dict->data();
502     return Status::OK();
503   }
504 
Visit(const ExtensionType & type)505   Status Visit(const ExtensionType& type) {
506     RETURN_NOT_OK(VisitTypeInline(*type.storage_type(), this));
507     return Status::OK();
508   }
509 
Visit(const DataType & type)510   Status Visit(const DataType& type) {
511     return Status::NotImplemented("construction of all-null ", type);
512   }
513 
CreateChild(int i,int64_t length)514   Result<std::shared_ptr<ArrayData>> CreateChild(int i, int64_t length) {
515     NullArrayFactory child_factory(pool_, type_->field(i)->type(), length);
516     child_factory.buffer_ = buffer_;
517     return child_factory.Create();
518   }
519 
520   MemoryPool* pool_;
521   std::shared_ptr<DataType> type_;
522   int64_t length_;
523   std::shared_ptr<ArrayData> out_;
524   std::shared_ptr<Buffer> buffer_;
525 };
526 
527 class RepeatedArrayFactory {
528  public:
RepeatedArrayFactory(MemoryPool * pool,const Scalar & scalar,int64_t length)529   RepeatedArrayFactory(MemoryPool* pool, const Scalar& scalar, int64_t length)
530       : pool_(pool), scalar_(scalar), length_(length) {}
531 
Create()532   Result<std::shared_ptr<Array>> Create() {
533     RETURN_NOT_OK(VisitTypeInline(*scalar_.type, this));
534     return out_;
535   }
536 
Visit(const NullType & type)537   Status Visit(const NullType& type) {
538     DCHECK(false);  // already forwarded to MakeArrayOfNull
539     return Status::OK();
540   }
541 
Visit(const BooleanType &)542   Status Visit(const BooleanType&) {
543     ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBitmap(length_, pool_));
544     BitUtil::SetBitsTo(buffer->mutable_data(), 0, length_,
545                        checked_cast<const BooleanScalar&>(scalar_).value);
546     out_ = std::make_shared<BooleanArray>(length_, buffer);
547     return Status::OK();
548   }
549 
550   template <typename T>
Visit(const T &)551   enable_if_t<is_number_type<T>::value || is_temporal_type<T>::value, Status> Visit(
552       const T&) {
553     auto value = checked_cast<const typename TypeTraits<T>::ScalarType&>(scalar_).value;
554     return FinishFixedWidth(&value, sizeof(value));
555   }
556 
Visit(const FixedSizeBinaryType & type)557   Status Visit(const FixedSizeBinaryType& type) {
558     auto value = checked_cast<const FixedSizeBinaryScalar&>(scalar_).value;
559     return FinishFixedWidth(value->data(), type.byte_width());
560   }
561 
562   template <typename T>
Visit(const T &)563   enable_if_decimal<T, Status> Visit(const T&) {
564     using ScalarType = typename TypeTraits<T>::ScalarType;
565     auto value = checked_cast<const ScalarType&>(scalar_).value.ToBytes();
566     return FinishFixedWidth(value.data(), value.size());
567   }
568 
Visit(const Decimal256Type &)569   Status Visit(const Decimal256Type&) {
570     auto value = checked_cast<const Decimal256Scalar&>(scalar_).value.ToBytes();
571     return FinishFixedWidth(value.data(), value.size());
572   }
573 
574   template <typename T>
Visit(const T &)575   enable_if_base_binary<T, Status> Visit(const T&) {
576     std::shared_ptr<Buffer> value =
577         checked_cast<const typename TypeTraits<T>::ScalarType&>(scalar_).value;
578     std::shared_ptr<Buffer> values_buffer, offsets_buffer;
579     RETURN_NOT_OK(CreateBufferOf(value->data(), value->size(), &values_buffer));
580     auto size = static_cast<typename T::offset_type>(value->size());
581     RETURN_NOT_OK(CreateOffsetsBuffer(size, &offsets_buffer));
582     out_ = std::make_shared<typename TypeTraits<T>::ArrayType>(length_, offsets_buffer,
583                                                                values_buffer);
584     return Status::OK();
585   }
586 
587   template <typename T>
Visit(const T & type)588   enable_if_var_size_list<T, Status> Visit(const T& type) {
589     using ScalarType = typename TypeTraits<T>::ScalarType;
590     using ArrayType = typename TypeTraits<T>::ArrayType;
591 
592     auto value = checked_cast<const ScalarType&>(scalar_).value;
593 
594     ArrayVector values(length_, value);
595     ARROW_ASSIGN_OR_RAISE(auto value_array, Concatenate(values, pool_));
596 
597     std::shared_ptr<Buffer> offsets_buffer;
598     auto size = static_cast<typename T::offset_type>(value->length());
599     RETURN_NOT_OK(CreateOffsetsBuffer(size, &offsets_buffer));
600 
601     out_ =
602         std::make_shared<ArrayType>(scalar_.type, length_, offsets_buffer, value_array);
603     return Status::OK();
604   }
605 
Visit(const FixedSizeListType & type)606   Status Visit(const FixedSizeListType& type) {
607     auto value = checked_cast<const FixedSizeListScalar&>(scalar_).value;
608 
609     ArrayVector values(length_, value);
610     ARROW_ASSIGN_OR_RAISE(auto value_array, Concatenate(values, pool_));
611 
612     out_ = std::make_shared<FixedSizeListArray>(scalar_.type, length_, value_array);
613     return Status::OK();
614   }
615 
Visit(const MapType & type)616   Status Visit(const MapType& type) {
617     auto map_scalar = checked_cast<const MapScalar&>(scalar_);
618     auto struct_array = checked_cast<const StructArray*>(map_scalar.value.get());
619 
620     ArrayVector keys(length_, struct_array->field(0));
621     ArrayVector values(length_, struct_array->field(1));
622 
623     ARROW_ASSIGN_OR_RAISE(auto key_array, Concatenate(keys, pool_));
624     ARROW_ASSIGN_OR_RAISE(auto value_array, Concatenate(values, pool_));
625 
626     std::shared_ptr<Buffer> offsets_buffer;
627     auto size = static_cast<typename MapType::offset_type>(struct_array->length());
628     RETURN_NOT_OK(CreateOffsetsBuffer(size, &offsets_buffer));
629 
630     out_ = std::make_shared<MapArray>(scalar_.type, length_, std::move(offsets_buffer),
631                                       std::move(key_array), std::move(value_array));
632     return Status::OK();
633   }
634 
Visit(const DictionaryType & type)635   Status Visit(const DictionaryType& type) {
636     const auto& value = checked_cast<const DictionaryScalar&>(scalar_).value;
637     ARROW_ASSIGN_OR_RAISE(auto indices,
638                           MakeArrayFromScalar(*value.index, length_, pool_));
639     out_ = std::make_shared<DictionaryArray>(scalar_.type, std::move(indices),
640                                              value.dictionary);
641     return Status::OK();
642   }
643 
Visit(const StructType & type)644   Status Visit(const StructType& type) {
645     ArrayVector fields;
646     for (const auto& value : checked_cast<const StructScalar&>(scalar_).value) {
647       fields.emplace_back();
648       ARROW_ASSIGN_OR_RAISE(fields.back(), MakeArrayFromScalar(*value, length_, pool_));
649     }
650     out_ = std::make_shared<StructArray>(scalar_.type, length_, std::move(fields));
651     return Status::OK();
652   }
653 
Visit(const SparseUnionType & type)654   Status Visit(const SparseUnionType& type) {
655     const auto& union_scalar = checked_cast<const UnionScalar&>(scalar_);
656     const auto& union_type = checked_cast<const UnionType&>(*scalar_.type);
657     const auto scalar_type_code = union_scalar.type_code;
658     const auto scalar_child_id = union_type.child_ids()[scalar_type_code];
659 
660     // Create child arrays: most of them are all-null, except for the child array
661     // for the given type code (if the scalar is valid).
662     ArrayVector fields;
663     for (int i = 0; i < type.num_fields(); ++i) {
664       fields.emplace_back();
665       if (i == scalar_child_id && scalar_.is_valid) {
666         ARROW_ASSIGN_OR_RAISE(fields.back(),
667                               MakeArrayFromScalar(*union_scalar.value, length_, pool_));
668       } else {
669         ARROW_ASSIGN_OR_RAISE(
670             fields.back(), MakeArrayOfNull(union_type.field(i)->type(), length_, pool_));
671       }
672     }
673 
674     ARROW_ASSIGN_OR_RAISE(auto type_codes_buffer, CreateUnionTypeCodes(scalar_type_code));
675 
676     out_ = std::make_shared<SparseUnionArray>(scalar_.type, length_, std::move(fields),
677                                               std::move(type_codes_buffer));
678     return Status::OK();
679   }
680 
Visit(const DenseUnionType & type)681   Status Visit(const DenseUnionType& type) {
682     const auto& union_scalar = checked_cast<const UnionScalar&>(scalar_);
683     const auto& union_type = checked_cast<const UnionType&>(*scalar_.type);
684     const auto scalar_type_code = union_scalar.type_code;
685     const auto scalar_child_id = union_type.child_ids()[scalar_type_code];
686 
687     // Create child arrays: all of them are empty, except for the child array
688     // for the given type code (if length > 0).
689     ArrayVector fields;
690     for (int i = 0; i < type.num_fields(); ++i) {
691       fields.emplace_back();
692       if (i == scalar_child_id && length_ > 0) {
693         if (scalar_.is_valid) {
694           // One valid element (will be referenced by multiple offsets)
695           ARROW_ASSIGN_OR_RAISE(fields.back(),
696                                 MakeArrayFromScalar(*union_scalar.value, 1, pool_));
697         } else {
698           // One null element (will be referenced by multiple offsets)
699           ARROW_ASSIGN_OR_RAISE(fields.back(),
700                                 MakeArrayOfNull(union_type.field(i)->type(), 1, pool_));
701         }
702       } else {
703         // Zero element (will not be referenced by any offset)
704         ARROW_ASSIGN_OR_RAISE(fields.back(),
705                               MakeArrayOfNull(union_type.field(i)->type(), 0, pool_));
706       }
707     }
708 
709     // Create an offsets buffer with all offsets equal to 0
710     ARROW_ASSIGN_OR_RAISE(auto offsets_buffer,
711                           AllocateBuffer(length_ * sizeof(int32_t), pool_));
712     memset(offsets_buffer->mutable_data(), 0, offsets_buffer->size());
713 
714     ARROW_ASSIGN_OR_RAISE(auto type_codes_buffer, CreateUnionTypeCodes(scalar_type_code));
715 
716     out_ = std::make_shared<DenseUnionArray>(scalar_.type, length_, std::move(fields),
717                                              std::move(type_codes_buffer),
718                                              std::move(offsets_buffer));
719     return Status::OK();
720   }
721 
Visit(const ExtensionType & type)722   Status Visit(const ExtensionType& type) {
723     return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
724   }
725 
CreateUnionTypeCodes(int8_t type_code)726   Result<std::shared_ptr<Buffer>> CreateUnionTypeCodes(int8_t type_code) {
727     TypedBufferBuilder<int8_t> builder(pool_);
728     RETURN_NOT_OK(builder.Resize(length_));
729     builder.UnsafeAppend(length_, type_code);
730     return builder.Finish();
731   }
732 
733   template <typename OffsetType>
CreateOffsetsBuffer(OffsetType value_length,std::shared_ptr<Buffer> * out)734   Status CreateOffsetsBuffer(OffsetType value_length, std::shared_ptr<Buffer>* out) {
735     TypedBufferBuilder<OffsetType> builder(pool_);
736     RETURN_NOT_OK(builder.Resize(length_ + 1));
737     OffsetType offset = 0;
738     for (int64_t i = 0; i < length_ + 1; ++i, offset += value_length) {
739       builder.UnsafeAppend(offset);
740     }
741     return builder.Finish(out);
742   }
743 
CreateBufferOf(const void * data,size_t data_length,std::shared_ptr<Buffer> * out)744   Status CreateBufferOf(const void* data, size_t data_length,
745                         std::shared_ptr<Buffer>* out) {
746     BufferBuilder builder(pool_);
747     RETURN_NOT_OK(builder.Resize(length_ * data_length));
748     for (int64_t i = 0; i < length_; ++i) {
749       builder.UnsafeAppend(data, data_length);
750     }
751     return builder.Finish(out);
752   }
753 
FinishFixedWidth(const void * data,size_t data_length)754   Status FinishFixedWidth(const void* data, size_t data_length) {
755     std::shared_ptr<Buffer> buffer;
756     RETURN_NOT_OK(CreateBufferOf(data, data_length, &buffer));
757     out_ = MakeArray(
758         ArrayData::Make(scalar_.type, length_, {nullptr, std::move(buffer)}, 0));
759     return Status::OK();
760   }
761 
762   MemoryPool* pool_;
763   const Scalar& scalar_;
764   int64_t length_;
765   std::shared_ptr<Array> out_;
766 };
767 
768 }  // namespace
769 
MakeArrayOfNull(const std::shared_ptr<DataType> & type,int64_t length,MemoryPool * pool)770 Result<std::shared_ptr<Array>> MakeArrayOfNull(const std::shared_ptr<DataType>& type,
771                                                int64_t length, MemoryPool* pool) {
772   ARROW_ASSIGN_OR_RAISE(auto data, NullArrayFactory(pool, type, length).Create());
773   return MakeArray(data);
774 }
775 
MakeArrayFromScalar(const Scalar & scalar,int64_t length,MemoryPool * pool)776 Result<std::shared_ptr<Array>> MakeArrayFromScalar(const Scalar& scalar, int64_t length,
777                                                    MemoryPool* pool) {
778   // Null union scalars still have a type code associated
779   if (!scalar.is_valid && !is_union(scalar.type->id())) {
780     return MakeArrayOfNull(scalar.type, length, pool);
781   }
782   return RepeatedArrayFactory(pool, scalar, length).Create();
783 }
784 
785 namespace internal {
786 
RechunkArraysConsistently(const std::vector<ArrayVector> & groups)787 std::vector<ArrayVector> RechunkArraysConsistently(
788     const std::vector<ArrayVector>& groups) {
789   if (groups.size() <= 1) {
790     return groups;
791   }
792   int64_t total_length = 0;
793   for (const auto& array : groups.front()) {
794     total_length += array->length();
795   }
796 #ifndef NDEBUG
797   for (const auto& group : groups) {
798     int64_t group_length = 0;
799     for (const auto& array : group) {
800       group_length += array->length();
801     }
802     DCHECK_EQ(group_length, total_length)
803         << "Array groups should have the same total number of elements";
804   }
805 #endif
806   if (total_length == 0) {
807     return groups;
808   }
809 
810   // Set up result vectors
811   std::vector<ArrayVector> rechunked_groups(groups.size());
812 
813   // Set up progress counters
814   std::vector<ArrayVector::const_iterator> current_arrays;
815   std::vector<int64_t> array_offsets;
816   for (const auto& group : groups) {
817     current_arrays.emplace_back(group.cbegin());
818     array_offsets.emplace_back(0);
819   }
820 
821   // Scan all array vectors at once, rechunking along the way
822   int64_t start = 0;
823   while (start < total_length) {
824     // First compute max possible length for next chunk
825     int64_t chunk_length = std::numeric_limits<int64_t>::max();
826     for (size_t i = 0; i < groups.size(); i++) {
827       auto& arr_it = current_arrays[i];
828       auto& offset = array_offsets[i];
829       // Skip any done arrays (including 0-length arrays)
830       while (offset == (*arr_it)->length()) {
831         ++arr_it;
832         offset = 0;
833       }
834       const auto& array = *arr_it;
835       DCHECK_GT(array->length(), offset);
836       chunk_length = std::min(chunk_length, array->length() - offset);
837     }
838     DCHECK_GT(chunk_length, 0);
839 
840     // Then slice all arrays along this chunk size
841     for (size_t i = 0; i < groups.size(); i++) {
842       const auto& array = *current_arrays[i];
843       auto& offset = array_offsets[i];
844       if (offset == 0 && array->length() == chunk_length) {
845         // Slice spans entire array
846         rechunked_groups[i].emplace_back(array);
847       } else {
848         DCHECK_LT(chunk_length - offset, array->length());
849         rechunked_groups[i].emplace_back(array->Slice(offset, chunk_length));
850       }
851       offset += chunk_length;
852     }
853     start += chunk_length;
854   }
855 
856   return rechunked_groups;
857 }
858 
859 }  // namespace internal
860 }  // namespace arrow
861