1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 #pragma once 19 20 #include <functional> 21 #include <memory> 22 #include <string> 23 #include <vector> 24 25 #include "arrow/compute/api_aggregate.h" 26 #include "arrow/compute/api_vector.h" 27 #include "arrow/compute/exec.h" 28 #include "arrow/compute/exec/expression.h" 29 #include "arrow/util/async_util.h" 30 #include "arrow/util/optional.h" 31 #include "arrow/util/visibility.h" 32 33 namespace arrow { 34 namespace compute { 35 36 class ARROW_EXPORT ExecNodeOptions { 37 public: 38 virtual ~ExecNodeOptions() = default; 39 }; 40 41 /// \brief Adapt an AsyncGenerator<ExecBatch> as a source node 42 /// 43 /// plan->exec_context()->executor() will be used to parallelize pushing to 44 /// outputs, if provided. 45 class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { 46 public: SourceNodeOptions(std::shared_ptr<Schema> output_schema,std::function<Future<util::optional<ExecBatch>> ()> generator)47 SourceNodeOptions(std::shared_ptr<Schema> output_schema, 48 std::function<Future<util::optional<ExecBatch>>()> generator) 49 : output_schema(std::move(output_schema)), generator(std::move(generator)) {} 50 51 std::shared_ptr<Schema> output_schema; 52 std::function<Future<util::optional<ExecBatch>>()> generator; 53 }; 54 55 /// \brief Make a node which excludes some rows from batches passed through it 56 /// 57 /// filter_expression will be evaluated against each batch which is pushed to 58 /// this node. Any rows for which filter_expression does not evaluate to `true` will be 59 /// excluded in the batch emitted by this node. 60 class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { 61 public: 62 explicit FilterNodeOptions(Expression filter_expression, bool async_mode = true) filter_expression(std::move (filter_expression))63 : filter_expression(std::move(filter_expression)), async_mode(async_mode) {} 64 65 Expression filter_expression; 66 bool async_mode; 67 }; 68 69 /// \brief Make a node which executes expressions on input batches, producing new batches. 70 /// 71 /// Each expression will be evaluated against each batch which is pushed to 72 /// this node to produce a corresponding output column. 73 /// 74 /// If names are not provided, the string representations of exprs will be used. 75 class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { 76 public: 77 explicit ProjectNodeOptions(std::vector<Expression> expressions, 78 std::vector<std::string> names = {}, bool async_mode = true) expressions(std::move (expressions))79 : expressions(std::move(expressions)), 80 names(std::move(names)), 81 async_mode(async_mode) {} 82 83 std::vector<Expression> expressions; 84 std::vector<std::string> names; 85 bool async_mode; 86 }; 87 88 /// \brief Make a node which aggregates input batches, optionally grouped by keys. 89 class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions { 90 public: 91 AggregateNodeOptions(std::vector<internal::Aggregate> aggregates, 92 std::vector<FieldRef> targets, std::vector<std::string> names, 93 std::vector<FieldRef> keys = {}) aggregates(std::move (aggregates))94 : aggregates(std::move(aggregates)), 95 targets(std::move(targets)), 96 names(std::move(names)), 97 keys(std::move(keys)) {} 98 99 // aggregations which will be applied to the targetted fields 100 std::vector<internal::Aggregate> aggregates; 101 // fields to which aggregations will be applied 102 std::vector<FieldRef> targets; 103 // output field names for aggregations 104 std::vector<std::string> names; 105 // keys by which aggregations will be grouped 106 std::vector<FieldRef> keys; 107 }; 108 109 /// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch> 110 /// 111 /// Emitted batches will not be ordered. 112 class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions { 113 public: 114 explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>* generator, 115 util::BackpressureOptions backpressure = {}) generator(generator)116 : generator(generator), backpressure(std::move(backpressure)) {} 117 118 std::function<Future<util::optional<ExecBatch>>()>* generator; 119 util::BackpressureOptions backpressure; 120 }; 121 122 class ARROW_EXPORT SinkNodeConsumer { 123 public: 124 virtual ~SinkNodeConsumer() = default; 125 /// \brief Consume a batch of data 126 virtual Status Consume(ExecBatch batch) = 0; 127 /// \brief Signal to the consumer that the last batch has been delivered 128 /// 129 /// The returned future should only finish when all outstanding tasks have completed 130 virtual Future<> Finish() = 0; 131 }; 132 133 /// \brief Add a sink node which consumes data within the exec plan run 134 class ARROW_EXPORT ConsumingSinkNodeOptions : public ExecNodeOptions { 135 public: ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer)136 explicit ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer) 137 : consumer(std::move(consumer)) {} 138 139 std::shared_ptr<SinkNodeConsumer> consumer; 140 }; 141 142 /// \brief Make a node which sorts rows passed through it 143 /// 144 /// All batches pushed to this node will be accumulated, then sorted, by the given 145 /// fields. Then sorted batches will be forwarded to the generator in sorted order. 146 class ARROW_EXPORT OrderBySinkNodeOptions : public SinkNodeOptions { 147 public: OrderBySinkNodeOptions(SortOptions sort_options,std::function<Future<util::optional<ExecBatch>> ()> * generator)148 explicit OrderBySinkNodeOptions( 149 SortOptions sort_options, 150 std::function<Future<util::optional<ExecBatch>>()>* generator) 151 : SinkNodeOptions(generator), sort_options(std::move(sort_options)) {} 152 153 SortOptions sort_options; 154 }; 155 156 enum class JoinType { 157 LEFT_SEMI, 158 RIGHT_SEMI, 159 LEFT_ANTI, 160 RIGHT_ANTI, 161 INNER, 162 LEFT_OUTER, 163 RIGHT_OUTER, 164 FULL_OUTER 165 }; 166 167 enum class JoinKeyCmp { EQ, IS }; 168 169 /// \brief Make a node which implements join operation using hash join strategy. 170 class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions { 171 public: 172 static constexpr const char* default_output_prefix_for_left = ""; 173 static constexpr const char* default_output_prefix_for_right = ""; 174 HashJoinNodeOptions( 175 JoinType in_join_type, std::vector<FieldRef> in_left_keys, 176 std::vector<FieldRef> in_right_keys, 177 std::string output_prefix_for_left = default_output_prefix_for_left, 178 std::string output_prefix_for_right = default_output_prefix_for_right) join_type(in_join_type)179 : join_type(in_join_type), 180 left_keys(std::move(in_left_keys)), 181 right_keys(std::move(in_right_keys)), 182 output_all(true), 183 output_prefix_for_left(std::move(output_prefix_for_left)), 184 output_prefix_for_right(std::move(output_prefix_for_right)) { 185 this->key_cmp.resize(this->left_keys.size()); 186 for (size_t i = 0; i < this->left_keys.size(); ++i) { 187 this->key_cmp[i] = JoinKeyCmp::EQ; 188 } 189 } 190 HashJoinNodeOptions( 191 JoinType join_type, std::vector<FieldRef> left_keys, 192 std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output, 193 std::vector<FieldRef> right_output, 194 std::string output_prefix_for_left = default_output_prefix_for_left, 195 std::string output_prefix_for_right = default_output_prefix_for_right) join_type(join_type)196 : join_type(join_type), 197 left_keys(std::move(left_keys)), 198 right_keys(std::move(right_keys)), 199 output_all(false), 200 left_output(std::move(left_output)), 201 right_output(std::move(right_output)), 202 output_prefix_for_left(std::move(output_prefix_for_left)), 203 output_prefix_for_right(std::move(output_prefix_for_right)) { 204 this->key_cmp.resize(this->left_keys.size()); 205 for (size_t i = 0; i < this->left_keys.size(); ++i) { 206 this->key_cmp[i] = JoinKeyCmp::EQ; 207 } 208 } 209 HashJoinNodeOptions( 210 JoinType join_type, std::vector<FieldRef> left_keys, 211 std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output, 212 std::vector<FieldRef> right_output, std::vector<JoinKeyCmp> key_cmp, 213 std::string output_prefix_for_left = default_output_prefix_for_left, 214 std::string output_prefix_for_right = default_output_prefix_for_right) join_type(join_type)215 : join_type(join_type), 216 left_keys(std::move(left_keys)), 217 right_keys(std::move(right_keys)), 218 output_all(false), 219 left_output(std::move(left_output)), 220 right_output(std::move(right_output)), 221 key_cmp(std::move(key_cmp)), 222 output_prefix_for_left(std::move(output_prefix_for_left)), 223 output_prefix_for_right(std::move(output_prefix_for_right)) {} 224 225 // type of join (inner, left, semi...) 226 JoinType join_type; 227 // key fields from left input 228 std::vector<FieldRef> left_keys; 229 // key fields from right input 230 std::vector<FieldRef> right_keys; 231 // if set all valid fields from both left and right input will be output 232 // (and field ref vectors for output fields will be ignored) 233 bool output_all; 234 // output fields passed from left input 235 std::vector<FieldRef> left_output; 236 // output fields passed from right input 237 std::vector<FieldRef> right_output; 238 // key comparison function (determines whether a null key is equal another null key or 239 // not) 240 std::vector<JoinKeyCmp> key_cmp; 241 // prefix added to names of output fields coming from left input (used to distinguish, 242 // if necessary, between fields of the same name in left and right input and can be left 243 // empty if there are no name collisions) 244 std::string output_prefix_for_left; 245 // prefix added to names of output fields coming from right input 246 std::string output_prefix_for_right; 247 }; 248 249 /// \brief Make a node which select top_k/bottom_k rows passed through it 250 /// 251 /// All batches pushed to this node will be accumulated, then selected, by the given 252 /// fields. Then sorted batches will be forwarded to the generator in sorted order. 253 class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { 254 public: SelectKSinkNodeOptions(SelectKOptions select_k_options,std::function<Future<util::optional<ExecBatch>> ()> * generator)255 explicit SelectKSinkNodeOptions( 256 SelectKOptions select_k_options, 257 std::function<Future<util::optional<ExecBatch>>()>* generator) 258 : SinkNodeOptions(generator), select_k_options(std::move(select_k_options)) {} 259 260 /// SelectK options 261 SelectKOptions select_k_options; 262 }; 263 264 } // namespace compute 265 } // namespace arrow 266