1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <functional>
21 #include <memory>
22 #include <string>
23 #include <vector>
24 
25 #include "arrow/compute/exec.h"
26 #include "arrow/compute/exec/util.h"
27 #include "arrow/compute/type_fwd.h"
28 #include "arrow/type_fwd.h"
29 #include "arrow/util/async_util.h"
30 #include "arrow/util/cancel.h"
31 #include "arrow/util/macros.h"
32 #include "arrow/util/optional.h"
33 #include "arrow/util/visibility.h"
34 
35 namespace arrow {
36 
37 namespace compute {
38 
39 class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
40  public:
41   using NodeVector = std::vector<ExecNode*>;
42 
43   virtual ~ExecPlan() = default;
44 
exec_context()45   ExecContext* exec_context() const { return exec_context_; }
46 
47   /// Make an empty exec plan
48   static Result<std::shared_ptr<ExecPlan>> Make(ExecContext* = default_exec_context());
49 
50   ExecNode* AddNode(std::unique_ptr<ExecNode> node);
51 
52   template <typename Node, typename... Args>
EmplaceNode(Args &&...args)53   Node* EmplaceNode(Args&&... args) {
54     std::unique_ptr<Node> node{new Node{std::forward<Args>(args)...}};
55     auto out = node.get();
56     AddNode(std::move(node));
57     return out;
58   }
59 
60   /// The initial inputs
61   const NodeVector& sources() const;
62 
63   /// The final outputs
64   const NodeVector& sinks() const;
65 
66   Status Validate();
67 
68   /// \brief Start producing on all nodes
69   ///
70   /// Nodes are started in reverse topological order, such that any node
71   /// is started before all of its inputs.
72   Status StartProducing();
73 
74   /// \brief Stop producing on all nodes
75   ///
76   /// Nodes are stopped in topological order, such that any node
77   /// is stopped before all of its outputs.
78   void StopProducing();
79 
80   /// \brief A future which will be marked finished when all nodes have stopped producing.
81   Future<> finished();
82 
83   std::string ToString() const;
84 
85  protected:
86   ExecContext* exec_context_;
ExecPlan(ExecContext * exec_context)87   explicit ExecPlan(ExecContext* exec_context) : exec_context_(exec_context) {}
88 };
89 
90 class ARROW_EXPORT ExecNode {
91  public:
92   using NodeVector = std::vector<ExecNode*>;
93 
94   virtual ~ExecNode() = default;
95 
96   virtual const char* kind_name() const = 0;
97 
98   // The number of inputs/outputs expected by this node
num_inputs()99   int num_inputs() const { return static_cast<int>(inputs_.size()); }
num_outputs()100   int num_outputs() const { return num_outputs_; }
101 
102   /// This node's predecessors in the exec plan
inputs()103   const NodeVector& inputs() const { return inputs_; }
104 
105   /// \brief Labels identifying the function of each input.
input_labels()106   const std::vector<std::string>& input_labels() const { return input_labels_; }
107 
108   /// This node's successors in the exec plan
outputs()109   const NodeVector& outputs() const { return outputs_; }
110 
111   /// The datatypes for batches produced by this node
output_schema()112   const std::shared_ptr<Schema>& output_schema() const { return output_schema_; }
113 
114   /// This node's exec plan
plan()115   ExecPlan* plan() { return plan_; }
116 
117   /// \brief An optional label, for display and debugging
118   ///
119   /// There is no guarantee that this value is non-empty or unique.
label()120   const std::string& label() const { return label_; }
SetLabel(std::string label)121   void SetLabel(std::string label) { label_ = std::move(label); }
122 
123   Status Validate() const;
124 
125   /// Upstream API:
126   /// These functions are called by input nodes that want to inform this node
127   /// about an updated condition (a new input batch, an error, an impeding
128   /// end of stream).
129   ///
130   /// Implementation rules:
131   /// - these may be called anytime after StartProducing() has succeeded
132   ///   (and even during or after StopProducing())
133   /// - these may be called concurrently
134   /// - these are allowed to call back into PauseProducing(), ResumeProducing()
135   ///   and StopProducing()
136 
137   /// Transfer input batch to ExecNode
138   virtual void InputReceived(ExecNode* input, ExecBatch batch) = 0;
139 
140   /// Signal error to ExecNode
141   virtual void ErrorReceived(ExecNode* input, Status error) = 0;
142 
143   /// Mark the inputs finished after the given number of batches.
144   ///
145   /// This may be called before all inputs are received.  This simply fixes
146   /// the total number of incoming batches for an input, so that the ExecNode
147   /// knows when it has received all input, regardless of order.
148   virtual void InputFinished(ExecNode* input, int total_batches) = 0;
149 
150   /// Lifecycle API:
151   /// - start / stop to initiate and terminate production
152   /// - pause / resume to apply backpressure
153   ///
154   /// Implementation rules:
155   /// - StartProducing() should not recurse into the inputs, as it is
156   ///   handled by ExecPlan::StartProducing()
157   /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
158   ///   concurrently (but only after StartProducing() has returned successfully)
159   /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
160   ///   by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished()
161   ///   methods
162   /// - StopProducing() should recurse into the inputs
163   /// - StopProducing() must be idempotent
164 
165   // XXX What happens if StartProducing() calls an output's InputReceived()
166   // synchronously, and InputReceived() decides to call back into StopProducing()
167   // (or PauseProducing()) because it received enough data?
168   //
169   // Right now, since synchronous calls happen in both directions (input to
170   // output and then output to input), a node must be careful to be reentrant
171   // against synchronous calls from its output, *and* also concurrent calls from
172   // other threads.  The most reliable solution is to update the internal state
173   // first, and notify outputs only at the end.
174   //
175   // Alternate rules:
176   // - StartProducing(), ResumeProducing() can call synchronously into
177   //   its ouputs' consuming methods (InputReceived() etc.)
178   // - InputReceived(), ErrorReceived(), InputFinished() can call asynchronously
179   //   into its inputs' PauseProducing(), StopProducing()
180   //
181   // Alternate API:
182   // - InputReceived(), ErrorReceived(), InputFinished() return a ProductionHint
183   //   enum: either None (default), PauseProducing, ResumeProducing, StopProducing
184   // - A method allows passing a ProductionHint asynchronously from an output node
185   //   (replacing PauseProducing(), ResumeProducing(), StopProducing())
186 
187   /// \brief Start producing
188   ///
189   /// This must only be called once.  If this fails, then other lifecycle
190   /// methods must not be called.
191   ///
192   /// This is typically called automatically by ExecPlan::StartProducing().
193   virtual Status StartProducing() = 0;
194 
195   /// \brief Pause producing temporarily
196   ///
197   /// This call is a hint that an output node is currently not willing
198   /// to receive data.
199   ///
200   /// This may be called any number of times after StartProducing() succeeds.
201   /// However, the node is still free to produce data (which may be difficult
202   /// to prevent anyway if data is produced using multiple threads).
203   virtual void PauseProducing(ExecNode* output) = 0;
204 
205   /// \brief Resume producing after a temporary pause
206   ///
207   /// This call is a hint that an output node is willing to receive data again.
208   ///
209   /// This may be called any number of times after StartProducing() succeeds.
210   /// This may also be called concurrently with PauseProducing(), which suggests
211   /// the implementation may use an atomic counter.
212   virtual void ResumeProducing(ExecNode* output) = 0;
213 
214   /// \brief Stop producing definitively to a single output
215   ///
216   /// This call is a hint that an output node has completed and is not willing
217   /// to receive any further data.
218   virtual void StopProducing(ExecNode* output) = 0;
219 
220   /// \brief Stop producing definitively to all outputs
221   virtual void StopProducing() = 0;
222 
223   /// \brief A future which will be marked finished when this node has stopped producing.
224   virtual Future<> finished() = 0;
225 
226   std::string ToString() const;
227 
228  protected:
229   ExecNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> input_labels,
230            std::shared_ptr<Schema> output_schema, int num_outputs);
231 
232   // A helper method to send an error status to all outputs.
233   // Returns true if the status was an error.
234   bool ErrorIfNotOk(Status status);
235 
236   /// Provide extra info to include in the string representation.
237   virtual std::string ToStringExtra() const;
238 
239   ExecPlan* plan_;
240   std::string label_;
241 
242   NodeVector inputs_;
243   std::vector<std::string> input_labels_;
244 
245   std::shared_ptr<Schema> output_schema_;
246   int num_outputs_;
247   NodeVector outputs_;
248 };
249 
250 /// \brief MapNode is an ExecNode type class which process a task like filter/project
251 /// (See SubmitTask method) to each given ExecBatch object, which have one input, one
252 /// output, and are pure functions on the input
253 ///
254 /// A simple parallel runner is created with a "map_fn" which is just a function that
255 /// takes a batch in and returns a batch.  This simple parallel runner also needs an
256 /// executor (use simple synchronous runner if there is no executor)
257 
258 class MapNode : public ExecNode {
259  public:
260   MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
261           std::shared_ptr<Schema> output_schema, bool async_mode);
262 
263   void ErrorReceived(ExecNode* input, Status error) override;
264 
265   void InputFinished(ExecNode* input, int total_batches) override;
266 
267   Status StartProducing() override;
268 
269   void PauseProducing(ExecNode* output) override;
270 
271   void ResumeProducing(ExecNode* output) override;
272 
273   void StopProducing(ExecNode* output) override;
274 
275   void StopProducing() override;
276 
277   Future<> finished() override;
278 
279  protected:
280   void SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn, ExecBatch batch);
281 
282   void Finish(Status finish_st = Status::OK());
283 
284  protected:
285   // Counter for the number of batches received
286   AtomicCounter input_counter_;
287 
288   // Future to sync finished
289   Future<> finished_ = Future<>::Make();
290 
291   // The task group for the corresponding batches
292   util::AsyncTaskGroup task_group_;
293 
294   ::arrow::internal::Executor* executor_;
295 
296   // Variable used to cancel remaining tasks in the executor
297   StopSource stop_source_;
298 };
299 
300 /// \brief An extensible registry for factories of ExecNodes
301 class ARROW_EXPORT ExecFactoryRegistry {
302  public:
303   using Factory = std::function<Result<ExecNode*>(ExecPlan*, std::vector<ExecNode*>,
304                                                   const ExecNodeOptions&)>;
305 
306   virtual ~ExecFactoryRegistry() = default;
307 
308   /// \brief Get the named factory from this registry
309   ///
310   /// will raise if factory_name is not found
311   virtual Result<Factory> GetFactory(const std::string& factory_name) = 0;
312 
313   /// \brief Add a factory to this registry with the provided name
314   ///
315   /// will raise if factory_name is already in the registry
316   virtual Status AddFactory(std::string factory_name, Factory factory) = 0;
317 };
318 
319 /// The default registry, which includes built-in factories.
320 ARROW_EXPORT
321 ExecFactoryRegistry* default_exec_factory_registry();
322 
323 /// \brief Construct an ExecNode using the named factory
324 inline Result<ExecNode*> MakeExecNode(
325     const std::string& factory_name, ExecPlan* plan, std::vector<ExecNode*> inputs,
326     const ExecNodeOptions& options,
327     ExecFactoryRegistry* registry = default_exec_factory_registry()) {
328   ARROW_ASSIGN_OR_RAISE(auto factory, registry->GetFactory(factory_name));
329   return factory(plan, std::move(inputs), options);
330 }
331 
332 /// \brief Helper class for declaring sets of ExecNodes efficiently
333 ///
334 /// A Declaration represents an unconstructed ExecNode (and potentially more since its
335 /// inputs may also be Declarations). The node can be constructed and added to a plan
336 /// with Declaration::AddToPlan, which will recursively construct any inputs as necessary.
337 struct ARROW_EXPORT Declaration {
338   using Input = util::Variant<ExecNode*, Declaration>;
339 
DeclarationDeclaration340   Declaration(std::string factory_name, std::vector<Input> inputs,
341               std::shared_ptr<ExecNodeOptions> options, std::string label)
342       : factory_name{std::move(factory_name)},
343         inputs{std::move(inputs)},
344         options{std::move(options)},
345         label{std::move(label)} {}
346 
347   template <typename Options>
DeclarationDeclaration348   Declaration(std::string factory_name, std::vector<Input> inputs, Options options)
349       : factory_name{std::move(factory_name)},
350         inputs{std::move(inputs)},
351         options{std::make_shared<Options>(std::move(options))},
352         label{this->factory_name} {}
353 
354   template <typename Options>
DeclarationDeclaration355   Declaration(std::string factory_name, Options options)
356       : factory_name{std::move(factory_name)},
357         inputs{},
358         options{std::make_shared<Options>(std::move(options))},
359         label{this->factory_name} {}
360 
361   /// \brief Convenience factory for the common case of a simple sequence of nodes.
362   ///
363   /// Each of decls will be appended to the inputs of the subsequent declaration,
364   /// and the final modified declaration will be returned.
365   ///
366   /// Without this convenience factory, constructing a sequence would require explicit,
367   /// difficult-to-read nesting:
368   ///
369   ///     Declaration{"n3",
370   ///                   {
371   ///                       Declaration{"n2",
372   ///                                   {
373   ///                                       Declaration{"n1",
374   ///                                                   {
375   ///                                                       Declaration{"n0", N0Opts{}},
376   ///                                                   },
377   ///                                                   N1Opts{}},
378   ///                                   },
379   ///                                   N2Opts{}},
380   ///                   },
381   ///                   N3Opts{}};
382   ///
383   /// An equivalent Declaration can be constructed more tersely using Sequence:
384   ///
385   ///     Declaration::Sequence({
386   ///         {"n0", N0Opts{}},
387   ///         {"n1", N1Opts{}},
388   ///         {"n2", N2Opts{}},
389   ///         {"n3", N3Opts{}},
390   ///     });
391   static Declaration Sequence(std::vector<Declaration> decls);
392 
393   Result<ExecNode*> AddToPlan(ExecPlan* plan, ExecFactoryRegistry* registry =
394                                                   default_exec_factory_registry()) const;
395 
396   std::string factory_name;
397   std::vector<Input> inputs;
398   std::shared_ptr<ExecNodeOptions> options;
399   std::string label;
400 };
401 
402 /// \brief Wrap an ExecBatch generator in a RecordBatchReader.
403 ///
404 /// The RecordBatchReader does not impose any ordering on emitted batches.
405 ARROW_EXPORT
406 std::shared_ptr<RecordBatchReader> MakeGeneratorReader(
407     std::shared_ptr<Schema>, std::function<Future<util::optional<ExecBatch>>()>,
408     MemoryPool*);
409 
410 constexpr int kDefaultBackgroundMaxQ = 32;
411 constexpr int kDefaultBackgroundQRestart = 16;
412 
413 /// \brief Make a generator of RecordBatchReaders
414 ///
415 /// Useful as a source node for an Exec plan
416 ARROW_EXPORT
417 Result<std::function<Future<util::optional<ExecBatch>>()>> MakeReaderGenerator(
418     std::shared_ptr<RecordBatchReader> reader, arrow::internal::Executor* io_executor,
419     int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart);
420 
421 }  // namespace compute
422 }  // namespace arrow
423