1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/compute/exec.h"
19 
20 #include <algorithm>
21 #include <cstddef>
22 #include <cstdint>
23 #include <memory>
24 #include <sstream>
25 #include <utility>
26 #include <vector>
27 
28 #include "arrow/array/array_base.h"
29 #include "arrow/array/array_primitive.h"
30 #include "arrow/array/data.h"
31 #include "arrow/array/util.h"
32 #include "arrow/buffer.h"
33 #include "arrow/chunked_array.h"
34 #include "arrow/compute/exec_internal.h"
35 #include "arrow/compute/function.h"
36 #include "arrow/compute/kernel.h"
37 #include "arrow/compute/registry.h"
38 #include "arrow/datum.h"
39 #include "arrow/pretty_print.h"
40 #include "arrow/record_batch.h"
41 #include "arrow/scalar.h"
42 #include "arrow/status.h"
43 #include "arrow/type.h"
44 #include "arrow/type_traits.h"
45 #include "arrow/util/bit_util.h"
46 #include "arrow/util/bitmap_ops.h"
47 #include "arrow/util/checked_cast.h"
48 #include "arrow/util/cpu_info.h"
49 #include "arrow/util/logging.h"
50 #include "arrow/util/make_unique.h"
51 #include "arrow/util/vector.h"
52 
53 namespace arrow {
54 
55 using internal::BitmapAnd;
56 using internal::checked_cast;
57 using internal::CopyBitmap;
58 using internal::CpuInfo;
59 
60 namespace compute {
61 
default_exec_context()62 ExecContext* default_exec_context() {
63   static ExecContext default_ctx;
64   return &default_ctx;
65 }
66 
ExecBatch(const RecordBatch & batch)67 ExecBatch::ExecBatch(const RecordBatch& batch)
68     : values(batch.num_columns()), length(batch.num_rows()) {
69   auto columns = batch.column_data();
70   std::move(columns.begin(), columns.end(), values.begin());
71 }
72 
Equals(const ExecBatch & other) const73 bool ExecBatch::Equals(const ExecBatch& other) const {
74   return guarantee == other.guarantee && values == other.values;
75 }
76 
PrintTo(const ExecBatch & batch,std::ostream * os)77 void PrintTo(const ExecBatch& batch, std::ostream* os) {
78   *os << "ExecBatch\n";
79 
80   static const std::string indent = "    ";
81 
82   *os << indent << "# Rows: " << batch.length << "\n";
83   if (batch.guarantee != literal(true)) {
84     *os << indent << "Guarantee: " << batch.guarantee.ToString() << "\n";
85   }
86 
87   int i = 0;
88   for (const Datum& value : batch.values) {
89     *os << indent << "" << i++ << ": ";
90 
91     if (value.is_scalar()) {
92       *os << "Scalar[" << value.scalar()->ToString() << "]\n";
93       continue;
94     }
95 
96     auto array = value.make_array();
97     PrettyPrintOptions options;
98     options.skip_new_lines = true;
99     *os << "Array";
100     ARROW_CHECK_OK(PrettyPrint(*array, options, os));
101     *os << "\n";
102   }
103 }
104 
ToString() const105 std::string ExecBatch::ToString() const {
106   std::stringstream ss;
107   PrintTo(*this, &ss);
108   return ss.str();
109 }
110 
Slice(int64_t offset,int64_t length) const111 ExecBatch ExecBatch::Slice(int64_t offset, int64_t length) const {
112   ExecBatch out = *this;
113   for (auto& value : out.values) {
114     if (value.is_scalar()) continue;
115     value = value.array()->Slice(offset, length);
116   }
117   out.length = std::min(length, this->length - offset);
118   return out;
119 }
120 
Make(std::vector<Datum> values)121 Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
122   if (values.empty()) {
123     return Status::Invalid("Cannot infer ExecBatch length without at least one value");
124   }
125 
126   int64_t length = -1;
127   for (const auto& value : values) {
128     if (value.is_scalar()) {
129       continue;
130     }
131 
132     if (length == -1) {
133       length = value.length();
134       continue;
135     }
136 
137     if (length != value.length()) {
138       return Status::Invalid(
139           "Arrays used to construct an ExecBatch must have equal length");
140     }
141   }
142 
143   if (length == -1) {
144     length = 1;
145   }
146 
147   return ExecBatch(std::move(values), length);
148 }
149 
ToRecordBatch(std::shared_ptr<Schema> schema,MemoryPool * pool) const150 Result<std::shared_ptr<RecordBatch>> ExecBatch::ToRecordBatch(
151     std::shared_ptr<Schema> schema, MemoryPool* pool) const {
152   ArrayVector columns(schema->num_fields());
153 
154   for (size_t i = 0; i < columns.size(); ++i) {
155     const Datum& value = values[i];
156     if (value.is_array()) {
157       columns[i] = value.make_array();
158       continue;
159     }
160     ARROW_ASSIGN_OR_RAISE(columns[i], MakeArrayFromScalar(*value.scalar(), length, pool));
161   }
162 
163   return RecordBatch::Make(std::move(schema), length, std::move(columns));
164 }
165 
166 namespace {
167 
AllocateDataBuffer(KernelContext * ctx,int64_t length,int bit_width)168 Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t length,
169                                                    int bit_width) {
170   if (bit_width == 1) {
171     return ctx->AllocateBitmap(length);
172   } else {
173     int64_t buffer_size = BitUtil::BytesForBits(length * bit_width);
174     return ctx->Allocate(buffer_size);
175   }
176 }
177 
178 struct BufferPreallocation {
BufferPreallocationarrow::compute::__anona2de6bcc0111::BufferPreallocation179   explicit BufferPreallocation(int bit_width = -1, int added_length = 0)
180       : bit_width(bit_width), added_length(added_length) {}
181 
182   int bit_width;
183   int added_length;
184 };
185 
ComputeDataPreallocate(const DataType & type,std::vector<BufferPreallocation> * widths)186 void ComputeDataPreallocate(const DataType& type,
187                             std::vector<BufferPreallocation>* widths) {
188   if (is_fixed_width(type.id()) && type.id() != Type::NA) {
189     widths->emplace_back(checked_cast<const FixedWidthType&>(type).bit_width());
190     return;
191   }
192   // Preallocate binary and list offsets
193   switch (type.id()) {
194     case Type::BINARY:
195     case Type::STRING:
196     case Type::LIST:
197     case Type::MAP:
198       widths->emplace_back(32, /*added_length=*/1);
199       return;
200     case Type::LARGE_BINARY:
201     case Type::LARGE_STRING:
202     case Type::LARGE_LIST:
203       widths->emplace_back(64, /*added_length=*/1);
204       return;
205     default:
206       break;
207   }
208 }
209 
210 }  // namespace
211 
212 namespace detail {
213 
CheckAllValues(const std::vector<Datum> & values)214 Status CheckAllValues(const std::vector<Datum>& values) {
215   for (const auto& value : values) {
216     if (!value.is_value()) {
217       return Status::Invalid("Tried executing function with non-value type: ",
218                              value.ToString());
219     }
220   }
221   return Status::OK();
222 }
223 
ExecBatchIterator(std::vector<Datum> args,int64_t length,int64_t max_chunksize)224 ExecBatchIterator::ExecBatchIterator(std::vector<Datum> args, int64_t length,
225                                      int64_t max_chunksize)
226     : args_(std::move(args)),
227       position_(0),
228       length_(length),
229       max_chunksize_(max_chunksize) {
230   chunk_indexes_.resize(args_.size(), 0);
231   chunk_positions_.resize(args_.size(), 0);
232 }
233 
Make(std::vector<Datum> args,int64_t max_chunksize)234 Result<std::unique_ptr<ExecBatchIterator>> ExecBatchIterator::Make(
235     std::vector<Datum> args, int64_t max_chunksize) {
236   for (const auto& arg : args) {
237     if (!(arg.is_arraylike() || arg.is_scalar())) {
238       return Status::Invalid(
239           "ExecBatchIterator only works with Scalar, Array, and "
240           "ChunkedArray arguments");
241     }
242   }
243 
244   // If the arguments are all scalars, then the length is 1
245   int64_t length = 1;
246 
247   bool length_set = false;
248   for (auto& arg : args) {
249     if (arg.is_scalar()) {
250       continue;
251     }
252     if (!length_set) {
253       length = arg.length();
254       length_set = true;
255     } else {
256       if (arg.length() != length) {
257         return Status::Invalid("Array arguments must all be the same length");
258       }
259     }
260   }
261 
262   max_chunksize = std::min(length, max_chunksize);
263 
264   return std::unique_ptr<ExecBatchIterator>(
265       new ExecBatchIterator(std::move(args), length, max_chunksize));
266 }
267 
Next(ExecBatch * batch)268 bool ExecBatchIterator::Next(ExecBatch* batch) {
269   if (position_ == length_) {
270     return false;
271   }
272 
273   // Determine how large the common contiguous "slice" of all the arguments is
274   int64_t iteration_size = std::min(length_ - position_, max_chunksize_);
275 
276   // If length_ is 0, then this loop will never execute
277   for (size_t i = 0; i < args_.size() && iteration_size > 0; ++i) {
278     // If the argument is not a chunked array, it's either a Scalar or Array,
279     // in which case it doesn't influence the size of this batch. Note that if
280     // the args are all scalars the batch length is 1
281     if (args_[i].kind() != Datum::CHUNKED_ARRAY) {
282       continue;
283     }
284     const ChunkedArray& arg = *args_[i].chunked_array();
285     std::shared_ptr<Array> current_chunk;
286     while (true) {
287       current_chunk = arg.chunk(chunk_indexes_[i]);
288       if (chunk_positions_[i] == current_chunk->length()) {
289         // Chunk is zero-length, or was exhausted in the previous iteration
290         chunk_positions_[i] = 0;
291         ++chunk_indexes_[i];
292         continue;
293       }
294       break;
295     }
296     iteration_size =
297         std::min(current_chunk->length() - chunk_positions_[i], iteration_size);
298   }
299 
300   // Now, fill the batch
301   batch->values.resize(args_.size());
302   batch->length = iteration_size;
303   for (size_t i = 0; i < args_.size(); ++i) {
304     if (args_[i].is_scalar()) {
305       batch->values[i] = args_[i].scalar();
306     } else if (args_[i].is_array()) {
307       batch->values[i] = args_[i].array()->Slice(position_, iteration_size);
308     } else {
309       const ChunkedArray& carr = *args_[i].chunked_array();
310       const auto& chunk = carr.chunk(chunk_indexes_[i]);
311       batch->values[i] = chunk->data()->Slice(chunk_positions_[i], iteration_size);
312       chunk_positions_[i] += iteration_size;
313     }
314   }
315   position_ += iteration_size;
316   DCHECK_LE(position_, length_);
317   return true;
318 }
319 
320 namespace {
321 
322 struct NullGeneralization {
323   enum type { PERHAPS_NULL, ALL_VALID, ALL_NULL };
324 
Getarrow::compute::detail::__anona2de6bcc0211::NullGeneralization325   static type Get(const Datum& datum) {
326     if (datum.type()->id() == Type::NA) {
327       return ALL_NULL;
328     }
329 
330     if (datum.is_scalar()) {
331       return datum.scalar()->is_valid ? ALL_VALID : ALL_NULL;
332     }
333 
334     const auto& arr = *datum.array();
335 
336     // Do not count the bits if they haven't been counted already
337     const int64_t known_null_count = arr.null_count.load();
338     if ((known_null_count == 0) || (arr.buffers[0] == NULLPTR)) {
339       return ALL_VALID;
340     }
341 
342     if (known_null_count == arr.length) {
343       return ALL_NULL;
344     }
345 
346     return PERHAPS_NULL;
347   }
348 };
349 
350 // Null propagation implementation that deals both with preallocated bitmaps
351 // and maybe-to-be allocated bitmaps
352 //
353 // If the bitmap is preallocated, it MUST be populated (since it might be a
354 // view of a much larger bitmap). If it isn't preallocated, then we have
355 // more flexibility.
356 //
357 // * If the batch has no nulls, then we do nothing
358 // * If only a single array has nulls, and its offset is a multiple of 8,
359 //   then we can zero-copy the bitmap into the output
360 // * Otherwise, we allocate the bitmap and populate it
361 class NullPropagator {
362  public:
NullPropagator(KernelContext * ctx,const ExecBatch & batch,ArrayData * output)363   NullPropagator(KernelContext* ctx, const ExecBatch& batch, ArrayData* output)
364       : ctx_(ctx), batch_(batch), output_(output) {
365     for (const Datum& datum : batch_.values) {
366       auto null_generalization = NullGeneralization::Get(datum);
367 
368       if (null_generalization == NullGeneralization::ALL_NULL) {
369         is_all_null_ = true;
370       }
371 
372       if (null_generalization != NullGeneralization::ALL_VALID &&
373           datum.kind() == Datum::ARRAY) {
374         arrays_with_nulls_.push_back(datum.array().get());
375       }
376     }
377 
378     if (output->buffers[0] != nullptr) {
379       bitmap_preallocated_ = true;
380       SetBitmap(output_->buffers[0].get());
381     }
382   }
383 
SetBitmap(Buffer * bitmap)384   void SetBitmap(Buffer* bitmap) { bitmap_ = bitmap->mutable_data(); }
385 
EnsureAllocated()386   Status EnsureAllocated() {
387     if (bitmap_preallocated_) {
388       return Status::OK();
389     }
390     ARROW_ASSIGN_OR_RAISE(output_->buffers[0], ctx_->AllocateBitmap(output_->length));
391     SetBitmap(output_->buffers[0].get());
392     return Status::OK();
393   }
394 
AllNullShortCircuit()395   Status AllNullShortCircuit() {
396     // OK, the output should be all null
397     output_->null_count = output_->length;
398 
399     if (bitmap_preallocated_) {
400       BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, false);
401       return Status::OK();
402     }
403 
404     // Walk all the values with nulls instead of breaking on the first in case
405     // we find a bitmap that can be reused in the non-preallocated case
406     for (const ArrayData* arr : arrays_with_nulls_) {
407       if (arr->null_count.load() == arr->length && arr->buffers[0] != nullptr) {
408         // Reuse this all null bitmap
409         output_->buffers[0] = arr->buffers[0];
410         return Status::OK();
411       }
412     }
413 
414     RETURN_NOT_OK(EnsureAllocated());
415     BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, false);
416     return Status::OK();
417   }
418 
PropagateSingle()419   Status PropagateSingle() {
420     // One array
421     const ArrayData& arr = *arrays_with_nulls_[0];
422     const std::shared_ptr<Buffer>& arr_bitmap = arr.buffers[0];
423 
424     // Reuse the null count if it's known
425     output_->null_count = arr.null_count.load();
426 
427     if (bitmap_preallocated_) {
428       CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_, output_->offset);
429       return Status::OK();
430     }
431 
432     // Two cases when memory was not pre-allocated:
433     //
434     // * Offset is zero: we reuse the bitmap as is
435     // * Offset is nonzero but a multiple of 8: we can slice the bitmap
436     // * Offset is not a multiple of 8: we must allocate and use CopyBitmap
437     //
438     // Keep in mind that output_->offset is not permitted to be nonzero when
439     // the bitmap is not preallocated, and that precondition is asserted
440     // higher in the call stack.
441     if (arr.offset == 0) {
442       output_->buffers[0] = arr_bitmap;
443     } else if (arr.offset % 8 == 0) {
444       output_->buffers[0] =
445           SliceBuffer(arr_bitmap, arr.offset / 8, BitUtil::BytesForBits(arr.length));
446     } else {
447       RETURN_NOT_OK(EnsureAllocated());
448       CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_,
449                  /*dst_offset=*/0);
450     }
451     return Status::OK();
452   }
453 
PropagateMultiple()454   Status PropagateMultiple() {
455     // More than one array. We use BitmapAnd to intersect their bitmaps
456 
457     // Do not compute the intersection null count until it's needed
458     RETURN_NOT_OK(EnsureAllocated());
459 
460     auto Accumulate = [&](const ArrayData& left, const ArrayData& right) {
461       DCHECK(left.buffers[0]);
462       DCHECK(right.buffers[0]);
463       BitmapAnd(left.buffers[0]->data(), left.offset, right.buffers[0]->data(),
464                 right.offset, output_->length, output_->offset,
465                 output_->buffers[0]->mutable_data());
466     };
467 
468     DCHECK_GT(arrays_with_nulls_.size(), 1);
469 
470     // Seed the output bitmap with the & of the first two bitmaps
471     Accumulate(*arrays_with_nulls_[0], *arrays_with_nulls_[1]);
472 
473     // Accumulate the rest
474     for (size_t i = 2; i < arrays_with_nulls_.size(); ++i) {
475       Accumulate(*output_, *arrays_with_nulls_[i]);
476     }
477     return Status::OK();
478   }
479 
Execute()480   Status Execute() {
481     if (is_all_null_) {
482       // An all-null value (scalar null or all-null array) gives us a short
483       // circuit opportunity
484       return AllNullShortCircuit();
485     }
486 
487     // At this point, by construction we know that all of the values in
488     // arrays_with_nulls_ are arrays that are not all null. So there are a
489     // few cases:
490     //
491     // * No arrays. This is a no-op w/o preallocation but when the bitmap is
492     //   pre-allocated we have to fill it with 1's
493     // * One array, whose bitmap can be zero-copied (w/o preallocation, and
494     //   when no byte is split) or copied (split byte or w/ preallocation)
495     // * More than one array, we must compute the intersection of all the
496     //   bitmaps
497     //
498     // BUT, if the output offset is nonzero for some reason, we copy into the
499     // output unconditionally
500 
501     output_->null_count = kUnknownNullCount;
502 
503     if (arrays_with_nulls_.empty()) {
504       // No arrays with nulls case
505       output_->null_count = 0;
506       if (bitmap_preallocated_) {
507         BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, true);
508       }
509       return Status::OK();
510     }
511 
512     if (arrays_with_nulls_.size() == 1) {
513       return PropagateSingle();
514     }
515 
516     return PropagateMultiple();
517   }
518 
519  private:
520   KernelContext* ctx_;
521   const ExecBatch& batch_;
522   std::vector<const ArrayData*> arrays_with_nulls_;
523   bool is_all_null_ = false;
524   ArrayData* output_;
525   uint8_t* bitmap_;
526   bool bitmap_preallocated_ = false;
527 };
528 
ToChunkedArray(const std::vector<Datum> & values,const std::shared_ptr<DataType> & type)529 std::shared_ptr<ChunkedArray> ToChunkedArray(const std::vector<Datum>& values,
530                                              const std::shared_ptr<DataType>& type) {
531   std::vector<std::shared_ptr<Array>> arrays;
532   arrays.reserve(values.size());
533   for (const Datum& val : values) {
534     if (val.length() == 0) {
535       // Skip empty chunks
536       continue;
537     }
538     arrays.emplace_back(val.make_array());
539   }
540   return std::make_shared<ChunkedArray>(std::move(arrays), type);
541 }
542 
HaveChunkedArray(const std::vector<Datum> & values)543 bool HaveChunkedArray(const std::vector<Datum>& values) {
544   for (const auto& value : values) {
545     if (value.kind() == Datum::CHUNKED_ARRAY) {
546       return true;
547     }
548   }
549   return false;
550 }
551 
552 template <typename KernelType>
553 class KernelExecutorImpl : public KernelExecutor {
554  public:
Init(KernelContext * kernel_ctx,KernelInitArgs args)555   Status Init(KernelContext* kernel_ctx, KernelInitArgs args) override {
556     kernel_ctx_ = kernel_ctx;
557     kernel_ = static_cast<const KernelType*>(args.kernel);
558 
559     // Resolve the output descriptor for this kernel
560     ARROW_ASSIGN_OR_RAISE(
561         output_descr_, kernel_->signature->out_type().Resolve(kernel_ctx_, args.inputs));
562 
563     return Status::OK();
564   }
565 
566  protected:
567   // This is overridden by the VectorExecutor
SetupArgIteration(const std::vector<Datum> & args)568   virtual Status SetupArgIteration(const std::vector<Datum>& args) {
569     ARROW_ASSIGN_OR_RAISE(
570         batch_iterator_, ExecBatchIterator::Make(args, exec_context()->exec_chunksize()));
571     return Status::OK();
572   }
573 
PrepareOutput(int64_t length)574   Result<std::shared_ptr<ArrayData>> PrepareOutput(int64_t length) {
575     auto out = std::make_shared<ArrayData>(output_descr_.type, length);
576     out->buffers.resize(output_num_buffers_);
577 
578     if (validity_preallocated_) {
579       ARROW_ASSIGN_OR_RAISE(out->buffers[0], kernel_ctx_->AllocateBitmap(length));
580     }
581     if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
582       out->null_count = 0;
583     }
584     for (size_t i = 0; i < data_preallocated_.size(); ++i) {
585       const auto& prealloc = data_preallocated_[i];
586       if (prealloc.bit_width >= 0) {
587         ARROW_ASSIGN_OR_RAISE(
588             out->buffers[i + 1],
589             AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length,
590                                prealloc.bit_width));
591       }
592     }
593     return out;
594   }
595 
CheckResultType(const Datum & out,const char * function_name)596   Status CheckResultType(const Datum& out, const char* function_name) override {
597     const auto& type = out.type();
598     if (type != nullptr && !type->Equals(output_descr_.type)) {
599       return Status::TypeError(
600           "kernel type result mismatch for function '", function_name, "': declared as ",
601           output_descr_.type->ToString(), ", actual is ", type->ToString());
602     }
603     return Status::OK();
604   }
605 
exec_context()606   ExecContext* exec_context() { return kernel_ctx_->exec_context(); }
state()607   KernelState* state() { return kernel_ctx_->state(); }
608 
609   // Not all of these members are used for every executor type
610 
611   KernelContext* kernel_ctx_;
612   const KernelType* kernel_;
613   std::unique_ptr<ExecBatchIterator> batch_iterator_;
614   ValueDescr output_descr_;
615 
616   int output_num_buffers_;
617 
618   // If true, then memory is preallocated for the validity bitmap with the same
619   // strategy as the data buffer(s).
620   bool validity_preallocated_ = false;
621 
622   // The kernel writes into data buffers preallocated for these bit widths
623   // (0 indicates no preallocation);
624   std::vector<BufferPreallocation> data_preallocated_;
625 };
626 
627 class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
628  public:
Execute(const std::vector<Datum> & args,ExecListener * listener)629   Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
630     RETURN_NOT_OK(PrepareExecute(args));
631     ExecBatch batch;
632     while (batch_iterator_->Next(&batch)) {
633       RETURN_NOT_OK(ExecuteBatch(batch, listener));
634     }
635     if (preallocate_contiguous_) {
636       // If we preallocated one big chunk, since the kernel execution is
637       // completed, we can now emit it
638       RETURN_NOT_OK(listener->OnResult(std::move(preallocated_)));
639     }
640     return Status::OK();
641   }
642 
WrapResults(const std::vector<Datum> & inputs,const std::vector<Datum> & outputs)643   Datum WrapResults(const std::vector<Datum>& inputs,
644                     const std::vector<Datum>& outputs) override {
645     if (output_descr_.shape == ValueDescr::SCALAR) {
646       DCHECK_GT(outputs.size(), 0);
647       if (outputs.size() == 1) {
648         // Return as SCALAR
649         return outputs[0];
650       } else {
651         // Return as COLLECTION
652         return outputs;
653       }
654     } else {
655       // If execution yielded multiple chunks (because large arrays were split
656       // based on the ExecContext parameters, then the result is a ChunkedArray
657       if (HaveChunkedArray(inputs) || outputs.size() > 1) {
658         return ToChunkedArray(outputs, output_descr_.type);
659       } else if (outputs.size() == 1) {
660         // Outputs have just one element
661         return outputs[0];
662       } else {
663         // XXX: In the case where no outputs are omitted, is returning a 0-length
664         // array always the correct move?
665         return MakeArrayOfNull(output_descr_.type, /*length=*/0,
666                                exec_context()->memory_pool())
667             .ValueOrDie();
668       }
669     }
670   }
671 
672  protected:
ExecuteBatch(const ExecBatch & batch,ExecListener * listener)673   Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
674     Datum out;
675     RETURN_NOT_OK(PrepareNextOutput(batch, &out));
676 
677     if (output_descr_.shape == ValueDescr::ARRAY) {
678       ArrayData* out_arr = out.mutable_array();
679       if (kernel_->null_handling == NullHandling::INTERSECTION) {
680         RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out_arr));
681       } else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
682         out_arr->null_count = 0;
683       }
684     } else {
685       if (kernel_->null_handling == NullHandling::INTERSECTION) {
686         // set scalar validity
687         out.scalar()->is_valid =
688             std::all_of(batch.values.begin(), batch.values.end(),
689                         [](const Datum& input) { return input.scalar()->is_valid; });
690       } else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
691         out.scalar()->is_valid = true;
692       }
693     }
694 
695     RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));
696     if (!preallocate_contiguous_) {
697       // If we are producing chunked output rather than one big array, then
698       // emit each chunk as soon as it's available
699       RETURN_NOT_OK(listener->OnResult(std::move(out)));
700     }
701     return Status::OK();
702   }
703 
PrepareExecute(const std::vector<Datum> & args)704   Status PrepareExecute(const std::vector<Datum>& args) {
705     RETURN_NOT_OK(this->SetupArgIteration(args));
706 
707     if (output_descr_.shape == ValueDescr::ARRAY) {
708       // If the executor is configured to produce a single large Array output for
709       // kernels supporting preallocation, then we do so up front and then
710       // iterate over slices of that large array. Otherwise, we preallocate prior
711       // to processing each batch emitted from the ExecBatchIterator
712       RETURN_NOT_OK(SetupPreallocation(batch_iterator_->length()));
713     }
714     return Status::OK();
715   }
716 
717   // We must accommodate two different modes of execution for preallocated
718   // execution
719   //
720   // * A single large ("contiguous") allocation that we populate with results
721   //   on a chunkwise basis according to the ExecBatchIterator. This permits
722   //   parallelization even if the objective is to obtain a single Array or
723   //   ChunkedArray at the end
724   // * A standalone buffer preallocation for each chunk emitted from the
725   //   ExecBatchIterator
726   //
727   // When data buffer preallocation is not possible (e.g. with BINARY / STRING
728   // outputs), then contiguous results are only possible if the input is
729   // contiguous.
730 
PrepareNextOutput(const ExecBatch & batch,Datum * out)731   Status PrepareNextOutput(const ExecBatch& batch, Datum* out) {
732     if (output_descr_.shape == ValueDescr::ARRAY) {
733       if (preallocate_contiguous_) {
734         // The output is already fully preallocated
735         const int64_t batch_start_position = batch_iterator_->position() - batch.length;
736 
737         if (batch.length < batch_iterator_->length()) {
738           // If this is a partial execution, then we write into a slice of
739           // preallocated_
740           out->value = preallocated_->Slice(batch_start_position, batch.length);
741         } else {
742           // Otherwise write directly into preallocated_. The main difference
743           // computationally (versus the Slice approach) is that the null_count
744           // may not need to be recomputed in the result
745           out->value = preallocated_;
746         }
747       } else {
748         // We preallocate (maybe) only for the output of processing the current
749         // batch
750         ARROW_ASSIGN_OR_RAISE(out->value, PrepareOutput(batch.length));
751       }
752     } else {
753       // For scalar outputs, we set a null scalar of the correct type to
754       // communicate the output type to the kernel if needed
755       //
756       // XXX: Is there some way to avoid this step?
757       out->value = MakeNullScalar(output_descr_.type);
758     }
759     return Status::OK();
760   }
761 
SetupPreallocation(int64_t total_length)762   Status SetupPreallocation(int64_t total_length) {
763     output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());
764 
765     // Decide if we need to preallocate memory for this kernel
766     validity_preallocated_ =
767         (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
768          kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL &&
769          output_descr_.type->id() != Type::NA);
770     if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
771       ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
772     }
773 
774     // Contiguous preallocation only possible on non-nested types if all
775     // buffers are preallocated.  Otherwise, we must go chunk-by-chunk.
776     //
777     // Some kernels are also unable to write into sliced outputs, so we respect the
778     // kernel's attributes.
779     preallocate_contiguous_ =
780         (exec_context()->preallocate_contiguous() && kernel_->can_write_into_slices &&
781          validity_preallocated_ && !is_nested(output_descr_.type->id()) &&
782          !is_dictionary(output_descr_.type->id()) &&
783          data_preallocated_.size() == static_cast<size_t>(output_num_buffers_ - 1) &&
784          std::all_of(data_preallocated_.begin(), data_preallocated_.end(),
785                      [](const BufferPreallocation& prealloc) {
786                        return prealloc.bit_width >= 0;
787                      }));
788     if (preallocate_contiguous_) {
789       ARROW_ASSIGN_OR_RAISE(preallocated_, PrepareOutput(total_length));
790     }
791     return Status::OK();
792   }
793 
794   // If true, and the kernel and output type supports preallocation (for both
795   // the validity and data buffers), then we allocate one big array and then
796   // iterate through it while executing the kernel in chunks
797   bool preallocate_contiguous_ = false;
798 
799   // For storing a contiguous preallocation per above. Unused otherwise
800   std::shared_ptr<ArrayData> preallocated_;
801 };
802 
PackBatchNoChunks(const std::vector<Datum> & args,ExecBatch * out)803 Status PackBatchNoChunks(const std::vector<Datum>& args, ExecBatch* out) {
804   int64_t length = 0;
805   for (const auto& arg : args) {
806     switch (arg.kind()) {
807       case Datum::SCALAR:
808       case Datum::ARRAY:
809       case Datum::CHUNKED_ARRAY:
810         length = std::max(arg.length(), length);
811         break;
812       default:
813         DCHECK(false);
814         break;
815     }
816   }
817   out->length = length;
818   out->values = args;
819   return Status::OK();
820 }
821 
822 class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
823  public:
Execute(const std::vector<Datum> & args,ExecListener * listener)824   Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
825     RETURN_NOT_OK(PrepareExecute(args));
826     ExecBatch batch;
827     if (kernel_->can_execute_chunkwise) {
828       while (batch_iterator_->Next(&batch)) {
829         RETURN_NOT_OK(ExecuteBatch(batch, listener));
830       }
831     } else {
832       RETURN_NOT_OK(PackBatchNoChunks(args, &batch));
833       RETURN_NOT_OK(ExecuteBatch(batch, listener));
834     }
835     return Finalize(listener);
836   }
837 
WrapResults(const std::vector<Datum> & inputs,const std::vector<Datum> & outputs)838   Datum WrapResults(const std::vector<Datum>& inputs,
839                     const std::vector<Datum>& outputs) override {
840     // If execution yielded multiple chunks (because large arrays were split
841     // based on the ExecContext parameters, then the result is a ChunkedArray
842     if (kernel_->output_chunked && (HaveChunkedArray(inputs) || outputs.size() > 1)) {
843       return ToChunkedArray(outputs, output_descr_.type);
844     } else if (outputs.size() == 1) {
845       // Outputs have just one element
846       return outputs[0];
847     } else {
848       // XXX: In the case where no outputs are omitted, is returning a 0-length
849       // array always the correct move?
850       return MakeArrayOfNull(output_descr_.type, /*length=*/0).ValueOrDie();
851     }
852   }
853 
854  protected:
ExecuteBatch(const ExecBatch & batch,ExecListener * listener)855   Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
856     Datum out;
857     if (output_descr_.shape == ValueDescr::ARRAY) {
858       // We preallocate (maybe) only for the output of processing the current
859       // batch
860       ARROW_ASSIGN_OR_RAISE(out.value, PrepareOutput(batch.length));
861     }
862 
863     if (kernel_->null_handling == NullHandling::INTERSECTION &&
864         output_descr_.shape == ValueDescr::ARRAY) {
865       RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out.mutable_array()));
866     }
867     RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));
868     if (!kernel_->finalize) {
869       // If there is no result finalizer (e.g. for hash-based functions, we can
870       // emit the processed batch right away rather than waiting
871       RETURN_NOT_OK(listener->OnResult(std::move(out)));
872     } else {
873       results_.emplace_back(std::move(out));
874     }
875     return Status::OK();
876   }
877 
Finalize(ExecListener * listener)878   Status Finalize(ExecListener* listener) {
879     if (kernel_->finalize) {
880       // Intermediate results require post-processing after the execution is
881       // completed (possibly involving some accumulated state)
882       RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &results_));
883       for (const auto& result : results_) {
884         RETURN_NOT_OK(listener->OnResult(result));
885       }
886     }
887     return Status::OK();
888   }
889 
SetupArgIteration(const std::vector<Datum> & args)890   Status SetupArgIteration(const std::vector<Datum>& args) override {
891     if (kernel_->can_execute_chunkwise) {
892       ARROW_ASSIGN_OR_RAISE(batch_iterator_, ExecBatchIterator::Make(
893                                                  args, exec_context()->exec_chunksize()));
894     }
895     return Status::OK();
896   }
897 
PrepareExecute(const std::vector<Datum> & args)898   Status PrepareExecute(const std::vector<Datum>& args) {
899     RETURN_NOT_OK(this->SetupArgIteration(args));
900     output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());
901 
902     // Decide if we need to preallocate memory for this kernel
903     validity_preallocated_ =
904         (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
905          kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
906     if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
907       ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
908     }
909     return Status::OK();
910   }
911 
912   std::vector<Datum> results_;
913 };
914 
915 class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {
916  public:
Init(KernelContext * ctx,KernelInitArgs args)917   Status Init(KernelContext* ctx, KernelInitArgs args) override {
918     input_descrs_ = &args.inputs;
919     options_ = args.options;
920     return KernelExecutorImpl<ScalarAggregateKernel>::Init(ctx, args);
921   }
922 
Execute(const std::vector<Datum> & args,ExecListener * listener)923   Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
924     RETURN_NOT_OK(this->SetupArgIteration(args));
925 
926     ExecBatch batch;
927     while (batch_iterator_->Next(&batch)) {
928       // TODO: implement parallelism
929       if (batch.length > 0) {
930         RETURN_NOT_OK(Consume(batch));
931       }
932     }
933 
934     Datum out;
935     RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &out));
936     RETURN_NOT_OK(listener->OnResult(std::move(out)));
937     return Status::OK();
938   }
939 
WrapResults(const std::vector<Datum> &,const std::vector<Datum> & outputs)940   Datum WrapResults(const std::vector<Datum>&,
941                     const std::vector<Datum>& outputs) override {
942     DCHECK_EQ(1, outputs.size());
943     return outputs[0];
944   }
945 
946  private:
Consume(const ExecBatch & batch)947   Status Consume(const ExecBatch& batch) {
948     // FIXME(ARROW-11840) don't merge *any* aggegates for every batch
949     ARROW_ASSIGN_OR_RAISE(
950         auto batch_state,
951         kernel_->init(kernel_ctx_, {kernel_, *input_descrs_, options_}));
952 
953     if (batch_state == nullptr) {
954       return Status::Invalid("ScalarAggregation requires non-null kernel state");
955     }
956 
957     KernelContext batch_ctx(exec_context());
958     batch_ctx.SetState(batch_state.get());
959 
960     RETURN_NOT_OK(kernel_->consume(&batch_ctx, batch));
961     RETURN_NOT_OK(kernel_->merge(kernel_ctx_, std::move(*batch_state), state()));
962     return Status::OK();
963   }
964 
965   const std::vector<ValueDescr>* input_descrs_;
966   const FunctionOptions* options_;
967 };
968 
969 template <typename ExecutorType,
970           typename FunctionType = typename ExecutorType::FunctionType>
MakeExecutor(ExecContext * ctx,const Function * func,const FunctionOptions * options)971 Result<std::unique_ptr<KernelExecutor>> MakeExecutor(ExecContext* ctx,
972                                                      const Function* func,
973                                                      const FunctionOptions* options) {
974   DCHECK_EQ(ExecutorType::function_kind, func->kind());
975   auto typed_func = checked_cast<const FunctionType*>(func);
976   return std::unique_ptr<KernelExecutor>(new ExecutorType(ctx, typed_func, options));
977 }
978 
979 }  // namespace
980 
PropagateNulls(KernelContext * ctx,const ExecBatch & batch,ArrayData * output)981 Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* output) {
982   DCHECK_NE(nullptr, output);
983   DCHECK_GT(output->buffers.size(), 0);
984 
985   if (output->type->id() == Type::NA) {
986     // Null output type is a no-op (rare when this would happen but we at least
987     // will test for it)
988     return Status::OK();
989   }
990 
991   // This function is ONLY able to write into output with non-zero offset
992   // when the bitmap is preallocated. This could be a DCHECK but returning
993   // error Status for now for emphasis
994   if (output->offset != 0 && output->buffers[0] == nullptr) {
995     return Status::Invalid(
996         "Can only propagate nulls into pre-allocated memory "
997         "when the output offset is non-zero");
998   }
999   NullPropagator propagator(ctx, batch, output);
1000   return propagator.Execute();
1001 }
1002 
MakeScalar()1003 std::unique_ptr<KernelExecutor> KernelExecutor::MakeScalar() {
1004   return ::arrow::internal::make_unique<detail::ScalarExecutor>();
1005 }
1006 
MakeVector()1007 std::unique_ptr<KernelExecutor> KernelExecutor::MakeVector() {
1008   return ::arrow::internal::make_unique<detail::VectorExecutor>();
1009 }
1010 
MakeScalarAggregate()1011 std::unique_ptr<KernelExecutor> KernelExecutor::MakeScalarAggregate() {
1012   return ::arrow::internal::make_unique<detail::ScalarAggExecutor>();
1013 }
1014 
1015 }  // namespace detail
1016 
ExecContext(MemoryPool * pool,::arrow::internal::Executor * executor,FunctionRegistry * func_registry)1017 ExecContext::ExecContext(MemoryPool* pool, ::arrow::internal::Executor* executor,
1018                          FunctionRegistry* func_registry)
1019     : pool_(pool), executor_(executor) {
1020   this->func_registry_ = func_registry == nullptr ? GetFunctionRegistry() : func_registry;
1021 }
1022 
cpu_info() const1023 CpuInfo* ExecContext::cpu_info() const { return CpuInfo::GetInstance(); }
1024 
1025 // ----------------------------------------------------------------------
1026 // SelectionVector
1027 
SelectionVector(std::shared_ptr<ArrayData> data)1028 SelectionVector::SelectionVector(std::shared_ptr<ArrayData> data)
1029     : data_(std::move(data)) {
1030   DCHECK_EQ(Type::INT32, data_->type->id());
1031   DCHECK_EQ(0, data_->GetNullCount());
1032   indices_ = data_->GetValues<int32_t>(1);
1033 }
1034 
SelectionVector(const Array & arr)1035 SelectionVector::SelectionVector(const Array& arr) : SelectionVector(arr.data()) {}
1036 
length() const1037 int32_t SelectionVector::length() const { return static_cast<int32_t>(data_->length); }
1038 
FromMask(const BooleanArray & arr)1039 Result<std::shared_ptr<SelectionVector>> SelectionVector::FromMask(
1040     const BooleanArray& arr) {
1041   return Status::NotImplemented("FromMask");
1042 }
1043 
CallFunction(const std::string & func_name,const std::vector<Datum> & args,const FunctionOptions * options,ExecContext * ctx)1044 Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
1045                            const FunctionOptions* options, ExecContext* ctx) {
1046   if (ctx == nullptr) {
1047     ExecContext default_ctx;
1048     return CallFunction(func_name, args, options, &default_ctx);
1049   }
1050   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<const Function> func,
1051                         ctx->func_registry()->GetFunction(func_name));
1052   return func->Execute(args, options, ctx);
1053 }
1054 
CallFunction(const std::string & func_name,const std::vector<Datum> & args,ExecContext * ctx)1055 Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
1056                            ExecContext* ctx) {
1057   return CallFunction(func_name, args, /*options=*/nullptr, ctx);
1058 }
1059 
1060 }  // namespace compute
1061 }  // namespace arrow
1062