1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 #pragma once 19 20 #include <functional> 21 #include <memory> 22 #include <string> 23 #include <vector> 24 25 #include "arrow/compute/exec.h" 26 #include "arrow/compute/exec/util.h" 27 #include "arrow/compute/type_fwd.h" 28 #include "arrow/type_fwd.h" 29 #include "arrow/util/async_util.h" 30 #include "arrow/util/cancel.h" 31 #include "arrow/util/macros.h" 32 #include "arrow/util/optional.h" 33 #include "arrow/util/visibility.h" 34 35 namespace arrow { 36 37 namespace compute { 38 39 class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { 40 public: 41 using NodeVector = std::vector<ExecNode*>; 42 43 virtual ~ExecPlan() = default; 44 exec_context()45 ExecContext* exec_context() const { return exec_context_; } 46 47 /// Make an empty exec plan 48 static Result<std::shared_ptr<ExecPlan>> Make(ExecContext* = default_exec_context()); 49 50 ExecNode* AddNode(std::unique_ptr<ExecNode> node); 51 52 template <typename Node, typename... Args> EmplaceNode(Args &&...args)53 Node* EmplaceNode(Args&&... args) { 54 std::unique_ptr<Node> node{new Node{std::forward<Args>(args)...}}; 55 auto out = node.get(); 56 AddNode(std::move(node)); 57 return out; 58 } 59 60 /// The initial inputs 61 const NodeVector& sources() const; 62 63 /// The final outputs 64 const NodeVector& sinks() const; 65 66 Status Validate(); 67 68 /// \brief Start producing on all nodes 69 /// 70 /// Nodes are started in reverse topological order, such that any node 71 /// is started before all of its inputs. 72 Status StartProducing(); 73 74 /// \brief Stop producing on all nodes 75 /// 76 /// Nodes are stopped in topological order, such that any node 77 /// is stopped before all of its outputs. 78 void StopProducing(); 79 80 /// \brief A future which will be marked finished when all nodes have stopped producing. 81 Future<> finished(); 82 83 std::string ToString() const; 84 85 protected: 86 ExecContext* exec_context_; ExecPlan(ExecContext * exec_context)87 explicit ExecPlan(ExecContext* exec_context) : exec_context_(exec_context) {} 88 }; 89 90 class ARROW_EXPORT ExecNode { 91 public: 92 using NodeVector = std::vector<ExecNode*>; 93 94 virtual ~ExecNode() = default; 95 96 virtual const char* kind_name() const = 0; 97 98 // The number of inputs/outputs expected by this node num_inputs()99 int num_inputs() const { return static_cast<int>(inputs_.size()); } num_outputs()100 int num_outputs() const { return num_outputs_; } 101 102 /// This node's predecessors in the exec plan inputs()103 const NodeVector& inputs() const { return inputs_; } 104 105 /// \brief Labels identifying the function of each input. input_labels()106 const std::vector<std::string>& input_labels() const { return input_labels_; } 107 108 /// This node's successors in the exec plan outputs()109 const NodeVector& outputs() const { return outputs_; } 110 111 /// The datatypes for batches produced by this node output_schema()112 const std::shared_ptr<Schema>& output_schema() const { return output_schema_; } 113 114 /// This node's exec plan plan()115 ExecPlan* plan() { return plan_; } 116 117 /// \brief An optional label, for display and debugging 118 /// 119 /// There is no guarantee that this value is non-empty or unique. label()120 const std::string& label() const { return label_; } SetLabel(std::string label)121 void SetLabel(std::string label) { label_ = std::move(label); } 122 123 Status Validate() const; 124 125 /// Upstream API: 126 /// These functions are called by input nodes that want to inform this node 127 /// about an updated condition (a new input batch, an error, an impeding 128 /// end of stream). 129 /// 130 /// Implementation rules: 131 /// - these may be called anytime after StartProducing() has succeeded 132 /// (and even during or after StopProducing()) 133 /// - these may be called concurrently 134 /// - these are allowed to call back into PauseProducing(), ResumeProducing() 135 /// and StopProducing() 136 137 /// Transfer input batch to ExecNode 138 virtual void InputReceived(ExecNode* input, ExecBatch batch) = 0; 139 140 /// Signal error to ExecNode 141 virtual void ErrorReceived(ExecNode* input, Status error) = 0; 142 143 /// Mark the inputs finished after the given number of batches. 144 /// 145 /// This may be called before all inputs are received. This simply fixes 146 /// the total number of incoming batches for an input, so that the ExecNode 147 /// knows when it has received all input, regardless of order. 148 virtual void InputFinished(ExecNode* input, int total_batches) = 0; 149 150 /// Lifecycle API: 151 /// - start / stop to initiate and terminate production 152 /// - pause / resume to apply backpressure 153 /// 154 /// Implementation rules: 155 /// - StartProducing() should not recurse into the inputs, as it is 156 /// handled by ExecPlan::StartProducing() 157 /// - PauseProducing(), ResumeProducing(), StopProducing() may be called 158 /// concurrently (but only after StartProducing() has returned successfully) 159 /// - PauseProducing(), ResumeProducing(), StopProducing() may be called 160 /// by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished() 161 /// methods 162 /// - StopProducing() should recurse into the inputs 163 /// - StopProducing() must be idempotent 164 165 // XXX What happens if StartProducing() calls an output's InputReceived() 166 // synchronously, and InputReceived() decides to call back into StopProducing() 167 // (or PauseProducing()) because it received enough data? 168 // 169 // Right now, since synchronous calls happen in both directions (input to 170 // output and then output to input), a node must be careful to be reentrant 171 // against synchronous calls from its output, *and* also concurrent calls from 172 // other threads. The most reliable solution is to update the internal state 173 // first, and notify outputs only at the end. 174 // 175 // Alternate rules: 176 // - StartProducing(), ResumeProducing() can call synchronously into 177 // its ouputs' consuming methods (InputReceived() etc.) 178 // - InputReceived(), ErrorReceived(), InputFinished() can call asynchronously 179 // into its inputs' PauseProducing(), StopProducing() 180 // 181 // Alternate API: 182 // - InputReceived(), ErrorReceived(), InputFinished() return a ProductionHint 183 // enum: either None (default), PauseProducing, ResumeProducing, StopProducing 184 // - A method allows passing a ProductionHint asynchronously from an output node 185 // (replacing PauseProducing(), ResumeProducing(), StopProducing()) 186 187 /// \brief Start producing 188 /// 189 /// This must only be called once. If this fails, then other lifecycle 190 /// methods must not be called. 191 /// 192 /// This is typically called automatically by ExecPlan::StartProducing(). 193 virtual Status StartProducing() = 0; 194 195 /// \brief Pause producing temporarily 196 /// 197 /// This call is a hint that an output node is currently not willing 198 /// to receive data. 199 /// 200 /// This may be called any number of times after StartProducing() succeeds. 201 /// However, the node is still free to produce data (which may be difficult 202 /// to prevent anyway if data is produced using multiple threads). 203 virtual void PauseProducing(ExecNode* output) = 0; 204 205 /// \brief Resume producing after a temporary pause 206 /// 207 /// This call is a hint that an output node is willing to receive data again. 208 /// 209 /// This may be called any number of times after StartProducing() succeeds. 210 /// This may also be called concurrently with PauseProducing(), which suggests 211 /// the implementation may use an atomic counter. 212 virtual void ResumeProducing(ExecNode* output) = 0; 213 214 /// \brief Stop producing definitively to a single output 215 /// 216 /// This call is a hint that an output node has completed and is not willing 217 /// to receive any further data. 218 virtual void StopProducing(ExecNode* output) = 0; 219 220 /// \brief Stop producing definitively to all outputs 221 virtual void StopProducing() = 0; 222 223 /// \brief A future which will be marked finished when this node has stopped producing. 224 virtual Future<> finished() = 0; 225 226 std::string ToString() const; 227 228 protected: 229 ExecNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> input_labels, 230 std::shared_ptr<Schema> output_schema, int num_outputs); 231 232 // A helper method to send an error status to all outputs. 233 // Returns true if the status was an error. 234 bool ErrorIfNotOk(Status status); 235 236 /// Provide extra info to include in the string representation. 237 virtual std::string ToStringExtra() const; 238 239 ExecPlan* plan_; 240 std::string label_; 241 242 NodeVector inputs_; 243 std::vector<std::string> input_labels_; 244 245 std::shared_ptr<Schema> output_schema_; 246 int num_outputs_; 247 NodeVector outputs_; 248 }; 249 250 /// \brief MapNode is an ExecNode type class which process a task like filter/project 251 /// (See SubmitTask method) to each given ExecBatch object, which have one input, one 252 /// output, and are pure functions on the input 253 /// 254 /// A simple parallel runner is created with a "map_fn" which is just a function that 255 /// takes a batch in and returns a batch. This simple parallel runner also needs an 256 /// executor (use simple synchronous runner if there is no executor) 257 258 class MapNode : public ExecNode { 259 public: 260 MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs, 261 std::shared_ptr<Schema> output_schema, bool async_mode); 262 263 void ErrorReceived(ExecNode* input, Status error) override; 264 265 void InputFinished(ExecNode* input, int total_batches) override; 266 267 Status StartProducing() override; 268 269 void PauseProducing(ExecNode* output) override; 270 271 void ResumeProducing(ExecNode* output) override; 272 273 void StopProducing(ExecNode* output) override; 274 275 void StopProducing() override; 276 277 Future<> finished() override; 278 279 protected: 280 void SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn, ExecBatch batch); 281 282 void Finish(Status finish_st = Status::OK()); 283 284 protected: 285 // Counter for the number of batches received 286 AtomicCounter input_counter_; 287 288 // Future to sync finished 289 Future<> finished_ = Future<>::Make(); 290 291 // The task group for the corresponding batches 292 util::AsyncTaskGroup task_group_; 293 294 ::arrow::internal::Executor* executor_; 295 296 // Variable used to cancel remaining tasks in the executor 297 StopSource stop_source_; 298 }; 299 300 /// \brief An extensible registry for factories of ExecNodes 301 class ARROW_EXPORT ExecFactoryRegistry { 302 public: 303 using Factory = std::function<Result<ExecNode*>(ExecPlan*, std::vector<ExecNode*>, 304 const ExecNodeOptions&)>; 305 306 virtual ~ExecFactoryRegistry() = default; 307 308 /// \brief Get the named factory from this registry 309 /// 310 /// will raise if factory_name is not found 311 virtual Result<Factory> GetFactory(const std::string& factory_name) = 0; 312 313 /// \brief Add a factory to this registry with the provided name 314 /// 315 /// will raise if factory_name is already in the registry 316 virtual Status AddFactory(std::string factory_name, Factory factory) = 0; 317 }; 318 319 /// The default registry, which includes built-in factories. 320 ARROW_EXPORT 321 ExecFactoryRegistry* default_exec_factory_registry(); 322 323 /// \brief Construct an ExecNode using the named factory 324 inline Result<ExecNode*> MakeExecNode( 325 const std::string& factory_name, ExecPlan* plan, std::vector<ExecNode*> inputs, 326 const ExecNodeOptions& options, 327 ExecFactoryRegistry* registry = default_exec_factory_registry()) { 328 ARROW_ASSIGN_OR_RAISE(auto factory, registry->GetFactory(factory_name)); 329 return factory(plan, std::move(inputs), options); 330 } 331 332 /// \brief Helper class for declaring sets of ExecNodes efficiently 333 /// 334 /// A Declaration represents an unconstructed ExecNode (and potentially more since its 335 /// inputs may also be Declarations). The node can be constructed and added to a plan 336 /// with Declaration::AddToPlan, which will recursively construct any inputs as necessary. 337 struct ARROW_EXPORT Declaration { 338 using Input = util::Variant<ExecNode*, Declaration>; 339 DeclarationDeclaration340 Declaration(std::string factory_name, std::vector<Input> inputs, 341 std::shared_ptr<ExecNodeOptions> options, std::string label) 342 : factory_name{std::move(factory_name)}, 343 inputs{std::move(inputs)}, 344 options{std::move(options)}, 345 label{std::move(label)} {} 346 347 template <typename Options> DeclarationDeclaration348 Declaration(std::string factory_name, std::vector<Input> inputs, Options options) 349 : factory_name{std::move(factory_name)}, 350 inputs{std::move(inputs)}, 351 options{std::make_shared<Options>(std::move(options))}, 352 label{this->factory_name} {} 353 354 template <typename Options> DeclarationDeclaration355 Declaration(std::string factory_name, Options options) 356 : factory_name{std::move(factory_name)}, 357 inputs{}, 358 options{std::make_shared<Options>(std::move(options))}, 359 label{this->factory_name} {} 360 361 /// \brief Convenience factory for the common case of a simple sequence of nodes. 362 /// 363 /// Each of decls will be appended to the inputs of the subsequent declaration, 364 /// and the final modified declaration will be returned. 365 /// 366 /// Without this convenience factory, constructing a sequence would require explicit, 367 /// difficult-to-read nesting: 368 /// 369 /// Declaration{"n3", 370 /// { 371 /// Declaration{"n2", 372 /// { 373 /// Declaration{"n1", 374 /// { 375 /// Declaration{"n0", N0Opts{}}, 376 /// }, 377 /// N1Opts{}}, 378 /// }, 379 /// N2Opts{}}, 380 /// }, 381 /// N3Opts{}}; 382 /// 383 /// An equivalent Declaration can be constructed more tersely using Sequence: 384 /// 385 /// Declaration::Sequence({ 386 /// {"n0", N0Opts{}}, 387 /// {"n1", N1Opts{}}, 388 /// {"n2", N2Opts{}}, 389 /// {"n3", N3Opts{}}, 390 /// }); 391 static Declaration Sequence(std::vector<Declaration> decls); 392 393 Result<ExecNode*> AddToPlan(ExecPlan* plan, ExecFactoryRegistry* registry = 394 default_exec_factory_registry()) const; 395 396 std::string factory_name; 397 std::vector<Input> inputs; 398 std::shared_ptr<ExecNodeOptions> options; 399 std::string label; 400 }; 401 402 /// \brief Wrap an ExecBatch generator in a RecordBatchReader. 403 /// 404 /// The RecordBatchReader does not impose any ordering on emitted batches. 405 ARROW_EXPORT 406 std::shared_ptr<RecordBatchReader> MakeGeneratorReader( 407 std::shared_ptr<Schema>, std::function<Future<util::optional<ExecBatch>>()>, 408 MemoryPool*); 409 410 constexpr int kDefaultBackgroundMaxQ = 32; 411 constexpr int kDefaultBackgroundQRestart = 16; 412 413 /// \brief Make a generator of RecordBatchReaders 414 /// 415 /// Useful as a source node for an Exec plan 416 ARROW_EXPORT 417 Result<std::function<Future<util::optional<ExecBatch>>()>> MakeReaderGenerator( 418 std::shared_ptr<RecordBatchReader> reader, arrow::internal::Executor* io_executor, 419 int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart); 420 421 } // namespace compute 422 } // namespace arrow 423