1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <functional>
21 #include <memory>
22 #include <string>
23 #include <vector>
24 
25 #include "arrow/compute/api_aggregate.h"
26 #include "arrow/compute/api_vector.h"
27 #include "arrow/compute/exec.h"
28 #include "arrow/compute/exec/expression.h"
29 #include "arrow/util/async_util.h"
30 #include "arrow/util/optional.h"
31 #include "arrow/util/visibility.h"
32 
33 namespace arrow {
34 namespace compute {
35 
36 class ARROW_EXPORT ExecNodeOptions {
37  public:
38   virtual ~ExecNodeOptions() = default;
39 };
40 
41 /// \brief Adapt an AsyncGenerator<ExecBatch> as a source node
42 ///
43 /// plan->exec_context()->executor() will be used to parallelize pushing to
44 /// outputs, if provided.
45 class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
46  public:
SourceNodeOptions(std::shared_ptr<Schema> output_schema,std::function<Future<util::optional<ExecBatch>> ()> generator)47   SourceNodeOptions(std::shared_ptr<Schema> output_schema,
48                     std::function<Future<util::optional<ExecBatch>>()> generator)
49       : output_schema(std::move(output_schema)), generator(std::move(generator)) {}
50 
51   std::shared_ptr<Schema> output_schema;
52   std::function<Future<util::optional<ExecBatch>>()> generator;
53 };
54 
55 /// \brief Make a node which excludes some rows from batches passed through it
56 ///
57 /// filter_expression will be evaluated against each batch which is pushed to
58 /// this node. Any rows for which filter_expression does not evaluate to `true` will be
59 /// excluded in the batch emitted by this node.
60 class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions {
61  public:
62   explicit FilterNodeOptions(Expression filter_expression, bool async_mode = true)
filter_expression(std::move (filter_expression))63       : filter_expression(std::move(filter_expression)), async_mode(async_mode) {}
64 
65   Expression filter_expression;
66   bool async_mode;
67 };
68 
69 /// \brief Make a node which executes expressions on input batches, producing new batches.
70 ///
71 /// Each expression will be evaluated against each batch which is pushed to
72 /// this node to produce a corresponding output column.
73 ///
74 /// If names are not provided, the string representations of exprs will be used.
75 class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions {
76  public:
77   explicit ProjectNodeOptions(std::vector<Expression> expressions,
78                               std::vector<std::string> names = {}, bool async_mode = true)
expressions(std::move (expressions))79       : expressions(std::move(expressions)),
80         names(std::move(names)),
81         async_mode(async_mode) {}
82 
83   std::vector<Expression> expressions;
84   std::vector<std::string> names;
85   bool async_mode;
86 };
87 
88 /// \brief Make a node which aggregates input batches, optionally grouped by keys.
89 class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
90  public:
91   AggregateNodeOptions(std::vector<internal::Aggregate> aggregates,
92                        std::vector<FieldRef> targets, std::vector<std::string> names,
93                        std::vector<FieldRef> keys = {})
aggregates(std::move (aggregates))94       : aggregates(std::move(aggregates)),
95         targets(std::move(targets)),
96         names(std::move(names)),
97         keys(std::move(keys)) {}
98 
99   // aggregations which will be applied to the targetted fields
100   std::vector<internal::Aggregate> aggregates;
101   // fields to which aggregations will be applied
102   std::vector<FieldRef> targets;
103   // output field names for aggregations
104   std::vector<std::string> names;
105   // keys by which aggregations will be grouped
106   std::vector<FieldRef> keys;
107 };
108 
109 /// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
110 ///
111 /// Emitted batches will not be ordered.
112 class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
113  public:
114   explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>* generator,
115                            util::BackpressureOptions backpressure = {})
generator(generator)116       : generator(generator), backpressure(std::move(backpressure)) {}
117 
118   std::function<Future<util::optional<ExecBatch>>()>* generator;
119   util::BackpressureOptions backpressure;
120 };
121 
122 class ARROW_EXPORT SinkNodeConsumer {
123  public:
124   virtual ~SinkNodeConsumer() = default;
125   /// \brief Consume a batch of data
126   virtual Status Consume(ExecBatch batch) = 0;
127   /// \brief Signal to the consumer that the last batch has been delivered
128   ///
129   /// The returned future should only finish when all outstanding tasks have completed
130   virtual Future<> Finish() = 0;
131 };
132 
133 /// \brief Add a sink node which consumes data within the exec plan run
134 class ARROW_EXPORT ConsumingSinkNodeOptions : public ExecNodeOptions {
135  public:
ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer)136   explicit ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer)
137       : consumer(std::move(consumer)) {}
138 
139   std::shared_ptr<SinkNodeConsumer> consumer;
140 };
141 
142 /// \brief Make a node which sorts rows passed through it
143 ///
144 /// All batches pushed to this node will be accumulated, then sorted, by the given
145 /// fields. Then sorted batches will be forwarded to the generator in sorted order.
146 class ARROW_EXPORT OrderBySinkNodeOptions : public SinkNodeOptions {
147  public:
OrderBySinkNodeOptions(SortOptions sort_options,std::function<Future<util::optional<ExecBatch>> ()> * generator)148   explicit OrderBySinkNodeOptions(
149       SortOptions sort_options,
150       std::function<Future<util::optional<ExecBatch>>()>* generator)
151       : SinkNodeOptions(generator), sort_options(std::move(sort_options)) {}
152 
153   SortOptions sort_options;
154 };
155 
156 enum class JoinType {
157   LEFT_SEMI,
158   RIGHT_SEMI,
159   LEFT_ANTI,
160   RIGHT_ANTI,
161   INNER,
162   LEFT_OUTER,
163   RIGHT_OUTER,
164   FULL_OUTER
165 };
166 
167 enum class JoinKeyCmp { EQ, IS };
168 
169 /// \brief Make a node which implements join operation using hash join strategy.
170 class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
171  public:
172   static constexpr const char* default_output_prefix_for_left = "";
173   static constexpr const char* default_output_prefix_for_right = "";
174   HashJoinNodeOptions(
175       JoinType in_join_type, std::vector<FieldRef> in_left_keys,
176       std::vector<FieldRef> in_right_keys,
177       std::string output_prefix_for_left = default_output_prefix_for_left,
178       std::string output_prefix_for_right = default_output_prefix_for_right)
join_type(in_join_type)179       : join_type(in_join_type),
180         left_keys(std::move(in_left_keys)),
181         right_keys(std::move(in_right_keys)),
182         output_all(true),
183         output_prefix_for_left(std::move(output_prefix_for_left)),
184         output_prefix_for_right(std::move(output_prefix_for_right)) {
185     this->key_cmp.resize(this->left_keys.size());
186     for (size_t i = 0; i < this->left_keys.size(); ++i) {
187       this->key_cmp[i] = JoinKeyCmp::EQ;
188     }
189   }
190   HashJoinNodeOptions(
191       JoinType join_type, std::vector<FieldRef> left_keys,
192       std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output,
193       std::vector<FieldRef> right_output,
194       std::string output_prefix_for_left = default_output_prefix_for_left,
195       std::string output_prefix_for_right = default_output_prefix_for_right)
join_type(join_type)196       : join_type(join_type),
197         left_keys(std::move(left_keys)),
198         right_keys(std::move(right_keys)),
199         output_all(false),
200         left_output(std::move(left_output)),
201         right_output(std::move(right_output)),
202         output_prefix_for_left(std::move(output_prefix_for_left)),
203         output_prefix_for_right(std::move(output_prefix_for_right)) {
204     this->key_cmp.resize(this->left_keys.size());
205     for (size_t i = 0; i < this->left_keys.size(); ++i) {
206       this->key_cmp[i] = JoinKeyCmp::EQ;
207     }
208   }
209   HashJoinNodeOptions(
210       JoinType join_type, std::vector<FieldRef> left_keys,
211       std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output,
212       std::vector<FieldRef> right_output, std::vector<JoinKeyCmp> key_cmp,
213       std::string output_prefix_for_left = default_output_prefix_for_left,
214       std::string output_prefix_for_right = default_output_prefix_for_right)
join_type(join_type)215       : join_type(join_type),
216         left_keys(std::move(left_keys)),
217         right_keys(std::move(right_keys)),
218         output_all(false),
219         left_output(std::move(left_output)),
220         right_output(std::move(right_output)),
221         key_cmp(std::move(key_cmp)),
222         output_prefix_for_left(std::move(output_prefix_for_left)),
223         output_prefix_for_right(std::move(output_prefix_for_right)) {}
224 
225   // type of join (inner, left, semi...)
226   JoinType join_type;
227   // key fields from left input
228   std::vector<FieldRef> left_keys;
229   // key fields from right input
230   std::vector<FieldRef> right_keys;
231   // if set all valid fields from both left and right input will be output
232   // (and field ref vectors for output fields will be ignored)
233   bool output_all;
234   // output fields passed from left input
235   std::vector<FieldRef> left_output;
236   // output fields passed from right input
237   std::vector<FieldRef> right_output;
238   // key comparison function (determines whether a null key is equal another null key or
239   // not)
240   std::vector<JoinKeyCmp> key_cmp;
241   // prefix added to names of output fields coming from left input (used to distinguish,
242   // if necessary, between fields of the same name in left and right input and can be left
243   // empty if there are no name collisions)
244   std::string output_prefix_for_left;
245   // prefix added to names of output fields coming from right input
246   std::string output_prefix_for_right;
247 };
248 
249 /// \brief Make a node which select top_k/bottom_k rows passed through it
250 ///
251 /// All batches pushed to this node will be accumulated, then selected, by the given
252 /// fields. Then sorted batches will be forwarded to the generator in sorted order.
253 class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions {
254  public:
SelectKSinkNodeOptions(SelectKOptions select_k_options,std::function<Future<util::optional<ExecBatch>> ()> * generator)255   explicit SelectKSinkNodeOptions(
256       SelectKOptions select_k_options,
257       std::function<Future<util::optional<ExecBatch>>()>* generator)
258       : SinkNodeOptions(generator), select_k_options(std::move(select_k_options)) {}
259 
260   /// SelectK options
261   SelectKOptions select_k_options;
262 };
263 
264 }  // namespace compute
265 }  // namespace arrow
266