1 #pragma once
2 
3 #include "task.hpp"
4 
5 /**
6 @file flow_builder.hpp
7 @brief flow builder include file
8 */
9 
10 namespace tf {
11 
12 /**
13 @class FlowBuilder
14 
15 @brief building methods of a task dependency graph
16 
17 */
18 class FlowBuilder {
19 
20   friend class Executor;
21 
22   public:
23 
24     /**
25     @brief creates a static task
26 
27     @tparam C callable type constructible from std::function<void()>
28 
29     @param callable callable to construct a static task
30 
31     @return a tf::Task handle
32 
33     The following example creates a static task.
34 
35     @code{.cpp}
36     tf::Task static_task = taskflow.emplace([](){});
37     @endcode
38 
39     Please refer to @ref StaticTasking for details.
40     */
41     template <typename C,
42       std::enable_if_t<is_static_task_v<C>, void>* = nullptr
43     >
44     Task emplace(C&& callable);
45 
46     /**
47     @brief creates a dynamic task
48 
49     @tparam C callable type constructible from std::function<void(tf::Subflow&)>
50 
51     @param callable callable to construct a dynamic task
52 
53     @return a tf::Task handle
54 
55     The following example creates a dynamic task (tf::Subflow)
56     that spawns two static tasks.
57 
58     @code{.cpp}
59     tf::Task dynamic_task = taskflow.emplace([](tf::Subflow& sf){
60       tf::Task static_task1 = sf.emplace([](){});
61       tf::Task static_task2 = sf.emplace([](){});
62     });
63     @endcode
64 
65     Please refer to @ref DynamicTasking for details.
66     */
67     template <typename C,
68       std::enable_if_t<is_dynamic_task_v<C>, void>* = nullptr
69     >
70     Task emplace(C&& callable);
71 
72     /**
73     @brief creates a condition task
74 
75     @tparam C callable type constructible from std::function<int()>
76 
77     @param callable callable to construct a condition task
78 
79     @return a tf::Task handle
80 
81     The following example creates an if-else block using one condition task
82     and three static tasks.
83 
84     @code{.cpp}
85     tf::Taskflow taskflow;
86 
87     auto [init, cond, yes, no] = taskflow.emplace(
88      [] () { },
89      [] () { return 0; },
90      [] () { std::cout << "yes\n"; },
91      [] () { std::cout << "no\n"; }
92     );
93 
94     // executes yes if cond returns 0, or no if cond returns 1
95     cond.precede(yes, no);
96     cond.succeed(init);
97     @endcode
98 
99     Please refer to @ref ConditionalTasking for details.
100     */
101     template <typename C,
102       std::enable_if_t<is_condition_task_v<C>, void>* = nullptr
103     >
104     Task emplace(C&& callable);
105 
106     /**
107     @brief creates multiple tasks from a list of callable objects
108 
109     @tparam C callable types
110 
111     @param callables one or multiple callable objects constructible from each task category
112 
113     @return a tf::Task handle
114 
115     The method returns a tuple of tasks each corresponding to the given
116     callable target. You can use structured binding to get the return tasks
117     one by one.
118     The following example creates four static tasks and assign them to
119     @c A, @c B, @c C, and @c D using structured binding.
120 
121     @code{.cpp}
122     auto [A, B, C, D] = taskflow.emplace(
123       [] () { std::cout << "A"; },
124       [] () { std::cout << "B"; },
125       [] () { std::cout << "C"; },
126       [] () { std::cout << "D"; }
127     );
128     @endcode
129     */
130     template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
131     auto emplace(C&&... callables);
132 
133     /**
134     @brief creates a module task from a taskflow
135 
136     @param taskflow a taskflow object for the module
137 
138     @return a tf::Task handle
139 
140     Please refer to @ref ComposableTasking for details.
141     */
142     Task composed_of(Taskflow& taskflow);
143 
144     /**
145     @brief creates a placeholder task
146 
147     @return a tf::Task handle
148 
149     A placeholder task maps to a node in the taskflow graph, but
150     it does not have any callable work assigned yet.
151     A placeholder task is different from an empty task handle that
152     does not point to any node in a graph.
153 
154     @code{.cpp}
155     // create a placeholder task with no callable target assigned
156     tf::Task placeholder = taskflow.placeholder();
157     assert(placeholder.empty() == false && placeholder.has_work() == false);
158 
159     // create an empty task handle
160     tf::Task task;
161     assert(task.empty() == true);
162 
163     // assign the task handle to the placeholder task
164     task = placeholder;
165     assert(task.empty() == false && task.has_work() == false);
166     @endcode
167     */
168     Task placeholder();
169 
170     /**
171     @brief creates a %cudaFlow task on the caller's GPU device context
172 
173     @tparam C callable type constructible from @c std::function<void(tf::cudaFlow&)>
174 
175     @return a tf::Task handle
176 
177     This method is equivalent to calling tf::FlowBuilder::emplace_on(callable, d)
178     where @c d is the caller's device context.
179     The following example creates a %cudaFlow of two kernel tasks, @c task1 and
180     @c task2, where @c task1 runs before @c task2.
181 
182     @code{.cpp}
183     taskflow.emplace([&](tf::cudaFlow& cf){
184       // create two kernel tasks
185       tf::cudaTask task1 = cf.kernel(grid1, block1, shm1, kernel1, args1);
186       tf::cudaTask task2 = cf.kernel(grid2, block2, shm2, kernel2, args2);
187 
188       // kernel1 runs before kernel2
189       task1.precede(task2);
190     });
191     @endcode
192 
193     Please refer to @ref GPUTaskingcudaFlow and @ref GPUTaskingcudaFlowCapturer
194     for details.
195     */
196     template <typename C,
197       std::enable_if_t<is_cudaflow_task_v<C>, void>* = nullptr
198     >
199     Task emplace(C&& callable);
200 
201     /**
202     @brief creates a %cudaFlow task on the given device
203 
204     @tparam C callable type constructible from std::function<void(tf::cudaFlow&)>
205     @tparam D device type, either @c int or @c std::ref<int> (stateful)
206 
207     @return a tf::Task handle
208 
209     The following example creates a %cudaFlow of two kernel tasks, @c task1 and
210     @c task2 on GPU @c 2, where @c task1 runs before @c task2
211 
212     @code{.cpp}
213     taskflow.emplace_on([&](tf::cudaFlow& cf){
214       // create two kernel tasks
215       tf::cudaTask task1 = cf.kernel(grid1, block1, shm1, kernel1, args1);
216       tf::cudaTask task2 = cf.kernel(grid2, block2, shm2, kernel2, args2);
217 
218       // kernel1 runs before kernel2
219       task1.precede(task2);
220     }, 2);
221     @endcode
222     */
223     template <typename C, typename D,
224       std::enable_if_t<is_cudaflow_task_v<C>, void>* = nullptr
225     >
226     Task emplace_on(C&& callable, D&& device);
227 
228     /**
229     @brief creates a %syclFlow task on the default queue
230 
231     @tparam C callable type constructible from std::function<void(tf::syclFlow&)>
232 
233     @param callable a callable that takes a referenced tf::syclFlow object
234 
235     @return a tf::Task handle
236 
237     The following example creates a %syclFlow on the default queue to submit
238     two kernel tasks, @c task1 and @c task2, where @c task1 runs before @c task2.
239 
240     @code{.cpp}
241     taskflow.emplace([&](tf::syclFlow& cf){
242       // create two single-thread kernel tasks
243       tf::syclTask task1 = cf.single_task([](){});
244       tf::syclTask task2 = cf.single_task([](){});
245 
246       // kernel1 runs before kernel2
247       task1.precede(task2);
248     });
249     @endcode
250     */
251     template <typename C, std::enable_if_t<is_syclflow_task_v<C>, void>* = nullptr>
252     Task emplace(C&& callable);
253 
254     /**
255     @brief creates a %syclFlow task on the given queue
256 
257     @tparam C callable type constructible from std::function<void(tf::syclFlow&)>
258     @tparam Q queue type
259 
260     @param callable a callable that takes a referenced tf::syclFlow object
261     @param queue a queue of type sycl::queue
262 
263     @return a tf::Task handle
264 
265     The following example creates a %syclFlow on the given queue to submit
266     two kernel tasks, @c task1 and @c task2, where @c task1 runs before @c task2.
267 
268     @code{.cpp}
269     taskflow.emplace_on([&](tf::syclFlow& cf){
270       // create two single-thread kernel tasks
271       tf::syclTask task1 = cf.single_task([](){});
272       tf::syclTask task2 = cf.single_task([](){});
273 
274       // kernel1 runs before kernel2
275       task1.precede(task2);
276     }, queue);
277     @endcode
278     */
279     template <typename C, typename Q,
280       std::enable_if_t<is_syclflow_task_v<C>, void>* = nullptr
281     >
282     Task emplace_on(C&& callable, Q&& queue);
283 
284     /**
285     @brief adds adjacent dependency links to a linear list of tasks
286 
287     @param tasks a vector of tasks
288     */
289     void linearize(std::vector<Task>& tasks);
290 
291     /**
292     @brief adds adjacent dependency links to a linear list of tasks
293 
294     @param tasks an initializer list of tasks
295     */
296     void linearize(std::initializer_list<Task> tasks);
297 
298     // ------------------------------------------------------------------------
299     // parallel iterations
300     // ------------------------------------------------------------------------
301 
302     /**
303     @brief constructs a STL-styled parallel-for task
304 
305     @tparam B beginning iterator type
306     @tparam E ending iterator type
307     @tparam C callable type
308 
309     @param first iterator to the beginning (inclusive)
310     @param last iterator to the end (exclusive)
311     @param callable a callable object to apply to the dereferenced iterator
312 
313     @return a tf::Task handle
314 
315     The task spawns a subflow that applies the callable object to each object obtained by dereferencing every iterator in the range <tt>[first, last)</tt>.
316     This method is equivalent to the parallel execution of the following loop:
317 
318     @code{.cpp}
319     for(auto itr=first; itr!=last; itr++) {
320       callable(*itr);
321     }
322     @endcode
323 
324     Arguments templated to enable stateful passing using std::reference_wrapper.
325     The callable needs to take a single argument of
326     the dereferenced iterator type.
327 
328     Please refer to @ref ParallelIterations for details.
329     */
330     template <typename B, typename E, typename C>
331     Task for_each(B&& first, E&& last, C&& callable);
332 
333     /**
334     @brief constructs an index-based parallel-for task
335 
336     @tparam B beginning index type (must be integral)
337     @tparam E ending index type (must be integral)
338     @tparam S step type (must be integral)
339     @tparam C callable type
340 
341     @param first index of the beginning (inclusive)
342     @param last index of the end (exclusive)
343     @param step step size
344     @param callable a callable object to apply to each valid index
345 
346     @return a tf::Task handle
347 
348     The task spawns a subflow that applies the callable object to each index in the range <tt>[first, last)</tt> with the step size.
349 
350     This method is equivalent to the parallel execution of the following loop:
351 
352     @code{.cpp}
353     // case 1: step size is positive
354     for(auto i=first; i<last; i+=step) {
355       callable(i);
356     }
357 
358     // case 2: step size is negative
359     for(auto i=first, i>last; i+=step) {
360       callable(i);
361     }
362     @endcode
363 
364     Arguments are templated to enable stateful passing using std::reference_wrapper.
365     The callable needs to take a single argument of the integral index type.
366 
367     Please refer to @ref ParallelIterations for details.
368     */
369     template <typename B, typename E, typename S, typename C>
370     Task for_each_index(B&& first, E&& last, S&& step, C&& callable);
371 
372     // ------------------------------------------------------------------------
373     // reduction
374     // ------------------------------------------------------------------------
375 
376     /**
377     @brief constructs a STL-styled parallel-reduce task
378 
379     @tparam B beginning iterator type
380     @tparam E ending iterator type
381     @tparam T result type
382     @tparam O binary reducer type
383 
384     @param first iterator to the beginning (inclusive)
385     @param last iterator to the end (exclusive)
386     @param init initial value of the reduction and the storage for the reduced result
387     @param bop binary operator that will be applied
388 
389     @return a tf::Task handle
390 
391     The task spawns a subflow to perform parallel reduction over @c init and the elements in the range <tt>[first, last)</tt>. The reduced result is store in @c init.
392 
393     This method is equivalent to the parallel execution of the following loop:
394 
395     @code{.cpp}
396     for(auto itr=first; itr!=last; itr++) {
397       init = bop(init, *itr);
398     }
399     @endcode
400 
401     Arguments are templated to enable stateful passing using std::reference_wrapper.
402 
403     Please refer to @ref ParallelReduction for details.
404     */
405     template <typename B, typename E, typename T, typename O>
406     Task reduce(B&& first, E&& last, T& init, O&& bop);
407 
408     // ------------------------------------------------------------------------
409     // transfrom and reduction
410     // ------------------------------------------------------------------------
411 
412     /**
413     @brief constructs a STL-styled parallel transform-reduce task
414 
415     @tparam B beginning iterator type
416     @tparam E ending iterator type
417     @tparam T result type
418     @tparam BOP binary reducer type
419     @tparam UOP unary transformion type
420 
421     @param first iterator to the beginning (inclusive)
422     @param last iterator to the end (exclusive)
423     @param init initial value of the reduction and the storage for the reduced result
424     @param bop binary operator that will be applied in unspecified order to the results of @c uop
425     @param uop unary operator that will be applied to transform each element in the range to the result type
426 
427     @return a tf::Task handle
428 
429     The task spawns a subflow to perform parallel reduction over @c init and the transformed elements in the range <tt>[first, last)</tt>.
430     The reduced result is store in @c init.
431 
432     This method is equivalent to the parallel execution of the following loop:
433 
434     @code{.cpp}
435     for(auto itr=first; itr!=last; itr++) {
436       init = bop(init, uop(*itr));
437     }
438     @endcode
439 
440     Arguments are templated to enable stateful passing using std::reference_wrapper.
441 
442     Please refer to @ref ParallelReduction for details.
443     */
444     template <typename B, typename E, typename T, typename BOP, typename UOP>
445     Task transform_reduce(B&& first, E&& last, T& init, BOP&& bop, UOP&& uop);
446 
447     // ------------------------------------------------------------------------
448     // sort
449     // ------------------------------------------------------------------------
450 
451     /**
452     @brief constructs a dynamic task to perform STL-styled parallel sort
453 
454     @tparam B beginning iterator type (random-accessible)
455     @tparam E ending iterator type (random-accessible)
456     @tparam C comparator type
457 
458     @param first iterator to the beginning (inclusive)
459     @param last iterator to the end (exclusive)
460     @param cmp comparison function object
461 
462     The task spawns a subflow to parallelly sort elements in the range
463     <tt>[first, last)</tt>.
464 
465     Arguments are templated to enable stateful passing using std::reference_wrapper.
466 
467     Please refer to @ref ParallelSort for details.
468     */
469     template <typename B, typename E, typename C>
470     Task sort(B&& first, E&& last, C&& cmp);
471 
472     /**
473     @brief constructs a dynamic task to perform STL-styled parallel sort using
474            the @c std::less<T> comparator, where @c T is the element type
475 
476     @tparam B beginning iterator type (random-accessible)
477     @tparam E ending iterator type (random-accessible)
478 
479     @param first iterator to the beginning (inclusive)
480     @param last iterator to the end (exclusive)
481 
482     The task spawns a subflow to parallelly sort elements in the range
483     <tt>[first, last)</tt> using the @c std::less<T> comparator,
484     where @c T is the dereferenced iterator type.
485 
486     Arguments are templated to enable stateful passing using std::reference_wrapper.
487 
488     Please refer to @ref ParallelSort for details.
489      */
490     template <typename B, typename E>
491     Task sort(B&& first, E&& last);
492 
493   protected:
494 
495     /**
496     @brief constructs a flow builder with a graph
497     */
498     FlowBuilder(Graph& graph);
499 
500     /**
501     @brief associated graph object
502     */
503     Graph& _graph;
504 
505   private:
506 
507     template <typename L>
508     void _linearize(L&);
509 };
510 
511 // Constructor
FlowBuilder(Graph & graph)512 inline FlowBuilder::FlowBuilder(Graph& graph) :
513   _graph {graph} {
514 }
515 
516 // Function: emplace
517 template <typename C, std::enable_if_t<is_static_task_v<C>, void>*>
emplace(C && c)518 Task FlowBuilder::emplace(C&& c) {
519   return Task(_graph.emplace_back(
520     std::in_place_type_t<Node::Static>{}, std::forward<C>(c)
521   ));
522 }
523 
524 // Function: emplace
525 template <typename C, std::enable_if_t<is_dynamic_task_v<C>, void>*>
emplace(C && c)526 Task FlowBuilder::emplace(C&& c) {
527   return Task(_graph.emplace_back(
528     std::in_place_type_t<Node::Dynamic>{}, std::forward<C>(c)
529   ));
530 }
531 
532 // Function: emplace
533 template <typename C, std::enable_if_t<is_condition_task_v<C>, void>*>
emplace(C && c)534 Task FlowBuilder::emplace(C&& c) {
535   return Task(_graph.emplace_back(
536     std::in_place_type_t<Node::Condition>{}, std::forward<C>(c)
537   ));
538 }
539 
540 // Function: emplace
541 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
emplace(C &&...cs)542 auto FlowBuilder::emplace(C&&... cs) {
543   return std::make_tuple(emplace(std::forward<C>(cs))...);
544 }
545 
546 // Function: composed_of
composed_of(Taskflow & taskflow)547 inline Task FlowBuilder::composed_of(Taskflow& taskflow) {
548   auto node = _graph.emplace_back(
549     std::in_place_type_t<Node::Module>{}, &taskflow
550   );
551   return Task(node);
552 }
553 
554 // Function: placeholder
placeholder()555 inline Task FlowBuilder::placeholder() {
556   auto node = _graph.emplace_back();
557   return Task(node);
558 }
559 
560 // Procedure: _linearize
561 template <typename L>
_linearize(L & keys)562 void FlowBuilder::_linearize(L& keys) {
563 
564   auto itr = keys.begin();
565   auto end = keys.end();
566 
567   if(itr == end) {
568     return;
569   }
570 
571   auto nxt = itr;
572 
573   for(++nxt; nxt != end; ++nxt, ++itr) {
574     itr->_node->_precede(nxt->_node);
575   }
576 }
577 
578 // Procedure: linearize
linearize(std::vector<Task> & keys)579 inline void FlowBuilder::linearize(std::vector<Task>& keys) {
580   _linearize(keys);
581 }
582 
583 // Procedure: linearize
linearize(std::initializer_list<Task> keys)584 inline void FlowBuilder::linearize(std::initializer_list<Task> keys) {
585   _linearize(keys);
586 }
587 
588 // ----------------------------------------------------------------------------
589 
590 /**
591 @class Subflow
592 
593 @brief class to construct a subflow graph from the execution of a dynamic task
594 
595 By default, a subflow automatically @em joins its parent node.
596 You may explicitly join or detach a subflow by calling tf::Subflow::join
597 or tf::Subflow::detach, respectively.
598 The following example creates a taskflow graph that spawns a subflow from
599 the execution of task @c B, and the subflow contains three tasks, @c B1,
600 @c B2, and @c B3, where @c B3 runs after @c B1 and @c B2.
601 
602 @code{.cpp}
603 // create three regular tasks
604 tf::Task A = taskflow.emplace([](){}).name("A");
605 tf::Task C = taskflow.emplace([](){}).name("C");
606 tf::Task D = taskflow.emplace([](){}).name("D");
607 
608 // create a subflow graph (dynamic tasking)
609 tf::Task B = taskflow.emplace([] (tf::Subflow& subflow) {
610   tf::Task B1 = subflow.emplace([](){}).name("B1");
611   tf::Task B2 = subflow.emplace([](){}).name("B2");
612   tf::Task B3 = subflow.emplace([](){}).name("B3");
613   B1.precede(B3);
614   B2.precede(B3);
615 }).name("B");
616 
617 A.precede(B);  // B runs after A
618 A.precede(C);  // C runs after A
619 B.precede(D);  // D runs after B
620 C.precede(D);  // D runs after C
621 @endcode
622 
623 */
624 class Subflow : public FlowBuilder {
625 
626   friend class Executor;
627   friend class FlowBuilder;
628 
629   public:
630 
631     /**
632     @brief enables the subflow to join its parent task
633 
634     Performs an immediate action to join the subflow. Once the subflow is joined,
635     it is considered finished and you may not modify the subflow anymore.
636     */
637     void join();
638 
639     /**
640     @brief enables the subflow to detach from its parent task
641 
642     Performs an immediate action to detach the subflow. Once the subflow is detached,
643     it is considered finished and you may not modify the subflow anymore.
644     */
645     void detach();
646 
647     /**
648     @brief queries if the subflow is joinable
649 
650     When a subflow is joined or detached, it becomes not joinable.
651     */
652     bool joinable() const;
653 
654     /**
655     @brief runs a given function asynchronously
656 
657     @tparam F callable type
658     @tparam ArgsT parameter types
659 
660     @param f callable object to call
661     @param args parameters to pass to the callable
662 
663     @return a tf::Future that will holds the result of the execution
664 
665     This method is thread-safe and can be called by multiple tasks in the
666     subflow at the same time.
667     The difference to tf::Executor::async is that the created asynchronous task
668     pertains to the subflow.
669     When the subflow joins, all asynchronous tasks created from the subflow
670     are guaranteed to finish before the join.
671     For example:
672 
673     @code{.cpp}
674     std::atomic<int> counter(0);
675     taskflow.empalce([&](tf::Subflow& sf){
676       for(int i=0; i<100; i++) {
677         sf.async([&](){ counter++; });
678       }
679       sf.join();
680       assert(counter == 100);
681     });
682     @endcode
683 
684     You cannot create asynchronous tasks from a detached subflow.
685     Doing this results in undefined behavior.
686     */
687     template <typename F, typename... ArgsT>
688     auto async(F&& f, ArgsT&&... args);
689 
690     /**
691     @brief similar to tf::Subflow::async but did not return a future object
692      */
693     template <typename F, typename... ArgsT>
694     void silent_async(F&& f, ArgsT&&... args);
695 
696   private:
697 
698     Subflow(Executor&, Node*, Graph&);
699 
700     Executor& _executor;
701     Node* _parent;
702 
703     bool _joinable {true};
704 };
705 
706 // Constructor
Subflow(Executor & executor,Node * parent,Graph & graph)707 inline Subflow::Subflow(Executor& executor, Node* parent, Graph& graph) :
708   FlowBuilder {graph},
709   _executor   {executor},
710   _parent     {parent} {
711 }
712 
713 // Function: joined
joinable() const714 inline bool Subflow::joinable() const {
715   return _joinable;
716 }
717 
718 }  // end of namespace tf. ---------------------------------------------------
719 
720 
721