1 #pragma once
2
3 #include "task.hpp"
4
5 /**
6 @file flow_builder.hpp
7 @brief flow builder include file
8 */
9
10 namespace tf {
11
12 /**
13 @class FlowBuilder
14
15 @brief building methods of a task dependency graph
16
17 */
18 class FlowBuilder {
19
20 friend class Executor;
21
22 public:
23
24 /**
25 @brief creates a static task
26
27 @tparam C callable type constructible from std::function<void()>
28
29 @param callable callable to construct a static task
30
31 @return a tf::Task handle
32
33 The following example creates a static task.
34
35 @code{.cpp}
36 tf::Task static_task = taskflow.emplace([](){});
37 @endcode
38
39 Please refer to @ref StaticTasking for details.
40 */
41 template <typename C,
42 std::enable_if_t<is_static_task_v<C>, void>* = nullptr
43 >
44 Task emplace(C&& callable);
45
46 /**
47 @brief creates a dynamic task
48
49 @tparam C callable type constructible from std::function<void(tf::Subflow&)>
50
51 @param callable callable to construct a dynamic task
52
53 @return a tf::Task handle
54
55 The following example creates a dynamic task (tf::Subflow)
56 that spawns two static tasks.
57
58 @code{.cpp}
59 tf::Task dynamic_task = taskflow.emplace([](tf::Subflow& sf){
60 tf::Task static_task1 = sf.emplace([](){});
61 tf::Task static_task2 = sf.emplace([](){});
62 });
63 @endcode
64
65 Please refer to @ref DynamicTasking for details.
66 */
67 template <typename C,
68 std::enable_if_t<is_dynamic_task_v<C>, void>* = nullptr
69 >
70 Task emplace(C&& callable);
71
72 /**
73 @brief creates a condition task
74
75 @tparam C callable type constructible from std::function<int()>
76
77 @param callable callable to construct a condition task
78
79 @return a tf::Task handle
80
81 The following example creates an if-else block using one condition task
82 and three static tasks.
83
84 @code{.cpp}
85 tf::Taskflow taskflow;
86
87 auto [init, cond, yes, no] = taskflow.emplace(
88 [] () { },
89 [] () { return 0; },
90 [] () { std::cout << "yes\n"; },
91 [] () { std::cout << "no\n"; }
92 );
93
94 // executes yes if cond returns 0, or no if cond returns 1
95 cond.precede(yes, no);
96 cond.succeed(init);
97 @endcode
98
99 Please refer to @ref ConditionalTasking for details.
100 */
101 template <typename C,
102 std::enable_if_t<is_condition_task_v<C>, void>* = nullptr
103 >
104 Task emplace(C&& callable);
105
106 /**
107 @brief creates multiple tasks from a list of callable objects
108
109 @tparam C callable types
110
111 @param callables one or multiple callable objects constructible from each task category
112
113 @return a tf::Task handle
114
115 The method returns a tuple of tasks each corresponding to the given
116 callable target. You can use structured binding to get the return tasks
117 one by one.
118 The following example creates four static tasks and assign them to
119 @c A, @c B, @c C, and @c D using structured binding.
120
121 @code{.cpp}
122 auto [A, B, C, D] = taskflow.emplace(
123 [] () { std::cout << "A"; },
124 [] () { std::cout << "B"; },
125 [] () { std::cout << "C"; },
126 [] () { std::cout << "D"; }
127 );
128 @endcode
129 */
130 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
131 auto emplace(C&&... callables);
132
133 /**
134 @brief creates a module task from a taskflow
135
136 @param taskflow a taskflow object for the module
137
138 @return a tf::Task handle
139
140 Please refer to @ref ComposableTasking for details.
141 */
142 Task composed_of(Taskflow& taskflow);
143
144 /**
145 @brief creates a placeholder task
146
147 @return a tf::Task handle
148
149 A placeholder task maps to a node in the taskflow graph, but
150 it does not have any callable work assigned yet.
151 A placeholder task is different from an empty task handle that
152 does not point to any node in a graph.
153
154 @code{.cpp}
155 // create a placeholder task with no callable target assigned
156 tf::Task placeholder = taskflow.placeholder();
157 assert(placeholder.empty() == false && placeholder.has_work() == false);
158
159 // create an empty task handle
160 tf::Task task;
161 assert(task.empty() == true);
162
163 // assign the task handle to the placeholder task
164 task = placeholder;
165 assert(task.empty() == false && task.has_work() == false);
166 @endcode
167 */
168 Task placeholder();
169
170 /**
171 @brief creates a %cudaFlow task on the caller's GPU device context
172
173 @tparam C callable type constructible from @c std::function<void(tf::cudaFlow&)>
174
175 @return a tf::Task handle
176
177 This method is equivalent to calling tf::FlowBuilder::emplace_on(callable, d)
178 where @c d is the caller's device context.
179 The following example creates a %cudaFlow of two kernel tasks, @c task1 and
180 @c task2, where @c task1 runs before @c task2.
181
182 @code{.cpp}
183 taskflow.emplace([&](tf::cudaFlow& cf){
184 // create two kernel tasks
185 tf::cudaTask task1 = cf.kernel(grid1, block1, shm1, kernel1, args1);
186 tf::cudaTask task2 = cf.kernel(grid2, block2, shm2, kernel2, args2);
187
188 // kernel1 runs before kernel2
189 task1.precede(task2);
190 });
191 @endcode
192
193 Please refer to @ref GPUTaskingcudaFlow and @ref GPUTaskingcudaFlowCapturer
194 for details.
195 */
196 template <typename C,
197 std::enable_if_t<is_cudaflow_task_v<C>, void>* = nullptr
198 >
199 Task emplace(C&& callable);
200
201 /**
202 @brief creates a %cudaFlow task on the given device
203
204 @tparam C callable type constructible from std::function<void(tf::cudaFlow&)>
205 @tparam D device type, either @c int or @c std::ref<int> (stateful)
206
207 @return a tf::Task handle
208
209 The following example creates a %cudaFlow of two kernel tasks, @c task1 and
210 @c task2 on GPU @c 2, where @c task1 runs before @c task2
211
212 @code{.cpp}
213 taskflow.emplace_on([&](tf::cudaFlow& cf){
214 // create two kernel tasks
215 tf::cudaTask task1 = cf.kernel(grid1, block1, shm1, kernel1, args1);
216 tf::cudaTask task2 = cf.kernel(grid2, block2, shm2, kernel2, args2);
217
218 // kernel1 runs before kernel2
219 task1.precede(task2);
220 }, 2);
221 @endcode
222 */
223 template <typename C, typename D,
224 std::enable_if_t<is_cudaflow_task_v<C>, void>* = nullptr
225 >
226 Task emplace_on(C&& callable, D&& device);
227
228 /**
229 @brief creates a %syclFlow task on the default queue
230
231 @tparam C callable type constructible from std::function<void(tf::syclFlow&)>
232
233 @param callable a callable that takes a referenced tf::syclFlow object
234
235 @return a tf::Task handle
236
237 The following example creates a %syclFlow on the default queue to submit
238 two kernel tasks, @c task1 and @c task2, where @c task1 runs before @c task2.
239
240 @code{.cpp}
241 taskflow.emplace([&](tf::syclFlow& cf){
242 // create two single-thread kernel tasks
243 tf::syclTask task1 = cf.single_task([](){});
244 tf::syclTask task2 = cf.single_task([](){});
245
246 // kernel1 runs before kernel2
247 task1.precede(task2);
248 });
249 @endcode
250 */
251 template <typename C, std::enable_if_t<is_syclflow_task_v<C>, void>* = nullptr>
252 Task emplace(C&& callable);
253
254 /**
255 @brief creates a %syclFlow task on the given queue
256
257 @tparam C callable type constructible from std::function<void(tf::syclFlow&)>
258 @tparam Q queue type
259
260 @param callable a callable that takes a referenced tf::syclFlow object
261 @param queue a queue of type sycl::queue
262
263 @return a tf::Task handle
264
265 The following example creates a %syclFlow on the given queue to submit
266 two kernel tasks, @c task1 and @c task2, where @c task1 runs before @c task2.
267
268 @code{.cpp}
269 taskflow.emplace_on([&](tf::syclFlow& cf){
270 // create two single-thread kernel tasks
271 tf::syclTask task1 = cf.single_task([](){});
272 tf::syclTask task2 = cf.single_task([](){});
273
274 // kernel1 runs before kernel2
275 task1.precede(task2);
276 }, queue);
277 @endcode
278 */
279 template <typename C, typename Q,
280 std::enable_if_t<is_syclflow_task_v<C>, void>* = nullptr
281 >
282 Task emplace_on(C&& callable, Q&& queue);
283
284 /**
285 @brief adds adjacent dependency links to a linear list of tasks
286
287 @param tasks a vector of tasks
288 */
289 void linearize(std::vector<Task>& tasks);
290
291 /**
292 @brief adds adjacent dependency links to a linear list of tasks
293
294 @param tasks an initializer list of tasks
295 */
296 void linearize(std::initializer_list<Task> tasks);
297
298 // ------------------------------------------------------------------------
299 // parallel iterations
300 // ------------------------------------------------------------------------
301
302 /**
303 @brief constructs a STL-styled parallel-for task
304
305 @tparam B beginning iterator type
306 @tparam E ending iterator type
307 @tparam C callable type
308
309 @param first iterator to the beginning (inclusive)
310 @param last iterator to the end (exclusive)
311 @param callable a callable object to apply to the dereferenced iterator
312
313 @return a tf::Task handle
314
315 The task spawns a subflow that applies the callable object to each object obtained by dereferencing every iterator in the range <tt>[first, last)</tt>.
316 This method is equivalent to the parallel execution of the following loop:
317
318 @code{.cpp}
319 for(auto itr=first; itr!=last; itr++) {
320 callable(*itr);
321 }
322 @endcode
323
324 Arguments templated to enable stateful passing using std::reference_wrapper.
325 The callable needs to take a single argument of
326 the dereferenced iterator type.
327
328 Please refer to @ref ParallelIterations for details.
329 */
330 template <typename B, typename E, typename C>
331 Task for_each(B&& first, E&& last, C&& callable);
332
333 /**
334 @brief constructs an index-based parallel-for task
335
336 @tparam B beginning index type (must be integral)
337 @tparam E ending index type (must be integral)
338 @tparam S step type (must be integral)
339 @tparam C callable type
340
341 @param first index of the beginning (inclusive)
342 @param last index of the end (exclusive)
343 @param step step size
344 @param callable a callable object to apply to each valid index
345
346 @return a tf::Task handle
347
348 The task spawns a subflow that applies the callable object to each index in the range <tt>[first, last)</tt> with the step size.
349
350 This method is equivalent to the parallel execution of the following loop:
351
352 @code{.cpp}
353 // case 1: step size is positive
354 for(auto i=first; i<last; i+=step) {
355 callable(i);
356 }
357
358 // case 2: step size is negative
359 for(auto i=first, i>last; i+=step) {
360 callable(i);
361 }
362 @endcode
363
364 Arguments are templated to enable stateful passing using std::reference_wrapper.
365 The callable needs to take a single argument of the integral index type.
366
367 Please refer to @ref ParallelIterations for details.
368 */
369 template <typename B, typename E, typename S, typename C>
370 Task for_each_index(B&& first, E&& last, S&& step, C&& callable);
371
372 // ------------------------------------------------------------------------
373 // reduction
374 // ------------------------------------------------------------------------
375
376 /**
377 @brief constructs a STL-styled parallel-reduce task
378
379 @tparam B beginning iterator type
380 @tparam E ending iterator type
381 @tparam T result type
382 @tparam O binary reducer type
383
384 @param first iterator to the beginning (inclusive)
385 @param last iterator to the end (exclusive)
386 @param init initial value of the reduction and the storage for the reduced result
387 @param bop binary operator that will be applied
388
389 @return a tf::Task handle
390
391 The task spawns a subflow to perform parallel reduction over @c init and the elements in the range <tt>[first, last)</tt>. The reduced result is store in @c init.
392
393 This method is equivalent to the parallel execution of the following loop:
394
395 @code{.cpp}
396 for(auto itr=first; itr!=last; itr++) {
397 init = bop(init, *itr);
398 }
399 @endcode
400
401 Arguments are templated to enable stateful passing using std::reference_wrapper.
402
403 Please refer to @ref ParallelReduction for details.
404 */
405 template <typename B, typename E, typename T, typename O>
406 Task reduce(B&& first, E&& last, T& init, O&& bop);
407
408 // ------------------------------------------------------------------------
409 // transfrom and reduction
410 // ------------------------------------------------------------------------
411
412 /**
413 @brief constructs a STL-styled parallel transform-reduce task
414
415 @tparam B beginning iterator type
416 @tparam E ending iterator type
417 @tparam T result type
418 @tparam BOP binary reducer type
419 @tparam UOP unary transformion type
420
421 @param first iterator to the beginning (inclusive)
422 @param last iterator to the end (exclusive)
423 @param init initial value of the reduction and the storage for the reduced result
424 @param bop binary operator that will be applied in unspecified order to the results of @c uop
425 @param uop unary operator that will be applied to transform each element in the range to the result type
426
427 @return a tf::Task handle
428
429 The task spawns a subflow to perform parallel reduction over @c init and the transformed elements in the range <tt>[first, last)</tt>.
430 The reduced result is store in @c init.
431
432 This method is equivalent to the parallel execution of the following loop:
433
434 @code{.cpp}
435 for(auto itr=first; itr!=last; itr++) {
436 init = bop(init, uop(*itr));
437 }
438 @endcode
439
440 Arguments are templated to enable stateful passing using std::reference_wrapper.
441
442 Please refer to @ref ParallelReduction for details.
443 */
444 template <typename B, typename E, typename T, typename BOP, typename UOP>
445 Task transform_reduce(B&& first, E&& last, T& init, BOP&& bop, UOP&& uop);
446
447 // ------------------------------------------------------------------------
448 // sort
449 // ------------------------------------------------------------------------
450
451 /**
452 @brief constructs a dynamic task to perform STL-styled parallel sort
453
454 @tparam B beginning iterator type (random-accessible)
455 @tparam E ending iterator type (random-accessible)
456 @tparam C comparator type
457
458 @param first iterator to the beginning (inclusive)
459 @param last iterator to the end (exclusive)
460 @param cmp comparison function object
461
462 The task spawns a subflow to parallelly sort elements in the range
463 <tt>[first, last)</tt>.
464
465 Arguments are templated to enable stateful passing using std::reference_wrapper.
466
467 Please refer to @ref ParallelSort for details.
468 */
469 template <typename B, typename E, typename C>
470 Task sort(B&& first, E&& last, C&& cmp);
471
472 /**
473 @brief constructs a dynamic task to perform STL-styled parallel sort using
474 the @c std::less<T> comparator, where @c T is the element type
475
476 @tparam B beginning iterator type (random-accessible)
477 @tparam E ending iterator type (random-accessible)
478
479 @param first iterator to the beginning (inclusive)
480 @param last iterator to the end (exclusive)
481
482 The task spawns a subflow to parallelly sort elements in the range
483 <tt>[first, last)</tt> using the @c std::less<T> comparator,
484 where @c T is the dereferenced iterator type.
485
486 Arguments are templated to enable stateful passing using std::reference_wrapper.
487
488 Please refer to @ref ParallelSort for details.
489 */
490 template <typename B, typename E>
491 Task sort(B&& first, E&& last);
492
493 protected:
494
495 /**
496 @brief constructs a flow builder with a graph
497 */
498 FlowBuilder(Graph& graph);
499
500 /**
501 @brief associated graph object
502 */
503 Graph& _graph;
504
505 private:
506
507 template <typename L>
508 void _linearize(L&);
509 };
510
511 // Constructor
FlowBuilder(Graph & graph)512 inline FlowBuilder::FlowBuilder(Graph& graph) :
513 _graph {graph} {
514 }
515
516 // Function: emplace
517 template <typename C, std::enable_if_t<is_static_task_v<C>, void>*>
emplace(C && c)518 Task FlowBuilder::emplace(C&& c) {
519 return Task(_graph.emplace_back(
520 std::in_place_type_t<Node::Static>{}, std::forward<C>(c)
521 ));
522 }
523
524 // Function: emplace
525 template <typename C, std::enable_if_t<is_dynamic_task_v<C>, void>*>
emplace(C && c)526 Task FlowBuilder::emplace(C&& c) {
527 return Task(_graph.emplace_back(
528 std::in_place_type_t<Node::Dynamic>{}, std::forward<C>(c)
529 ));
530 }
531
532 // Function: emplace
533 template <typename C, std::enable_if_t<is_condition_task_v<C>, void>*>
emplace(C && c)534 Task FlowBuilder::emplace(C&& c) {
535 return Task(_graph.emplace_back(
536 std::in_place_type_t<Node::Condition>{}, std::forward<C>(c)
537 ));
538 }
539
540 // Function: emplace
541 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
emplace(C &&...cs)542 auto FlowBuilder::emplace(C&&... cs) {
543 return std::make_tuple(emplace(std::forward<C>(cs))...);
544 }
545
546 // Function: composed_of
composed_of(Taskflow & taskflow)547 inline Task FlowBuilder::composed_of(Taskflow& taskflow) {
548 auto node = _graph.emplace_back(
549 std::in_place_type_t<Node::Module>{}, &taskflow
550 );
551 return Task(node);
552 }
553
554 // Function: placeholder
placeholder()555 inline Task FlowBuilder::placeholder() {
556 auto node = _graph.emplace_back();
557 return Task(node);
558 }
559
560 // Procedure: _linearize
561 template <typename L>
_linearize(L & keys)562 void FlowBuilder::_linearize(L& keys) {
563
564 auto itr = keys.begin();
565 auto end = keys.end();
566
567 if(itr == end) {
568 return;
569 }
570
571 auto nxt = itr;
572
573 for(++nxt; nxt != end; ++nxt, ++itr) {
574 itr->_node->_precede(nxt->_node);
575 }
576 }
577
578 // Procedure: linearize
linearize(std::vector<Task> & keys)579 inline void FlowBuilder::linearize(std::vector<Task>& keys) {
580 _linearize(keys);
581 }
582
583 // Procedure: linearize
linearize(std::initializer_list<Task> keys)584 inline void FlowBuilder::linearize(std::initializer_list<Task> keys) {
585 _linearize(keys);
586 }
587
588 // ----------------------------------------------------------------------------
589
590 /**
591 @class Subflow
592
593 @brief class to construct a subflow graph from the execution of a dynamic task
594
595 By default, a subflow automatically @em joins its parent node.
596 You may explicitly join or detach a subflow by calling tf::Subflow::join
597 or tf::Subflow::detach, respectively.
598 The following example creates a taskflow graph that spawns a subflow from
599 the execution of task @c B, and the subflow contains three tasks, @c B1,
600 @c B2, and @c B3, where @c B3 runs after @c B1 and @c B2.
601
602 @code{.cpp}
603 // create three regular tasks
604 tf::Task A = taskflow.emplace([](){}).name("A");
605 tf::Task C = taskflow.emplace([](){}).name("C");
606 tf::Task D = taskflow.emplace([](){}).name("D");
607
608 // create a subflow graph (dynamic tasking)
609 tf::Task B = taskflow.emplace([] (tf::Subflow& subflow) {
610 tf::Task B1 = subflow.emplace([](){}).name("B1");
611 tf::Task B2 = subflow.emplace([](){}).name("B2");
612 tf::Task B3 = subflow.emplace([](){}).name("B3");
613 B1.precede(B3);
614 B2.precede(B3);
615 }).name("B");
616
617 A.precede(B); // B runs after A
618 A.precede(C); // C runs after A
619 B.precede(D); // D runs after B
620 C.precede(D); // D runs after C
621 @endcode
622
623 */
624 class Subflow : public FlowBuilder {
625
626 friend class Executor;
627 friend class FlowBuilder;
628
629 public:
630
631 /**
632 @brief enables the subflow to join its parent task
633
634 Performs an immediate action to join the subflow. Once the subflow is joined,
635 it is considered finished and you may not modify the subflow anymore.
636 */
637 void join();
638
639 /**
640 @brief enables the subflow to detach from its parent task
641
642 Performs an immediate action to detach the subflow. Once the subflow is detached,
643 it is considered finished and you may not modify the subflow anymore.
644 */
645 void detach();
646
647 /**
648 @brief queries if the subflow is joinable
649
650 When a subflow is joined or detached, it becomes not joinable.
651 */
652 bool joinable() const;
653
654 /**
655 @brief runs a given function asynchronously
656
657 @tparam F callable type
658 @tparam ArgsT parameter types
659
660 @param f callable object to call
661 @param args parameters to pass to the callable
662
663 @return a tf::Future that will holds the result of the execution
664
665 This method is thread-safe and can be called by multiple tasks in the
666 subflow at the same time.
667 The difference to tf::Executor::async is that the created asynchronous task
668 pertains to the subflow.
669 When the subflow joins, all asynchronous tasks created from the subflow
670 are guaranteed to finish before the join.
671 For example:
672
673 @code{.cpp}
674 std::atomic<int> counter(0);
675 taskflow.empalce([&](tf::Subflow& sf){
676 for(int i=0; i<100; i++) {
677 sf.async([&](){ counter++; });
678 }
679 sf.join();
680 assert(counter == 100);
681 });
682 @endcode
683
684 You cannot create asynchronous tasks from a detached subflow.
685 Doing this results in undefined behavior.
686 */
687 template <typename F, typename... ArgsT>
688 auto async(F&& f, ArgsT&&... args);
689
690 /**
691 @brief similar to tf::Subflow::async but did not return a future object
692 */
693 template <typename F, typename... ArgsT>
694 void silent_async(F&& f, ArgsT&&... args);
695
696 private:
697
698 Subflow(Executor&, Node*, Graph&);
699
700 Executor& _executor;
701 Node* _parent;
702
703 bool _joinable {true};
704 };
705
706 // Constructor
Subflow(Executor & executor,Node * parent,Graph & graph)707 inline Subflow::Subflow(Executor& executor, Node* parent, Graph& graph) :
708 FlowBuilder {graph},
709 _executor {executor},
710 _parent {parent} {
711 }
712
713 // Function: joined
joinable() const714 inline bool Subflow::joinable() const {
715 return _joinable;
716 }
717
718 } // end of namespace tf. ---------------------------------------------------
719
720
721