1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19
20 #include <atomic>
21 #include <memory>
22 #include <type_traits>
23
24 #include "detail/_config.h"
25 #include "detail/_namespace_injection.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
30 #include "detail/_pipeline_filters.h"
31 #include "detail/_task.h"
32 #include "detail/_small_object_pool.h"
33 #include "cache_aligned_allocator.h"
34 #include "detail/_exception.h"
35 #include "detail/_template_helpers.h"
36 #include "detail/_aggregator.h"
37 #include "detail/_allocator_traits.h"
38 #include "detail/_utils.h"
39 #include "profiling.h"
40 #include "task_arena.h"
41
42 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
43 #if __INTEL_COMPILER
44 // Disabled warning "routine is both inline and noinline"
45 #pragma warning (push)
46 #pragma warning( disable: 2196 )
47 #endif
48 #define __TBB_NOINLINE_SYM __attribute__((noinline))
49 #else
50 #define __TBB_NOINLINE_SYM
51 #endif
52
53 #include <tuple>
54 #include <list>
55 #include <queue>
56 #if __TBB_CPP20_CONCEPTS_PRESENT
57 #include <concepts>
58 #endif
59
60 /** @file
61 \brief The graph related classes and functions
62
63 There are some applications that best express dependencies as messages
64 passed between nodes in a graph. These messages may contain data or
65 simply act as signals that a predecessors has completed. The graph
66 class and its associated node classes can be used to express such
67 applications.
68 */
69
70 namespace tbb {
71 namespace detail {
72
73 namespace d1 {
74
75 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
76 enum concurrency { unlimited = 0, serial = 1 };
77
78 //! A generic null type
79 struct null_type {};
80
81 //! An empty class used for messages that mean "I'm done"
82 class continue_msg {};
83
84 } // namespace d1
85
86 #if __TBB_CPP20_CONCEPTS_PRESENT
87 namespace d0 {
88
89 template <typename ReturnType, typename OutputType>
90 concept node_body_return_type = std::same_as<OutputType, tbb::detail::d1::continue_msg> ||
91 std::same_as<OutputType, ReturnType>;
92
93 template <typename Body, typename Output>
94 concept continue_node_body = std::copy_constructible<Body> &&
requires(Body & body,const tbb::detail::d1::continue_msg & v)95 requires( Body& body, const tbb::detail::d1::continue_msg& v ) {
96 { body(v) } -> node_body_return_type<Output>;
97 };
98
99 template <typename Body, typename Input, typename Output>
100 concept function_node_body = std::copy_constructible<Body> &&
requires(Body & body,const Input & v)101 requires( Body& body, const Input& v ) {
102 { body(v) } -> node_body_return_type<Output>;
103 };
104
105 template <typename FunctionObject, typename Input, typename Key>
106 concept join_node_function_object = std::copy_constructible<FunctionObject> &&
requires(FunctionObject & func,const Input & v)107 requires( FunctionObject& func, const Input& v ) {
108 { func(v) } -> adaptive_same_as<Key>;
109 };
110
111 template <typename Body, typename Output>
112 concept input_node_body = std::copy_constructible<Body> &&
requires(Body & body,tbb::detail::d1::flow_control & fc)113 requires( Body& body, tbb::detail::d1::flow_control& fc ) {
114 { body(fc) } -> adaptive_same_as<Output>;
115 };
116
117 template <typename Body, typename Input, typename OutputPortsType>
118 concept multifunction_node_body = std::copy_constructible<Body> &&
requires(Body & body,const Input & v,OutputPortsType & p)119 requires( Body& body, const Input& v, OutputPortsType& p ) {
120 body(v, p);
121 };
122
123 template <typename Sequencer, typename Value>
124 concept sequencer = std::copy_constructible<Sequencer> &&
requires(Sequencer & seq,const Value & value)125 requires( Sequencer& seq, const Value& value ) {
126 { seq(value) } -> adaptive_same_as<std::size_t>;
127 };
128
129 template <typename Body, typename Input, typename GatewayType>
130 concept async_node_body = std::copy_constructible<Body> &&
requires(Body & body,const Input & v,GatewayType & gateway)131 requires( Body& body, const Input& v, GatewayType& gateway ) {
132 body(v, gateway);
133 };
134
135 } // namespace d0
136 #endif // __TBB_CPP20_CONCEPTS_PRESENT
137
138 namespace d1 {
139
140 //! Forward declaration section
141 template< typename T > class sender;
142 template< typename T > class receiver;
143 class continue_receiver;
144
145 template< typename T, typename U > class limiter_node; // needed for resetting decrementer
146
147 template<typename T, typename M> class successor_cache;
148 template<typename T, typename M> class broadcast_cache;
149 template<typename T, typename M> class round_robin_cache;
150 template<typename T, typename M> class predecessor_cache;
151 template<typename T, typename M> class reservable_predecessor_cache;
152
153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
154 namespace order {
155 struct following;
156 struct preceding;
157 }
158 template<typename Order, typename... Args> struct node_set;
159 #endif
160
161
162 } // namespace d1
163 } // namespace detail
164 } // namespace tbb
165
166 //! The graph class
167 #include "detail/_flow_graph_impl.h"
168
169 namespace tbb {
170 namespace detail {
171 namespace d1 {
172
order_tasks(graph_task * first,graph_task * second)173 static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
174 if (second->priority > first->priority)
175 return std::make_pair(second, first);
176 return std::make_pair(first, second);
177 }
178
179 // submit task if necessary. Returns the non-enqueued task if there is one.
combine_tasks(graph & g,graph_task * left,graph_task * right)180 static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
181 // if no RHS task, don't change left.
182 if (right == NULL) return left;
183 // right != NULL
184 if (left == NULL) return right;
185 if (left == SUCCESSFULLY_ENQUEUED) return right;
186 // left contains a task
187 if (right != SUCCESSFULLY_ENQUEUED) {
188 // both are valid tasks
189 auto tasks_pair = order_tasks(left, right);
190 spawn_in_graph_arena(g, *tasks_pair.first);
191 return tasks_pair.second;
192 }
193 return left;
194 }
195
196 //! Pure virtual template class that defines a sender of messages of type T
197 template< typename T >
198 class sender {
199 public:
~sender()200 virtual ~sender() {}
201
202 //! Request an item from the sender
try_get(T &)203 virtual bool try_get( T & ) { return false; }
204
205 //! Reserves an item in the sender
try_reserve(T &)206 virtual bool try_reserve( T & ) { return false; }
207
208 //! Releases the reserved item
try_release()209 virtual bool try_release( ) { return false; }
210
211 //! Consumes the reserved item
try_consume()212 virtual bool try_consume( ) { return false; }
213
214 protected:
215 //! The output type of this sender
216 typedef T output_type;
217
218 //! The successor type for this node
219 typedef receiver<T> successor_type;
220
221 //! Add a new successor to this node
222 virtual bool register_successor( successor_type &r ) = 0;
223
224 //! Removes a successor from this node
225 virtual bool remove_successor( successor_type &r ) = 0;
226
227 template<typename C>
228 friend bool register_successor(sender<C>& s, receiver<C>& r);
229
230 template<typename C>
231 friend bool remove_successor (sender<C>& s, receiver<C>& r);
232 }; // class sender<T>
233
234 template<typename C>
register_successor(sender<C> & s,receiver<C> & r)235 bool register_successor(sender<C>& s, receiver<C>& r) {
236 return s.register_successor(r);
237 }
238
239 template<typename C>
remove_successor(sender<C> & s,receiver<C> & r)240 bool remove_successor(sender<C>& s, receiver<C>& r) {
241 return s.remove_successor(r);
242 }
243
244 //! Pure virtual template class that defines a receiver of messages of type T
245 template< typename T >
246 class receiver {
247 public:
248 //! Destructor
~receiver()249 virtual ~receiver() {}
250
251 //! Put an item to the receiver
try_put(const T & t)252 bool try_put( const T& t ) {
253 graph_task *res = try_put_task(t);
254 if (!res) return false;
255 if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
256 return true;
257 }
258
259 //! put item to successor; return task to run the successor if possible.
260 protected:
261 //! The input type of this receiver
262 typedef T input_type;
263
264 //! The predecessor type for this node
265 typedef sender<T> predecessor_type;
266
267 template< typename R, typename B > friend class run_and_put_task;
268 template< typename X, typename Y > friend class broadcast_cache;
269 template< typename X, typename Y > friend class round_robin_cache;
270 virtual graph_task *try_put_task(const T& t) = 0;
271 virtual graph& graph_reference() const = 0;
272
273 template<typename TT, typename M> friend class successor_cache;
is_continue_receiver()274 virtual bool is_continue_receiver() { return false; }
275
276 // TODO revamp: reconsider the inheritance and move node priority out of receiver
priority()277 virtual node_priority_t priority() const { return no_priority; }
278
279 //! Add a predecessor to the node
register_predecessor(predecessor_type &)280 virtual bool register_predecessor( predecessor_type & ) { return false; }
281
282 //! Remove a predecessor from the node
remove_predecessor(predecessor_type &)283 virtual bool remove_predecessor( predecessor_type & ) { return false; }
284
285 template <typename C>
286 friend bool register_predecessor(receiver<C>& r, sender<C>& s);
287 template <typename C>
288 friend bool remove_predecessor (receiver<C>& r, sender<C>& s);
289 }; // class receiver<T>
290
291 template <typename C>
register_predecessor(receiver<C> & r,sender<C> & s)292 bool register_predecessor(receiver<C>& r, sender<C>& s) {
293 return r.register_predecessor(s);
294 }
295
296 template <typename C>
remove_predecessor(receiver<C> & r,sender<C> & s)297 bool remove_predecessor(receiver<C>& r, sender<C>& s) {
298 return r.remove_predecessor(s);
299 }
300
301 //! Base class for receivers of completion messages
302 /** These receivers automatically reset, but cannot be explicitly waited on */
303 class continue_receiver : public receiver< continue_msg > {
304 protected:
305
306 //! Constructor
continue_receiver(int number_of_predecessors,node_priority_t a_priority)307 explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
308 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
309 my_current_count = 0;
310 my_priority = a_priority;
311 }
312
313 //! Copy constructor
continue_receiver(const continue_receiver & src)314 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
315 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
316 my_current_count = 0;
317 my_priority = src.my_priority;
318 }
319
320 //! Increments the trigger threshold
register_predecessor(predecessor_type &)321 bool register_predecessor( predecessor_type & ) override {
322 spin_mutex::scoped_lock l(my_mutex);
323 ++my_predecessor_count;
324 return true;
325 }
326
327 //! Decrements the trigger threshold
328 /** Does not check to see if the removal of the predecessor now makes the current count
329 exceed the new threshold. So removing a predecessor while the graph is active can cause
330 unexpected results. */
remove_predecessor(predecessor_type &)331 bool remove_predecessor( predecessor_type & ) override {
332 spin_mutex::scoped_lock l(my_mutex);
333 --my_predecessor_count;
334 return true;
335 }
336
337 //! The input type
338 typedef continue_msg input_type;
339
340 //! The predecessor type for this node
341 typedef receiver<input_type>::predecessor_type predecessor_type;
342
343 template< typename R, typename B > friend class run_and_put_task;
344 template<typename X, typename Y> friend class broadcast_cache;
345 template<typename X, typename Y> friend class round_robin_cache;
346 // execute body is supposed to be too small to create a task for.
try_put_task(const input_type &)347 graph_task* try_put_task( const input_type & ) override {
348 {
349 spin_mutex::scoped_lock l(my_mutex);
350 if ( ++my_current_count < my_predecessor_count )
351 return SUCCESSFULLY_ENQUEUED;
352 else
353 my_current_count = 0;
354 }
355 graph_task* res = execute();
356 return res? res : SUCCESSFULLY_ENQUEUED;
357 }
358
359 spin_mutex my_mutex;
360 int my_predecessor_count;
361 int my_current_count;
362 int my_initial_predecessor_count;
363 node_priority_t my_priority;
364 // the friend declaration in the base class did not eliminate the "protected class"
365 // error in gcc 4.1.2
366 template<typename U, typename V> friend class limiter_node;
367
reset_receiver(reset_flags f)368 virtual void reset_receiver( reset_flags f ) {
369 my_current_count = 0;
370 if (f & rf_clear_edges) {
371 my_predecessor_count = my_initial_predecessor_count;
372 }
373 }
374
375 //! Does whatever should happen when the threshold is reached
376 /** This should be very fast or else spawn a task. This is
377 called while the sender is blocked in the try_put(). */
378 virtual graph_task* execute() = 0;
379 template<typename TT, typename M> friend class successor_cache;
is_continue_receiver()380 bool is_continue_receiver() override { return true; }
381
priority()382 node_priority_t priority() const override { return my_priority; }
383 }; // class continue_receiver
384
385 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
386 template <typename K, typename T>
key_from_message(const T & t)387 K key_from_message( const T &t ) {
388 return t.key();
389 }
390 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
391
392 } // d1
393 } // detail
394 } // tbb
395
396 #include "detail/_flow_graph_trace_impl.h"
397 #include "detail/_hash_compare.h"
398
399 namespace tbb {
400 namespace detail {
401 namespace d1 {
402
403 #include "detail/_flow_graph_body_impl.h"
404 #include "detail/_flow_graph_cache_impl.h"
405 #include "detail/_flow_graph_types_impl.h"
406
407 using namespace graph_policy_namespace;
408
409 template <typename C, typename N>
graph_iterator(C * g,bool begin)410 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
411 {
412 if (begin) current_node = my_graph->my_nodes;
413 //else it is an end iterator by default
414 }
415
416 template <typename C, typename N>
417 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
418 __TBB_ASSERT(current_node, "graph_iterator at end");
419 return *operator->();
420 }
421
422 template <typename C, typename N>
423 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
424 return current_node;
425 }
426
427 template <typename C, typename N>
internal_forward()428 void graph_iterator<C,N>::internal_forward() {
429 if (current_node) current_node = current_node->next;
430 }
431
432 //! Constructs a graph with isolated task_group_context
graph()433 inline graph::graph() : my_wait_context(0), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
434 prepare_task_arena();
435 own_context = true;
436 cancelled = false;
437 caught_exception = false;
438 my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
439 fgt_graph(this);
440 my_is_active = true;
441 }
442
graph(task_group_context & use_this_context)443 inline graph::graph(task_group_context& use_this_context) :
444 my_wait_context(0), my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
445 prepare_task_arena();
446 own_context = false;
447 cancelled = false;
448 caught_exception = false;
449 fgt_graph(this);
450 my_is_active = true;
451 }
452
~graph()453 inline graph::~graph() {
454 wait_for_all();
455 if (own_context) {
456 my_context->~task_group_context();
457 r1::cache_aligned_deallocate(my_context);
458 }
459 delete my_task_arena;
460 }
461
reserve_wait()462 inline void graph::reserve_wait() {
463 my_wait_context.reserve();
464 fgt_reserve_wait(this);
465 }
466
release_wait()467 inline void graph::release_wait() {
468 fgt_release_wait(this);
469 my_wait_context.release();
470 }
471
register_node(graph_node * n)472 inline void graph::register_node(graph_node *n) {
473 n->next = NULL;
474 {
475 spin_mutex::scoped_lock lock(nodelist_mutex);
476 n->prev = my_nodes_last;
477 if (my_nodes_last) my_nodes_last->next = n;
478 my_nodes_last = n;
479 if (!my_nodes) my_nodes = n;
480 }
481 }
482
remove_node(graph_node * n)483 inline void graph::remove_node(graph_node *n) {
484 {
485 spin_mutex::scoped_lock lock(nodelist_mutex);
486 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
487 if (n->prev) n->prev->next = n->next;
488 if (n->next) n->next->prev = n->prev;
489 if (my_nodes_last == n) my_nodes_last = n->prev;
490 if (my_nodes == n) my_nodes = n->next;
491 }
492 n->prev = n->next = NULL;
493 }
494
reset(reset_flags f)495 inline void graph::reset( reset_flags f ) {
496 // reset context
497 deactivate_graph(*this);
498
499 my_context->reset();
500 cancelled = false;
501 caught_exception = false;
502 // reset all the nodes comprising the graph
503 for(iterator ii = begin(); ii != end(); ++ii) {
504 graph_node *my_p = &(*ii);
505 my_p->reset_node(f);
506 }
507 // Reattach the arena. Might be useful to run the graph in a particular task_arena
508 // while not limiting graph lifetime to a single task_arena::execute() call.
509 prepare_task_arena( /*reinit=*/true );
510 activate_graph(*this);
511 }
512
cancel()513 inline void graph::cancel() {
514 my_context->cancel_group_execution();
515 }
516
begin()517 inline graph::iterator graph::begin() { return iterator(this, true); }
518
end()519 inline graph::iterator graph::end() { return iterator(this, false); }
520
begin()521 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
522
end()523 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
524
cbegin()525 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
526
cend()527 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
528
graph_node(graph & g)529 inline graph_node::graph_node(graph& g) : my_graph(g) {
530 my_graph.register_node(this);
531 }
532
~graph_node()533 inline graph_node::~graph_node() {
534 my_graph.remove_node(this);
535 }
536
537 #include "detail/_flow_graph_node_impl.h"
538
539
540 //! An executable node that acts as a source, i.e. it has no predecessors
541
542 template < typename Output >
__TBB_requires(std::copyable<Output>)543 __TBB_requires(std::copyable<Output>)
544 class input_node : public graph_node, public sender< Output > {
545 public:
546 //! The type of the output message, which is complete
547 typedef Output output_type;
548
549 //! The type of successors of this node
550 typedef typename sender<output_type>::successor_type successor_type;
551
552 // Input node has no input type
553 typedef null_type input_type;
554
555 //! Constructor for a node with a successor
556 template< typename Body >
557 __TBB_requires(input_node_body<Body, Output>)
558 __TBB_NOINLINE_SYM input_node( graph &g, Body body )
559 : graph_node(g), my_active(false)
560 , my_body( new input_body_leaf< output_type, Body>(body) )
561 , my_init_body( new input_body_leaf< output_type, Body>(body) )
562 , my_successors(this), my_reserved(false), my_has_cached_item(false)
563 {
564 fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
565 static_cast<sender<output_type> *>(this), this->my_body);
566 }
567
568 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
569 template <typename Body, typename... Successors>
570 __TBB_requires(input_node_body<Body, Output>)
571 input_node( const node_set<order::preceding, Successors...>& successors, Body body )
572 : input_node(successors.graph_reference(), body)
573 {
574 make_edges(*this, successors);
575 }
576 #endif
577
578 //! Copy constructor
579 __TBB_NOINLINE_SYM input_node( const input_node& src )
580 : graph_node(src.my_graph), sender<Output>()
581 , my_active(false)
582 , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
583 , my_successors(this), my_reserved(false), my_has_cached_item(false)
584 {
585 fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
586 static_cast<sender<output_type> *>(this), this->my_body);
587 }
588
589 //! The destructor
590 ~input_node() { delete my_body; delete my_init_body; }
591
592 //! Add a new successor to this node
593 bool register_successor( successor_type &r ) override {
594 spin_mutex::scoped_lock lock(my_mutex);
595 my_successors.register_successor(r);
596 if ( my_active )
597 spawn_put();
598 return true;
599 }
600
601 //! Removes a successor from this node
602 bool remove_successor( successor_type &r ) override {
603 spin_mutex::scoped_lock lock(my_mutex);
604 my_successors.remove_successor(r);
605 return true;
606 }
607
608 //! Request an item from the node
609 bool try_get( output_type &v ) override {
610 spin_mutex::scoped_lock lock(my_mutex);
611 if ( my_reserved )
612 return false;
613
614 if ( my_has_cached_item ) {
615 v = my_cached_item;
616 my_has_cached_item = false;
617 return true;
618 }
619 // we've been asked to provide an item, but we have none. enqueue a task to
620 // provide one.
621 if ( my_active )
622 spawn_put();
623 return false;
624 }
625
626 //! Reserves an item.
627 bool try_reserve( output_type &v ) override {
628 spin_mutex::scoped_lock lock(my_mutex);
629 if ( my_reserved ) {
630 return false;
631 }
632
633 if ( my_has_cached_item ) {
634 v = my_cached_item;
635 my_reserved = true;
636 return true;
637 } else {
638 return false;
639 }
640 }
641
642 //! Release a reserved item.
643 /** true = item has been released and so remains in sender, dest must request or reserve future items */
644 bool try_release( ) override {
645 spin_mutex::scoped_lock lock(my_mutex);
646 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
647 my_reserved = false;
648 if(!my_successors.empty())
649 spawn_put();
650 return true;
651 }
652
653 //! Consumes a reserved item
654 bool try_consume( ) override {
655 spin_mutex::scoped_lock lock(my_mutex);
656 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
657 my_reserved = false;
658 my_has_cached_item = false;
659 if ( !my_successors.empty() ) {
660 spawn_put();
661 }
662 return true;
663 }
664
665 //! Activates a node that was created in the inactive state
666 void activate() {
667 spin_mutex::scoped_lock lock(my_mutex);
668 my_active = true;
669 if (!my_successors.empty())
670 spawn_put();
671 }
672
673 template<typename Body>
674 Body copy_function_object() {
675 input_body<output_type> &body_ref = *this->my_body;
676 return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
677 }
678
679 protected:
680
681 //! resets the input_node to its initial state
682 void reset_node( reset_flags f) override {
683 my_active = false;
684 my_reserved = false;
685 my_has_cached_item = false;
686
687 if(f & rf_clear_edges) my_successors.clear();
688 if(f & rf_reset_bodies) {
689 input_body<output_type> *tmp = my_init_body->clone();
690 delete my_body;
691 my_body = tmp;
692 }
693 }
694
695 private:
696 spin_mutex my_mutex;
697 bool my_active;
698 input_body<output_type> *my_body;
699 input_body<output_type> *my_init_body;
700 broadcast_cache< output_type > my_successors;
701 bool my_reserved;
702 bool my_has_cached_item;
703 output_type my_cached_item;
704
705 // used by apply_body_bypass, can invoke body of node.
706 bool try_reserve_apply_body(output_type &v) {
707 spin_mutex::scoped_lock lock(my_mutex);
708 if ( my_reserved ) {
709 return false;
710 }
711 if ( !my_has_cached_item ) {
712 flow_control control;
713
714 fgt_begin_body( my_body );
715
716 my_cached_item = (*my_body)(control);
717 my_has_cached_item = !control.is_pipeline_stopped;
718
719 fgt_end_body( my_body );
720 }
721 if ( my_has_cached_item ) {
722 v = my_cached_item;
723 my_reserved = true;
724 return true;
725 } else {
726 return false;
727 }
728 }
729
730 graph_task* create_put_task() {
731 small_object_allocator allocator{};
732 typedef input_node_task_bypass< input_node<output_type> > task_type;
733 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
734 my_graph.reserve_wait();
735 return t;
736 }
737
738 //! Spawns a task that applies the body
739 void spawn_put( ) {
740 if(is_graph_active(this->my_graph)) {
741 spawn_in_graph_arena(this->my_graph, *create_put_task());
742 }
743 }
744
745 friend class input_node_task_bypass< input_node<output_type> >;
746 //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
747 graph_task* apply_body_bypass( ) {
748 output_type v;
749 if ( !try_reserve_apply_body(v) )
750 return NULL;
751
752 graph_task *last_task = my_successors.try_put_task(v);
753 if ( last_task )
754 try_consume();
755 else
756 try_release();
757 return last_task;
758 }
759 }; // class input_node
760
761 //! Implements a function node that supports Input -> Output
762 template<typename Input, typename Output = continue_msg, typename Policy = queueing>
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input> && std::copy_constructible<Output>)763 __TBB_requires(std::default_initializable<Input> &&
764 std::copy_constructible<Input> &&
765 std::copy_constructible<Output>)
766 class function_node
767 : public graph_node
768 , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
769 , public function_output<Output>
770 {
771 typedef cache_aligned_allocator<Input> internals_allocator;
772
773 public:
774 typedef Input input_type;
775 typedef Output output_type;
776 typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
777 typedef function_input_queue<input_type, internals_allocator> input_queue_type;
778 typedef function_output<output_type> fOutput_type;
779 typedef typename input_impl_type::predecessor_type predecessor_type;
780 typedef typename fOutput_type::successor_type successor_type;
781
782 using input_impl_type::my_predecessors;
783
784 //! Constructor
785 // input_queue_type is allocated here, but destroyed in the function_input_base.
786 // TODO: pass the graph_buffer_policy to the function_input_base so it can all
787 // be done in one place. This would be an interface-breaking change.
788 template< typename Body >
789 __TBB_requires(function_node_body<Body, Input, Output>)
790 __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
791 Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
792 : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
793 fOutput_type(g) {
794 fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
795 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
796 }
797
798 template <typename Body>
799 __TBB_requires(function_node_body<Body, Input, Output>)
800 function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
801 : function_node(g, concurrency, body, Policy(), a_priority) {}
802
803 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
804 template <typename Body, typename... Args>
805 __TBB_requires(function_node_body<Body, Input, Output>)
806 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
807 Policy p = Policy(), node_priority_t a_priority = no_priority )
808 : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
809 make_edges_in_order(nodes, *this);
810 }
811
812 template <typename Body, typename... Args>
813 __TBB_requires(function_node_body<Body, Input, Output>)
814 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
815 : function_node(nodes, concurrency, body, Policy(), a_priority) {}
816 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
817
818 //! Copy constructor
819 __TBB_NOINLINE_SYM function_node( const function_node& src ) :
820 graph_node(src.my_graph),
821 input_impl_type(src),
822 fOutput_type(src.my_graph) {
823 fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
824 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
825 }
826
827 protected:
828 template< typename R, typename B > friend class run_and_put_task;
829 template<typename X, typename Y> friend class broadcast_cache;
830 template<typename X, typename Y> friend class round_robin_cache;
831 using input_impl_type::try_put_task;
832
833 broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
834
835 void reset_node(reset_flags f) override {
836 input_impl_type::reset_function_input(f);
837 // TODO: use clear() instead.
838 if(f & rf_clear_edges) {
839 successors().clear();
840 my_predecessors.clear();
841 }
842 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
843 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
844 }
845
846 }; // class function_node
847
848 //! implements a function node that supports Input -> (set of outputs)
849 // Output is a tuple of output types.
850 template<typename Input, typename Output, typename Policy = queueing>
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)851 __TBB_requires(std::default_initializable<Input> &&
852 std::copy_constructible<Input>)
853 class multifunction_node :
854 public graph_node,
855 public multifunction_input
856 <
857 Input,
858 typename wrap_tuple_elements<
859 std::tuple_size<Output>::value, // #elements in tuple
860 multifunction_output, // wrap this around each element
861 Output // the tuple providing the types
862 >::type,
863 Policy,
864 cache_aligned_allocator<Input>
865 >
866 {
867 typedef cache_aligned_allocator<Input> internals_allocator;
868
869 protected:
870 static const int N = std::tuple_size<Output>::value;
871 public:
872 typedef Input input_type;
873 typedef null_type output_type;
874 typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
875 typedef multifunction_input<
876 input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
877 typedef function_input_queue<input_type, internals_allocator> input_queue_type;
878 private:
879 using input_impl_type::my_predecessors;
880 public:
881 template<typename Body>
882 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
883 __TBB_NOINLINE_SYM multifunction_node(
884 graph &g, size_t concurrency,
885 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
886 ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
887 fgt_multioutput_node_with_body<N>(
888 CODEPTR(), FLOW_MULTIFUNCTION_NODE,
889 &this->my_graph, static_cast<receiver<input_type> *>(this),
890 this->output_ports(), this->my_body
891 );
892 }
893
894 template <typename Body>
895 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
896 __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
897 : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
898
899 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
900 template <typename Body, typename... Args>
901 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
902 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
903 Policy p = Policy(), node_priority_t a_priority = no_priority)
904 : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
905 make_edges_in_order(nodes, *this);
906 }
907
908 template <typename Body, typename... Args>
909 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
910 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
911 : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
912 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
913
914 __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
915 graph_node(other.my_graph), input_impl_type(other) {
916 fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
917 &this->my_graph, static_cast<receiver<input_type> *>(this),
918 this->output_ports(), this->my_body );
919 }
920
921 // all the guts are in multifunction_input...
922 protected:
923 void reset_node(reset_flags f) override { input_impl_type::reset(f); }
924 }; // multifunction_node
925
926 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
927 // successors. The node has unlimited concurrency, so it does not reject inputs.
928 template<typename TupleType>
929 class split_node : public graph_node, public receiver<TupleType> {
930 static const int N = std::tuple_size<TupleType>::value;
931 typedef receiver<TupleType> base_type;
932 public:
933 typedef TupleType input_type;
934 typedef typename wrap_tuple_elements<
935 N, // #elements in tuple
936 multifunction_output, // wrap this around each element
937 TupleType // the tuple providing the types
938 >::type output_ports_type;
939
split_node(graph & g)940 __TBB_NOINLINE_SYM explicit split_node(graph &g)
941 : graph_node(g),
942 my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
943 {
944 fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
945 static_cast<receiver<input_type> *>(this), this->output_ports());
946 }
947
948 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
949 template <typename... Args>
split_node(const node_set<Args...> & nodes)950 __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
951 make_edges_in_order(nodes, *this);
952 }
953 #endif
954
split_node(const split_node & other)955 __TBB_NOINLINE_SYM split_node(const split_node& other)
956 : graph_node(other.my_graph), base_type(other),
957 my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
958 {
959 fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
960 static_cast<receiver<input_type> *>(this), this->output_ports());
961 }
962
output_ports()963 output_ports_type &output_ports() { return my_output_ports; }
964
965 protected:
try_put_task(const TupleType & t)966 graph_task *try_put_task(const TupleType& t) override {
967 // Sending split messages in parallel is not justified, as overheads would prevail.
968 // Also, we do not have successors here. So we just tell the task returned here is successful.
969 return emit_element<N>::emit_this(this->my_graph, t, output_ports());
970 }
reset_node(reset_flags f)971 void reset_node(reset_flags f) override {
972 if (f & rf_clear_edges)
973 clear_element<N>::clear_this(my_output_ports);
974
975 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
976 }
graph_reference()977 graph& graph_reference() const override {
978 return my_graph;
979 }
980
981 private:
982 output_ports_type my_output_ports;
983 };
984
985 //! Implements an executable node that supports continue_msg -> Output
986 template <typename Output, typename Policy = Policy<void> >
__TBB_requires(std::copy_constructible<Output>)987 __TBB_requires(std::copy_constructible<Output>)
988 class continue_node : public graph_node, public continue_input<Output, Policy>,
989 public function_output<Output> {
990 public:
991 typedef continue_msg input_type;
992 typedef Output output_type;
993 typedef continue_input<Output, Policy> input_impl_type;
994 typedef function_output<output_type> fOutput_type;
995 typedef typename input_impl_type::predecessor_type predecessor_type;
996 typedef typename fOutput_type::successor_type successor_type;
997
998 //! Constructor for executable node with continue_msg -> Output
999 template <typename Body >
1000 __TBB_requires(continue_node_body<Body, Output>)
1001 __TBB_NOINLINE_SYM continue_node(
1002 graph &g,
1003 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1004 ) : graph_node(g), input_impl_type( g, body, a_priority ),
1005 fOutput_type(g) {
1006 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1007
1008 static_cast<receiver<input_type> *>(this),
1009 static_cast<sender<output_type> *>(this), this->my_body );
1010 }
1011
1012 template <typename Body>
1013 __TBB_requires(continue_node_body<Body, Output>)
1014 continue_node( graph& g, Body body, node_priority_t a_priority )
1015 : continue_node(g, body, Policy(), a_priority) {}
1016
1017 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1018 template <typename Body, typename... Args>
1019 __TBB_requires(continue_node_body<Body, Output>)
1020 continue_node( const node_set<Args...>& nodes, Body body,
1021 Policy p = Policy(), node_priority_t a_priority = no_priority )
1022 : continue_node(nodes.graph_reference(), body, p, a_priority ) {
1023 make_edges_in_order(nodes, *this);
1024 }
1025 template <typename Body, typename... Args>
1026 __TBB_requires(continue_node_body<Body, Output>)
1027 continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
1028 : continue_node(nodes, body, Policy(), a_priority) {}
1029 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1030
1031 //! Constructor for executable node with continue_msg -> Output
1032 template <typename Body >
1033 __TBB_requires(continue_node_body<Body, Output>)
1034 __TBB_NOINLINE_SYM continue_node(
1035 graph &g, int number_of_predecessors,
1036 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1037 ) : graph_node(g)
1038 , input_impl_type(g, number_of_predecessors, body, a_priority),
1039 fOutput_type(g) {
1040 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1041 static_cast<receiver<input_type> *>(this),
1042 static_cast<sender<output_type> *>(this), this->my_body );
1043 }
1044
1045 template <typename Body>
1046 __TBB_requires(continue_node_body<Body, Output>)
1047 continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
1048 : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
1049
1050 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1051 template <typename Body, typename... Args>
1052 __TBB_requires(continue_node_body<Body, Output>)
1053 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1054 Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
1055 : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
1056 make_edges_in_order(nodes, *this);
1057 }
1058
1059 template <typename Body, typename... Args>
1060 __TBB_requires(continue_node_body<Body, Output>)
1061 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1062 Body body, node_priority_t a_priority )
1063 : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
1064 #endif
1065
1066 //! Copy constructor
1067 __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1068 graph_node(src.my_graph), input_impl_type(src),
1069 function_output<Output>(src.my_graph) {
1070 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1071 static_cast<receiver<input_type> *>(this),
1072 static_cast<sender<output_type> *>(this), this->my_body );
1073 }
1074
1075 protected:
1076 template< typename R, typename B > friend class run_and_put_task;
1077 template<typename X, typename Y> friend class broadcast_cache;
1078 template<typename X, typename Y> friend class round_robin_cache;
1079 using input_impl_type::try_put_task;
1080 broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
1081
1082 void reset_node(reset_flags f) override {
1083 input_impl_type::reset_receiver(f);
1084 if(f & rf_clear_edges)successors().clear();
1085 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1086 }
1087 }; // continue_node
1088
1089 //! Forwards messages of type T to all successors
1090 template <typename T>
1091 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1092 public:
1093 typedef T input_type;
1094 typedef T output_type;
1095 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1096 typedef typename sender<output_type>::successor_type successor_type;
1097 private:
1098 broadcast_cache<input_type> my_successors;
1099 public:
1100
broadcast_node(graph & g)1101 __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
1102 fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
1103 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1104 }
1105
1106 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1107 template <typename... Args>
broadcast_node(const node_set<Args...> & nodes)1108 broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1109 make_edges_in_order(nodes, *this);
1110 }
1111 #endif
1112
1113 // Copy constructor
broadcast_node(const broadcast_node & src)1114 __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
1115
1116 //! Adds a successor
register_successor(successor_type & r)1117 bool register_successor( successor_type &r ) override {
1118 my_successors.register_successor( r );
1119 return true;
1120 }
1121
1122 //! Removes s as a successor
remove_successor(successor_type & r)1123 bool remove_successor( successor_type &r ) override {
1124 my_successors.remove_successor( r );
1125 return true;
1126 }
1127
1128 protected:
1129 template< typename R, typename B > friend class run_and_put_task;
1130 template<typename X, typename Y> friend class broadcast_cache;
1131 template<typename X, typename Y> friend class round_robin_cache;
1132 //! build a task to run the successor if possible. Default is old behavior.
try_put_task(const T & t)1133 graph_task *try_put_task(const T& t) override {
1134 graph_task *new_task = my_successors.try_put_task(t);
1135 if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1136 return new_task;
1137 }
1138
graph_reference()1139 graph& graph_reference() const override {
1140 return my_graph;
1141 }
1142
reset_node(reset_flags f)1143 void reset_node(reset_flags f) override {
1144 if (f&rf_clear_edges) {
1145 my_successors.clear();
1146 }
1147 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1148 }
1149 }; // broadcast_node
1150
1151 //! Forwards messages in arbitrary order
1152 template <typename T>
1153 class buffer_node
1154 : public graph_node
1155 , public reservable_item_buffer< T, cache_aligned_allocator<T> >
1156 , public receiver<T>, public sender<T>
1157 {
1158 typedef cache_aligned_allocator<T> internals_allocator;
1159
1160 public:
1161 typedef T input_type;
1162 typedef T output_type;
1163 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1164 typedef typename sender<output_type>::successor_type successor_type;
1165 typedef buffer_node<T> class_type;
1166
1167 protected:
1168 typedef size_t size_type;
1169 round_robin_cache< T, null_rw_mutex > my_successors;
1170
1171 friend class forward_task_bypass< class_type >;
1172
1173 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1174 };
1175
1176 // implements the aggregator_operation concept
1177 class buffer_operation : public aggregated_operation< buffer_operation > {
1178 public:
1179 char type;
1180 T* elem;
1181 graph_task* ltask;
1182 successor_type *r;
1183
buffer_operation(const T & e,op_type t)1184 buffer_operation(const T& e, op_type t) : type(char(t))
1185 , elem(const_cast<T*>(&e)) , ltask(NULL)
1186 {}
buffer_operation(op_type t)1187 buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1188 };
1189
1190 bool forwarder_busy;
1191 typedef aggregating_functor<class_type, buffer_operation> handler_type;
1192 friend class aggregating_functor<class_type, buffer_operation>;
1193 aggregator< handler_type, buffer_operation> my_aggregator;
1194
handle_operations(buffer_operation * op_list)1195 virtual void handle_operations(buffer_operation *op_list) {
1196 handle_operations_impl(op_list, this);
1197 }
1198
1199 template<typename derived_type>
handle_operations_impl(buffer_operation * op_list,derived_type * derived)1200 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1201 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1202
1203 buffer_operation *tmp = NULL;
1204 bool try_forwarding = false;
1205 while (op_list) {
1206 tmp = op_list;
1207 op_list = op_list->next;
1208 switch (tmp->type) {
1209 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1210 case rem_succ: internal_rem_succ(tmp); break;
1211 case req_item: internal_pop(tmp); break;
1212 case res_item: internal_reserve(tmp); break;
1213 case rel_res: internal_release(tmp); try_forwarding = true; break;
1214 case con_res: internal_consume(tmp); try_forwarding = true; break;
1215 case put_item: try_forwarding = internal_push(tmp); break;
1216 case try_fwd_task: internal_forward_task(tmp); break;
1217 }
1218 }
1219
1220 derived->order();
1221
1222 if (try_forwarding && !forwarder_busy) {
1223 if(is_graph_active(this->my_graph)) {
1224 forwarder_busy = true;
1225 typedef forward_task_bypass<class_type> task_type;
1226 small_object_allocator allocator{};
1227 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
1228 my_graph.reserve_wait();
1229 // tmp should point to the last item handled by the aggregator. This is the operation
1230 // the handling thread enqueued. So modifying that record will be okay.
1231 // TODO revamp: check that the issue is still present
1232 // workaround for icc bug (at least 12.0 and 13.0)
1233 // error: function "tbb::flow::interfaceX::combine_tasks" cannot be called with the given argument list
1234 // argument types are: (graph, graph_task *, graph_task *)
1235 graph_task *z = tmp->ltask;
1236 graph &g = this->my_graph;
1237 tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
1238 }
1239 }
1240 } // handle_operations
1241
grab_forwarding_task(buffer_operation & op_data)1242 inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
1243 return op_data.ltask;
1244 }
1245
enqueue_forwarding_task(buffer_operation & op_data)1246 inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1247 graph_task *ft = grab_forwarding_task(op_data);
1248 if(ft) {
1249 spawn_in_graph_arena(graph_reference(), *ft);
1250 return true;
1251 }
1252 return false;
1253 }
1254
1255 //! This is executed by an enqueued task, the "forwarder"
forward_task()1256 virtual graph_task *forward_task() {
1257 buffer_operation op_data(try_fwd_task);
1258 graph_task *last_task = NULL;
1259 do {
1260 op_data.status = WAIT;
1261 op_data.ltask = NULL;
1262 my_aggregator.execute(&op_data);
1263
1264 // workaround for icc bug
1265 graph_task *xtask = op_data.ltask;
1266 graph& g = this->my_graph;
1267 last_task = combine_tasks(g, last_task, xtask);
1268 } while (op_data.status ==SUCCEEDED);
1269 return last_task;
1270 }
1271
1272 //! Register successor
internal_reg_succ(buffer_operation * op)1273 virtual void internal_reg_succ(buffer_operation *op) {
1274 my_successors.register_successor(*(op->r));
1275 op->status.store(SUCCEEDED, std::memory_order_release);
1276 }
1277
1278 //! Remove successor
internal_rem_succ(buffer_operation * op)1279 virtual void internal_rem_succ(buffer_operation *op) {
1280 my_successors.remove_successor(*(op->r));
1281 op->status.store(SUCCEEDED, std::memory_order_release);
1282 }
1283
1284 private:
order()1285 void order() {}
1286
is_item_valid()1287 bool is_item_valid() {
1288 return this->my_item_valid(this->my_tail - 1);
1289 }
1290
try_put_and_add_task(graph_task * & last_task)1291 void try_put_and_add_task(graph_task*& last_task) {
1292 graph_task *new_task = my_successors.try_put_task(this->back());
1293 if (new_task) {
1294 // workaround for icc bug
1295 graph& g = this->my_graph;
1296 last_task = combine_tasks(g, last_task, new_task);
1297 this->destroy_back();
1298 }
1299 }
1300
1301 protected:
1302 //! Tries to forward valid items to successors
internal_forward_task(buffer_operation * op)1303 virtual void internal_forward_task(buffer_operation *op) {
1304 internal_forward_task_impl(op, this);
1305 }
1306
1307 template<typename derived_type>
internal_forward_task_impl(buffer_operation * op,derived_type * derived)1308 void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1309 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1310
1311 if (this->my_reserved || !derived->is_item_valid()) {
1312 op->status.store(FAILED, std::memory_order_release);
1313 this->forwarder_busy = false;
1314 return;
1315 }
1316 // Try forwarding, giving each successor a chance
1317 graph_task* last_task = NULL;
1318 size_type counter = my_successors.size();
1319 for (; counter > 0 && derived->is_item_valid(); --counter)
1320 derived->try_put_and_add_task(last_task);
1321
1322 op->ltask = last_task; // return task
1323 if (last_task && !counter) {
1324 op->status.store(SUCCEEDED, std::memory_order_release);
1325 }
1326 else {
1327 op->status.store(FAILED, std::memory_order_release);
1328 forwarder_busy = false;
1329 }
1330 }
1331
internal_push(buffer_operation * op)1332 virtual bool internal_push(buffer_operation *op) {
1333 this->push_back(*(op->elem));
1334 op->status.store(SUCCEEDED, std::memory_order_release);
1335 return true;
1336 }
1337
internal_pop(buffer_operation * op)1338 virtual void internal_pop(buffer_operation *op) {
1339 if(this->pop_back(*(op->elem))) {
1340 op->status.store(SUCCEEDED, std::memory_order_release);
1341 }
1342 else {
1343 op->status.store(FAILED, std::memory_order_release);
1344 }
1345 }
1346
internal_reserve(buffer_operation * op)1347 virtual void internal_reserve(buffer_operation *op) {
1348 if(this->reserve_front(*(op->elem))) {
1349 op->status.store(SUCCEEDED, std::memory_order_release);
1350 }
1351 else {
1352 op->status.store(FAILED, std::memory_order_release);
1353 }
1354 }
1355
internal_consume(buffer_operation * op)1356 virtual void internal_consume(buffer_operation *op) {
1357 this->consume_front();
1358 op->status.store(SUCCEEDED, std::memory_order_release);
1359 }
1360
internal_release(buffer_operation * op)1361 virtual void internal_release(buffer_operation *op) {
1362 this->release_front();
1363 op->status.store(SUCCEEDED, std::memory_order_release);
1364 }
1365
1366 public:
1367 //! Constructor
buffer_node(graph & g)1368 __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
1369 : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
1370 sender<T>(), my_successors(this), forwarder_busy(false)
1371 {
1372 my_aggregator.initialize_handler(handler_type(this));
1373 fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
1374 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1375 }
1376
1377 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1378 template <typename... Args>
buffer_node(const node_set<Args...> & nodes)1379 buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
1380 make_edges_in_order(nodes, *this);
1381 }
1382 #endif
1383
1384 //! Copy constructor
buffer_node(const buffer_node & src)1385 __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
1386
1387 //
1388 // message sender implementation
1389 //
1390
1391 //! Adds a new successor.
1392 /** Adds successor r to the list of successors; may forward tasks. */
register_successor(successor_type & r)1393 bool register_successor( successor_type &r ) override {
1394 buffer_operation op_data(reg_succ);
1395 op_data.r = &r;
1396 my_aggregator.execute(&op_data);
1397 (void)enqueue_forwarding_task(op_data);
1398 return true;
1399 }
1400
1401 //! Removes a successor.
1402 /** Removes successor r from the list of successors.
1403 It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
remove_successor(successor_type & r)1404 bool remove_successor( successor_type &r ) override {
1405 // TODO revamp: investigate why full qualification is necessary here
1406 tbb::detail::d1::remove_predecessor(r, *this);
1407 buffer_operation op_data(rem_succ);
1408 op_data.r = &r;
1409 my_aggregator.execute(&op_data);
1410 // even though this operation does not cause a forward, if we are the handler, and
1411 // a forward is scheduled, we may be the first to reach this point after the aggregator,
1412 // and so should check for the task.
1413 (void)enqueue_forwarding_task(op_data);
1414 return true;
1415 }
1416
1417 //! Request an item from the buffer_node
1418 /** true = v contains the returned item<BR>
1419 false = no item has been returned */
try_get(T & v)1420 bool try_get( T &v ) override {
1421 buffer_operation op_data(req_item);
1422 op_data.elem = &v;
1423 my_aggregator.execute(&op_data);
1424 (void)enqueue_forwarding_task(op_data);
1425 return (op_data.status==SUCCEEDED);
1426 }
1427
1428 //! Reserves an item.
1429 /** false = no item can be reserved<BR>
1430 true = an item is reserved */
try_reserve(T & v)1431 bool try_reserve( T &v ) override {
1432 buffer_operation op_data(res_item);
1433 op_data.elem = &v;
1434 my_aggregator.execute(&op_data);
1435 (void)enqueue_forwarding_task(op_data);
1436 return (op_data.status==SUCCEEDED);
1437 }
1438
1439 //! Release a reserved item.
1440 /** true = item has been released and so remains in sender */
try_release()1441 bool try_release() override {
1442 buffer_operation op_data(rel_res);
1443 my_aggregator.execute(&op_data);
1444 (void)enqueue_forwarding_task(op_data);
1445 return true;
1446 }
1447
1448 //! Consumes a reserved item.
1449 /** true = item is removed from sender and reservation removed */
try_consume()1450 bool try_consume() override {
1451 buffer_operation op_data(con_res);
1452 my_aggregator.execute(&op_data);
1453 (void)enqueue_forwarding_task(op_data);
1454 return true;
1455 }
1456
1457 protected:
1458
1459 template< typename R, typename B > friend class run_and_put_task;
1460 template<typename X, typename Y> friend class broadcast_cache;
1461 template<typename X, typename Y> friend class round_robin_cache;
1462 //! receive an item, return a task *if possible
try_put_task(const T & t)1463 graph_task *try_put_task(const T &t) override {
1464 buffer_operation op_data(t, put_item);
1465 my_aggregator.execute(&op_data);
1466 graph_task *ft = grab_forwarding_task(op_data);
1467 // sequencer_nodes can return failure (if an item has been previously inserted)
1468 // We have to spawn the returned task if our own operation fails.
1469
1470 if(ft && op_data.status ==FAILED) {
1471 // we haven't succeeded queueing the item, but for some reason the
1472 // call returned a task (if another request resulted in a successful
1473 // forward this could happen.) Queue the task and reset the pointer.
1474 spawn_in_graph_arena(graph_reference(), *ft); ft = NULL;
1475 }
1476 else if(!ft && op_data.status ==SUCCEEDED) {
1477 ft = SUCCESSFULLY_ENQUEUED;
1478 }
1479 return ft;
1480 }
1481
graph_reference()1482 graph& graph_reference() const override {
1483 return my_graph;
1484 }
1485
1486 protected:
reset_node(reset_flags f)1487 void reset_node( reset_flags f) override {
1488 reservable_item_buffer<T, internals_allocator>::reset();
1489 // TODO: just clear structures
1490 if (f&rf_clear_edges) {
1491 my_successors.clear();
1492 }
1493 forwarder_busy = false;
1494 }
1495 }; // buffer_node
1496
1497 //! Forwards messages in FIFO order
1498 template <typename T>
1499 class queue_node : public buffer_node<T> {
1500 protected:
1501 typedef buffer_node<T> base_type;
1502 typedef typename base_type::size_type size_type;
1503 typedef typename base_type::buffer_operation queue_operation;
1504 typedef queue_node class_type;
1505
1506 private:
1507 template<typename> friend class buffer_node;
1508
is_item_valid()1509 bool is_item_valid() {
1510 return this->my_item_valid(this->my_head);
1511 }
1512
try_put_and_add_task(graph_task * & last_task)1513 void try_put_and_add_task(graph_task*& last_task) {
1514 graph_task *new_task = this->my_successors.try_put_task(this->front());
1515 if (new_task) {
1516 // workaround for icc bug
1517 graph& graph_ref = this->graph_reference();
1518 last_task = combine_tasks(graph_ref, last_task, new_task);
1519 this->destroy_front();
1520 }
1521 }
1522
1523 protected:
internal_forward_task(queue_operation * op)1524 void internal_forward_task(queue_operation *op) override {
1525 this->internal_forward_task_impl(op, this);
1526 }
1527
internal_pop(queue_operation * op)1528 void internal_pop(queue_operation *op) override {
1529 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
1530 op->status.store(FAILED, std::memory_order_release);
1531 }
1532 else {
1533 this->pop_front(*(op->elem));
1534 op->status.store(SUCCEEDED, std::memory_order_release);
1535 }
1536 }
internal_reserve(queue_operation * op)1537 void internal_reserve(queue_operation *op) override {
1538 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
1539 op->status.store(FAILED, std::memory_order_release);
1540 }
1541 else {
1542 this->reserve_front(*(op->elem));
1543 op->status.store(SUCCEEDED, std::memory_order_release);
1544 }
1545 }
internal_consume(queue_operation * op)1546 void internal_consume(queue_operation *op) override {
1547 this->consume_front();
1548 op->status.store(SUCCEEDED, std::memory_order_release);
1549 }
1550
1551 public:
1552 typedef T input_type;
1553 typedef T output_type;
1554 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1555 typedef typename sender<output_type>::successor_type successor_type;
1556
1557 //! Constructor
queue_node(graph & g)1558 __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
1559 fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1560 static_cast<receiver<input_type> *>(this),
1561 static_cast<sender<output_type> *>(this) );
1562 }
1563
1564 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1565 template <typename... Args>
queue_node(const node_set<Args...> & nodes)1566 queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
1567 make_edges_in_order(nodes, *this);
1568 }
1569 #endif
1570
1571 //! Copy constructor
queue_node(const queue_node & src)1572 __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
1573 fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1574 static_cast<receiver<input_type> *>(this),
1575 static_cast<sender<output_type> *>(this) );
1576 }
1577
1578
1579 protected:
reset_node(reset_flags f)1580 void reset_node( reset_flags f) override {
1581 base_type::reset_node(f);
1582 }
1583 }; // queue_node
1584
1585 //! Forwards messages in sequence order
1586 template <typename T>
__TBB_requires(std::copyable<T>)1587 __TBB_requires(std::copyable<T>)
1588 class sequencer_node : public queue_node<T> {
1589 function_body< T, size_t > *my_sequencer;
1590 // my_sequencer should be a benign function and must be callable
1591 // from a parallel context. Does this mean it needn't be reset?
1592 public:
1593 typedef T input_type;
1594 typedef T output_type;
1595 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1596 typedef typename sender<output_type>::successor_type successor_type;
1597
1598 //! Constructor
1599 template< typename Sequencer >
1600 __TBB_requires(sequencer<Sequencer, T>)
1601 __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
1602 my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
1603 fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1604 static_cast<receiver<input_type> *>(this),
1605 static_cast<sender<output_type> *>(this) );
1606 }
1607
1608 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1609 template <typename Sequencer, typename... Args>
1610 __TBB_requires(sequencer<Sequencer, T>)
1611 sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
1612 : sequencer_node(nodes.graph_reference(), s) {
1613 make_edges_in_order(nodes, *this);
1614 }
1615 #endif
1616
1617 //! Copy constructor
1618 __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
1619 my_sequencer( src.my_sequencer->clone() ) {
1620 fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1621 static_cast<receiver<input_type> *>(this),
1622 static_cast<sender<output_type> *>(this) );
1623 }
1624
1625 //! Destructor
1626 ~sequencer_node() { delete my_sequencer; }
1627
1628 protected:
1629 typedef typename buffer_node<T>::size_type size_type;
1630 typedef typename buffer_node<T>::buffer_operation sequencer_operation;
1631
1632 private:
1633 bool internal_push(sequencer_operation *op) override {
1634 size_type tag = (*my_sequencer)(*(op->elem));
1635 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
1636 if (tag < this->my_head) {
1637 // have already emitted a message with this tag
1638 op->status.store(FAILED, std::memory_order_release);
1639 return false;
1640 }
1641 #endif
1642 // cannot modify this->my_tail now; the buffer would be inconsistent.
1643 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1644
1645 if (this->size(new_tail) > this->capacity()) {
1646 this->grow_my_array(this->size(new_tail));
1647 }
1648 this->my_tail = new_tail;
1649
1650 const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
1651 op->status.store(res, std::memory_order_release);
1652 return res ==SUCCEEDED;
1653 }
1654 }; // sequencer_node
1655
1656 //! Forwards messages in priority order
1657 template<typename T, typename Compare = std::less<T>>
1658 class priority_queue_node : public buffer_node<T> {
1659 public:
1660 typedef T input_type;
1661 typedef T output_type;
1662 typedef buffer_node<T> base_type;
1663 typedef priority_queue_node class_type;
1664 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1665 typedef typename sender<output_type>::successor_type successor_type;
1666
1667 //! Constructor
1668 __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
1669 : buffer_node<T>(g), compare(comp), mark(0) {
1670 fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1671 static_cast<receiver<input_type> *>(this),
1672 static_cast<sender<output_type> *>(this) );
1673 }
1674
1675 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1676 template <typename... Args>
1677 priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
1678 : priority_queue_node(nodes.graph_reference(), comp) {
1679 make_edges_in_order(nodes, *this);
1680 }
1681 #endif
1682
1683 //! Copy constructor
priority_queue_node(const priority_queue_node & src)1684 __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
1685 : buffer_node<T>(src), mark(0)
1686 {
1687 fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1688 static_cast<receiver<input_type> *>(this),
1689 static_cast<sender<output_type> *>(this) );
1690 }
1691
1692 protected:
1693
reset_node(reset_flags f)1694 void reset_node( reset_flags f) override {
1695 mark = 0;
1696 base_type::reset_node(f);
1697 }
1698
1699 typedef typename buffer_node<T>::size_type size_type;
1700 typedef typename buffer_node<T>::item_type item_type;
1701 typedef typename buffer_node<T>::buffer_operation prio_operation;
1702
1703 //! Tries to forward valid items to successors
internal_forward_task(prio_operation * op)1704 void internal_forward_task(prio_operation *op) override {
1705 this->internal_forward_task_impl(op, this);
1706 }
1707
handle_operations(prio_operation * op_list)1708 void handle_operations(prio_operation *op_list) override {
1709 this->handle_operations_impl(op_list, this);
1710 }
1711
internal_push(prio_operation * op)1712 bool internal_push(prio_operation *op) override {
1713 prio_push(*(op->elem));
1714 op->status.store(SUCCEEDED, std::memory_order_release);
1715 return true;
1716 }
1717
internal_pop(prio_operation * op)1718 void internal_pop(prio_operation *op) override {
1719 // if empty or already reserved, don't pop
1720 if ( this->my_reserved == true || this->my_tail == 0 ) {
1721 op->status.store(FAILED, std::memory_order_release);
1722 return;
1723 }
1724
1725 *(op->elem) = prio();
1726 op->status.store(SUCCEEDED, std::memory_order_release);
1727 prio_pop();
1728
1729 }
1730
1731 // pops the highest-priority item, saves copy
internal_reserve(prio_operation * op)1732 void internal_reserve(prio_operation *op) override {
1733 if (this->my_reserved == true || this->my_tail == 0) {
1734 op->status.store(FAILED, std::memory_order_release);
1735 return;
1736 }
1737 this->my_reserved = true;
1738 *(op->elem) = prio();
1739 reserved_item = *(op->elem);
1740 op->status.store(SUCCEEDED, std::memory_order_release);
1741 prio_pop();
1742 }
1743
internal_consume(prio_operation * op)1744 void internal_consume(prio_operation *op) override {
1745 op->status.store(SUCCEEDED, std::memory_order_release);
1746 this->my_reserved = false;
1747 reserved_item = input_type();
1748 }
1749
internal_release(prio_operation * op)1750 void internal_release(prio_operation *op) override {
1751 op->status.store(SUCCEEDED, std::memory_order_release);
1752 prio_push(reserved_item);
1753 this->my_reserved = false;
1754 reserved_item = input_type();
1755 }
1756
1757 private:
1758 template<typename> friend class buffer_node;
1759
order()1760 void order() {
1761 if (mark < this->my_tail) heapify();
1762 __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
1763 }
1764
is_item_valid()1765 bool is_item_valid() {
1766 return this->my_tail > 0;
1767 }
1768
try_put_and_add_task(graph_task * & last_task)1769 void try_put_and_add_task(graph_task*& last_task) {
1770 graph_task * new_task = this->my_successors.try_put_task(this->prio());
1771 if (new_task) {
1772 // workaround for icc bug
1773 graph& graph_ref = this->graph_reference();
1774 last_task = combine_tasks(graph_ref, last_task, new_task);
1775 prio_pop();
1776 }
1777 }
1778
1779 private:
1780 Compare compare;
1781 size_type mark;
1782
1783 input_type reserved_item;
1784
1785 // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
prio_use_tail()1786 bool prio_use_tail() {
1787 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
1788 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
1789 }
1790
1791 // prio_push: checks that the item will fit, expand array if necessary, put at end
prio_push(const T & src)1792 void prio_push(const T &src) {
1793 if ( this->my_tail >= this->my_array_size )
1794 this->grow_my_array( this->my_tail + 1 );
1795 (void) this->place_item(this->my_tail, src);
1796 ++(this->my_tail);
1797 __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
1798 }
1799
1800 // prio_pop: deletes highest priority item from the array, and if it is item
1801 // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
1802 // and mark. Assumes the array has already been tested for emptiness; no failure.
prio_pop()1803 void prio_pop() {
1804 if (prio_use_tail()) {
1805 // there are newly pushed elements; last one higher than top
1806 // copy the data
1807 this->destroy_item(this->my_tail-1);
1808 --(this->my_tail);
1809 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1810 return;
1811 }
1812 this->destroy_item(0);
1813 if(this->my_tail > 1) {
1814 // push the last element down heap
1815 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
1816 this->move_item(0,this->my_tail - 1);
1817 }
1818 --(this->my_tail);
1819 if(mark > this->my_tail) --mark;
1820 if (this->my_tail > 1) // don't reheap for heap of size 1
1821 reheap();
1822 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1823 }
1824
prio()1825 const T& prio() {
1826 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
1827 }
1828
1829 // turn array into heap
heapify()1830 void heapify() {
1831 if(this->my_tail == 0) {
1832 mark = 0;
1833 return;
1834 }
1835 if (!mark) mark = 1;
1836 for (; mark<this->my_tail; ++mark) { // for each unheaped element
1837 size_type cur_pos = mark;
1838 input_type to_place;
1839 this->fetch_item(mark,to_place);
1840 do { // push to_place up the heap
1841 size_type parent = (cur_pos-1)>>1;
1842 if (!compare(this->get_my_item(parent), to_place))
1843 break;
1844 this->move_item(cur_pos, parent);
1845 cur_pos = parent;
1846 } while( cur_pos );
1847 (void) this->place_item(cur_pos, to_place);
1848 }
1849 }
1850
1851 // otherwise heapified array with new root element; rearrange to heap
reheap()1852 void reheap() {
1853 size_type cur_pos=0, child=1;
1854 while (child < mark) {
1855 size_type target = child;
1856 if (child+1<mark &&
1857 compare(this->get_my_item(child),
1858 this->get_my_item(child+1)))
1859 ++target;
1860 // target now has the higher priority child
1861 if (compare(this->get_my_item(target),
1862 this->get_my_item(cur_pos)))
1863 break;
1864 // swap
1865 this->swap_items(cur_pos, target);
1866 cur_pos = target;
1867 child = (cur_pos<<1)+1;
1868 }
1869 }
1870 }; // priority_queue_node
1871
1872 //! Forwards messages only if the threshold has not been reached
1873 /** This node forwards items until its threshold is reached.
1874 It contains no buffering. If the downstream node rejects, the
1875 message is dropped. */
1876 template< typename T, typename DecrementType=continue_msg >
1877 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
1878 public:
1879 typedef T input_type;
1880 typedef T output_type;
1881 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1882 typedef typename sender<output_type>::successor_type successor_type;
1883 //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
1884
1885 private:
1886 size_t my_threshold;
1887 size_t my_count; // number of successful puts
1888 size_t my_tries; // number of active put attempts
1889 reservable_predecessor_cache< T, spin_mutex > my_predecessors;
1890 spin_mutex my_mutex;
1891 broadcast_cache< T > my_successors;
1892
1893 //! The internal receiver< DecrementType > that adjusts the count
1894 threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
1895
decrement_counter(long long delta)1896 graph_task* decrement_counter( long long delta ) {
1897 {
1898 spin_mutex::scoped_lock lock(my_mutex);
1899 if( delta > 0 && size_t(delta) > my_count ) {
1900 my_count = 0;
1901 }
1902 else if( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
1903 my_count = my_threshold;
1904 }
1905 else {
1906 my_count -= size_t(delta); // absolute value of delta is sufficiently small
1907 }
1908 __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
1909 }
1910 return forward_task();
1911 }
1912
1913 // Let threshold_regulator call decrement_counter()
1914 friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
1915
1916 friend class forward_task_bypass< limiter_node<T,DecrementType> >;
1917
check_conditions()1918 bool check_conditions() { // always called under lock
1919 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
1920 }
1921
1922 // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
forward_task()1923 graph_task* forward_task() {
1924 input_type v;
1925 graph_task* rval = NULL;
1926 bool reserved = false;
1927 {
1928 spin_mutex::scoped_lock lock(my_mutex);
1929 if ( check_conditions() )
1930 ++my_tries;
1931 else
1932 return NULL;
1933 }
1934
1935 //SUCCESS
1936 // if we can reserve and can put, we consume the reservation
1937 // we increment the count and decrement the tries
1938 if ( (my_predecessors.try_reserve(v)) == true ){
1939 reserved=true;
1940 if ( (rval = my_successors.try_put_task(v)) != NULL ){
1941 {
1942 spin_mutex::scoped_lock lock(my_mutex);
1943 ++my_count;
1944 --my_tries;
1945 my_predecessors.try_consume();
1946 if ( check_conditions() ) {
1947 if ( is_graph_active(this->my_graph) ) {
1948 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1949 small_object_allocator allocator{};
1950 graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
1951 my_graph.reserve_wait();
1952 spawn_in_graph_arena(graph_reference(), *rtask);
1953 }
1954 }
1955 }
1956 return rval;
1957 }
1958 }
1959 //FAILURE
1960 //if we can't reserve, we decrement the tries
1961 //if we can reserve but can't put, we decrement the tries and release the reservation
1962 {
1963 spin_mutex::scoped_lock lock(my_mutex);
1964 --my_tries;
1965 if (reserved) my_predecessors.try_release();
1966 if ( check_conditions() ) {
1967 if ( is_graph_active(this->my_graph) ) {
1968 small_object_allocator allocator{};
1969 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1970 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1971 my_graph.reserve_wait();
1972 __TBB_ASSERT(!rval, "Have two tasks to handle");
1973 return t;
1974 }
1975 }
1976 return rval;
1977 }
1978 }
1979
initialize()1980 void initialize() {
1981 fgt_node(
1982 CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
1983 static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
1984 static_cast<sender<output_type> *>(this)
1985 );
1986 }
1987
1988 public:
1989 //! Constructor
limiter_node(graph & g,size_t threshold)1990 limiter_node(graph &g, size_t threshold)
1991 : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_predecessors(this)
1992 , my_successors(this), decrement(this)
1993 {
1994 initialize();
1995 }
1996
1997 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1998 template <typename... Args>
limiter_node(const node_set<Args...> & nodes,size_t threshold)1999 limiter_node(const node_set<Args...>& nodes, size_t threshold)
2000 : limiter_node(nodes.graph_reference(), threshold) {
2001 make_edges_in_order(nodes, *this);
2002 }
2003 #endif
2004
2005 //! Copy constructor
limiter_node(const limiter_node & src)2006 limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
2007
2008 //! The interface for accessing internal receiver< DecrementType > that adjusts the count
decrementer()2009 receiver<DecrementType>& decrementer() { return decrement; }
2010
2011 //! Replace the current successor with this new successor
register_successor(successor_type & r)2012 bool register_successor( successor_type &r ) override {
2013 spin_mutex::scoped_lock lock(my_mutex);
2014 bool was_empty = my_successors.empty();
2015 my_successors.register_successor(r);
2016 //spawn a forward task if this is the only successor
2017 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2018 if ( is_graph_active(this->my_graph) ) {
2019 small_object_allocator allocator{};
2020 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2021 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2022 my_graph.reserve_wait();
2023 spawn_in_graph_arena(graph_reference(), *t);
2024 }
2025 }
2026 return true;
2027 }
2028
2029 //! Removes a successor from this node
2030 /** r.remove_predecessor(*this) is also called. */
remove_successor(successor_type & r)2031 bool remove_successor( successor_type &r ) override {
2032 // TODO revamp: investigate why qualification is needed for remove_predecessor() call
2033 tbb::detail::d1::remove_predecessor(r, *this);
2034 my_successors.remove_successor(r);
2035 return true;
2036 }
2037
2038 //! Adds src to the list of cached predecessors.
register_predecessor(predecessor_type & src)2039 bool register_predecessor( predecessor_type &src ) override {
2040 spin_mutex::scoped_lock lock(my_mutex);
2041 my_predecessors.add( src );
2042 if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
2043 small_object_allocator allocator{};
2044 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2045 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2046 my_graph.reserve_wait();
2047 spawn_in_graph_arena(graph_reference(), *t);
2048 }
2049 return true;
2050 }
2051
2052 //! Removes src from the list of cached predecessors.
remove_predecessor(predecessor_type & src)2053 bool remove_predecessor( predecessor_type &src ) override {
2054 my_predecessors.remove( src );
2055 return true;
2056 }
2057
2058 protected:
2059
2060 template< typename R, typename B > friend class run_and_put_task;
2061 template<typename X, typename Y> friend class broadcast_cache;
2062 template<typename X, typename Y> friend class round_robin_cache;
2063 //! Puts an item to this receiver
try_put_task(const T & t)2064 graph_task* try_put_task( const T &t ) override {
2065 {
2066 spin_mutex::scoped_lock lock(my_mutex);
2067 if ( my_count + my_tries >= my_threshold )
2068 return NULL;
2069 else
2070 ++my_tries;
2071 }
2072
2073 graph_task* rtask = my_successors.try_put_task(t);
2074
2075 if ( !rtask ) { // try_put_task failed.
2076 spin_mutex::scoped_lock lock(my_mutex);
2077 --my_tries;
2078 if (check_conditions() && is_graph_active(this->my_graph)) {
2079 small_object_allocator allocator{};
2080 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2081 rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
2082 my_graph.reserve_wait();
2083 }
2084 }
2085 else {
2086 spin_mutex::scoped_lock lock(my_mutex);
2087 ++my_count;
2088 --my_tries;
2089 }
2090 return rtask;
2091 }
2092
graph_reference()2093 graph& graph_reference() const override { return my_graph; }
2094
reset_node(reset_flags f)2095 void reset_node( reset_flags f) override {
2096 my_count = 0;
2097 if(f & rf_clear_edges) {
2098 my_predecessors.clear();
2099 my_successors.clear();
2100 }
2101 else
2102 {
2103 my_predecessors.reset( );
2104 }
2105 decrement.reset_receiver(f);
2106 }
2107 }; // limiter_node
2108
2109 #include "detail/_flow_graph_join_impl.h"
2110
2111 template<typename OutputTuple, typename JP=queueing> class join_node;
2112
2113 template<typename OutputTuple>
2114 class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2115 private:
2116 static const int N = std::tuple_size<OutputTuple>::value;
2117 typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2118 public:
2119 typedef OutputTuple output_type;
2120 typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)2121 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2122 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2123 this->input_ports(), static_cast< sender< output_type > *>(this) );
2124 }
2125
2126 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2127 template <typename... Args>
2128 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2129 make_edges_in_order(nodes, *this);
2130 }
2131 #endif
2132
join_node(const join_node & other)2133 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2134 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2135 this->input_ports(), static_cast< sender< output_type > *>(this) );
2136 }
2137
2138 };
2139
2140 template<typename OutputTuple>
2141 class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2142 private:
2143 static const int N = std::tuple_size<OutputTuple>::value;
2144 typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2145 public:
2146 typedef OutputTuple output_type;
2147 typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)2148 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2149 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2150 this->input_ports(), static_cast< sender< output_type > *>(this) );
2151 }
2152
2153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2154 template <typename... Args>
2155 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2156 make_edges_in_order(nodes, *this);
2157 }
2158 #endif
2159
join_node(const join_node & other)2160 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2161 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2162 this->input_ports(), static_cast< sender< output_type > *>(this) );
2163 }
2164
2165 };
2166
2167 #if __TBB_CPP20_CONCEPTS_PRESENT
2168 // Helper function which is well-formed only if all of the elements in OutputTuple
2169 // satisfies join_node_function_object<body[i], tuple[i], K>
2170 template <typename OutputTuple, typename K,
2171 typename... Functions, std::size_t... Idx>
2172 void join_node_function_objects_helper( std::index_sequence<Idx...> )
2173 requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2174 (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2175
2176 template <typename OutputTuple, typename K, typename... Functions>
2177 concept join_node_functions = requires {
2178 join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2179 };
2180
2181 #endif
2182
2183 // template for key_matching join_node
2184 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2185 template<typename OutputTuple, typename K, typename KHash>
2186 class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
2187 key_matching_port, OutputTuple, key_matching<K,KHash> > {
2188 private:
2189 static const int N = std::tuple_size<OutputTuple>::value;
2190 typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
2191 public:
2192 typedef OutputTuple output_type;
2193 typedef typename unfolded_type::input_ports_type input_ports_type;
2194
2195 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
join_node(graph & g)2196 join_node(graph &g) : unfolded_type(g) {}
2197 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2198
2199 template<typename __TBB_B0, typename __TBB_B1>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1>)2200 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
2201 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2202 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2203 this->input_ports(), static_cast< sender< output_type > *>(this) );
2204 }
2205 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2>)2206 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
2207 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2208 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2209 this->input_ports(), static_cast< sender< output_type > *>(this) );
2210 }
2211 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3>)2212 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
2213 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2214 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2215 this->input_ports(), static_cast< sender< output_type > *>(this) );
2216 }
2217 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4>)2218 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
2219 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2220 unfolded_type(g, b0, b1, b2, b3, b4) {
2221 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2222 this->input_ports(), static_cast< sender< output_type > *>(this) );
2223 }
2224 #if __TBB_VARIADIC_MAX >= 6
2225 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2226 typename __TBB_B5>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5>)2227 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
2228 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2229 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2230 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2231 this->input_ports(), static_cast< sender< output_type > *>(this) );
2232 }
2233 #endif
2234 #if __TBB_VARIADIC_MAX >= 7
2235 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2236 typename __TBB_B5, typename __TBB_B6>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6>)2237 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
2238 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2239 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2240 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2241 this->input_ports(), static_cast< sender< output_type > *>(this) );
2242 }
2243 #endif
2244 #if __TBB_VARIADIC_MAX >= 8
2245 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2246 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7>)2247 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
2248 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2249 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2250 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2251 this->input_ports(), static_cast< sender< output_type > *>(this) );
2252 }
2253 #endif
2254 #if __TBB_VARIADIC_MAX >= 9
2255 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2256 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7,__TBB_B8>)2257 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
2258 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2259 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2260 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2261 this->input_ports(), static_cast< sender< output_type > *>(this) );
2262 }
2263 #endif
2264 #if __TBB_VARIADIC_MAX >= 10
2265 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2266 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7,__TBB_B8,__TBB_B9>)2267 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
2268 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2269 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2270 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2271 this->input_ports(), static_cast< sender< output_type > *>(this) );
2272 }
2273 #endif
2274
2275 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2276 template <
2277 #if (__clang_major__ == 3 && __clang_minor__ == 4)
2278 // clang 3.4 misdeduces 'Args...' for 'node_set' while it can cope with template template parameter.
2279 template<typename...> class node_set,
2280 #endif
2281 typename... Args, typename... Bodies
2282 >
2283 __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
join_node(const node_set<Args...> & nodes,Bodies...bodies)2284 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
2285 : join_node(nodes.graph_reference(), bodies...) {
2286 make_edges_in_order(nodes, *this);
2287 }
2288 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2289
join_node(const join_node & other)2290 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2291 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2292 this->input_ports(), static_cast< sender< output_type > *>(this) );
2293 }
2294
2295 };
2296
2297 // indexer node
2298 #include "detail/_flow_graph_indexer_impl.h"
2299
2300 // TODO: Implement interface with variadic template or tuple
2301 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2302 typename T4=null_type, typename T5=null_type, typename T6=null_type,
2303 typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2304
2305 //indexer node specializations
2306 template<typename T0>
2307 class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
2308 private:
2309 static const int N = 1;
2310 public:
2311 typedef std::tuple<T0> InputTuple;
2312 typedef tagged_msg<size_t, T0> output_type;
2313 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2314 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2315 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2316 this->input_ports(), static_cast< sender< output_type > *>(this) );
2317 }
2318
2319 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2320 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2321 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2322 make_edges_in_order(nodes, *this);
2323 }
2324 #endif
2325
2326 // Copy constructor
indexer_node(const indexer_node & other)2327 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2328 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2329 this->input_ports(), static_cast< sender< output_type > *>(this) );
2330 }
2331 };
2332
2333 template<typename T0, typename T1>
2334 class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
2335 private:
2336 static const int N = 2;
2337 public:
2338 typedef std::tuple<T0, T1> InputTuple;
2339 typedef tagged_msg<size_t, T0, T1> output_type;
2340 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2341 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2342 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2343 this->input_ports(), static_cast< sender< output_type > *>(this) );
2344 }
2345
2346 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2347 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2348 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2349 make_edges_in_order(nodes, *this);
2350 }
2351 #endif
2352
2353 // Copy constructor
indexer_node(const indexer_node & other)2354 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2355 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2356 this->input_ports(), static_cast< sender< output_type > *>(this) );
2357 }
2358
2359 };
2360
2361 template<typename T0, typename T1, typename T2>
2362 class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
2363 private:
2364 static const int N = 3;
2365 public:
2366 typedef std::tuple<T0, T1, T2> InputTuple;
2367 typedef tagged_msg<size_t, T0, T1, T2> output_type;
2368 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2369 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2370 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2371 this->input_ports(), static_cast< sender< output_type > *>(this) );
2372 }
2373
2374 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2375 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2376 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2377 make_edges_in_order(nodes, *this);
2378 }
2379 #endif
2380
2381 // Copy constructor
indexer_node(const indexer_node & other)2382 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2383 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2384 this->input_ports(), static_cast< sender< output_type > *>(this) );
2385 }
2386
2387 };
2388
2389 template<typename T0, typename T1, typename T2, typename T3>
2390 class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
2391 private:
2392 static const int N = 4;
2393 public:
2394 typedef std::tuple<T0, T1, T2, T3> InputTuple;
2395 typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
2396 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2397 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2398 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2399 this->input_ports(), static_cast< sender< output_type > *>(this) );
2400 }
2401
2402 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2403 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2404 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2405 make_edges_in_order(nodes, *this);
2406 }
2407 #endif
2408
2409 // Copy constructor
indexer_node(const indexer_node & other)2410 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2411 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2412 this->input_ports(), static_cast< sender< output_type > *>(this) );
2413 }
2414
2415 };
2416
2417 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2418 class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
2419 private:
2420 static const int N = 5;
2421 public:
2422 typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
2423 typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2424 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2425 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2426 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2427 this->input_ports(), static_cast< sender< output_type > *>(this) );
2428 }
2429
2430 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2431 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2432 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2433 make_edges_in_order(nodes, *this);
2434 }
2435 #endif
2436
2437 // Copy constructor
indexer_node(const indexer_node & other)2438 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2439 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2440 this->input_ports(), static_cast< sender< output_type > *>(this) );
2441 }
2442
2443 };
2444
2445 #if __TBB_VARIADIC_MAX >= 6
2446 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2447 class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
2448 private:
2449 static const int N = 6;
2450 public:
2451 typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2452 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
2453 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2454 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2455 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2456 this->input_ports(), static_cast< sender< output_type > *>(this) );
2457 }
2458
2459 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2460 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2461 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2462 make_edges_in_order(nodes, *this);
2463 }
2464 #endif
2465
2466 // Copy constructor
indexer_node(const indexer_node & other)2467 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2468 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2469 this->input_ports(), static_cast< sender< output_type > *>(this) );
2470 }
2471
2472 };
2473 #endif //variadic max 6
2474
2475 #if __TBB_VARIADIC_MAX >= 7
2476 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2477 typename T6>
2478 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
2479 private:
2480 static const int N = 7;
2481 public:
2482 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
2483 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
2484 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2485 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2486 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2487 this->input_ports(), static_cast< sender< output_type > *>(this) );
2488 }
2489
2490 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2491 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2492 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2493 make_edges_in_order(nodes, *this);
2494 }
2495 #endif
2496
2497 // Copy constructor
indexer_node(const indexer_node & other)2498 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2499 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2500 this->input_ports(), static_cast< sender< output_type > *>(this) );
2501 }
2502
2503 };
2504 #endif //variadic max 7
2505
2506 #if __TBB_VARIADIC_MAX >= 8
2507 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2508 typename T6, typename T7>
2509 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
2510 private:
2511 static const int N = 8;
2512 public:
2513 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
2514 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
2515 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2516 indexer_node(graph& g) : unfolded_type(g) {
2517 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2518 this->input_ports(), static_cast< sender< output_type > *>(this) );
2519 }
2520
2521 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2522 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2523 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2524 make_edges_in_order(nodes, *this);
2525 }
2526 #endif
2527
2528 // Copy constructor
indexer_node(const indexer_node & other)2529 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2530 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2531 this->input_ports(), static_cast< sender< output_type > *>(this) );
2532 }
2533
2534 };
2535 #endif //variadic max 8
2536
2537 #if __TBB_VARIADIC_MAX >= 9
2538 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2539 typename T6, typename T7, typename T8>
2540 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
2541 private:
2542 static const int N = 9;
2543 public:
2544 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
2545 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
2546 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2547 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2548 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2549 this->input_ports(), static_cast< sender< output_type > *>(this) );
2550 }
2551
2552 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2553 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2554 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2555 make_edges_in_order(nodes, *this);
2556 }
2557 #endif
2558
2559 // Copy constructor
indexer_node(const indexer_node & other)2560 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2561 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2562 this->input_ports(), static_cast< sender< output_type > *>(this) );
2563 }
2564
2565 };
2566 #endif //variadic max 9
2567
2568 #if __TBB_VARIADIC_MAX >= 10
2569 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2570 typename T6, typename T7, typename T8, typename T9>
2571 class indexer_node/*default*/ : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
2572 private:
2573 static const int N = 10;
2574 public:
2575 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
2576 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
2577 typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2578 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2579 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2580 this->input_ports(), static_cast< sender< output_type > *>(this) );
2581 }
2582
2583 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2584 template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2585 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2586 make_edges_in_order(nodes, *this);
2587 }
2588 #endif
2589
2590 // Copy constructor
indexer_node(const indexer_node & other)2591 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2592 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2593 this->input_ports(), static_cast< sender< output_type > *>(this) );
2594 }
2595
2596 };
2597 #endif //variadic max 10
2598
2599 template< typename T >
internal_make_edge(sender<T> & p,receiver<T> & s)2600 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
2601 register_successor(p, s);
2602 fgt_make_edge( &p, &s );
2603 }
2604
2605 //! Makes an edge between a single predecessor and a single successor
2606 template< typename T >
make_edge(sender<T> & p,receiver<T> & s)2607 inline void make_edge( sender<T> &p, receiver<T> &s ) {
2608 internal_make_edge( p, s );
2609 }
2610
2611 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
2612 template< typename T, typename V,
2613 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
make_edge(T & output,V & input)2614 inline void make_edge( T& output, V& input) {
2615 make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2616 }
2617
2618 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
2619 template< typename T, typename R,
2620 typename = typename T::output_ports_type >
make_edge(T & output,receiver<R> & input)2621 inline void make_edge( T& output, receiver<R>& input) {
2622 make_edge(std::get<0>(output.output_ports()), input);
2623 }
2624
2625 //Makes an edge from a sender to port 0 of a multi-input successor.
2626 template< typename S, typename V,
2627 typename = typename V::input_ports_type >
make_edge(sender<S> & output,V & input)2628 inline void make_edge( sender<S>& output, V& input) {
2629 make_edge(output, std::get<0>(input.input_ports()));
2630 }
2631
2632 template< typename T >
internal_remove_edge(sender<T> & p,receiver<T> & s)2633 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
2634 remove_successor( p, s );
2635 fgt_remove_edge( &p, &s );
2636 }
2637
2638 //! Removes an edge between a single predecessor and a single successor
2639 template< typename T >
remove_edge(sender<T> & p,receiver<T> & s)2640 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
2641 internal_remove_edge( p, s );
2642 }
2643
2644 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
2645 template< typename T, typename V,
2646 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
remove_edge(T & output,V & input)2647 inline void remove_edge( T& output, V& input) {
2648 remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2649 }
2650
2651 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
2652 template< typename T, typename R,
2653 typename = typename T::output_ports_type >
remove_edge(T & output,receiver<R> & input)2654 inline void remove_edge( T& output, receiver<R>& input) {
2655 remove_edge(std::get<0>(output.output_ports()), input);
2656 }
2657 //Removes an edge between a sender and port 0 of a multi-input successor.
2658 template< typename S, typename V,
2659 typename = typename V::input_ports_type >
remove_edge(sender<S> & output,V & input)2660 inline void remove_edge( sender<S>& output, V& input) {
2661 remove_edge(output, std::get<0>(input.input_ports()));
2662 }
2663
2664 //! Returns a copy of the body from a function or continue node
2665 template< typename Body, typename Node >
copy_body(Node & n)2666 Body copy_body( Node &n ) {
2667 return n.template copy_function_object<Body>();
2668 }
2669
2670 //composite_node
2671 template< typename InputTuple, typename OutputTuple > class composite_node;
2672
2673 template< typename... InputTypes, typename... OutputTypes>
2674 class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
2675
2676 public:
2677 typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2678 typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2679
2680 private:
2681 std::unique_ptr<input_ports_type> my_input_ports;
2682 std::unique_ptr<output_ports_type> my_output_ports;
2683
2684 static const size_t NUM_INPUTS = sizeof...(InputTypes);
2685 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2686
2687 protected:
reset_node(reset_flags)2688 void reset_node(reset_flags) override {}
2689
2690 public:
composite_node(graph & g)2691 composite_node( graph &g ) : graph_node(g) {
2692 fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
2693 }
2694
2695 template<typename T1, typename T2>
set_external_ports(T1 && input_ports_tuple,T2 && output_ports_tuple)2696 void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
2697 static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2698 static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2699
2700 fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2701 fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2702
2703 my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
2704 my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
2705 }
2706
2707 template< typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)2708 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2709
2710 template< typename... NodeTypes >
add_nodes(const NodeTypes &...n)2711 void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2712
2713
input_ports()2714 input_ports_type& input_ports() {
2715 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2716 return *my_input_ports;
2717 }
2718
output_ports()2719 output_ports_type& output_ports() {
2720 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2721 return *my_output_ports;
2722 }
2723 }; // class composite_node
2724
2725 //composite_node with only input ports
2726 template< typename... InputTypes>
2727 class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
2728 public:
2729 typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2730
2731 private:
2732 std::unique_ptr<input_ports_type> my_input_ports;
2733 static const size_t NUM_INPUTS = sizeof...(InputTypes);
2734
2735 protected:
reset_node(reset_flags)2736 void reset_node(reset_flags) override {}
2737
2738 public:
composite_node(graph & g)2739 composite_node( graph &g ) : graph_node(g) {
2740 fgt_composite( CODEPTR(), this, &g );
2741 }
2742
2743 template<typename T>
set_external_ports(T && input_ports_tuple)2744 void set_external_ports(T&& input_ports_tuple) {
2745 static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2746
2747 fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2748
2749 my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
2750 }
2751
2752 template< typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)2753 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2754
2755 template< typename... NodeTypes >
add_nodes(const NodeTypes &...n)2756 void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2757
2758
input_ports()2759 input_ports_type& input_ports() {
2760 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2761 return *my_input_ports;
2762 }
2763
2764 }; // class composite_node
2765
2766 //composite_nodes with only output_ports
2767 template<typename... OutputTypes>
2768 class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
2769 public:
2770 typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2771
2772 private:
2773 std::unique_ptr<output_ports_type> my_output_ports;
2774 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2775
2776 protected:
reset_node(reset_flags)2777 void reset_node(reset_flags) override {}
2778
2779 public:
composite_node(graph & g)2780 __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
2781 fgt_composite( CODEPTR(), this, &g );
2782 }
2783
2784 template<typename T>
set_external_ports(T && output_ports_tuple)2785 void set_external_ports(T&& output_ports_tuple) {
2786 static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2787
2788 fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2789
2790 my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
2791 }
2792
2793 template<typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)2794 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2795
2796 template<typename... NodeTypes >
add_nodes(const NodeTypes &...n)2797 void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2798
2799
output_ports()2800 output_ports_type& output_ports() {
2801 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2802 return *my_output_ports;
2803 }
2804
2805 }; // class composite_node
2806
2807 template<typename Gateway>
2808 class async_body_base: no_assign {
2809 public:
2810 typedef Gateway gateway_type;
2811
async_body_base(gateway_type * gateway)2812 async_body_base(gateway_type *gateway): my_gateway(gateway) { }
set_gateway(gateway_type * gateway)2813 void set_gateway(gateway_type *gateway) {
2814 my_gateway = gateway;
2815 }
2816
2817 protected:
2818 gateway_type *my_gateway;
2819 };
2820
2821 template<typename Input, typename Ports, typename Gateway, typename Body>
2822 class async_body: public async_body_base<Gateway> {
2823 public:
2824 typedef async_body_base<Gateway> base_type;
2825 typedef Gateway gateway_type;
2826
async_body(const Body & body,gateway_type * gateway)2827 async_body(const Body &body, gateway_type *gateway)
2828 : base_type(gateway), my_body(body) { }
2829
operator()2830 void operator()( const Input &v, Ports & ) {
2831 my_body(v, *this->my_gateway);
2832 }
2833
get_body()2834 Body get_body() { return my_body; }
2835
2836 private:
2837 Body my_body;
2838 };
2839
2840 //! Implements async node
2841 template < typename Input, typename Output,
2842 typename Policy = queueing_lightweight >
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)2843 __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
2844 class async_node
2845 : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
2846 {
2847 typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
2848 typedef multifunction_input<
2849 Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
2850
2851 public:
2852 typedef Input input_type;
2853 typedef Output output_type;
2854 typedef receiver<input_type> receiver_type;
2855 typedef receiver<output_type> successor_type;
2856 typedef sender<input_type> predecessor_type;
2857 typedef receiver_gateway<output_type> gateway_type;
2858 typedef async_body_base<gateway_type> async_body_base_type;
2859 typedef typename base_type::output_ports_type output_ports_type;
2860
2861 private:
2862 class receiver_gateway_impl: public receiver_gateway<Output> {
2863 public:
2864 receiver_gateway_impl(async_node* node): my_node(node) {}
2865 void reserve_wait() override {
2866 fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
2867 my_node->my_graph.reserve_wait();
2868 }
2869
2870 void release_wait() override {
2871 async_node* n = my_node;
2872 graph* g = &n->my_graph;
2873 g->release_wait();
2874 fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
2875 }
2876
2877 //! Implements gateway_type::try_put for an external activity to submit a message to FG
2878 bool try_put(const Output &i) override {
2879 return my_node->try_put_impl(i);
2880 }
2881
2882 private:
2883 async_node* my_node;
2884 } my_gateway;
2885
2886 //The substitute of 'this' for member construction, to prevent compiler warnings
2887 async_node* self() { return this; }
2888
2889 //! Implements gateway_type::try_put for an external activity to submit a message to FG
2890 bool try_put_impl(const Output &i) {
2891 multifunction_output<Output> &port_0 = output_port<0>(*this);
2892 broadcast_cache<output_type>& port_successors = port_0.successors();
2893 fgt_async_try_put_begin(this, &port_0);
2894 // TODO revamp: change to std::list<graph_task*>
2895 graph_task_list tasks;
2896 bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
2897 __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
2898 "Return status is inconsistent with the method operation." );
2899
2900 while( !tasks.empty() ) {
2901 enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
2902 }
2903 fgt_async_try_put_end(this, &port_0);
2904 return is_at_least_one_put_successful;
2905 }
2906
2907 public:
2908 template<typename Body>
2909 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2910 __TBB_NOINLINE_SYM async_node(
2911 graph &g, size_t concurrency,
2912 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
2913 ) : base_type(
2914 g, concurrency,
2915 async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
2916 (body, &my_gateway), a_priority ), my_gateway(self()) {
2917 fgt_multioutput_node_with_body<1>(
2918 CODEPTR(), FLOW_ASYNC_NODE,
2919 &this->my_graph, static_cast<receiver<input_type> *>(this),
2920 this->output_ports(), this->my_body
2921 );
2922 }
2923
2924 template <typename Body>
2925 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2926 __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
2927 : async_node(g, concurrency, body, Policy(), a_priority) {}
2928
2929 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2930 template <typename Body, typename... Args>
2931 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2932 __TBB_NOINLINE_SYM async_node(
2933 const node_set<Args...>& nodes, size_t concurrency, Body body,
2934 Policy = Policy(), node_priority_t a_priority = no_priority )
2935 : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
2936 make_edges_in_order(nodes, *this);
2937 }
2938
2939 template <typename Body, typename... Args>
2940 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2941 __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
2942 : async_node(nodes, concurrency, body, Policy(), a_priority) {}
2943 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2944
2945 __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
2946 static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
2947 static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
2948
2949 fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
2950 &this->my_graph, static_cast<receiver<input_type> *>(this),
2951 this->output_ports(), this->my_body );
2952 }
2953
2954 gateway_type& gateway() {
2955 return my_gateway;
2956 }
2957
2958 // Define sender< Output >
2959
2960 //! Add a new successor to this node
2961 bool register_successor(successor_type&) override {
2962 __TBB_ASSERT(false, "Successors must be registered only via ports");
2963 return false;
2964 }
2965
2966 //! Removes a successor from this node
2967 bool remove_successor(successor_type&) override {
2968 __TBB_ASSERT(false, "Successors must be removed only via ports");
2969 return false;
2970 }
2971
2972 template<typename Body>
2973 Body copy_function_object() {
2974 typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
2975 typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
2976 mfn_body_type &body_ref = *this->my_body;
2977 async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
2978 return ab.get_body();
2979 }
2980
2981 protected:
2982
2983 void reset_node( reset_flags f) override {
2984 base_type::reset_node(f);
2985 }
2986 };
2987
2988 #include "detail/_flow_graph_node_set_impl.h"
2989
2990 template< typename T >
2991 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
2992 public:
2993 typedef T input_type;
2994 typedef T output_type;
2995 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2996 typedef typename sender<output_type>::successor_type successor_type;
2997
overwrite_node(graph & g)2998 __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
2999 : graph_node(g), my_successors(this), my_buffer_is_valid(false)
3000 {
3001 fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
3002 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3003 }
3004
3005 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3006 template <typename... Args>
overwrite_node(const node_set<Args...> & nodes)3007 overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
3008 make_edges_in_order(nodes, *this);
3009 }
3010 #endif
3011
3012 //! Copy constructor; doesn't take anything from src; default won't work
overwrite_node(const overwrite_node & src)3013 __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
3014
~overwrite_node()3015 ~overwrite_node() {}
3016
register_successor(successor_type & s)3017 bool register_successor( successor_type &s ) override {
3018 spin_mutex::scoped_lock l( my_mutex );
3019 if (my_buffer_is_valid && is_graph_active( my_graph )) {
3020 // We have a valid value that must be forwarded immediately.
3021 bool ret = s.try_put( my_buffer );
3022 if ( ret ) {
3023 // We add the successor that accepted our put
3024 my_successors.register_successor( s );
3025 } else {
3026 // In case of reservation a race between the moment of reservation and register_successor can appear,
3027 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3028 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3029 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3030 small_object_allocator allocator{};
3031 typedef register_predecessor_task task_type;
3032 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
3033 graph_reference().reserve_wait();
3034 spawn_in_graph_arena( my_graph, *t );
3035 }
3036 } else {
3037 // No valid value yet, just add as successor
3038 my_successors.register_successor( s );
3039 }
3040 return true;
3041 }
3042
remove_successor(successor_type & s)3043 bool remove_successor( successor_type &s ) override {
3044 spin_mutex::scoped_lock l( my_mutex );
3045 my_successors.remove_successor(s);
3046 return true;
3047 }
3048
try_get(input_type & v)3049 bool try_get( input_type &v ) override {
3050 spin_mutex::scoped_lock l( my_mutex );
3051 if ( my_buffer_is_valid ) {
3052 v = my_buffer;
3053 return true;
3054 }
3055 return false;
3056 }
3057
3058 //! Reserves an item
try_reserve(T & v)3059 bool try_reserve( T &v ) override {
3060 return try_get(v);
3061 }
3062
3063 //! Releases the reserved item
try_release()3064 bool try_release() override { return true; }
3065
3066 //! Consumes the reserved item
try_consume()3067 bool try_consume() override { return true; }
3068
is_valid()3069 bool is_valid() {
3070 spin_mutex::scoped_lock l( my_mutex );
3071 return my_buffer_is_valid;
3072 }
3073
clear()3074 void clear() {
3075 spin_mutex::scoped_lock l( my_mutex );
3076 my_buffer_is_valid = false;
3077 }
3078
3079 protected:
3080
3081 template< typename R, typename B > friend class run_and_put_task;
3082 template<typename X, typename Y> friend class broadcast_cache;
3083 template<typename X, typename Y> friend class round_robin_cache;
try_put_task(const input_type & v)3084 graph_task* try_put_task( const input_type &v ) override {
3085 spin_mutex::scoped_lock l( my_mutex );
3086 return try_put_task_impl(v);
3087 }
3088
try_put_task_impl(const input_type & v)3089 graph_task * try_put_task_impl(const input_type &v) {
3090 my_buffer = v;
3091 my_buffer_is_valid = true;
3092 graph_task* rtask = my_successors.try_put_task(v);
3093 if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3094 return rtask;
3095 }
3096
graph_reference()3097 graph& graph_reference() const override {
3098 return my_graph;
3099 }
3100
3101 //! Breaks an infinite loop between the node reservation and register_successor call
3102 struct register_predecessor_task : public graph_task {
register_predecessor_taskregister_predecessor_task3103 register_predecessor_task(
3104 graph& g, small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
3105 : graph_task(g, allocator), o(owner), s(succ) {};
3106
executeregister_predecessor_task3107 task* execute(execution_data& ed) override {
3108 // TODO revamp: investigate why qualification is needed for register_successor() call
3109 using tbb::detail::d1::register_predecessor;
3110 using tbb::detail::d1::register_successor;
3111 if ( !register_predecessor(s, o) ) {
3112 register_successor(o, s);
3113 }
3114 finalize(ed);
3115 return nullptr;
3116 }
3117
3118 predecessor_type& o;
3119 successor_type& s;
3120 };
3121
3122 spin_mutex my_mutex;
3123 broadcast_cache< input_type, null_rw_mutex > my_successors;
3124 input_type my_buffer;
3125 bool my_buffer_is_valid;
3126
reset_node(reset_flags f)3127 void reset_node( reset_flags f) override {
3128 my_buffer_is_valid = false;
3129 if (f&rf_clear_edges) {
3130 my_successors.clear();
3131 }
3132 }
3133 }; // overwrite_node
3134
3135 template< typename T >
3136 class write_once_node : public overwrite_node<T> {
3137 public:
3138 typedef T input_type;
3139 typedef T output_type;
3140 typedef overwrite_node<T> base_type;
3141 typedef typename receiver<input_type>::predecessor_type predecessor_type;
3142 typedef typename sender<output_type>::successor_type successor_type;
3143
3144 //! Constructor
write_once_node(graph & g)3145 __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
3146 fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3147 static_cast<receiver<input_type> *>(this),
3148 static_cast<sender<output_type> *>(this) );
3149 }
3150
3151 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3152 template <typename... Args>
write_once_node(const node_set<Args...> & nodes)3153 write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
3154 make_edges_in_order(nodes, *this);
3155 }
3156 #endif
3157
3158 //! Copy constructor: call base class copy constructor
write_once_node(const write_once_node & src)3159 __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
3160 fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3161 static_cast<receiver<input_type> *>(this),
3162 static_cast<sender<output_type> *>(this) );
3163 }
3164
3165 protected:
3166 template< typename R, typename B > friend class run_and_put_task;
3167 template<typename X, typename Y> friend class broadcast_cache;
3168 template<typename X, typename Y> friend class round_robin_cache;
try_put_task(const T & v)3169 graph_task *try_put_task( const T &v ) override {
3170 spin_mutex::scoped_lock l( this->my_mutex );
3171 return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
3172 }
3173 }; // write_once_node
3174
set_name(const graph & g,const char * name)3175 inline void set_name(const graph& g, const char *name) {
3176 fgt_graph_desc(&g, name);
3177 }
3178
3179 template <typename Output>
set_name(const input_node<Output> & node,const char * name)3180 inline void set_name(const input_node<Output>& node, const char *name) {
3181 fgt_node_desc(&node, name);
3182 }
3183
3184 template <typename Input, typename Output, typename Policy>
set_name(const function_node<Input,Output,Policy> & node,const char * name)3185 inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
3186 fgt_node_desc(&node, name);
3187 }
3188
3189 template <typename Output, typename Policy>
set_name(const continue_node<Output,Policy> & node,const char * name)3190 inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
3191 fgt_node_desc(&node, name);
3192 }
3193
3194 template <typename T>
set_name(const broadcast_node<T> & node,const char * name)3195 inline void set_name(const broadcast_node<T>& node, const char *name) {
3196 fgt_node_desc(&node, name);
3197 }
3198
3199 template <typename T>
set_name(const buffer_node<T> & node,const char * name)3200 inline void set_name(const buffer_node<T>& node, const char *name) {
3201 fgt_node_desc(&node, name);
3202 }
3203
3204 template <typename T>
set_name(const queue_node<T> & node,const char * name)3205 inline void set_name(const queue_node<T>& node, const char *name) {
3206 fgt_node_desc(&node, name);
3207 }
3208
3209 template <typename T>
set_name(const sequencer_node<T> & node,const char * name)3210 inline void set_name(const sequencer_node<T>& node, const char *name) {
3211 fgt_node_desc(&node, name);
3212 }
3213
3214 template <typename T, typename Compare>
set_name(const priority_queue_node<T,Compare> & node,const char * name)3215 inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
3216 fgt_node_desc(&node, name);
3217 }
3218
3219 template <typename T, typename DecrementType>
set_name(const limiter_node<T,DecrementType> & node,const char * name)3220 inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
3221 fgt_node_desc(&node, name);
3222 }
3223
3224 template <typename OutputTuple, typename JP>
set_name(const join_node<OutputTuple,JP> & node,const char * name)3225 inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
3226 fgt_node_desc(&node, name);
3227 }
3228
3229 template <typename... Types>
set_name(const indexer_node<Types...> & node,const char * name)3230 inline void set_name(const indexer_node<Types...>& node, const char *name) {
3231 fgt_node_desc(&node, name);
3232 }
3233
3234 template <typename T>
set_name(const overwrite_node<T> & node,const char * name)3235 inline void set_name(const overwrite_node<T>& node, const char *name) {
3236 fgt_node_desc(&node, name);
3237 }
3238
3239 template <typename T>
set_name(const write_once_node<T> & node,const char * name)3240 inline void set_name(const write_once_node<T>& node, const char *name) {
3241 fgt_node_desc(&node, name);
3242 }
3243
3244 template<typename Input, typename Output, typename Policy>
set_name(const multifunction_node<Input,Output,Policy> & node,const char * name)3245 inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
3246 fgt_multioutput_node_desc(&node, name);
3247 }
3248
3249 template<typename TupleType>
set_name(const split_node<TupleType> & node,const char * name)3250 inline void set_name(const split_node<TupleType>& node, const char *name) {
3251 fgt_multioutput_node_desc(&node, name);
3252 }
3253
3254 template< typename InputTuple, typename OutputTuple >
set_name(const composite_node<InputTuple,OutputTuple> & node,const char * name)3255 inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
3256 fgt_multiinput_multioutput_node_desc(&node, name);
3257 }
3258
3259 template<typename Input, typename Output, typename Policy>
set_name(const async_node<Input,Output,Policy> & node,const char * name)3260 inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
3261 {
3262 fgt_multioutput_node_desc(&node, name);
3263 }
3264 } // d1
3265 } // detail
3266 } // tbb
3267
3268
3269 // Include deduction guides for node classes
3270 #include "detail/_flow_graph_nodes_deduction.h"
3271
3272 namespace tbb {
3273 namespace flow {
3274 inline namespace v1 {
3275 using detail::d1::receiver;
3276 using detail::d1::sender;
3277
3278 using detail::d1::serial;
3279 using detail::d1::unlimited;
3280
3281 using detail::d1::reset_flags;
3282 using detail::d1::rf_reset_protocol;
3283 using detail::d1::rf_reset_bodies;
3284 using detail::d1::rf_clear_edges;
3285
3286 using detail::d1::graph;
3287 using detail::d1::graph_node;
3288 using detail::d1::continue_msg;
3289
3290 using detail::d1::input_node;
3291 using detail::d1::function_node;
3292 using detail::d1::multifunction_node;
3293 using detail::d1::split_node;
3294 using detail::d1::output_port;
3295 using detail::d1::indexer_node;
3296 using detail::d1::tagged_msg;
3297 using detail::d1::cast_to;
3298 using detail::d1::is_a;
3299 using detail::d1::continue_node;
3300 using detail::d1::overwrite_node;
3301 using detail::d1::write_once_node;
3302 using detail::d1::broadcast_node;
3303 using detail::d1::buffer_node;
3304 using detail::d1::queue_node;
3305 using detail::d1::sequencer_node;
3306 using detail::d1::priority_queue_node;
3307 using detail::d1::limiter_node;
3308 using namespace detail::d1::graph_policy_namespace;
3309 using detail::d1::join_node;
3310 using detail::d1::input_port;
3311 using detail::d1::copy_body;
3312 using detail::d1::make_edge;
3313 using detail::d1::remove_edge;
3314 using detail::d1::tag_value;
3315 using detail::d1::composite_node;
3316 using detail::d1::async_node;
3317 using detail::d1::node_priority_t;
3318 using detail::d1::no_priority;
3319
3320 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3321 using detail::d1::follows;
3322 using detail::d1::precedes;
3323 using detail::d1::make_node_set;
3324 using detail::d1::make_edges;
3325 #endif
3326
3327 } // v1
3328 } // flow
3329
3330 using detail::d1::flow_control;
3331
3332 namespace profiling {
3333 using detail::d1::set_name;
3334 } // profiling
3335
3336 } // tbb
3337
3338
3339 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
3340 // We don't do pragma pop here, since it still gives warning on the USER side
3341 #undef __TBB_NOINLINE_SYM
3342 #endif
3343
3344 #endif // __TBB_flow_graph_H
3345