1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <cstdint>
21 #include <limits>
22 #include <memory>
23 #include <string>
24 #include <vector>
25 
26 #include "arrow/array.h"
27 #include "arrow/buffer.h"
28 #include "arrow/compute/exec.h"
29 #include "arrow/compute/kernel.h"
30 #include "arrow/status.h"
31 #include "arrow/util/visibility.h"
32 
33 namespace arrow {
34 namespace compute {
35 
36 class Function;
37 
38 static constexpr int64_t kDefaultMaxChunksize = std::numeric_limits<int64_t>::max();
39 
40 namespace detail {
41 
42 /// \brief Break std::vector<Datum> into a sequence of ExecBatch for kernel
43 /// execution
44 class ARROW_EXPORT ExecBatchIterator {
45  public:
46   /// \brief Construct iterator and do basic argument validation
47   ///
48   /// \param[in] args the Datum argument, must be all array-like or scalar
49   /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending
50   /// on the chunk layout of ChunkedArray.
51   static Result<std::unique_ptr<ExecBatchIterator>> Make(
52       std::vector<Datum> args, int64_t max_chunksize = kDefaultMaxChunksize);
53 
54   /// \brief Compute the next batch. Always returns at least one batch. Return
55   /// false if the iterator is exhausted
56   bool Next(ExecBatch* batch);
57 
length()58   int64_t length() const { return length_; }
59 
position()60   int64_t position() const { return position_; }
61 
max_chunksize()62   int64_t max_chunksize() const { return max_chunksize_; }
63 
64  private:
65   ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t max_chunksize);
66 
67   std::vector<Datum> args_;
68   std::vector<int> chunk_indexes_;
69   std::vector<int64_t> chunk_positions_;
70   int64_t position_;
71   int64_t length_;
72   int64_t max_chunksize_;
73 };
74 
75 // "Push" / listener API like IPC reader so that consumers can receive
76 // processed chunks as soon as they're available.
77 
78 class ARROW_EXPORT ExecListener {
79  public:
80   virtual ~ExecListener() = default;
81 
OnResult(Datum)82   virtual Status OnResult(Datum) { return Status::NotImplemented("OnResult"); }
83 };
84 
85 class DatumAccumulator : public ExecListener {
86  public:
DatumAccumulator()87   DatumAccumulator() {}
88 
OnResult(Datum value)89   Status OnResult(Datum value) override {
90     values_.emplace_back(value);
91     return Status::OK();
92   }
93 
values()94   std::vector<Datum> values() const { return values_; }
95 
96  private:
97   std::vector<Datum> values_;
98 };
99 
100 /// \brief Check that each Datum is of a "value" type, which means either
101 /// SCALAR, ARRAY, or CHUNKED_ARRAY. If there are chunked inputs, then these
102 /// inputs will be split into non-chunked ExecBatch values for execution
103 Status CheckAllValues(const std::vector<Datum>& values);
104 
105 class ARROW_EXPORT FunctionExecutor {
106  public:
107   virtual ~FunctionExecutor() = default;
108 
109   /// XXX: Better configurability for listener
110   /// Not thread-safe
111   virtual Status Execute(const std::vector<Datum>& args, ExecListener* listener) = 0;
112 
113   virtual ValueDescr output_descr() const = 0;
114 
115   virtual Datum WrapResults(const std::vector<Datum>& args,
116                             const std::vector<Datum>& outputs) = 0;
117 
118   static Result<std::unique_ptr<FunctionExecutor>> Make(ExecContext* ctx,
119                                                         const Function* func,
120                                                         const FunctionOptions* options);
121 };
122 
123 /// \brief Populate validity bitmap with the intersection of the nullity of the
124 /// arguments. If a preallocated bitmap is not provided, then one will be
125 /// allocated if needed (in some cases a bitmap can be zero-copied from the
126 /// arguments). If any Scalar value is null, then the entire validity bitmap
127 /// will be set to null.
128 ///
129 /// \param[in] ctx kernel execution context, for memory allocation etc.
130 /// \param[in] batch the data batch
131 /// \param[in] out the output ArrayData, must not be null
132 ARROW_EXPORT
133 Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* out);
134 
135 }  // namespace detail
136 }  // namespace compute
137 }  // namespace arrow
138