1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/compute/exec.h"
19
20 #include <algorithm>
21 #include <cstddef>
22 #include <cstdint>
23 #include <memory>
24 #include <sstream>
25 #include <utility>
26 #include <vector>
27
28 #include "arrow/array/array_base.h"
29 #include "arrow/array/array_primitive.h"
30 #include "arrow/array/data.h"
31 #include "arrow/array/util.h"
32 #include "arrow/buffer.h"
33 #include "arrow/chunked_array.h"
34 #include "arrow/compute/exec_internal.h"
35 #include "arrow/compute/function.h"
36 #include "arrow/compute/kernel.h"
37 #include "arrow/compute/registry.h"
38 #include "arrow/datum.h"
39 #include "arrow/pretty_print.h"
40 #include "arrow/record_batch.h"
41 #include "arrow/scalar.h"
42 #include "arrow/status.h"
43 #include "arrow/type.h"
44 #include "arrow/type_traits.h"
45 #include "arrow/util/bit_util.h"
46 #include "arrow/util/bitmap_ops.h"
47 #include "arrow/util/checked_cast.h"
48 #include "arrow/util/cpu_info.h"
49 #include "arrow/util/logging.h"
50 #include "arrow/util/make_unique.h"
51 #include "arrow/util/vector.h"
52
53 namespace arrow {
54
55 using internal::BitmapAnd;
56 using internal::checked_cast;
57 using internal::CopyBitmap;
58 using internal::CpuInfo;
59
60 namespace compute {
61
default_exec_context()62 ExecContext* default_exec_context() {
63 static ExecContext default_ctx;
64 return &default_ctx;
65 }
66
ExecBatch(const RecordBatch & batch)67 ExecBatch::ExecBatch(const RecordBatch& batch)
68 : values(batch.num_columns()), length(batch.num_rows()) {
69 auto columns = batch.column_data();
70 std::move(columns.begin(), columns.end(), values.begin());
71 }
72
Equals(const ExecBatch & other) const73 bool ExecBatch::Equals(const ExecBatch& other) const {
74 return guarantee == other.guarantee && values == other.values;
75 }
76
PrintTo(const ExecBatch & batch,std::ostream * os)77 void PrintTo(const ExecBatch& batch, std::ostream* os) {
78 *os << "ExecBatch\n";
79
80 static const std::string indent = " ";
81
82 *os << indent << "# Rows: " << batch.length << "\n";
83 if (batch.guarantee != literal(true)) {
84 *os << indent << "Guarantee: " << batch.guarantee.ToString() << "\n";
85 }
86
87 int i = 0;
88 for (const Datum& value : batch.values) {
89 *os << indent << "" << i++ << ": ";
90
91 if (value.is_scalar()) {
92 *os << "Scalar[" << value.scalar()->ToString() << "]\n";
93 continue;
94 }
95
96 auto array = value.make_array();
97 PrettyPrintOptions options;
98 options.skip_new_lines = true;
99 *os << "Array";
100 ARROW_CHECK_OK(PrettyPrint(*array, options, os));
101 *os << "\n";
102 }
103 }
104
ToString() const105 std::string ExecBatch::ToString() const {
106 std::stringstream ss;
107 PrintTo(*this, &ss);
108 return ss.str();
109 }
110
Slice(int64_t offset,int64_t length) const111 ExecBatch ExecBatch::Slice(int64_t offset, int64_t length) const {
112 ExecBatch out = *this;
113 for (auto& value : out.values) {
114 if (value.is_scalar()) continue;
115 value = value.array()->Slice(offset, length);
116 }
117 out.length = std::min(length, this->length - offset);
118 return out;
119 }
120
Make(std::vector<Datum> values)121 Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
122 if (values.empty()) {
123 return Status::Invalid("Cannot infer ExecBatch length without at least one value");
124 }
125
126 int64_t length = -1;
127 for (const auto& value : values) {
128 if (value.is_scalar()) {
129 continue;
130 }
131
132 if (length == -1) {
133 length = value.length();
134 continue;
135 }
136
137 if (length != value.length()) {
138 return Status::Invalid(
139 "Arrays used to construct an ExecBatch must have equal length");
140 }
141 }
142
143 if (length == -1) {
144 length = 1;
145 }
146
147 return ExecBatch(std::move(values), length);
148 }
149
ToRecordBatch(std::shared_ptr<Schema> schema,MemoryPool * pool) const150 Result<std::shared_ptr<RecordBatch>> ExecBatch::ToRecordBatch(
151 std::shared_ptr<Schema> schema, MemoryPool* pool) const {
152 ArrayVector columns(schema->num_fields());
153
154 for (size_t i = 0; i < columns.size(); ++i) {
155 const Datum& value = values[i];
156 if (value.is_array()) {
157 columns[i] = value.make_array();
158 continue;
159 }
160 ARROW_ASSIGN_OR_RAISE(columns[i], MakeArrayFromScalar(*value.scalar(), length, pool));
161 }
162
163 return RecordBatch::Make(std::move(schema), length, std::move(columns));
164 }
165
166 namespace {
167
AllocateDataBuffer(KernelContext * ctx,int64_t length,int bit_width)168 Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t length,
169 int bit_width) {
170 if (bit_width == 1) {
171 return ctx->AllocateBitmap(length);
172 } else {
173 int64_t buffer_size = BitUtil::BytesForBits(length * bit_width);
174 return ctx->Allocate(buffer_size);
175 }
176 }
177
178 struct BufferPreallocation {
BufferPreallocationarrow::compute::__anona2de6bcc0111::BufferPreallocation179 explicit BufferPreallocation(int bit_width = -1, int added_length = 0)
180 : bit_width(bit_width), added_length(added_length) {}
181
182 int bit_width;
183 int added_length;
184 };
185
ComputeDataPreallocate(const DataType & type,std::vector<BufferPreallocation> * widths)186 void ComputeDataPreallocate(const DataType& type,
187 std::vector<BufferPreallocation>* widths) {
188 if (is_fixed_width(type.id()) && type.id() != Type::NA) {
189 widths->emplace_back(checked_cast<const FixedWidthType&>(type).bit_width());
190 return;
191 }
192 // Preallocate binary and list offsets
193 switch (type.id()) {
194 case Type::BINARY:
195 case Type::STRING:
196 case Type::LIST:
197 case Type::MAP:
198 widths->emplace_back(32, /*added_length=*/1);
199 return;
200 case Type::LARGE_BINARY:
201 case Type::LARGE_STRING:
202 case Type::LARGE_LIST:
203 widths->emplace_back(64, /*added_length=*/1);
204 return;
205 default:
206 break;
207 }
208 }
209
210 } // namespace
211
212 namespace detail {
213
CheckAllValues(const std::vector<Datum> & values)214 Status CheckAllValues(const std::vector<Datum>& values) {
215 for (const auto& value : values) {
216 if (!value.is_value()) {
217 return Status::Invalid("Tried executing function with non-value type: ",
218 value.ToString());
219 }
220 }
221 return Status::OK();
222 }
223
ExecBatchIterator(std::vector<Datum> args,int64_t length,int64_t max_chunksize)224 ExecBatchIterator::ExecBatchIterator(std::vector<Datum> args, int64_t length,
225 int64_t max_chunksize)
226 : args_(std::move(args)),
227 position_(0),
228 length_(length),
229 max_chunksize_(max_chunksize) {
230 chunk_indexes_.resize(args_.size(), 0);
231 chunk_positions_.resize(args_.size(), 0);
232 }
233
Make(std::vector<Datum> args,int64_t max_chunksize)234 Result<std::unique_ptr<ExecBatchIterator>> ExecBatchIterator::Make(
235 std::vector<Datum> args, int64_t max_chunksize) {
236 for (const auto& arg : args) {
237 if (!(arg.is_arraylike() || arg.is_scalar())) {
238 return Status::Invalid(
239 "ExecBatchIterator only works with Scalar, Array, and "
240 "ChunkedArray arguments");
241 }
242 }
243
244 // If the arguments are all scalars, then the length is 1
245 int64_t length = 1;
246
247 bool length_set = false;
248 for (auto& arg : args) {
249 if (arg.is_scalar()) {
250 continue;
251 }
252 if (!length_set) {
253 length = arg.length();
254 length_set = true;
255 } else {
256 if (arg.length() != length) {
257 return Status::Invalid("Array arguments must all be the same length");
258 }
259 }
260 }
261
262 max_chunksize = std::min(length, max_chunksize);
263
264 return std::unique_ptr<ExecBatchIterator>(
265 new ExecBatchIterator(std::move(args), length, max_chunksize));
266 }
267
Next(ExecBatch * batch)268 bool ExecBatchIterator::Next(ExecBatch* batch) {
269 if (position_ == length_) {
270 return false;
271 }
272
273 // Determine how large the common contiguous "slice" of all the arguments is
274 int64_t iteration_size = std::min(length_ - position_, max_chunksize_);
275
276 // If length_ is 0, then this loop will never execute
277 for (size_t i = 0; i < args_.size() && iteration_size > 0; ++i) {
278 // If the argument is not a chunked array, it's either a Scalar or Array,
279 // in which case it doesn't influence the size of this batch. Note that if
280 // the args are all scalars the batch length is 1
281 if (args_[i].kind() != Datum::CHUNKED_ARRAY) {
282 continue;
283 }
284 const ChunkedArray& arg = *args_[i].chunked_array();
285 std::shared_ptr<Array> current_chunk;
286 while (true) {
287 current_chunk = arg.chunk(chunk_indexes_[i]);
288 if (chunk_positions_[i] == current_chunk->length()) {
289 // Chunk is zero-length, or was exhausted in the previous iteration
290 chunk_positions_[i] = 0;
291 ++chunk_indexes_[i];
292 continue;
293 }
294 break;
295 }
296 iteration_size =
297 std::min(current_chunk->length() - chunk_positions_[i], iteration_size);
298 }
299
300 // Now, fill the batch
301 batch->values.resize(args_.size());
302 batch->length = iteration_size;
303 for (size_t i = 0; i < args_.size(); ++i) {
304 if (args_[i].is_scalar()) {
305 batch->values[i] = args_[i].scalar();
306 } else if (args_[i].is_array()) {
307 batch->values[i] = args_[i].array()->Slice(position_, iteration_size);
308 } else {
309 const ChunkedArray& carr = *args_[i].chunked_array();
310 const auto& chunk = carr.chunk(chunk_indexes_[i]);
311 batch->values[i] = chunk->data()->Slice(chunk_positions_[i], iteration_size);
312 chunk_positions_[i] += iteration_size;
313 }
314 }
315 position_ += iteration_size;
316 DCHECK_LE(position_, length_);
317 return true;
318 }
319
320 namespace {
321
322 struct NullGeneralization {
323 enum type { PERHAPS_NULL, ALL_VALID, ALL_NULL };
324
Getarrow::compute::detail::__anona2de6bcc0211::NullGeneralization325 static type Get(const Datum& datum) {
326 if (datum.type()->id() == Type::NA) {
327 return ALL_NULL;
328 }
329
330 if (datum.is_scalar()) {
331 return datum.scalar()->is_valid ? ALL_VALID : ALL_NULL;
332 }
333
334 const auto& arr = *datum.array();
335
336 // Do not count the bits if they haven't been counted already
337 const int64_t known_null_count = arr.null_count.load();
338 if ((known_null_count == 0) || (arr.buffers[0] == NULLPTR)) {
339 return ALL_VALID;
340 }
341
342 if (known_null_count == arr.length) {
343 return ALL_NULL;
344 }
345
346 return PERHAPS_NULL;
347 }
348 };
349
350 // Null propagation implementation that deals both with preallocated bitmaps
351 // and maybe-to-be allocated bitmaps
352 //
353 // If the bitmap is preallocated, it MUST be populated (since it might be a
354 // view of a much larger bitmap). If it isn't preallocated, then we have
355 // more flexibility.
356 //
357 // * If the batch has no nulls, then we do nothing
358 // * If only a single array has nulls, and its offset is a multiple of 8,
359 // then we can zero-copy the bitmap into the output
360 // * Otherwise, we allocate the bitmap and populate it
361 class NullPropagator {
362 public:
NullPropagator(KernelContext * ctx,const ExecBatch & batch,ArrayData * output)363 NullPropagator(KernelContext* ctx, const ExecBatch& batch, ArrayData* output)
364 : ctx_(ctx), batch_(batch), output_(output) {
365 for (const Datum& datum : batch_.values) {
366 auto null_generalization = NullGeneralization::Get(datum);
367
368 if (null_generalization == NullGeneralization::ALL_NULL) {
369 is_all_null_ = true;
370 }
371
372 if (null_generalization != NullGeneralization::ALL_VALID &&
373 datum.kind() == Datum::ARRAY) {
374 arrays_with_nulls_.push_back(datum.array().get());
375 }
376 }
377
378 if (output->buffers[0] != nullptr) {
379 bitmap_preallocated_ = true;
380 SetBitmap(output_->buffers[0].get());
381 }
382 }
383
SetBitmap(Buffer * bitmap)384 void SetBitmap(Buffer* bitmap) { bitmap_ = bitmap->mutable_data(); }
385
EnsureAllocated()386 Status EnsureAllocated() {
387 if (bitmap_preallocated_) {
388 return Status::OK();
389 }
390 ARROW_ASSIGN_OR_RAISE(output_->buffers[0], ctx_->AllocateBitmap(output_->length));
391 SetBitmap(output_->buffers[0].get());
392 return Status::OK();
393 }
394
AllNullShortCircuit()395 Status AllNullShortCircuit() {
396 // OK, the output should be all null
397 output_->null_count = output_->length;
398
399 if (bitmap_preallocated_) {
400 BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, false);
401 return Status::OK();
402 }
403
404 // Walk all the values with nulls instead of breaking on the first in case
405 // we find a bitmap that can be reused in the non-preallocated case
406 for (const ArrayData* arr : arrays_with_nulls_) {
407 if (arr->null_count.load() == arr->length && arr->buffers[0] != nullptr) {
408 // Reuse this all null bitmap
409 output_->buffers[0] = arr->buffers[0];
410 return Status::OK();
411 }
412 }
413
414 RETURN_NOT_OK(EnsureAllocated());
415 BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, false);
416 return Status::OK();
417 }
418
PropagateSingle()419 Status PropagateSingle() {
420 // One array
421 const ArrayData& arr = *arrays_with_nulls_[0];
422 const std::shared_ptr<Buffer>& arr_bitmap = arr.buffers[0];
423
424 // Reuse the null count if it's known
425 output_->null_count = arr.null_count.load();
426
427 if (bitmap_preallocated_) {
428 CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_, output_->offset);
429 return Status::OK();
430 }
431
432 // Two cases when memory was not pre-allocated:
433 //
434 // * Offset is zero: we reuse the bitmap as is
435 // * Offset is nonzero but a multiple of 8: we can slice the bitmap
436 // * Offset is not a multiple of 8: we must allocate and use CopyBitmap
437 //
438 // Keep in mind that output_->offset is not permitted to be nonzero when
439 // the bitmap is not preallocated, and that precondition is asserted
440 // higher in the call stack.
441 if (arr.offset == 0) {
442 output_->buffers[0] = arr_bitmap;
443 } else if (arr.offset % 8 == 0) {
444 output_->buffers[0] =
445 SliceBuffer(arr_bitmap, arr.offset / 8, BitUtil::BytesForBits(arr.length));
446 } else {
447 RETURN_NOT_OK(EnsureAllocated());
448 CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_,
449 /*dst_offset=*/0);
450 }
451 return Status::OK();
452 }
453
PropagateMultiple()454 Status PropagateMultiple() {
455 // More than one array. We use BitmapAnd to intersect their bitmaps
456
457 // Do not compute the intersection null count until it's needed
458 RETURN_NOT_OK(EnsureAllocated());
459
460 auto Accumulate = [&](const ArrayData& left, const ArrayData& right) {
461 DCHECK(left.buffers[0]);
462 DCHECK(right.buffers[0]);
463 BitmapAnd(left.buffers[0]->data(), left.offset, right.buffers[0]->data(),
464 right.offset, output_->length, output_->offset,
465 output_->buffers[0]->mutable_data());
466 };
467
468 DCHECK_GT(arrays_with_nulls_.size(), 1);
469
470 // Seed the output bitmap with the & of the first two bitmaps
471 Accumulate(*arrays_with_nulls_[0], *arrays_with_nulls_[1]);
472
473 // Accumulate the rest
474 for (size_t i = 2; i < arrays_with_nulls_.size(); ++i) {
475 Accumulate(*output_, *arrays_with_nulls_[i]);
476 }
477 return Status::OK();
478 }
479
Execute()480 Status Execute() {
481 if (is_all_null_) {
482 // An all-null value (scalar null or all-null array) gives us a short
483 // circuit opportunity
484 return AllNullShortCircuit();
485 }
486
487 // At this point, by construction we know that all of the values in
488 // arrays_with_nulls_ are arrays that are not all null. So there are a
489 // few cases:
490 //
491 // * No arrays. This is a no-op w/o preallocation but when the bitmap is
492 // pre-allocated we have to fill it with 1's
493 // * One array, whose bitmap can be zero-copied (w/o preallocation, and
494 // when no byte is split) or copied (split byte or w/ preallocation)
495 // * More than one array, we must compute the intersection of all the
496 // bitmaps
497 //
498 // BUT, if the output offset is nonzero for some reason, we copy into the
499 // output unconditionally
500
501 output_->null_count = kUnknownNullCount;
502
503 if (arrays_with_nulls_.empty()) {
504 // No arrays with nulls case
505 output_->null_count = 0;
506 if (bitmap_preallocated_) {
507 BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, true);
508 }
509 return Status::OK();
510 }
511
512 if (arrays_with_nulls_.size() == 1) {
513 return PropagateSingle();
514 }
515
516 return PropagateMultiple();
517 }
518
519 private:
520 KernelContext* ctx_;
521 const ExecBatch& batch_;
522 std::vector<const ArrayData*> arrays_with_nulls_;
523 bool is_all_null_ = false;
524 ArrayData* output_;
525 uint8_t* bitmap_;
526 bool bitmap_preallocated_ = false;
527 };
528
ToChunkedArray(const std::vector<Datum> & values,const std::shared_ptr<DataType> & type)529 std::shared_ptr<ChunkedArray> ToChunkedArray(const std::vector<Datum>& values,
530 const std::shared_ptr<DataType>& type) {
531 std::vector<std::shared_ptr<Array>> arrays;
532 arrays.reserve(values.size());
533 for (const Datum& val : values) {
534 if (val.length() == 0) {
535 // Skip empty chunks
536 continue;
537 }
538 arrays.emplace_back(val.make_array());
539 }
540 return std::make_shared<ChunkedArray>(std::move(arrays), type);
541 }
542
HaveChunkedArray(const std::vector<Datum> & values)543 bool HaveChunkedArray(const std::vector<Datum>& values) {
544 for (const auto& value : values) {
545 if (value.kind() == Datum::CHUNKED_ARRAY) {
546 return true;
547 }
548 }
549 return false;
550 }
551
552 template <typename KernelType>
553 class KernelExecutorImpl : public KernelExecutor {
554 public:
Init(KernelContext * kernel_ctx,KernelInitArgs args)555 Status Init(KernelContext* kernel_ctx, KernelInitArgs args) override {
556 kernel_ctx_ = kernel_ctx;
557 kernel_ = static_cast<const KernelType*>(args.kernel);
558
559 // Resolve the output descriptor for this kernel
560 ARROW_ASSIGN_OR_RAISE(
561 output_descr_, kernel_->signature->out_type().Resolve(kernel_ctx_, args.inputs));
562
563 return Status::OK();
564 }
565
566 protected:
567 // This is overridden by the VectorExecutor
SetupArgIteration(const std::vector<Datum> & args)568 virtual Status SetupArgIteration(const std::vector<Datum>& args) {
569 ARROW_ASSIGN_OR_RAISE(
570 batch_iterator_, ExecBatchIterator::Make(args, exec_context()->exec_chunksize()));
571 return Status::OK();
572 }
573
PrepareOutput(int64_t length)574 Result<std::shared_ptr<ArrayData>> PrepareOutput(int64_t length) {
575 auto out = std::make_shared<ArrayData>(output_descr_.type, length);
576 out->buffers.resize(output_num_buffers_);
577
578 if (validity_preallocated_) {
579 ARROW_ASSIGN_OR_RAISE(out->buffers[0], kernel_ctx_->AllocateBitmap(length));
580 }
581 if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
582 out->null_count = 0;
583 }
584 for (size_t i = 0; i < data_preallocated_.size(); ++i) {
585 const auto& prealloc = data_preallocated_[i];
586 if (prealloc.bit_width >= 0) {
587 ARROW_ASSIGN_OR_RAISE(
588 out->buffers[i + 1],
589 AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length,
590 prealloc.bit_width));
591 }
592 }
593 return out;
594 }
595
CheckResultType(const Datum & out,const char * function_name)596 Status CheckResultType(const Datum& out, const char* function_name) override {
597 const auto& type = out.type();
598 if (type != nullptr && !type->Equals(output_descr_.type)) {
599 return Status::TypeError(
600 "kernel type result mismatch for function '", function_name, "': declared as ",
601 output_descr_.type->ToString(), ", actual is ", type->ToString());
602 }
603 return Status::OK();
604 }
605
exec_context()606 ExecContext* exec_context() { return kernel_ctx_->exec_context(); }
state()607 KernelState* state() { return kernel_ctx_->state(); }
608
609 // Not all of these members are used for every executor type
610
611 KernelContext* kernel_ctx_;
612 const KernelType* kernel_;
613 std::unique_ptr<ExecBatchIterator> batch_iterator_;
614 ValueDescr output_descr_;
615
616 int output_num_buffers_;
617
618 // If true, then memory is preallocated for the validity bitmap with the same
619 // strategy as the data buffer(s).
620 bool validity_preallocated_ = false;
621
622 // The kernel writes into data buffers preallocated for these bit widths
623 // (0 indicates no preallocation);
624 std::vector<BufferPreallocation> data_preallocated_;
625 };
626
627 class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
628 public:
Execute(const std::vector<Datum> & args,ExecListener * listener)629 Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
630 RETURN_NOT_OK(PrepareExecute(args));
631 ExecBatch batch;
632 while (batch_iterator_->Next(&batch)) {
633 RETURN_NOT_OK(ExecuteBatch(batch, listener));
634 }
635 if (preallocate_contiguous_) {
636 // If we preallocated one big chunk, since the kernel execution is
637 // completed, we can now emit it
638 RETURN_NOT_OK(listener->OnResult(std::move(preallocated_)));
639 }
640 return Status::OK();
641 }
642
WrapResults(const std::vector<Datum> & inputs,const std::vector<Datum> & outputs)643 Datum WrapResults(const std::vector<Datum>& inputs,
644 const std::vector<Datum>& outputs) override {
645 if (output_descr_.shape == ValueDescr::SCALAR) {
646 DCHECK_GT(outputs.size(), 0);
647 if (outputs.size() == 1) {
648 // Return as SCALAR
649 return outputs[0];
650 } else {
651 // Return as COLLECTION
652 return outputs;
653 }
654 } else {
655 // If execution yielded multiple chunks (because large arrays were split
656 // based on the ExecContext parameters, then the result is a ChunkedArray
657 if (HaveChunkedArray(inputs) || outputs.size() > 1) {
658 return ToChunkedArray(outputs, output_descr_.type);
659 } else if (outputs.size() == 1) {
660 // Outputs have just one element
661 return outputs[0];
662 } else {
663 // XXX: In the case where no outputs are omitted, is returning a 0-length
664 // array always the correct move?
665 return MakeArrayOfNull(output_descr_.type, /*length=*/0,
666 exec_context()->memory_pool())
667 .ValueOrDie();
668 }
669 }
670 }
671
672 protected:
ExecuteBatch(const ExecBatch & batch,ExecListener * listener)673 Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
674 Datum out;
675 RETURN_NOT_OK(PrepareNextOutput(batch, &out));
676
677 if (output_descr_.shape == ValueDescr::ARRAY) {
678 ArrayData* out_arr = out.mutable_array();
679 if (kernel_->null_handling == NullHandling::INTERSECTION) {
680 RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out_arr));
681 } else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
682 out_arr->null_count = 0;
683 }
684 } else {
685 if (kernel_->null_handling == NullHandling::INTERSECTION) {
686 // set scalar validity
687 out.scalar()->is_valid =
688 std::all_of(batch.values.begin(), batch.values.end(),
689 [](const Datum& input) { return input.scalar()->is_valid; });
690 } else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
691 out.scalar()->is_valid = true;
692 }
693 }
694
695 RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));
696 if (!preallocate_contiguous_) {
697 // If we are producing chunked output rather than one big array, then
698 // emit each chunk as soon as it's available
699 RETURN_NOT_OK(listener->OnResult(std::move(out)));
700 }
701 return Status::OK();
702 }
703
PrepareExecute(const std::vector<Datum> & args)704 Status PrepareExecute(const std::vector<Datum>& args) {
705 RETURN_NOT_OK(this->SetupArgIteration(args));
706
707 if (output_descr_.shape == ValueDescr::ARRAY) {
708 // If the executor is configured to produce a single large Array output for
709 // kernels supporting preallocation, then we do so up front and then
710 // iterate over slices of that large array. Otherwise, we preallocate prior
711 // to processing each batch emitted from the ExecBatchIterator
712 RETURN_NOT_OK(SetupPreallocation(batch_iterator_->length()));
713 }
714 return Status::OK();
715 }
716
717 // We must accommodate two different modes of execution for preallocated
718 // execution
719 //
720 // * A single large ("contiguous") allocation that we populate with results
721 // on a chunkwise basis according to the ExecBatchIterator. This permits
722 // parallelization even if the objective is to obtain a single Array or
723 // ChunkedArray at the end
724 // * A standalone buffer preallocation for each chunk emitted from the
725 // ExecBatchIterator
726 //
727 // When data buffer preallocation is not possible (e.g. with BINARY / STRING
728 // outputs), then contiguous results are only possible if the input is
729 // contiguous.
730
PrepareNextOutput(const ExecBatch & batch,Datum * out)731 Status PrepareNextOutput(const ExecBatch& batch, Datum* out) {
732 if (output_descr_.shape == ValueDescr::ARRAY) {
733 if (preallocate_contiguous_) {
734 // The output is already fully preallocated
735 const int64_t batch_start_position = batch_iterator_->position() - batch.length;
736
737 if (batch.length < batch_iterator_->length()) {
738 // If this is a partial execution, then we write into a slice of
739 // preallocated_
740 out->value = preallocated_->Slice(batch_start_position, batch.length);
741 } else {
742 // Otherwise write directly into preallocated_. The main difference
743 // computationally (versus the Slice approach) is that the null_count
744 // may not need to be recomputed in the result
745 out->value = preallocated_;
746 }
747 } else {
748 // We preallocate (maybe) only for the output of processing the current
749 // batch
750 ARROW_ASSIGN_OR_RAISE(out->value, PrepareOutput(batch.length));
751 }
752 } else {
753 // For scalar outputs, we set a null scalar of the correct type to
754 // communicate the output type to the kernel if needed
755 //
756 // XXX: Is there some way to avoid this step?
757 out->value = MakeNullScalar(output_descr_.type);
758 }
759 return Status::OK();
760 }
761
SetupPreallocation(int64_t total_length)762 Status SetupPreallocation(int64_t total_length) {
763 output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());
764
765 // Decide if we need to preallocate memory for this kernel
766 validity_preallocated_ =
767 (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
768 kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL &&
769 output_descr_.type->id() != Type::NA);
770 if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
771 ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
772 }
773
774 // Contiguous preallocation only possible on non-nested types if all
775 // buffers are preallocated. Otherwise, we must go chunk-by-chunk.
776 //
777 // Some kernels are also unable to write into sliced outputs, so we respect the
778 // kernel's attributes.
779 preallocate_contiguous_ =
780 (exec_context()->preallocate_contiguous() && kernel_->can_write_into_slices &&
781 validity_preallocated_ && !is_nested(output_descr_.type->id()) &&
782 !is_dictionary(output_descr_.type->id()) &&
783 data_preallocated_.size() == static_cast<size_t>(output_num_buffers_ - 1) &&
784 std::all_of(data_preallocated_.begin(), data_preallocated_.end(),
785 [](const BufferPreallocation& prealloc) {
786 return prealloc.bit_width >= 0;
787 }));
788 if (preallocate_contiguous_) {
789 ARROW_ASSIGN_OR_RAISE(preallocated_, PrepareOutput(total_length));
790 }
791 return Status::OK();
792 }
793
794 // If true, and the kernel and output type supports preallocation (for both
795 // the validity and data buffers), then we allocate one big array and then
796 // iterate through it while executing the kernel in chunks
797 bool preallocate_contiguous_ = false;
798
799 // For storing a contiguous preallocation per above. Unused otherwise
800 std::shared_ptr<ArrayData> preallocated_;
801 };
802
PackBatchNoChunks(const std::vector<Datum> & args,ExecBatch * out)803 Status PackBatchNoChunks(const std::vector<Datum>& args, ExecBatch* out) {
804 int64_t length = 0;
805 for (const auto& arg : args) {
806 switch (arg.kind()) {
807 case Datum::SCALAR:
808 case Datum::ARRAY:
809 case Datum::CHUNKED_ARRAY:
810 length = std::max(arg.length(), length);
811 break;
812 default:
813 DCHECK(false);
814 break;
815 }
816 }
817 out->length = length;
818 out->values = args;
819 return Status::OK();
820 }
821
822 class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
823 public:
Execute(const std::vector<Datum> & args,ExecListener * listener)824 Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
825 RETURN_NOT_OK(PrepareExecute(args));
826 ExecBatch batch;
827 if (kernel_->can_execute_chunkwise) {
828 while (batch_iterator_->Next(&batch)) {
829 RETURN_NOT_OK(ExecuteBatch(batch, listener));
830 }
831 } else {
832 RETURN_NOT_OK(PackBatchNoChunks(args, &batch));
833 RETURN_NOT_OK(ExecuteBatch(batch, listener));
834 }
835 return Finalize(listener);
836 }
837
WrapResults(const std::vector<Datum> & inputs,const std::vector<Datum> & outputs)838 Datum WrapResults(const std::vector<Datum>& inputs,
839 const std::vector<Datum>& outputs) override {
840 // If execution yielded multiple chunks (because large arrays were split
841 // based on the ExecContext parameters, then the result is a ChunkedArray
842 if (kernel_->output_chunked && (HaveChunkedArray(inputs) || outputs.size() > 1)) {
843 return ToChunkedArray(outputs, output_descr_.type);
844 } else if (outputs.size() == 1) {
845 // Outputs have just one element
846 return outputs[0];
847 } else {
848 // XXX: In the case where no outputs are omitted, is returning a 0-length
849 // array always the correct move?
850 return MakeArrayOfNull(output_descr_.type, /*length=*/0).ValueOrDie();
851 }
852 }
853
854 protected:
ExecuteBatch(const ExecBatch & batch,ExecListener * listener)855 Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
856 Datum out;
857 if (output_descr_.shape == ValueDescr::ARRAY) {
858 // We preallocate (maybe) only for the output of processing the current
859 // batch
860 ARROW_ASSIGN_OR_RAISE(out.value, PrepareOutput(batch.length));
861 }
862
863 if (kernel_->null_handling == NullHandling::INTERSECTION &&
864 output_descr_.shape == ValueDescr::ARRAY) {
865 RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out.mutable_array()));
866 }
867 RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));
868 if (!kernel_->finalize) {
869 // If there is no result finalizer (e.g. for hash-based functions, we can
870 // emit the processed batch right away rather than waiting
871 RETURN_NOT_OK(listener->OnResult(std::move(out)));
872 } else {
873 results_.emplace_back(std::move(out));
874 }
875 return Status::OK();
876 }
877
Finalize(ExecListener * listener)878 Status Finalize(ExecListener* listener) {
879 if (kernel_->finalize) {
880 // Intermediate results require post-processing after the execution is
881 // completed (possibly involving some accumulated state)
882 RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &results_));
883 for (const auto& result : results_) {
884 RETURN_NOT_OK(listener->OnResult(result));
885 }
886 }
887 return Status::OK();
888 }
889
SetupArgIteration(const std::vector<Datum> & args)890 Status SetupArgIteration(const std::vector<Datum>& args) override {
891 if (kernel_->can_execute_chunkwise) {
892 ARROW_ASSIGN_OR_RAISE(batch_iterator_, ExecBatchIterator::Make(
893 args, exec_context()->exec_chunksize()));
894 }
895 return Status::OK();
896 }
897
PrepareExecute(const std::vector<Datum> & args)898 Status PrepareExecute(const std::vector<Datum>& args) {
899 RETURN_NOT_OK(this->SetupArgIteration(args));
900 output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());
901
902 // Decide if we need to preallocate memory for this kernel
903 validity_preallocated_ =
904 (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
905 kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
906 if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
907 ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
908 }
909 return Status::OK();
910 }
911
912 std::vector<Datum> results_;
913 };
914
915 class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {
916 public:
Init(KernelContext * ctx,KernelInitArgs args)917 Status Init(KernelContext* ctx, KernelInitArgs args) override {
918 input_descrs_ = &args.inputs;
919 options_ = args.options;
920 return KernelExecutorImpl<ScalarAggregateKernel>::Init(ctx, args);
921 }
922
Execute(const std::vector<Datum> & args,ExecListener * listener)923 Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
924 RETURN_NOT_OK(this->SetupArgIteration(args));
925
926 ExecBatch batch;
927 while (batch_iterator_->Next(&batch)) {
928 // TODO: implement parallelism
929 if (batch.length > 0) {
930 RETURN_NOT_OK(Consume(batch));
931 }
932 }
933
934 Datum out;
935 RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &out));
936 RETURN_NOT_OK(listener->OnResult(std::move(out)));
937 return Status::OK();
938 }
939
WrapResults(const std::vector<Datum> &,const std::vector<Datum> & outputs)940 Datum WrapResults(const std::vector<Datum>&,
941 const std::vector<Datum>& outputs) override {
942 DCHECK_EQ(1, outputs.size());
943 return outputs[0];
944 }
945
946 private:
Consume(const ExecBatch & batch)947 Status Consume(const ExecBatch& batch) {
948 // FIXME(ARROW-11840) don't merge *any* aggegates for every batch
949 ARROW_ASSIGN_OR_RAISE(
950 auto batch_state,
951 kernel_->init(kernel_ctx_, {kernel_, *input_descrs_, options_}));
952
953 if (batch_state == nullptr) {
954 return Status::Invalid("ScalarAggregation requires non-null kernel state");
955 }
956
957 KernelContext batch_ctx(exec_context());
958 batch_ctx.SetState(batch_state.get());
959
960 RETURN_NOT_OK(kernel_->consume(&batch_ctx, batch));
961 RETURN_NOT_OK(kernel_->merge(kernel_ctx_, std::move(*batch_state), state()));
962 return Status::OK();
963 }
964
965 const std::vector<ValueDescr>* input_descrs_;
966 const FunctionOptions* options_;
967 };
968
969 template <typename ExecutorType,
970 typename FunctionType = typename ExecutorType::FunctionType>
MakeExecutor(ExecContext * ctx,const Function * func,const FunctionOptions * options)971 Result<std::unique_ptr<KernelExecutor>> MakeExecutor(ExecContext* ctx,
972 const Function* func,
973 const FunctionOptions* options) {
974 DCHECK_EQ(ExecutorType::function_kind, func->kind());
975 auto typed_func = checked_cast<const FunctionType*>(func);
976 return std::unique_ptr<KernelExecutor>(new ExecutorType(ctx, typed_func, options));
977 }
978
979 } // namespace
980
PropagateNulls(KernelContext * ctx,const ExecBatch & batch,ArrayData * output)981 Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* output) {
982 DCHECK_NE(nullptr, output);
983 DCHECK_GT(output->buffers.size(), 0);
984
985 if (output->type->id() == Type::NA) {
986 // Null output type is a no-op (rare when this would happen but we at least
987 // will test for it)
988 return Status::OK();
989 }
990
991 // This function is ONLY able to write into output with non-zero offset
992 // when the bitmap is preallocated. This could be a DCHECK but returning
993 // error Status for now for emphasis
994 if (output->offset != 0 && output->buffers[0] == nullptr) {
995 return Status::Invalid(
996 "Can only propagate nulls into pre-allocated memory "
997 "when the output offset is non-zero");
998 }
999 NullPropagator propagator(ctx, batch, output);
1000 return propagator.Execute();
1001 }
1002
MakeScalar()1003 std::unique_ptr<KernelExecutor> KernelExecutor::MakeScalar() {
1004 return ::arrow::internal::make_unique<detail::ScalarExecutor>();
1005 }
1006
MakeVector()1007 std::unique_ptr<KernelExecutor> KernelExecutor::MakeVector() {
1008 return ::arrow::internal::make_unique<detail::VectorExecutor>();
1009 }
1010
MakeScalarAggregate()1011 std::unique_ptr<KernelExecutor> KernelExecutor::MakeScalarAggregate() {
1012 return ::arrow::internal::make_unique<detail::ScalarAggExecutor>();
1013 }
1014
1015 } // namespace detail
1016
ExecContext(MemoryPool * pool,::arrow::internal::Executor * executor,FunctionRegistry * func_registry)1017 ExecContext::ExecContext(MemoryPool* pool, ::arrow::internal::Executor* executor,
1018 FunctionRegistry* func_registry)
1019 : pool_(pool), executor_(executor) {
1020 this->func_registry_ = func_registry == nullptr ? GetFunctionRegistry() : func_registry;
1021 }
1022
cpu_info() const1023 CpuInfo* ExecContext::cpu_info() const { return CpuInfo::GetInstance(); }
1024
1025 // ----------------------------------------------------------------------
1026 // SelectionVector
1027
SelectionVector(std::shared_ptr<ArrayData> data)1028 SelectionVector::SelectionVector(std::shared_ptr<ArrayData> data)
1029 : data_(std::move(data)) {
1030 DCHECK_EQ(Type::INT32, data_->type->id());
1031 DCHECK_EQ(0, data_->GetNullCount());
1032 indices_ = data_->GetValues<int32_t>(1);
1033 }
1034
SelectionVector(const Array & arr)1035 SelectionVector::SelectionVector(const Array& arr) : SelectionVector(arr.data()) {}
1036
length() const1037 int32_t SelectionVector::length() const { return static_cast<int32_t>(data_->length); }
1038
FromMask(const BooleanArray & arr)1039 Result<std::shared_ptr<SelectionVector>> SelectionVector::FromMask(
1040 const BooleanArray& arr) {
1041 return Status::NotImplemented("FromMask");
1042 }
1043
CallFunction(const std::string & func_name,const std::vector<Datum> & args,const FunctionOptions * options,ExecContext * ctx)1044 Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
1045 const FunctionOptions* options, ExecContext* ctx) {
1046 if (ctx == nullptr) {
1047 ExecContext default_ctx;
1048 return CallFunction(func_name, args, options, &default_ctx);
1049 }
1050 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<const Function> func,
1051 ctx->func_registry()->GetFunction(func_name));
1052 return func->Execute(args, options, ctx);
1053 }
1054
CallFunction(const std::string & func_name,const std::vector<Datum> & args,ExecContext * ctx)1055 Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
1056 ExecContext* ctx) {
1057 return CallFunction(func_name, args, /*options=*/nullptr, ctx);
1058 }
1059
1060 } // namespace compute
1061 } // namespace arrow
1062