1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 #pragma once 19 20 #include <cstdint> 21 #include <limits> 22 #include <memory> 23 #include <string> 24 #include <vector> 25 26 #include "arrow/array.h" 27 #include "arrow/buffer.h" 28 #include "arrow/compute/exec.h" 29 #include "arrow/compute/kernel.h" 30 #include "arrow/status.h" 31 #include "arrow/util/visibility.h" 32 33 namespace arrow { 34 namespace compute { 35 36 class Function; 37 38 static constexpr int64_t kDefaultMaxChunksize = std::numeric_limits<int64_t>::max(); 39 40 namespace detail { 41 42 /// \brief Break std::vector<Datum> into a sequence of ExecBatch for kernel 43 /// execution 44 class ARROW_EXPORT ExecBatchIterator { 45 public: 46 /// \brief Construct iterator and do basic argument validation 47 /// 48 /// \param[in] args the Datum argument, must be all array-like or scalar 49 /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending 50 /// on the chunk layout of ChunkedArray. 51 static Result<std::unique_ptr<ExecBatchIterator>> Make( 52 std::vector<Datum> args, int64_t max_chunksize = kDefaultMaxChunksize); 53 54 /// \brief Compute the next batch. Always returns at least one batch. Return 55 /// false if the iterator is exhausted 56 bool Next(ExecBatch* batch); 57 length()58 int64_t length() const { return length_; } 59 position()60 int64_t position() const { return position_; } 61 max_chunksize()62 int64_t max_chunksize() const { return max_chunksize_; } 63 64 private: 65 ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t max_chunksize); 66 67 std::vector<Datum> args_; 68 std::vector<int> chunk_indexes_; 69 std::vector<int64_t> chunk_positions_; 70 int64_t position_; 71 int64_t length_; 72 int64_t max_chunksize_; 73 }; 74 75 // "Push" / listener API like IPC reader so that consumers can receive 76 // processed chunks as soon as they're available. 77 78 class ARROW_EXPORT ExecListener { 79 public: 80 virtual ~ExecListener() = default; 81 OnResult(Datum)82 virtual Status OnResult(Datum) { return Status::NotImplemented("OnResult"); } 83 }; 84 85 class DatumAccumulator : public ExecListener { 86 public: DatumAccumulator()87 DatumAccumulator() {} 88 OnResult(Datum value)89 Status OnResult(Datum value) override { 90 values_.emplace_back(value); 91 return Status::OK(); 92 } 93 values()94 std::vector<Datum> values() const { return values_; } 95 96 private: 97 std::vector<Datum> values_; 98 }; 99 100 /// \brief Check that each Datum is of a "value" type, which means either 101 /// SCALAR, ARRAY, or CHUNKED_ARRAY. If there are chunked inputs, then these 102 /// inputs will be split into non-chunked ExecBatch values for execution 103 Status CheckAllValues(const std::vector<Datum>& values); 104 105 class ARROW_EXPORT FunctionExecutor { 106 public: 107 virtual ~FunctionExecutor() = default; 108 109 /// XXX: Better configurability for listener 110 /// Not thread-safe 111 virtual Status Execute(const std::vector<Datum>& args, ExecListener* listener) = 0; 112 113 virtual ValueDescr output_descr() const = 0; 114 115 virtual Datum WrapResults(const std::vector<Datum>& args, 116 const std::vector<Datum>& outputs) = 0; 117 118 static Result<std::unique_ptr<FunctionExecutor>> Make(ExecContext* ctx, 119 const Function* func, 120 const FunctionOptions* options); 121 }; 122 123 /// \brief Populate validity bitmap with the intersection of the nullity of the 124 /// arguments. If a preallocated bitmap is not provided, then one will be 125 /// allocated if needed (in some cases a bitmap can be zero-copied from the 126 /// arguments). If any Scalar value is null, then the entire validity bitmap 127 /// will be set to null. 128 /// 129 /// \param[in] ctx kernel execution context, for memory allocation etc. 130 /// \param[in] batch the data batch 131 /// \param[in] out the output ArrayData, must not be null 132 ARROW_EXPORT 133 Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* out); 134 135 } // namespace detail 136 } // namespace compute 137 } // namespace arrow 138