1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #include <atomic>
21 #include <memory>
22 #include <type_traits>
23 
24 #include "detail/_config.h"
25 #include "detail/_namespace_injection.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
30 #include "detail/_pipeline_filters.h"
31 #include "detail/_task.h"
32 #include "detail/_small_object_pool.h"
33 #include "cache_aligned_allocator.h"
34 #include "detail/_exception.h"
35 #include "detail/_template_helpers.h"
36 #include "detail/_aggregator.h"
37 #include "detail/_allocator_traits.h"
38 #include "detail/_utils.h"
39 #include "profiling.h"
40 #include "task_arena.h"
41 
42 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
43    #if __INTEL_COMPILER
44        // Disabled warning "routine is both inline and noinline"
45        #pragma warning (push)
46        #pragma warning( disable: 2196 )
47    #endif
48    #define __TBB_NOINLINE_SYM __attribute__((noinline))
49 #else
50    #define __TBB_NOINLINE_SYM
51 #endif
52 
53 #include <tuple>
54 #include <list>
55 #include <queue>
56 #if __TBB_CPP20_CONCEPTS_PRESENT
57 #include <concepts>
58 #endif
59 
60 /** @file
61   \brief The graph related classes and functions
62 
63   There are some applications that best express dependencies as messages
64   passed between nodes in a graph.  These messages may contain data or
65   simply act as signals that a predecessors has completed. The graph
66   class and its associated node classes can be used to express such
67   applications.
68 */
69 
70 namespace tbb {
71 namespace detail {
72 
73 namespace d1 {
74 
75 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
76 enum concurrency { unlimited = 0, serial = 1 };
77 
78 //! A generic null type
79 struct null_type {};
80 
81 //! An empty class used for messages that mean "I'm done"
82 class continue_msg {};
83 
84 } // namespace d1
85 
86 #if __TBB_CPP20_CONCEPTS_PRESENT
87 namespace d0 {
88 
89 template <typename ReturnType, typename OutputType>
90 concept node_body_return_type = std::same_as<OutputType, tbb::detail::d1::continue_msg> ||
91                                 std::same_as<OutputType, ReturnType>;
92 
93 template <typename Body, typename Output>
94 concept continue_node_body = std::copy_constructible<Body> &&
requires(Body & body,const tbb::detail::d1::continue_msg & v)95                              requires( Body& body, const tbb::detail::d1::continue_msg& v ) {
96                                  { body(v) } -> node_body_return_type<Output>;
97                              };
98 
99 template <typename Body, typename Input, typename Output>
100 concept function_node_body = std::copy_constructible<Body> &&
requires(Body & body,const Input & v)101                              requires( Body& body, const Input& v ) {
102                                  { body(v) } -> node_body_return_type<Output>;
103                              };
104 
105 template <typename FunctionObject, typename Input, typename Key>
106 concept join_node_function_object = std::copy_constructible<FunctionObject> &&
requires(FunctionObject & func,const Input & v)107                                     requires( FunctionObject& func, const Input& v ) {
108                                         { func(v) } -> adaptive_same_as<Key>;
109                                     };
110 
111 template <typename Body, typename Output>
112 concept input_node_body = std::copy_constructible<Body> &&
requires(Body & body,tbb::detail::d1::flow_control & fc)113                           requires( Body& body, tbb::detail::d1::flow_control& fc ) {
114                               { body(fc) } -> adaptive_same_as<Output>;
115                           };
116 
117 template <typename Body, typename Input, typename OutputPortsType>
118 concept multifunction_node_body = std::copy_constructible<Body> &&
requires(Body & body,const Input & v,OutputPortsType & p)119                                   requires( Body& body, const Input& v, OutputPortsType& p ) {
120                                       body(v, p);
121                                   };
122 
123 template <typename Sequencer, typename Value>
124 concept sequencer = std::copy_constructible<Sequencer> &&
requires(Sequencer & seq,const Value & value)125                     requires( Sequencer& seq, const Value& value ) {
126                         { seq(value) } -> adaptive_same_as<std::size_t>;
127                     };
128 
129 template <typename Body, typename Input, typename GatewayType>
130 concept async_node_body = std::copy_constructible<Body> &&
requires(Body & body,const Input & v,GatewayType & gateway)131                           requires( Body& body, const Input& v, GatewayType& gateway ) {
132                               body(v, gateway);
133                           };
134 
135 } // namespace d0
136 #endif // __TBB_CPP20_CONCEPTS_PRESENT
137 
138 namespace d1 {
139 
140 //! Forward declaration section
141 template< typename T > class sender;
142 template< typename T > class receiver;
143 class continue_receiver;
144 
145 template< typename T, typename U > class limiter_node;  // needed for resetting decrementer
146 
147 template<typename T, typename M> class successor_cache;
148 template<typename T, typename M> class broadcast_cache;
149 template<typename T, typename M> class round_robin_cache;
150 template<typename T, typename M> class predecessor_cache;
151 template<typename T, typename M> class reservable_predecessor_cache;
152 
153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
154 namespace order {
155 struct following;
156 struct preceding;
157 }
158 template<typename Order, typename... Args> struct node_set;
159 #endif
160 
161 
162 } // namespace d1
163 } // namespace detail
164 } // namespace tbb
165 
166 //! The graph class
167 #include "detail/_flow_graph_impl.h"
168 
169 namespace tbb {
170 namespace detail {
171 namespace d1 {
172 
order_tasks(graph_task * first,graph_task * second)173 static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
174     if (second->priority > first->priority)
175         return std::make_pair(second, first);
176     return std::make_pair(first, second);
177 }
178 
179 // submit task if necessary. Returns the non-enqueued task if there is one.
combine_tasks(graph & g,graph_task * left,graph_task * right)180 static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
181     // if no RHS task, don't change left.
182     if (right == NULL) return left;
183     // right != NULL
184     if (left == NULL) return right;
185     if (left == SUCCESSFULLY_ENQUEUED) return right;
186     // left contains a task
187     if (right != SUCCESSFULLY_ENQUEUED) {
188         // both are valid tasks
189         auto tasks_pair = order_tasks(left, right);
190         spawn_in_graph_arena(g, *tasks_pair.first);
191         return tasks_pair.second;
192     }
193     return left;
194 }
195 
196 //! Pure virtual template class that defines a sender of messages of type T
197 template< typename T >
198 class sender {
199 public:
~sender()200     virtual ~sender() {}
201 
202     //! Request an item from the sender
try_get(T &)203     virtual bool try_get( T & ) { return false; }
204 
205     //! Reserves an item in the sender
try_reserve(T &)206     virtual bool try_reserve( T & ) { return false; }
207 
208     //! Releases the reserved item
try_release()209     virtual bool try_release( ) { return false; }
210 
211     //! Consumes the reserved item
try_consume()212     virtual bool try_consume( ) { return false; }
213 
214 protected:
215     //! The output type of this sender
216     typedef T output_type;
217 
218     //! The successor type for this node
219     typedef receiver<T> successor_type;
220 
221     //! Add a new successor to this node
222     virtual bool register_successor( successor_type &r ) = 0;
223 
224     //! Removes a successor from this node
225     virtual bool remove_successor( successor_type &r ) = 0;
226 
227     template<typename C>
228     friend bool register_successor(sender<C>& s, receiver<C>& r);
229 
230     template<typename C>
231     friend bool remove_successor  (sender<C>& s, receiver<C>& r);
232 };  // class sender<T>
233 
234 template<typename C>
register_successor(sender<C> & s,receiver<C> & r)235 bool register_successor(sender<C>& s, receiver<C>& r) {
236     return s.register_successor(r);
237 }
238 
239 template<typename C>
remove_successor(sender<C> & s,receiver<C> & r)240 bool remove_successor(sender<C>& s, receiver<C>& r) {
241     return s.remove_successor(r);
242 }
243 
244 //! Pure virtual template class that defines a receiver of messages of type T
245 template< typename T >
246 class receiver {
247 public:
248     //! Destructor
~receiver()249     virtual ~receiver() {}
250 
251     //! Put an item to the receiver
try_put(const T & t)252     bool try_put( const T& t ) {
253         graph_task *res = try_put_task(t);
254         if (!res) return false;
255         if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
256         return true;
257     }
258 
259     //! put item to successor; return task to run the successor if possible.
260 protected:
261     //! The input type of this receiver
262     typedef T input_type;
263 
264     //! The predecessor type for this node
265     typedef sender<T> predecessor_type;
266 
267     template< typename R, typename B > friend class run_and_put_task;
268     template< typename X, typename Y > friend class broadcast_cache;
269     template< typename X, typename Y > friend class round_robin_cache;
270     virtual graph_task *try_put_task(const T& t) = 0;
271     virtual graph& graph_reference() const = 0;
272 
273     template<typename TT, typename M> friend class successor_cache;
is_continue_receiver()274     virtual bool is_continue_receiver() { return false; }
275 
276     // TODO revamp: reconsider the inheritance and move node priority out of receiver
priority()277     virtual node_priority_t priority() const { return no_priority; }
278 
279     //! Add a predecessor to the node
register_predecessor(predecessor_type &)280     virtual bool register_predecessor( predecessor_type & ) { return false; }
281 
282     //! Remove a predecessor from the node
remove_predecessor(predecessor_type &)283     virtual bool remove_predecessor( predecessor_type & ) { return false; }
284 
285     template <typename C>
286     friend bool register_predecessor(receiver<C>& r, sender<C>& s);
287     template <typename C>
288     friend bool remove_predecessor  (receiver<C>& r, sender<C>& s);
289 }; // class receiver<T>
290 
291 template <typename C>
register_predecessor(receiver<C> & r,sender<C> & s)292 bool register_predecessor(receiver<C>& r, sender<C>& s) {
293     return r.register_predecessor(s);
294 }
295 
296 template <typename C>
remove_predecessor(receiver<C> & r,sender<C> & s)297 bool remove_predecessor(receiver<C>& r, sender<C>& s) {
298     return r.remove_predecessor(s);
299 }
300 
301 //! Base class for receivers of completion messages
302 /** These receivers automatically reset, but cannot be explicitly waited on */
303 class continue_receiver : public receiver< continue_msg > {
304 protected:
305 
306     //! Constructor
continue_receiver(int number_of_predecessors,node_priority_t a_priority)307     explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
308         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
309         my_current_count = 0;
310         my_priority = a_priority;
311     }
312 
313     //! Copy constructor
continue_receiver(const continue_receiver & src)314     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
315         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
316         my_current_count = 0;
317         my_priority = src.my_priority;
318     }
319 
320     //! Increments the trigger threshold
register_predecessor(predecessor_type &)321     bool register_predecessor( predecessor_type & ) override {
322         spin_mutex::scoped_lock l(my_mutex);
323         ++my_predecessor_count;
324         return true;
325     }
326 
327     //! Decrements the trigger threshold
328     /** Does not check to see if the removal of the predecessor now makes the current count
329         exceed the new threshold.  So removing a predecessor while the graph is active can cause
330         unexpected results. */
remove_predecessor(predecessor_type &)331     bool remove_predecessor( predecessor_type & ) override {
332         spin_mutex::scoped_lock l(my_mutex);
333         --my_predecessor_count;
334         return true;
335     }
336 
337     //! The input type
338     typedef continue_msg input_type;
339 
340     //! The predecessor type for this node
341     typedef receiver<input_type>::predecessor_type predecessor_type;
342 
343     template< typename R, typename B > friend class run_and_put_task;
344     template<typename X, typename Y> friend class broadcast_cache;
345     template<typename X, typename Y> friend class round_robin_cache;
346     // execute body is supposed to be too small to create a task for.
try_put_task(const input_type &)347     graph_task* try_put_task( const input_type & ) override {
348         {
349             spin_mutex::scoped_lock l(my_mutex);
350             if ( ++my_current_count < my_predecessor_count )
351                 return SUCCESSFULLY_ENQUEUED;
352             else
353                 my_current_count = 0;
354         }
355         graph_task* res = execute();
356         return res? res : SUCCESSFULLY_ENQUEUED;
357     }
358 
359     spin_mutex my_mutex;
360     int my_predecessor_count;
361     int my_current_count;
362     int my_initial_predecessor_count;
363     node_priority_t my_priority;
364     // the friend declaration in the base class did not eliminate the "protected class"
365     // error in gcc 4.1.2
366     template<typename U, typename V> friend class limiter_node;
367 
reset_receiver(reset_flags f)368     virtual void reset_receiver( reset_flags f ) {
369         my_current_count = 0;
370         if (f & rf_clear_edges) {
371             my_predecessor_count = my_initial_predecessor_count;
372         }
373     }
374 
375     //! Does whatever should happen when the threshold is reached
376     /** This should be very fast or else spawn a task.  This is
377         called while the sender is blocked in the try_put(). */
378     virtual graph_task* execute() = 0;
379     template<typename TT, typename M> friend class successor_cache;
is_continue_receiver()380     bool is_continue_receiver() override { return true; }
381 
priority()382     node_priority_t priority() const override { return my_priority; }
383 }; // class continue_receiver
384 
385 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
386     template <typename K, typename T>
key_from_message(const T & t)387     K key_from_message( const T &t ) {
388         return t.key();
389     }
390 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
391 
392 } // d1
393 } // detail
394 } // tbb
395 
396 #include "detail/_flow_graph_trace_impl.h"
397 #include "detail/_hash_compare.h"
398 
399 namespace tbb {
400 namespace detail {
401 namespace d1 {
402 
403 #include "detail/_flow_graph_body_impl.h"
404 #include "detail/_flow_graph_cache_impl.h"
405 #include "detail/_flow_graph_types_impl.h"
406 
407 using namespace graph_policy_namespace;
408 
409 template <typename C, typename N>
graph_iterator(C * g,bool begin)410 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
411 {
412     if (begin) current_node = my_graph->my_nodes;
413     //else it is an end iterator by default
414 }
415 
416 template <typename C, typename N>
417 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
418     __TBB_ASSERT(current_node, "graph_iterator at end");
419     return *operator->();
420 }
421 
422 template <typename C, typename N>
423 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
424     return current_node;
425 }
426 
427 template <typename C, typename N>
internal_forward()428 void graph_iterator<C,N>::internal_forward() {
429     if (current_node) current_node = current_node->next;
430 }
431 
432 //! Constructs a graph with isolated task_group_context
graph()433 inline graph::graph() : my_wait_context(0), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
434     prepare_task_arena();
435     own_context = true;
436     cancelled = false;
437     caught_exception = false;
438     my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
439     fgt_graph(this);
440     my_is_active = true;
441 }
442 
graph(task_group_context & use_this_context)443 inline graph::graph(task_group_context& use_this_context) :
444     my_wait_context(0), my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
445     prepare_task_arena();
446     own_context = false;
447     cancelled = false;
448     caught_exception = false;
449     fgt_graph(this);
450     my_is_active = true;
451 }
452 
~graph()453 inline graph::~graph() {
454     wait_for_all();
455     if (own_context) {
456         my_context->~task_group_context();
457         r1::cache_aligned_deallocate(my_context);
458     }
459     delete my_task_arena;
460 }
461 
reserve_wait()462 inline void graph::reserve_wait() {
463     my_wait_context.reserve();
464     fgt_reserve_wait(this);
465 }
466 
release_wait()467 inline void graph::release_wait() {
468     fgt_release_wait(this);
469     my_wait_context.release();
470 }
471 
register_node(graph_node * n)472 inline void graph::register_node(graph_node *n) {
473     n->next = NULL;
474     {
475         spin_mutex::scoped_lock lock(nodelist_mutex);
476         n->prev = my_nodes_last;
477         if (my_nodes_last) my_nodes_last->next = n;
478         my_nodes_last = n;
479         if (!my_nodes) my_nodes = n;
480     }
481 }
482 
remove_node(graph_node * n)483 inline void graph::remove_node(graph_node *n) {
484     {
485         spin_mutex::scoped_lock lock(nodelist_mutex);
486         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
487         if (n->prev) n->prev->next = n->next;
488         if (n->next) n->next->prev = n->prev;
489         if (my_nodes_last == n) my_nodes_last = n->prev;
490         if (my_nodes == n) my_nodes = n->next;
491     }
492     n->prev = n->next = NULL;
493 }
494 
reset(reset_flags f)495 inline void graph::reset( reset_flags f ) {
496     // reset context
497     deactivate_graph(*this);
498 
499     my_context->reset();
500     cancelled = false;
501     caught_exception = false;
502     // reset all the nodes comprising the graph
503     for(iterator ii = begin(); ii != end(); ++ii) {
504         graph_node *my_p = &(*ii);
505         my_p->reset_node(f);
506     }
507     // Reattach the arena. Might be useful to run the graph in a particular task_arena
508     // while not limiting graph lifetime to a single task_arena::execute() call.
509     prepare_task_arena( /*reinit=*/true );
510     activate_graph(*this);
511 }
512 
cancel()513 inline void graph::cancel() {
514     my_context->cancel_group_execution();
515 }
516 
begin()517 inline graph::iterator graph::begin() { return iterator(this, true); }
518 
end()519 inline graph::iterator graph::end() { return iterator(this, false); }
520 
begin()521 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
522 
end()523 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
524 
cbegin()525 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
526 
cend()527 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
528 
graph_node(graph & g)529 inline graph_node::graph_node(graph& g) : my_graph(g) {
530     my_graph.register_node(this);
531 }
532 
~graph_node()533 inline graph_node::~graph_node() {
534     my_graph.remove_node(this);
535 }
536 
537 #include "detail/_flow_graph_node_impl.h"
538 
539 
540 //! An executable node that acts as a source, i.e. it has no predecessors
541 
542 template < typename Output >
__TBB_requires(std::copyable<Output>)543     __TBB_requires(std::copyable<Output>)
544 class input_node : public graph_node, public sender< Output > {
545 public:
546     //! The type of the output message, which is complete
547     typedef Output output_type;
548 
549     //! The type of successors of this node
550     typedef typename sender<output_type>::successor_type successor_type;
551 
552     // Input node has no input type
553     typedef null_type input_type;
554 
555     //! Constructor for a node with a successor
556     template< typename Body >
557         __TBB_requires(input_node_body<Body, Output>)
558      __TBB_NOINLINE_SYM input_node( graph &g, Body body )
559          : graph_node(g), my_active(false)
560          , my_body( new input_body_leaf< output_type, Body>(body) )
561          , my_init_body( new input_body_leaf< output_type, Body>(body) )
562          , my_successors(this), my_reserved(false), my_has_cached_item(false)
563     {
564         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
565                            static_cast<sender<output_type> *>(this), this->my_body);
566     }
567 
568 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
569     template <typename Body, typename... Successors>
570         __TBB_requires(input_node_body<Body, Output>)
571     input_node( const node_set<order::preceding, Successors...>& successors, Body body )
572         : input_node(successors.graph_reference(), body)
573     {
574         make_edges(*this, successors);
575     }
576 #endif
577 
578     //! Copy constructor
579     __TBB_NOINLINE_SYM input_node( const input_node& src )
580         : graph_node(src.my_graph), sender<Output>()
581         , my_active(false)
582         , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
583         , my_successors(this), my_reserved(false), my_has_cached_item(false)
584     {
585         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
586                            static_cast<sender<output_type> *>(this), this->my_body);
587     }
588 
589     //! The destructor
590     ~input_node() { delete my_body; delete my_init_body; }
591 
592     //! Add a new successor to this node
593     bool register_successor( successor_type &r ) override {
594         spin_mutex::scoped_lock lock(my_mutex);
595         my_successors.register_successor(r);
596         if ( my_active )
597             spawn_put();
598         return true;
599     }
600 
601     //! Removes a successor from this node
602     bool remove_successor( successor_type &r ) override {
603         spin_mutex::scoped_lock lock(my_mutex);
604         my_successors.remove_successor(r);
605         return true;
606     }
607 
608     //! Request an item from the node
609     bool try_get( output_type &v ) override {
610         spin_mutex::scoped_lock lock(my_mutex);
611         if ( my_reserved )
612             return false;
613 
614         if ( my_has_cached_item ) {
615             v = my_cached_item;
616             my_has_cached_item = false;
617             return true;
618         }
619         // we've been asked to provide an item, but we have none.  enqueue a task to
620         // provide one.
621         if ( my_active )
622             spawn_put();
623         return false;
624     }
625 
626     //! Reserves an item.
627     bool try_reserve( output_type &v ) override {
628         spin_mutex::scoped_lock lock(my_mutex);
629         if ( my_reserved ) {
630             return false;
631         }
632 
633         if ( my_has_cached_item ) {
634             v = my_cached_item;
635             my_reserved = true;
636             return true;
637         } else {
638             return false;
639         }
640     }
641 
642     //! Release a reserved item.
643     /** true = item has been released and so remains in sender, dest must request or reserve future items */
644     bool try_release( ) override {
645         spin_mutex::scoped_lock lock(my_mutex);
646         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
647         my_reserved = false;
648         if(!my_successors.empty())
649             spawn_put();
650         return true;
651     }
652 
653     //! Consumes a reserved item
654     bool try_consume( ) override {
655         spin_mutex::scoped_lock lock(my_mutex);
656         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
657         my_reserved = false;
658         my_has_cached_item = false;
659         if ( !my_successors.empty() ) {
660             spawn_put();
661         }
662         return true;
663     }
664 
665     //! Activates a node that was created in the inactive state
666     void activate() {
667         spin_mutex::scoped_lock lock(my_mutex);
668         my_active = true;
669         if (!my_successors.empty())
670             spawn_put();
671     }
672 
673     template<typename Body>
674     Body copy_function_object() {
675         input_body<output_type> &body_ref = *this->my_body;
676         return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
677     }
678 
679 protected:
680 
681     //! resets the input_node to its initial state
682     void reset_node( reset_flags f) override {
683         my_active = false;
684         my_reserved = false;
685         my_has_cached_item = false;
686 
687         if(f & rf_clear_edges) my_successors.clear();
688         if(f & rf_reset_bodies) {
689             input_body<output_type> *tmp = my_init_body->clone();
690             delete my_body;
691             my_body = tmp;
692         }
693     }
694 
695 private:
696     spin_mutex my_mutex;
697     bool my_active;
698     input_body<output_type> *my_body;
699     input_body<output_type> *my_init_body;
700     broadcast_cache< output_type > my_successors;
701     bool my_reserved;
702     bool my_has_cached_item;
703     output_type my_cached_item;
704 
705     // used by apply_body_bypass, can invoke body of node.
706     bool try_reserve_apply_body(output_type &v) {
707         spin_mutex::scoped_lock lock(my_mutex);
708         if ( my_reserved ) {
709             return false;
710         }
711         if ( !my_has_cached_item ) {
712             flow_control control;
713 
714             fgt_begin_body( my_body );
715 
716             my_cached_item = (*my_body)(control);
717             my_has_cached_item = !control.is_pipeline_stopped;
718 
719             fgt_end_body( my_body );
720         }
721         if ( my_has_cached_item ) {
722             v = my_cached_item;
723             my_reserved = true;
724             return true;
725         } else {
726             return false;
727         }
728     }
729 
730     graph_task* create_put_task() {
731         small_object_allocator allocator{};
732         typedef input_node_task_bypass< input_node<output_type> > task_type;
733         graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
734         my_graph.reserve_wait();
735         return t;
736     }
737 
738     //! Spawns a task that applies the body
739     void spawn_put( ) {
740         if(is_graph_active(this->my_graph)) {
741             spawn_in_graph_arena(this->my_graph, *create_put_task());
742         }
743     }
744 
745     friend class input_node_task_bypass< input_node<output_type> >;
746     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
747     graph_task* apply_body_bypass( ) {
748         output_type v;
749         if ( !try_reserve_apply_body(v) )
750             return NULL;
751 
752         graph_task *last_task = my_successors.try_put_task(v);
753         if ( last_task )
754             try_consume();
755         else
756             try_release();
757         return last_task;
758     }
759 };  // class input_node
760 
761 //! Implements a function node that supports Input -> Output
762 template<typename Input, typename Output = continue_msg, typename Policy = queueing>
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input> && std::copy_constructible<Output>)763     __TBB_requires(std::default_initializable<Input> &&
764                    std::copy_constructible<Input> &&
765                    std::copy_constructible<Output>)
766 class function_node
767     : public graph_node
768     , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
769     , public function_output<Output>
770 {
771     typedef cache_aligned_allocator<Input> internals_allocator;
772 
773 public:
774     typedef Input input_type;
775     typedef Output output_type;
776     typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
777     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
778     typedef function_output<output_type> fOutput_type;
779     typedef typename input_impl_type::predecessor_type predecessor_type;
780     typedef typename fOutput_type::successor_type successor_type;
781 
782     using input_impl_type::my_predecessors;
783 
784     //! Constructor
785     // input_queue_type is allocated here, but destroyed in the function_input_base.
786     // TODO: pass the graph_buffer_policy to the function_input_base so it can all
787     // be done in one place.  This would be an interface-breaking change.
788     template< typename Body >
789         __TBB_requires(function_node_body<Body, Input, Output>)
790      __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
791                    Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
792         : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
793           fOutput_type(g) {
794         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
795                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
796     }
797 
798     template <typename Body>
799         __TBB_requires(function_node_body<Body, Input, Output>)
800     function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
801         : function_node(g, concurrency, body, Policy(), a_priority) {}
802 
803 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
804     template <typename Body, typename... Args>
805         __TBB_requires(function_node_body<Body, Input, Output>)
806     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
807                    Policy p = Policy(), node_priority_t a_priority = no_priority )
808         : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
809         make_edges_in_order(nodes, *this);
810     }
811 
812     template <typename Body, typename... Args>
813         __TBB_requires(function_node_body<Body, Input, Output>)
814     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
815         : function_node(nodes, concurrency, body, Policy(), a_priority) {}
816 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
817 
818     //! Copy constructor
819     __TBB_NOINLINE_SYM function_node( const function_node& src ) :
820         graph_node(src.my_graph),
821         input_impl_type(src),
822         fOutput_type(src.my_graph) {
823         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
824                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
825     }
826 
827 protected:
828     template< typename R, typename B > friend class run_and_put_task;
829     template<typename X, typename Y> friend class broadcast_cache;
830     template<typename X, typename Y> friend class round_robin_cache;
831     using input_impl_type::try_put_task;
832 
833     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
834 
835     void reset_node(reset_flags f) override {
836         input_impl_type::reset_function_input(f);
837         // TODO: use clear() instead.
838         if(f & rf_clear_edges) {
839             successors().clear();
840             my_predecessors.clear();
841         }
842         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
843         __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
844     }
845 
846 };  // class function_node
847 
848 //! implements a function node that supports Input -> (set of outputs)
849 // Output is a tuple of output types.
850 template<typename Input, typename Output, typename Policy = queueing>
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)851     __TBB_requires(std::default_initializable<Input> &&
852                    std::copy_constructible<Input>)
853 class multifunction_node :
854     public graph_node,
855     public multifunction_input
856     <
857         Input,
858         typename wrap_tuple_elements<
859             std::tuple_size<Output>::value,  // #elements in tuple
860             multifunction_output,  // wrap this around each element
861             Output // the tuple providing the types
862         >::type,
863         Policy,
864         cache_aligned_allocator<Input>
865     >
866 {
867     typedef cache_aligned_allocator<Input> internals_allocator;
868 
869 protected:
870     static const int N = std::tuple_size<Output>::value;
871 public:
872     typedef Input input_type;
873     typedef null_type output_type;
874     typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
875     typedef multifunction_input<
876         input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
877     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
878 private:
879     using input_impl_type::my_predecessors;
880 public:
881     template<typename Body>
882         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
883     __TBB_NOINLINE_SYM multifunction_node(
884         graph &g, size_t concurrency,
885         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
886     ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
887         fgt_multioutput_node_with_body<N>(
888             CODEPTR(), FLOW_MULTIFUNCTION_NODE,
889             &this->my_graph, static_cast<receiver<input_type> *>(this),
890             this->output_ports(), this->my_body
891         );
892     }
893 
894     template <typename Body>
895         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
896     __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
897         : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
898 
899 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
900     template <typename Body, typename... Args>
901         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
902     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
903                        Policy p = Policy(), node_priority_t a_priority = no_priority)
904         : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
905         make_edges_in_order(nodes, *this);
906     }
907 
908     template <typename Body, typename... Args>
909         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
910     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
911         : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
912 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
913 
914     __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
915         graph_node(other.my_graph), input_impl_type(other) {
916         fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
917                 &this->my_graph, static_cast<receiver<input_type> *>(this),
918                 this->output_ports(), this->my_body );
919     }
920 
921     // all the guts are in multifunction_input...
922 protected:
923     void reset_node(reset_flags f) override { input_impl_type::reset(f); }
924 };  // multifunction_node
925 
926 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
927 //  successors.  The node has unlimited concurrency, so it does not reject inputs.
928 template<typename TupleType>
929 class split_node : public graph_node, public receiver<TupleType> {
930     static const int N = std::tuple_size<TupleType>::value;
931     typedef receiver<TupleType> base_type;
932 public:
933     typedef TupleType input_type;
934     typedef typename wrap_tuple_elements<
935             N,  // #elements in tuple
936             multifunction_output,  // wrap this around each element
937             TupleType // the tuple providing the types
938         >::type  output_ports_type;
939 
split_node(graph & g)940     __TBB_NOINLINE_SYM explicit split_node(graph &g)
941         : graph_node(g),
942           my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
943     {
944         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
945             static_cast<receiver<input_type> *>(this), this->output_ports());
946     }
947 
948 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
949     template <typename... Args>
split_node(const node_set<Args...> & nodes)950     __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
951         make_edges_in_order(nodes, *this);
952     }
953 #endif
954 
split_node(const split_node & other)955     __TBB_NOINLINE_SYM split_node(const split_node& other)
956         : graph_node(other.my_graph), base_type(other),
957           my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
958     {
959         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
960             static_cast<receiver<input_type> *>(this), this->output_ports());
961     }
962 
output_ports()963     output_ports_type &output_ports() { return my_output_ports; }
964 
965 protected:
try_put_task(const TupleType & t)966     graph_task *try_put_task(const TupleType& t) override {
967         // Sending split messages in parallel is not justified, as overheads would prevail.
968         // Also, we do not have successors here. So we just tell the task returned here is successful.
969         return emit_element<N>::emit_this(this->my_graph, t, output_ports());
970     }
reset_node(reset_flags f)971     void reset_node(reset_flags f) override {
972         if (f & rf_clear_edges)
973             clear_element<N>::clear_this(my_output_ports);
974 
975         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
976     }
graph_reference()977     graph& graph_reference() const override {
978         return my_graph;
979     }
980 
981 private:
982     output_ports_type my_output_ports;
983 };
984 
985 //! Implements an executable node that supports continue_msg -> Output
986 template <typename Output, typename Policy = Policy<void> >
__TBB_requires(std::copy_constructible<Output>)987     __TBB_requires(std::copy_constructible<Output>)
988 class continue_node : public graph_node, public continue_input<Output, Policy>,
989                       public function_output<Output> {
990 public:
991     typedef continue_msg input_type;
992     typedef Output output_type;
993     typedef continue_input<Output, Policy> input_impl_type;
994     typedef function_output<output_type> fOutput_type;
995     typedef typename input_impl_type::predecessor_type predecessor_type;
996     typedef typename fOutput_type::successor_type successor_type;
997 
998     //! Constructor for executable node with continue_msg -> Output
999     template <typename Body >
1000         __TBB_requires(continue_node_body<Body, Output>)
1001     __TBB_NOINLINE_SYM continue_node(
1002         graph &g,
1003         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1004     ) : graph_node(g), input_impl_type( g, body, a_priority ),
1005         fOutput_type(g) {
1006         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1007 
1008                                            static_cast<receiver<input_type> *>(this),
1009                                            static_cast<sender<output_type> *>(this), this->my_body );
1010     }
1011 
1012     template <typename Body>
1013         __TBB_requires(continue_node_body<Body, Output>)
1014     continue_node( graph& g, Body body, node_priority_t a_priority )
1015         : continue_node(g, body, Policy(), a_priority) {}
1016 
1017 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1018     template <typename Body, typename... Args>
1019         __TBB_requires(continue_node_body<Body, Output>)
1020     continue_node( const node_set<Args...>& nodes, Body body,
1021                    Policy p = Policy(), node_priority_t a_priority = no_priority )
1022         : continue_node(nodes.graph_reference(), body, p, a_priority ) {
1023         make_edges_in_order(nodes, *this);
1024     }
1025     template <typename Body, typename... Args>
1026         __TBB_requires(continue_node_body<Body, Output>)
1027     continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
1028         : continue_node(nodes, body, Policy(), a_priority) {}
1029 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1030 
1031     //! Constructor for executable node with continue_msg -> Output
1032     template <typename Body >
1033         __TBB_requires(continue_node_body<Body, Output>)
1034     __TBB_NOINLINE_SYM continue_node(
1035         graph &g, int number_of_predecessors,
1036         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1037     ) : graph_node(g)
1038       , input_impl_type(g, number_of_predecessors, body, a_priority),
1039         fOutput_type(g) {
1040         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1041                                            static_cast<receiver<input_type> *>(this),
1042                                            static_cast<sender<output_type> *>(this), this->my_body );
1043     }
1044 
1045     template <typename Body>
1046         __TBB_requires(continue_node_body<Body, Output>)
1047     continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
1048         : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
1049 
1050 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1051     template <typename Body, typename... Args>
1052         __TBB_requires(continue_node_body<Body, Output>)
1053     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1054                    Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
1055         : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
1056         make_edges_in_order(nodes, *this);
1057     }
1058 
1059     template <typename Body, typename... Args>
1060         __TBB_requires(continue_node_body<Body, Output>)
1061     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1062                    Body body, node_priority_t a_priority )
1063         : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
1064 #endif
1065 
1066     //! Copy constructor
1067     __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1068         graph_node(src.my_graph), input_impl_type(src),
1069         function_output<Output>(src.my_graph) {
1070         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1071                                            static_cast<receiver<input_type> *>(this),
1072                                            static_cast<sender<output_type> *>(this), this->my_body );
1073     }
1074 
1075 protected:
1076     template< typename R, typename B > friend class run_and_put_task;
1077     template<typename X, typename Y> friend class broadcast_cache;
1078     template<typename X, typename Y> friend class round_robin_cache;
1079     using input_impl_type::try_put_task;
1080     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
1081 
1082     void reset_node(reset_flags f) override {
1083         input_impl_type::reset_receiver(f);
1084         if(f & rf_clear_edges)successors().clear();
1085         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1086     }
1087 };  // continue_node
1088 
1089 //! Forwards messages of type T to all successors
1090 template <typename T>
1091 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1092 public:
1093     typedef T input_type;
1094     typedef T output_type;
1095     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1096     typedef typename sender<output_type>::successor_type successor_type;
1097 private:
1098     broadcast_cache<input_type> my_successors;
1099 public:
1100 
broadcast_node(graph & g)1101     __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
1102         fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
1103                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1104     }
1105 
1106 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1107     template <typename... Args>
broadcast_node(const node_set<Args...> & nodes)1108     broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1109         make_edges_in_order(nodes, *this);
1110     }
1111 #endif
1112 
1113     // Copy constructor
broadcast_node(const broadcast_node & src)1114     __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
1115 
1116     //! Adds a successor
register_successor(successor_type & r)1117     bool register_successor( successor_type &r ) override {
1118         my_successors.register_successor( r );
1119         return true;
1120     }
1121 
1122     //! Removes s as a successor
remove_successor(successor_type & r)1123     bool remove_successor( successor_type &r ) override {
1124         my_successors.remove_successor( r );
1125         return true;
1126     }
1127 
1128 protected:
1129     template< typename R, typename B > friend class run_and_put_task;
1130     template<typename X, typename Y> friend class broadcast_cache;
1131     template<typename X, typename Y> friend class round_robin_cache;
1132     //! build a task to run the successor if possible.  Default is old behavior.
try_put_task(const T & t)1133     graph_task *try_put_task(const T& t) override {
1134         graph_task *new_task = my_successors.try_put_task(t);
1135         if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1136         return new_task;
1137     }
1138 
graph_reference()1139     graph& graph_reference() const override {
1140         return my_graph;
1141     }
1142 
reset_node(reset_flags f)1143     void reset_node(reset_flags f) override {
1144         if (f&rf_clear_edges) {
1145            my_successors.clear();
1146         }
1147         __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1148     }
1149 };  // broadcast_node
1150 
1151 //! Forwards messages in arbitrary order
1152 template <typename T>
1153 class buffer_node
1154     : public graph_node
1155     , public reservable_item_buffer< T, cache_aligned_allocator<T> >
1156     , public receiver<T>, public sender<T>
1157 {
1158     typedef cache_aligned_allocator<T> internals_allocator;
1159 
1160 public:
1161     typedef T input_type;
1162     typedef T output_type;
1163     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1164     typedef typename sender<output_type>::successor_type successor_type;
1165     typedef buffer_node<T> class_type;
1166 
1167 protected:
1168     typedef size_t size_type;
1169     round_robin_cache< T, null_rw_mutex > my_successors;
1170 
1171     friend class forward_task_bypass< class_type >;
1172 
1173     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1174     };
1175 
1176     // implements the aggregator_operation concept
1177     class buffer_operation : public aggregated_operation< buffer_operation > {
1178     public:
1179         char type;
1180         T* elem;
1181         graph_task* ltask;
1182         successor_type *r;
1183 
buffer_operation(const T & e,op_type t)1184         buffer_operation(const T& e, op_type t) : type(char(t))
1185                                                   , elem(const_cast<T*>(&e)) , ltask(NULL)
1186         {}
buffer_operation(op_type t)1187         buffer_operation(op_type t) : type(char(t)),  ltask(NULL) {}
1188     };
1189 
1190     bool forwarder_busy;
1191     typedef aggregating_functor<class_type, buffer_operation> handler_type;
1192     friend class aggregating_functor<class_type, buffer_operation>;
1193     aggregator< handler_type, buffer_operation> my_aggregator;
1194 
handle_operations(buffer_operation * op_list)1195     virtual void handle_operations(buffer_operation *op_list) {
1196         handle_operations_impl(op_list, this);
1197     }
1198 
1199     template<typename derived_type>
handle_operations_impl(buffer_operation * op_list,derived_type * derived)1200     void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1201         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1202 
1203         buffer_operation *tmp = NULL;
1204         bool try_forwarding = false;
1205         while (op_list) {
1206             tmp = op_list;
1207             op_list = op_list->next;
1208             switch (tmp->type) {
1209             case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1210             case rem_succ: internal_rem_succ(tmp); break;
1211             case req_item: internal_pop(tmp); break;
1212             case res_item: internal_reserve(tmp); break;
1213             case rel_res:  internal_release(tmp); try_forwarding = true; break;
1214             case con_res:  internal_consume(tmp); try_forwarding = true; break;
1215             case put_item: try_forwarding = internal_push(tmp); break;
1216             case try_fwd_task: internal_forward_task(tmp); break;
1217             }
1218         }
1219 
1220         derived->order();
1221 
1222         if (try_forwarding && !forwarder_busy) {
1223             if(is_graph_active(this->my_graph)) {
1224                 forwarder_busy = true;
1225                 typedef forward_task_bypass<class_type> task_type;
1226                 small_object_allocator allocator{};
1227                 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
1228                 my_graph.reserve_wait();
1229                 // tmp should point to the last item handled by the aggregator.  This is the operation
1230                 // the handling thread enqueued.  So modifying that record will be okay.
1231                 // TODO revamp: check that the issue is still present
1232                 // workaround for icc bug  (at least 12.0 and 13.0)
1233                 // error: function "tbb::flow::interfaceX::combine_tasks" cannot be called with the given argument list
1234                 //        argument types are: (graph, graph_task *, graph_task *)
1235                 graph_task *z = tmp->ltask;
1236                 graph &g = this->my_graph;
1237                 tmp->ltask = combine_tasks(g, z, new_task);  // in case the op generated a task
1238             }
1239         }
1240     }  // handle_operations
1241 
grab_forwarding_task(buffer_operation & op_data)1242     inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
1243         return op_data.ltask;
1244     }
1245 
enqueue_forwarding_task(buffer_operation & op_data)1246     inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1247         graph_task *ft = grab_forwarding_task(op_data);
1248         if(ft) {
1249             spawn_in_graph_arena(graph_reference(), *ft);
1250             return true;
1251         }
1252         return false;
1253     }
1254 
1255     //! This is executed by an enqueued task, the "forwarder"
forward_task()1256     virtual graph_task *forward_task() {
1257         buffer_operation op_data(try_fwd_task);
1258         graph_task *last_task = NULL;
1259         do {
1260             op_data.status = WAIT;
1261             op_data.ltask = NULL;
1262             my_aggregator.execute(&op_data);
1263 
1264             // workaround for icc bug
1265             graph_task *xtask = op_data.ltask;
1266             graph& g = this->my_graph;
1267             last_task = combine_tasks(g, last_task, xtask);
1268         } while (op_data.status ==SUCCEEDED);
1269         return last_task;
1270     }
1271 
1272     //! Register successor
internal_reg_succ(buffer_operation * op)1273     virtual void internal_reg_succ(buffer_operation *op) {
1274         my_successors.register_successor(*(op->r));
1275         op->status.store(SUCCEEDED, std::memory_order_release);
1276     }
1277 
1278     //! Remove successor
internal_rem_succ(buffer_operation * op)1279     virtual void internal_rem_succ(buffer_operation *op) {
1280         my_successors.remove_successor(*(op->r));
1281         op->status.store(SUCCEEDED, std::memory_order_release);
1282     }
1283 
1284 private:
order()1285     void order() {}
1286 
is_item_valid()1287     bool is_item_valid() {
1288         return this->my_item_valid(this->my_tail - 1);
1289     }
1290 
try_put_and_add_task(graph_task * & last_task)1291     void try_put_and_add_task(graph_task*& last_task) {
1292         graph_task *new_task = my_successors.try_put_task(this->back());
1293         if (new_task) {
1294             // workaround for icc bug
1295             graph& g = this->my_graph;
1296             last_task = combine_tasks(g, last_task, new_task);
1297             this->destroy_back();
1298         }
1299     }
1300 
1301 protected:
1302     //! Tries to forward valid items to successors
internal_forward_task(buffer_operation * op)1303     virtual void internal_forward_task(buffer_operation *op) {
1304         internal_forward_task_impl(op, this);
1305     }
1306 
1307     template<typename derived_type>
internal_forward_task_impl(buffer_operation * op,derived_type * derived)1308     void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1309         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1310 
1311         if (this->my_reserved || !derived->is_item_valid()) {
1312             op->status.store(FAILED, std::memory_order_release);
1313             this->forwarder_busy = false;
1314             return;
1315         }
1316         // Try forwarding, giving each successor a chance
1317         graph_task* last_task = NULL;
1318         size_type counter = my_successors.size();
1319         for (; counter > 0 && derived->is_item_valid(); --counter)
1320             derived->try_put_and_add_task(last_task);
1321 
1322         op->ltask = last_task;  // return task
1323         if (last_task && !counter) {
1324             op->status.store(SUCCEEDED, std::memory_order_release);
1325         }
1326         else {
1327             op->status.store(FAILED, std::memory_order_release);
1328             forwarder_busy = false;
1329         }
1330     }
1331 
internal_push(buffer_operation * op)1332     virtual bool internal_push(buffer_operation *op) {
1333         this->push_back(*(op->elem));
1334         op->status.store(SUCCEEDED, std::memory_order_release);
1335         return true;
1336     }
1337 
internal_pop(buffer_operation * op)1338     virtual void internal_pop(buffer_operation *op) {
1339         if(this->pop_back(*(op->elem))) {
1340             op->status.store(SUCCEEDED, std::memory_order_release);
1341         }
1342         else {
1343             op->status.store(FAILED, std::memory_order_release);
1344         }
1345     }
1346 
internal_reserve(buffer_operation * op)1347     virtual void internal_reserve(buffer_operation *op) {
1348         if(this->reserve_front(*(op->elem))) {
1349             op->status.store(SUCCEEDED, std::memory_order_release);
1350         }
1351         else {
1352             op->status.store(FAILED, std::memory_order_release);
1353         }
1354     }
1355 
internal_consume(buffer_operation * op)1356     virtual void internal_consume(buffer_operation *op) {
1357         this->consume_front();
1358         op->status.store(SUCCEEDED, std::memory_order_release);
1359     }
1360 
internal_release(buffer_operation * op)1361     virtual void internal_release(buffer_operation *op) {
1362         this->release_front();
1363         op->status.store(SUCCEEDED, std::memory_order_release);
1364     }
1365 
1366 public:
1367     //! Constructor
buffer_node(graph & g)1368     __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
1369         : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
1370           sender<T>(), my_successors(this), forwarder_busy(false)
1371     {
1372         my_aggregator.initialize_handler(handler_type(this));
1373         fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
1374                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1375     }
1376 
1377 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1378     template <typename... Args>
buffer_node(const node_set<Args...> & nodes)1379     buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
1380         make_edges_in_order(nodes, *this);
1381     }
1382 #endif
1383 
1384     //! Copy constructor
buffer_node(const buffer_node & src)1385     __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
1386 
1387     //
1388     // message sender implementation
1389     //
1390 
1391     //! Adds a new successor.
1392     /** Adds successor r to the list of successors; may forward tasks.  */
register_successor(successor_type & r)1393     bool register_successor( successor_type &r ) override {
1394         buffer_operation op_data(reg_succ);
1395         op_data.r = &r;
1396         my_aggregator.execute(&op_data);
1397         (void)enqueue_forwarding_task(op_data);
1398         return true;
1399     }
1400 
1401     //! Removes a successor.
1402     /** Removes successor r from the list of successors.
1403         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
remove_successor(successor_type & r)1404     bool remove_successor( successor_type &r ) override {
1405         // TODO revamp: investigate why full qualification is necessary here
1406         tbb::detail::d1::remove_predecessor(r, *this);
1407         buffer_operation op_data(rem_succ);
1408         op_data.r = &r;
1409         my_aggregator.execute(&op_data);
1410         // even though this operation does not cause a forward, if we are the handler, and
1411         // a forward is scheduled, we may be the first to reach this point after the aggregator,
1412         // and so should check for the task.
1413         (void)enqueue_forwarding_task(op_data);
1414         return true;
1415     }
1416 
1417     //! Request an item from the buffer_node
1418     /**  true = v contains the returned item<BR>
1419          false = no item has been returned */
try_get(T & v)1420     bool try_get( T &v ) override {
1421         buffer_operation op_data(req_item);
1422         op_data.elem = &v;
1423         my_aggregator.execute(&op_data);
1424         (void)enqueue_forwarding_task(op_data);
1425         return (op_data.status==SUCCEEDED);
1426     }
1427 
1428     //! Reserves an item.
1429     /**  false = no item can be reserved<BR>
1430          true = an item is reserved */
try_reserve(T & v)1431     bool try_reserve( T &v ) override {
1432         buffer_operation op_data(res_item);
1433         op_data.elem = &v;
1434         my_aggregator.execute(&op_data);
1435         (void)enqueue_forwarding_task(op_data);
1436         return (op_data.status==SUCCEEDED);
1437     }
1438 
1439     //! Release a reserved item.
1440     /**  true = item has been released and so remains in sender */
try_release()1441     bool try_release() override {
1442         buffer_operation op_data(rel_res);
1443         my_aggregator.execute(&op_data);
1444         (void)enqueue_forwarding_task(op_data);
1445         return true;
1446     }
1447 
1448     //! Consumes a reserved item.
1449     /** true = item is removed from sender and reservation removed */
try_consume()1450     bool try_consume() override {
1451         buffer_operation op_data(con_res);
1452         my_aggregator.execute(&op_data);
1453         (void)enqueue_forwarding_task(op_data);
1454         return true;
1455     }
1456 
1457 protected:
1458 
1459     template< typename R, typename B > friend class run_and_put_task;
1460     template<typename X, typename Y> friend class broadcast_cache;
1461     template<typename X, typename Y> friend class round_robin_cache;
1462     //! receive an item, return a task *if possible
try_put_task(const T & t)1463     graph_task *try_put_task(const T &t) override {
1464         buffer_operation op_data(t, put_item);
1465         my_aggregator.execute(&op_data);
1466         graph_task *ft = grab_forwarding_task(op_data);
1467         // sequencer_nodes can return failure (if an item has been previously inserted)
1468         // We have to spawn the returned task if our own operation fails.
1469 
1470         if(ft && op_data.status ==FAILED) {
1471             // we haven't succeeded queueing the item, but for some reason the
1472             // call returned a task (if another request resulted in a successful
1473             // forward this could happen.)  Queue the task and reset the pointer.
1474             spawn_in_graph_arena(graph_reference(), *ft); ft = NULL;
1475         }
1476         else if(!ft && op_data.status ==SUCCEEDED) {
1477             ft = SUCCESSFULLY_ENQUEUED;
1478         }
1479         return ft;
1480     }
1481 
graph_reference()1482     graph& graph_reference() const override {
1483         return my_graph;
1484     }
1485 
1486 protected:
reset_node(reset_flags f)1487     void reset_node( reset_flags f) override {
1488         reservable_item_buffer<T, internals_allocator>::reset();
1489         // TODO: just clear structures
1490         if (f&rf_clear_edges) {
1491             my_successors.clear();
1492         }
1493         forwarder_busy = false;
1494     }
1495 };  // buffer_node
1496 
1497 //! Forwards messages in FIFO order
1498 template <typename T>
1499 class queue_node : public buffer_node<T> {
1500 protected:
1501     typedef buffer_node<T> base_type;
1502     typedef typename base_type::size_type size_type;
1503     typedef typename base_type::buffer_operation queue_operation;
1504     typedef queue_node class_type;
1505 
1506 private:
1507     template<typename> friend class buffer_node;
1508 
is_item_valid()1509     bool is_item_valid() {
1510         return this->my_item_valid(this->my_head);
1511     }
1512 
try_put_and_add_task(graph_task * & last_task)1513     void try_put_and_add_task(graph_task*& last_task) {
1514         graph_task *new_task = this->my_successors.try_put_task(this->front());
1515         if (new_task) {
1516             // workaround for icc bug
1517             graph& graph_ref = this->graph_reference();
1518             last_task = combine_tasks(graph_ref, last_task, new_task);
1519             this->destroy_front();
1520         }
1521     }
1522 
1523 protected:
internal_forward_task(queue_operation * op)1524     void internal_forward_task(queue_operation *op) override {
1525         this->internal_forward_task_impl(op, this);
1526     }
1527 
internal_pop(queue_operation * op)1528     void internal_pop(queue_operation *op) override {
1529         if ( this->my_reserved || !this->my_item_valid(this->my_head)){
1530             op->status.store(FAILED, std::memory_order_release);
1531         }
1532         else {
1533             this->pop_front(*(op->elem));
1534             op->status.store(SUCCEEDED, std::memory_order_release);
1535         }
1536     }
internal_reserve(queue_operation * op)1537     void internal_reserve(queue_operation *op) override {
1538         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
1539             op->status.store(FAILED, std::memory_order_release);
1540         }
1541         else {
1542             this->reserve_front(*(op->elem));
1543             op->status.store(SUCCEEDED, std::memory_order_release);
1544         }
1545     }
internal_consume(queue_operation * op)1546     void internal_consume(queue_operation *op) override {
1547         this->consume_front();
1548         op->status.store(SUCCEEDED, std::memory_order_release);
1549     }
1550 
1551 public:
1552     typedef T input_type;
1553     typedef T output_type;
1554     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1555     typedef typename sender<output_type>::successor_type successor_type;
1556 
1557     //! Constructor
queue_node(graph & g)1558     __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
1559         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1560                                  static_cast<receiver<input_type> *>(this),
1561                                  static_cast<sender<output_type> *>(this) );
1562     }
1563 
1564 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1565     template <typename... Args>
queue_node(const node_set<Args...> & nodes)1566     queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
1567         make_edges_in_order(nodes, *this);
1568     }
1569 #endif
1570 
1571     //! Copy constructor
queue_node(const queue_node & src)1572     __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
1573         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1574                                  static_cast<receiver<input_type> *>(this),
1575                                  static_cast<sender<output_type> *>(this) );
1576     }
1577 
1578 
1579 protected:
reset_node(reset_flags f)1580     void reset_node( reset_flags f) override {
1581         base_type::reset_node(f);
1582     }
1583 };  // queue_node
1584 
1585 //! Forwards messages in sequence order
1586 template <typename T>
__TBB_requires(std::copyable<T>)1587     __TBB_requires(std::copyable<T>)
1588 class sequencer_node : public queue_node<T> {
1589     function_body< T, size_t > *my_sequencer;
1590     // my_sequencer should be a benign function and must be callable
1591     // from a parallel context.  Does this mean it needn't be reset?
1592 public:
1593     typedef T input_type;
1594     typedef T output_type;
1595     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1596     typedef typename sender<output_type>::successor_type successor_type;
1597 
1598     //! Constructor
1599     template< typename Sequencer >
1600         __TBB_requires(sequencer<Sequencer, T>)
1601     __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
1602         my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
1603         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1604                                  static_cast<receiver<input_type> *>(this),
1605                                  static_cast<sender<output_type> *>(this) );
1606     }
1607 
1608 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1609     template <typename Sequencer, typename... Args>
1610         __TBB_requires(sequencer<Sequencer, T>)
1611     sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
1612         : sequencer_node(nodes.graph_reference(), s) {
1613         make_edges_in_order(nodes, *this);
1614     }
1615 #endif
1616 
1617     //! Copy constructor
1618     __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
1619         my_sequencer( src.my_sequencer->clone() ) {
1620         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1621                                  static_cast<receiver<input_type> *>(this),
1622                                  static_cast<sender<output_type> *>(this) );
1623     }
1624 
1625     //! Destructor
1626     ~sequencer_node() { delete my_sequencer; }
1627 
1628 protected:
1629     typedef typename buffer_node<T>::size_type size_type;
1630     typedef typename buffer_node<T>::buffer_operation sequencer_operation;
1631 
1632 private:
1633     bool internal_push(sequencer_operation *op) override {
1634         size_type tag = (*my_sequencer)(*(op->elem));
1635 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
1636         if (tag < this->my_head) {
1637             // have already emitted a message with this tag
1638             op->status.store(FAILED, std::memory_order_release);
1639             return false;
1640         }
1641 #endif
1642         // cannot modify this->my_tail now; the buffer would be inconsistent.
1643         size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1644 
1645         if (this->size(new_tail) > this->capacity()) {
1646             this->grow_my_array(this->size(new_tail));
1647         }
1648         this->my_tail = new_tail;
1649 
1650         const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
1651         op->status.store(res, std::memory_order_release);
1652         return res ==SUCCEEDED;
1653     }
1654 };  // sequencer_node
1655 
1656 //! Forwards messages in priority order
1657 template<typename T, typename Compare = std::less<T>>
1658 class priority_queue_node : public buffer_node<T> {
1659 public:
1660     typedef T input_type;
1661     typedef T output_type;
1662     typedef buffer_node<T> base_type;
1663     typedef priority_queue_node class_type;
1664     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1665     typedef typename sender<output_type>::successor_type successor_type;
1666 
1667     //! Constructor
1668     __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
1669         : buffer_node<T>(g), compare(comp), mark(0) {
1670         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1671                                  static_cast<receiver<input_type> *>(this),
1672                                  static_cast<sender<output_type> *>(this) );
1673     }
1674 
1675 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1676     template <typename... Args>
1677     priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
1678         : priority_queue_node(nodes.graph_reference(), comp) {
1679         make_edges_in_order(nodes, *this);
1680     }
1681 #endif
1682 
1683     //! Copy constructor
priority_queue_node(const priority_queue_node & src)1684     __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
1685         : buffer_node<T>(src), mark(0)
1686     {
1687         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1688                                  static_cast<receiver<input_type> *>(this),
1689                                  static_cast<sender<output_type> *>(this) );
1690     }
1691 
1692 protected:
1693 
reset_node(reset_flags f)1694     void reset_node( reset_flags f) override {
1695         mark = 0;
1696         base_type::reset_node(f);
1697     }
1698 
1699     typedef typename buffer_node<T>::size_type size_type;
1700     typedef typename buffer_node<T>::item_type item_type;
1701     typedef typename buffer_node<T>::buffer_operation prio_operation;
1702 
1703     //! Tries to forward valid items to successors
internal_forward_task(prio_operation * op)1704     void internal_forward_task(prio_operation *op) override {
1705         this->internal_forward_task_impl(op, this);
1706     }
1707 
handle_operations(prio_operation * op_list)1708     void handle_operations(prio_operation *op_list) override {
1709         this->handle_operations_impl(op_list, this);
1710     }
1711 
internal_push(prio_operation * op)1712     bool internal_push(prio_operation *op) override {
1713         prio_push(*(op->elem));
1714         op->status.store(SUCCEEDED, std::memory_order_release);
1715         return true;
1716     }
1717 
internal_pop(prio_operation * op)1718     void internal_pop(prio_operation *op) override {
1719         // if empty or already reserved, don't pop
1720         if ( this->my_reserved == true || this->my_tail == 0 ) {
1721             op->status.store(FAILED, std::memory_order_release);
1722             return;
1723         }
1724 
1725         *(op->elem) = prio();
1726         op->status.store(SUCCEEDED, std::memory_order_release);
1727         prio_pop();
1728 
1729     }
1730 
1731     // pops the highest-priority item, saves copy
internal_reserve(prio_operation * op)1732     void internal_reserve(prio_operation *op) override {
1733         if (this->my_reserved == true || this->my_tail == 0) {
1734             op->status.store(FAILED, std::memory_order_release);
1735             return;
1736         }
1737         this->my_reserved = true;
1738         *(op->elem) = prio();
1739         reserved_item = *(op->elem);
1740         op->status.store(SUCCEEDED, std::memory_order_release);
1741         prio_pop();
1742     }
1743 
internal_consume(prio_operation * op)1744     void internal_consume(prio_operation *op) override {
1745         op->status.store(SUCCEEDED, std::memory_order_release);
1746         this->my_reserved = false;
1747         reserved_item = input_type();
1748     }
1749 
internal_release(prio_operation * op)1750     void internal_release(prio_operation *op) override {
1751         op->status.store(SUCCEEDED, std::memory_order_release);
1752         prio_push(reserved_item);
1753         this->my_reserved = false;
1754         reserved_item = input_type();
1755     }
1756 
1757 private:
1758     template<typename> friend class buffer_node;
1759 
order()1760     void order() {
1761         if (mark < this->my_tail) heapify();
1762         __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
1763     }
1764 
is_item_valid()1765     bool is_item_valid() {
1766         return this->my_tail > 0;
1767     }
1768 
try_put_and_add_task(graph_task * & last_task)1769     void try_put_and_add_task(graph_task*& last_task) {
1770         graph_task * new_task = this->my_successors.try_put_task(this->prio());
1771         if (new_task) {
1772             // workaround for icc bug
1773             graph& graph_ref = this->graph_reference();
1774             last_task = combine_tasks(graph_ref, last_task, new_task);
1775             prio_pop();
1776         }
1777     }
1778 
1779 private:
1780     Compare compare;
1781     size_type mark;
1782 
1783     input_type reserved_item;
1784 
1785     // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
prio_use_tail()1786     bool prio_use_tail() {
1787         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
1788         return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
1789     }
1790 
1791     // prio_push: checks that the item will fit, expand array if necessary, put at end
prio_push(const T & src)1792     void prio_push(const T &src) {
1793         if ( this->my_tail >= this->my_array_size )
1794             this->grow_my_array( this->my_tail + 1 );
1795         (void) this->place_item(this->my_tail, src);
1796         ++(this->my_tail);
1797         __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
1798     }
1799 
1800     // prio_pop: deletes highest priority item from the array, and if it is item
1801     // 0, move last item to 0 and reheap.  If end of array, just destroy and decrement tail
1802     // and mark.  Assumes the array has already been tested for emptiness; no failure.
prio_pop()1803     void prio_pop()  {
1804         if (prio_use_tail()) {
1805             // there are newly pushed elements; last one higher than top
1806             // copy the data
1807             this->destroy_item(this->my_tail-1);
1808             --(this->my_tail);
1809             __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1810             return;
1811         }
1812         this->destroy_item(0);
1813         if(this->my_tail > 1) {
1814             // push the last element down heap
1815             __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
1816             this->move_item(0,this->my_tail - 1);
1817         }
1818         --(this->my_tail);
1819         if(mark > this->my_tail) --mark;
1820         if (this->my_tail > 1) // don't reheap for heap of size 1
1821             reheap();
1822         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1823     }
1824 
prio()1825     const T& prio() {
1826         return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
1827     }
1828 
1829     // turn array into heap
heapify()1830     void heapify() {
1831         if(this->my_tail == 0) {
1832             mark = 0;
1833             return;
1834         }
1835         if (!mark) mark = 1;
1836         for (; mark<this->my_tail; ++mark) { // for each unheaped element
1837             size_type cur_pos = mark;
1838             input_type to_place;
1839             this->fetch_item(mark,to_place);
1840             do { // push to_place up the heap
1841                 size_type parent = (cur_pos-1)>>1;
1842                 if (!compare(this->get_my_item(parent), to_place))
1843                     break;
1844                 this->move_item(cur_pos, parent);
1845                 cur_pos = parent;
1846             } while( cur_pos );
1847             (void) this->place_item(cur_pos, to_place);
1848         }
1849     }
1850 
1851     // otherwise heapified array with new root element; rearrange to heap
reheap()1852     void reheap() {
1853         size_type cur_pos=0, child=1;
1854         while (child < mark) {
1855             size_type target = child;
1856             if (child+1<mark &&
1857                 compare(this->get_my_item(child),
1858                         this->get_my_item(child+1)))
1859                 ++target;
1860             // target now has the higher priority child
1861             if (compare(this->get_my_item(target),
1862                         this->get_my_item(cur_pos)))
1863                 break;
1864             // swap
1865             this->swap_items(cur_pos, target);
1866             cur_pos = target;
1867             child = (cur_pos<<1)+1;
1868         }
1869     }
1870 };  // priority_queue_node
1871 
1872 //! Forwards messages only if the threshold has not been reached
1873 /** This node forwards items until its threshold is reached.
1874     It contains no buffering.  If the downstream node rejects, the
1875     message is dropped. */
1876 template< typename T, typename DecrementType=continue_msg >
1877 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
1878 public:
1879     typedef T input_type;
1880     typedef T output_type;
1881     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1882     typedef typename sender<output_type>::successor_type successor_type;
1883     //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
1884 
1885 private:
1886     size_t my_threshold;
1887     size_t my_count; // number of successful puts
1888     size_t my_tries; // number of active put attempts
1889     reservable_predecessor_cache< T, spin_mutex > my_predecessors;
1890     spin_mutex my_mutex;
1891     broadcast_cache< T > my_successors;
1892 
1893     //! The internal receiver< DecrementType > that adjusts the count
1894     threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
1895 
decrement_counter(long long delta)1896     graph_task* decrement_counter( long long delta ) {
1897         {
1898             spin_mutex::scoped_lock lock(my_mutex);
1899             if( delta > 0 && size_t(delta) > my_count ) {
1900                 my_count = 0;
1901             }
1902             else if( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
1903                 my_count = my_threshold;
1904             }
1905             else {
1906                 my_count -= size_t(delta); // absolute value of delta is sufficiently small
1907             }
1908             __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
1909         }
1910         return forward_task();
1911     }
1912 
1913     // Let threshold_regulator call decrement_counter()
1914     friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
1915 
1916     friend class forward_task_bypass< limiter_node<T,DecrementType> >;
1917 
check_conditions()1918     bool check_conditions() {  // always called under lock
1919         return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
1920     }
1921 
1922     // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
forward_task()1923     graph_task* forward_task() {
1924         input_type v;
1925         graph_task* rval = NULL;
1926         bool reserved = false;
1927             {
1928                 spin_mutex::scoped_lock lock(my_mutex);
1929                 if ( check_conditions() )
1930                     ++my_tries;
1931                 else
1932                     return NULL;
1933             }
1934 
1935         //SUCCESS
1936         // if we can reserve and can put, we consume the reservation
1937         // we increment the count and decrement the tries
1938         if ( (my_predecessors.try_reserve(v)) == true ){
1939             reserved=true;
1940             if ( (rval = my_successors.try_put_task(v)) != NULL ){
1941                 {
1942                     spin_mutex::scoped_lock lock(my_mutex);
1943                     ++my_count;
1944                     --my_tries;
1945                     my_predecessors.try_consume();
1946                     if ( check_conditions() ) {
1947                         if ( is_graph_active(this->my_graph) ) {
1948                             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1949                             small_object_allocator allocator{};
1950                             graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
1951                             my_graph.reserve_wait();
1952                             spawn_in_graph_arena(graph_reference(), *rtask);
1953                         }
1954                     }
1955                 }
1956                 return rval;
1957             }
1958         }
1959         //FAILURE
1960         //if we can't reserve, we decrement the tries
1961         //if we can reserve but can't put, we decrement the tries and release the reservation
1962         {
1963             spin_mutex::scoped_lock lock(my_mutex);
1964             --my_tries;
1965             if (reserved) my_predecessors.try_release();
1966             if ( check_conditions() ) {
1967                 if ( is_graph_active(this->my_graph) ) {
1968                     small_object_allocator allocator{};
1969                     typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1970                     graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1971                     my_graph.reserve_wait();
1972                     __TBB_ASSERT(!rval, "Have two tasks to handle");
1973                     return t;
1974                 }
1975             }
1976             return rval;
1977         }
1978     }
1979 
initialize()1980     void initialize() {
1981         fgt_node(
1982             CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
1983             static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
1984             static_cast<sender<output_type> *>(this)
1985         );
1986     }
1987 
1988 public:
1989     //! Constructor
limiter_node(graph & g,size_t threshold)1990     limiter_node(graph &g, size_t threshold)
1991         : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_predecessors(this)
1992         , my_successors(this), decrement(this)
1993     {
1994         initialize();
1995     }
1996 
1997 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1998     template <typename... Args>
limiter_node(const node_set<Args...> & nodes,size_t threshold)1999     limiter_node(const node_set<Args...>& nodes, size_t threshold)
2000         : limiter_node(nodes.graph_reference(), threshold) {
2001         make_edges_in_order(nodes, *this);
2002     }
2003 #endif
2004 
2005     //! Copy constructor
limiter_node(const limiter_node & src)2006     limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
2007 
2008     //! The interface for accessing internal receiver< DecrementType > that adjusts the count
decrementer()2009     receiver<DecrementType>& decrementer() { return decrement; }
2010 
2011     //! Replace the current successor with this new successor
register_successor(successor_type & r)2012     bool register_successor( successor_type &r ) override {
2013         spin_mutex::scoped_lock lock(my_mutex);
2014         bool was_empty = my_successors.empty();
2015         my_successors.register_successor(r);
2016         //spawn a forward task if this is the only successor
2017         if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2018             if ( is_graph_active(this->my_graph) ) {
2019                 small_object_allocator allocator{};
2020                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2021                 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2022                 my_graph.reserve_wait();
2023                 spawn_in_graph_arena(graph_reference(), *t);
2024             }
2025         }
2026         return true;
2027     }
2028 
2029     //! Removes a successor from this node
2030     /** r.remove_predecessor(*this) is also called. */
remove_successor(successor_type & r)2031     bool remove_successor( successor_type &r ) override {
2032         // TODO revamp: investigate why qualification is needed for remove_predecessor() call
2033         tbb::detail::d1::remove_predecessor(r, *this);
2034         my_successors.remove_successor(r);
2035         return true;
2036     }
2037 
2038     //! Adds src to the list of cached predecessors.
register_predecessor(predecessor_type & src)2039     bool register_predecessor( predecessor_type &src ) override {
2040         spin_mutex::scoped_lock lock(my_mutex);
2041         my_predecessors.add( src );
2042         if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
2043             small_object_allocator allocator{};
2044             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2045             graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2046             my_graph.reserve_wait();
2047             spawn_in_graph_arena(graph_reference(), *t);
2048         }
2049         return true;
2050     }
2051 
2052     //! Removes src from the list of cached predecessors.
remove_predecessor(predecessor_type & src)2053     bool remove_predecessor( predecessor_type &src ) override {
2054         my_predecessors.remove( src );
2055         return true;
2056     }
2057 
2058 protected:
2059 
2060     template< typename R, typename B > friend class run_and_put_task;
2061     template<typename X, typename Y> friend class broadcast_cache;
2062     template<typename X, typename Y> friend class round_robin_cache;
2063     //! Puts an item to this receiver
try_put_task(const T & t)2064     graph_task* try_put_task( const T &t ) override {
2065         {
2066             spin_mutex::scoped_lock lock(my_mutex);
2067             if ( my_count + my_tries >= my_threshold )
2068                 return NULL;
2069             else
2070                 ++my_tries;
2071         }
2072 
2073         graph_task* rtask = my_successors.try_put_task(t);
2074 
2075         if ( !rtask ) {  // try_put_task failed.
2076             spin_mutex::scoped_lock lock(my_mutex);
2077             --my_tries;
2078             if (check_conditions() && is_graph_active(this->my_graph)) {
2079                 small_object_allocator allocator{};
2080                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2081                 rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
2082                 my_graph.reserve_wait();
2083             }
2084         }
2085         else {
2086             spin_mutex::scoped_lock lock(my_mutex);
2087             ++my_count;
2088             --my_tries;
2089              }
2090         return rtask;
2091     }
2092 
graph_reference()2093     graph& graph_reference() const override { return my_graph; }
2094 
reset_node(reset_flags f)2095     void reset_node( reset_flags f) override {
2096         my_count = 0;
2097         if(f & rf_clear_edges) {
2098             my_predecessors.clear();
2099             my_successors.clear();
2100         }
2101         else
2102         {
2103             my_predecessors.reset( );
2104         }
2105         decrement.reset_receiver(f);
2106     }
2107 };  // limiter_node
2108 
2109 #include "detail/_flow_graph_join_impl.h"
2110 
2111 template<typename OutputTuple, typename JP=queueing> class join_node;
2112 
2113 template<typename OutputTuple>
2114 class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2115 private:
2116     static const int N = std::tuple_size<OutputTuple>::value;
2117     typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2118 public:
2119     typedef OutputTuple output_type;
2120     typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)2121      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2122         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2123                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2124     }
2125 
2126 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2127     template <typename... Args>
2128     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2129         make_edges_in_order(nodes, *this);
2130     }
2131 #endif
2132 
join_node(const join_node & other)2133     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2134         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2135                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2136     }
2137 
2138 };
2139 
2140 template<typename OutputTuple>
2141 class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2142 private:
2143     static const int N = std::tuple_size<OutputTuple>::value;
2144     typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2145 public:
2146     typedef OutputTuple output_type;
2147     typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)2148      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2149         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2150                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2151     }
2152 
2153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2154     template <typename... Args>
2155     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2156         make_edges_in_order(nodes, *this);
2157     }
2158 #endif
2159 
join_node(const join_node & other)2160     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2161         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2162                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2163     }
2164 
2165 };
2166 
2167 #if __TBB_CPP20_CONCEPTS_PRESENT
2168 // Helper function which is well-formed only if all of the elements in OutputTuple
2169 // satisfies join_node_function_object<body[i], tuple[i], K>
2170 template <typename OutputTuple, typename K,
2171           typename... Functions, std::size_t... Idx>
2172 void join_node_function_objects_helper( std::index_sequence<Idx...> )
2173     requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2174              (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2175 
2176 template <typename OutputTuple, typename K, typename... Functions>
2177 concept join_node_functions = requires {
2178     join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2179 };
2180 
2181 #endif
2182 
2183 // template for key_matching join_node
2184 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2185 template<typename OutputTuple, typename K, typename KHash>
2186 class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
2187       key_matching_port, OutputTuple, key_matching<K,KHash> > {
2188 private:
2189     static const int N = std::tuple_size<OutputTuple>::value;
2190     typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
2191 public:
2192     typedef OutputTuple output_type;
2193     typedef typename unfolded_type::input_ports_type input_ports_type;
2194 
2195 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
join_node(graph & g)2196     join_node(graph &g) : unfolded_type(g) {}
2197 #endif  /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2198 
2199     template<typename __TBB_B0, typename __TBB_B1>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1>)2200         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
2201      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2202         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2203                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2204     }
2205     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2>)2206         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
2207      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2208         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2209                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2210     }
2211     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3>)2212         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
2213      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2214         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2215                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2216     }
2217     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4>)2218         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
2219      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2220             unfolded_type(g, b0, b1, b2, b3, b4) {
2221         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2222                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2223     }
2224 #if __TBB_VARIADIC_MAX >= 6
2225     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2226         typename __TBB_B5>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5>)2227         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
2228      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2229             unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2230         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2231                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2232     }
2233 #endif
2234 #if __TBB_VARIADIC_MAX >= 7
2235     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2236         typename __TBB_B5, typename __TBB_B6>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6>)2237         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
2238      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2239             unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2240         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2241                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2242     }
2243 #endif
2244 #if __TBB_VARIADIC_MAX >= 8
2245     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2246         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7>)2247         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
2248      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2249             __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2250         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2251                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2252     }
2253 #endif
2254 #if __TBB_VARIADIC_MAX >= 9
2255     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2256         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7,__TBB_B8>)2257         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
2258      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2259             __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2260         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2261                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2262     }
2263 #endif
2264 #if __TBB_VARIADIC_MAX >= 10
2265     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2266         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7,__TBB_B8,__TBB_B9>)2267         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
2268      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2269             __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2270         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2271                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2272     }
2273 #endif
2274 
2275 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2276     template <
2277 #if (__clang_major__ == 3 && __clang_minor__ == 4)
2278         // clang 3.4 misdeduces 'Args...' for 'node_set' while it can cope with template template parameter.
2279         template<typename...> class node_set,
2280 #endif
2281         typename... Args, typename... Bodies
2282     >
2283     __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
join_node(const node_set<Args...> & nodes,Bodies...bodies)2284     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
2285         : join_node(nodes.graph_reference(), bodies...) {
2286         make_edges_in_order(nodes, *this);
2287     }
2288 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2289 
join_node(const join_node & other)2290     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2291         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2292                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2293     }
2294 
2295 };
2296 
2297 // indexer node
2298 #include "detail/_flow_graph_indexer_impl.h"
2299 
2300 // TODO: Implement interface with variadic template or tuple
2301 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2302                       typename T4=null_type, typename T5=null_type, typename T6=null_type,
2303                       typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2304 
2305 //indexer node specializations
2306 template<typename T0>
2307 class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
2308 private:
2309     static const int N = 1;
2310 public:
2311     typedef std::tuple<T0> InputTuple;
2312     typedef tagged_msg<size_t, T0> output_type;
2313     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2314     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2315         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2316                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2317     }
2318 
2319 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2320     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2321     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2322         make_edges_in_order(nodes, *this);
2323     }
2324 #endif
2325 
2326     // Copy constructor
indexer_node(const indexer_node & other)2327     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2328         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2329                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2330     }
2331 };
2332 
2333 template<typename T0, typename T1>
2334 class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
2335 private:
2336     static const int N = 2;
2337 public:
2338     typedef std::tuple<T0, T1> InputTuple;
2339     typedef tagged_msg<size_t, T0, T1> output_type;
2340     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2341     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2342         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2343                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2344     }
2345 
2346 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2347     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2348     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2349         make_edges_in_order(nodes, *this);
2350     }
2351 #endif
2352 
2353     // Copy constructor
indexer_node(const indexer_node & other)2354     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2355         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2356                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2357     }
2358 
2359 };
2360 
2361 template<typename T0, typename T1, typename T2>
2362 class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
2363 private:
2364     static const int N = 3;
2365 public:
2366     typedef std::tuple<T0, T1, T2> InputTuple;
2367     typedef tagged_msg<size_t, T0, T1, T2> output_type;
2368     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2369     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2370         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2371                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2372     }
2373 
2374 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2375     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2376     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2377         make_edges_in_order(nodes, *this);
2378     }
2379 #endif
2380 
2381     // Copy constructor
indexer_node(const indexer_node & other)2382     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2383         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2384                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2385     }
2386 
2387 };
2388 
2389 template<typename T0, typename T1, typename T2, typename T3>
2390 class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
2391 private:
2392     static const int N = 4;
2393 public:
2394     typedef std::tuple<T0, T1, T2, T3> InputTuple;
2395     typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
2396     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2397     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2398         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2399                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2400     }
2401 
2402 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2403     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2404     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2405         make_edges_in_order(nodes, *this);
2406     }
2407 #endif
2408 
2409     // Copy constructor
indexer_node(const indexer_node & other)2410     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2411         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2412                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2413     }
2414 
2415 };
2416 
2417 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2418 class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
2419 private:
2420     static const int N = 5;
2421 public:
2422     typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
2423     typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2424     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2425     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2426         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2427                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2428     }
2429 
2430 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2431     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2432     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2433         make_edges_in_order(nodes, *this);
2434     }
2435 #endif
2436 
2437     // Copy constructor
indexer_node(const indexer_node & other)2438     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2439         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2440                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2441     }
2442 
2443 };
2444 
2445 #if __TBB_VARIADIC_MAX >= 6
2446 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2447 class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
2448 private:
2449     static const int N = 6;
2450 public:
2451     typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2452     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
2453     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2454     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2455         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2456                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2457     }
2458 
2459 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2460     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2461     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2462         make_edges_in_order(nodes, *this);
2463     }
2464 #endif
2465 
2466     // Copy constructor
indexer_node(const indexer_node & other)2467     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2468         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2469                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2470     }
2471 
2472 };
2473 #endif //variadic max 6
2474 
2475 #if __TBB_VARIADIC_MAX >= 7
2476 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2477          typename T6>
2478 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
2479 private:
2480     static const int N = 7;
2481 public:
2482     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
2483     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
2484     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2485     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2486         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2487                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2488     }
2489 
2490 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2491     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2492     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2493         make_edges_in_order(nodes, *this);
2494     }
2495 #endif
2496 
2497     // Copy constructor
indexer_node(const indexer_node & other)2498     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2499         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2500                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2501     }
2502 
2503 };
2504 #endif //variadic max 7
2505 
2506 #if __TBB_VARIADIC_MAX >= 8
2507 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2508          typename T6, typename T7>
2509 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
2510 private:
2511     static const int N = 8;
2512 public:
2513     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
2514     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
2515     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2516     indexer_node(graph& g) : unfolded_type(g) {
2517         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2518                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2519     }
2520 
2521 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2522     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2523     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2524         make_edges_in_order(nodes, *this);
2525     }
2526 #endif
2527 
2528     // Copy constructor
indexer_node(const indexer_node & other)2529     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2530         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2531                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2532     }
2533 
2534 };
2535 #endif //variadic max 8
2536 
2537 #if __TBB_VARIADIC_MAX >= 9
2538 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2539          typename T6, typename T7, typename T8>
2540 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
2541 private:
2542     static const int N = 9;
2543 public:
2544     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
2545     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
2546     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2547     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2548         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2549                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2550     }
2551 
2552 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2553     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2554     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2555         make_edges_in_order(nodes, *this);
2556     }
2557 #endif
2558 
2559     // Copy constructor
indexer_node(const indexer_node & other)2560     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2561         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2562                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2563     }
2564 
2565 };
2566 #endif //variadic max 9
2567 
2568 #if __TBB_VARIADIC_MAX >= 10
2569 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2570          typename T6, typename T7, typename T8, typename T9>
2571 class indexer_node/*default*/ : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
2572 private:
2573     static const int N = 10;
2574 public:
2575     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
2576     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
2577     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2578     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2579         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2580                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2581     }
2582 
2583 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2584     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)2585     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2586         make_edges_in_order(nodes, *this);
2587     }
2588 #endif
2589 
2590     // Copy constructor
indexer_node(const indexer_node & other)2591     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2592         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2593                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2594     }
2595 
2596 };
2597 #endif //variadic max 10
2598 
2599 template< typename T >
internal_make_edge(sender<T> & p,receiver<T> & s)2600 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
2601     register_successor(p, s);
2602     fgt_make_edge( &p, &s );
2603 }
2604 
2605 //! Makes an edge between a single predecessor and a single successor
2606 template< typename T >
make_edge(sender<T> & p,receiver<T> & s)2607 inline void make_edge( sender<T> &p, receiver<T> &s ) {
2608     internal_make_edge( p, s );
2609 }
2610 
2611 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
2612 template< typename T, typename V,
2613           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
make_edge(T & output,V & input)2614 inline void make_edge( T& output, V& input) {
2615     make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2616 }
2617 
2618 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
2619 template< typename T, typename R,
2620           typename = typename T::output_ports_type >
make_edge(T & output,receiver<R> & input)2621 inline void make_edge( T& output, receiver<R>& input) {
2622      make_edge(std::get<0>(output.output_ports()), input);
2623 }
2624 
2625 //Makes an edge from a sender to port 0 of a multi-input successor.
2626 template< typename S,  typename V,
2627           typename = typename V::input_ports_type >
make_edge(sender<S> & output,V & input)2628 inline void make_edge( sender<S>& output, V& input) {
2629      make_edge(output, std::get<0>(input.input_ports()));
2630 }
2631 
2632 template< typename T >
internal_remove_edge(sender<T> & p,receiver<T> & s)2633 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
2634     remove_successor( p, s );
2635     fgt_remove_edge( &p, &s );
2636 }
2637 
2638 //! Removes an edge between a single predecessor and a single successor
2639 template< typename T >
remove_edge(sender<T> & p,receiver<T> & s)2640 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
2641     internal_remove_edge( p, s );
2642 }
2643 
2644 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
2645 template< typename T, typename V,
2646           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
remove_edge(T & output,V & input)2647 inline void remove_edge( T& output, V& input) {
2648     remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2649 }
2650 
2651 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
2652 template< typename T, typename R,
2653           typename = typename T::output_ports_type >
remove_edge(T & output,receiver<R> & input)2654 inline void remove_edge( T& output, receiver<R>& input) {
2655      remove_edge(std::get<0>(output.output_ports()), input);
2656 }
2657 //Removes an edge between a sender and port 0 of a multi-input successor.
2658 template< typename S,  typename V,
2659           typename = typename V::input_ports_type >
remove_edge(sender<S> & output,V & input)2660 inline void remove_edge( sender<S>& output, V& input) {
2661      remove_edge(output, std::get<0>(input.input_ports()));
2662 }
2663 
2664 //! Returns a copy of the body from a function or continue node
2665 template< typename Body, typename Node >
copy_body(Node & n)2666 Body copy_body( Node &n ) {
2667     return n.template copy_function_object<Body>();
2668 }
2669 
2670 //composite_node
2671 template< typename InputTuple, typename OutputTuple > class composite_node;
2672 
2673 template< typename... InputTypes, typename... OutputTypes>
2674 class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
2675 
2676 public:
2677     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2678     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2679 
2680 private:
2681     std::unique_ptr<input_ports_type> my_input_ports;
2682     std::unique_ptr<output_ports_type> my_output_ports;
2683 
2684     static const size_t NUM_INPUTS = sizeof...(InputTypes);
2685     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2686 
2687 protected:
reset_node(reset_flags)2688     void reset_node(reset_flags) override {}
2689 
2690 public:
composite_node(graph & g)2691     composite_node( graph &g ) : graph_node(g) {
2692         fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
2693     }
2694 
2695     template<typename T1, typename T2>
set_external_ports(T1 && input_ports_tuple,T2 && output_ports_tuple)2696     void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
2697         static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2698         static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2699 
2700         fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2701         fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2702 
2703         my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
2704         my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
2705     }
2706 
2707     template< typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)2708     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2709 
2710     template< typename... NodeTypes >
add_nodes(const NodeTypes &...n)2711     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2712 
2713 
input_ports()2714     input_ports_type& input_ports() {
2715          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2716          return *my_input_ports;
2717     }
2718 
output_ports()2719     output_ports_type& output_ports() {
2720          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2721          return *my_output_ports;
2722     }
2723 };  // class composite_node
2724 
2725 //composite_node with only input ports
2726 template< typename... InputTypes>
2727 class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
2728 public:
2729     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2730 
2731 private:
2732     std::unique_ptr<input_ports_type> my_input_ports;
2733     static const size_t NUM_INPUTS = sizeof...(InputTypes);
2734 
2735 protected:
reset_node(reset_flags)2736     void reset_node(reset_flags) override {}
2737 
2738 public:
composite_node(graph & g)2739     composite_node( graph &g ) : graph_node(g) {
2740         fgt_composite( CODEPTR(), this, &g );
2741     }
2742 
2743    template<typename T>
set_external_ports(T && input_ports_tuple)2744    void set_external_ports(T&& input_ports_tuple) {
2745        static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2746 
2747        fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2748 
2749        my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
2750    }
2751 
2752     template< typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)2753     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2754 
2755     template< typename... NodeTypes >
add_nodes(const NodeTypes &...n)2756     void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2757 
2758 
input_ports()2759     input_ports_type& input_ports() {
2760          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2761          return *my_input_ports;
2762     }
2763 
2764 };  // class composite_node
2765 
2766 //composite_nodes with only output_ports
2767 template<typename... OutputTypes>
2768 class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
2769 public:
2770     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2771 
2772 private:
2773     std::unique_ptr<output_ports_type> my_output_ports;
2774     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2775 
2776 protected:
reset_node(reset_flags)2777     void reset_node(reset_flags) override {}
2778 
2779 public:
composite_node(graph & g)2780     __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
2781         fgt_composite( CODEPTR(), this, &g );
2782     }
2783 
2784    template<typename T>
set_external_ports(T && output_ports_tuple)2785    void set_external_ports(T&& output_ports_tuple) {
2786        static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2787 
2788        fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2789 
2790        my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
2791    }
2792 
2793     template<typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)2794     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2795 
2796     template<typename... NodeTypes >
add_nodes(const NodeTypes &...n)2797     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2798 
2799 
output_ports()2800     output_ports_type& output_ports() {
2801          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2802          return *my_output_ports;
2803     }
2804 
2805 };  // class composite_node
2806 
2807 template<typename Gateway>
2808 class async_body_base: no_assign {
2809 public:
2810     typedef Gateway gateway_type;
2811 
async_body_base(gateway_type * gateway)2812     async_body_base(gateway_type *gateway): my_gateway(gateway) { }
set_gateway(gateway_type * gateway)2813     void set_gateway(gateway_type *gateway) {
2814         my_gateway = gateway;
2815     }
2816 
2817 protected:
2818     gateway_type *my_gateway;
2819 };
2820 
2821 template<typename Input, typename Ports, typename Gateway, typename Body>
2822 class async_body: public async_body_base<Gateway> {
2823 public:
2824     typedef async_body_base<Gateway> base_type;
2825     typedef Gateway gateway_type;
2826 
async_body(const Body & body,gateway_type * gateway)2827     async_body(const Body &body, gateway_type *gateway)
2828         : base_type(gateway), my_body(body) { }
2829 
operator()2830     void operator()( const Input &v, Ports & ) {
2831         my_body(v, *this->my_gateway);
2832     }
2833 
get_body()2834     Body get_body() { return my_body; }
2835 
2836 private:
2837     Body my_body;
2838 };
2839 
2840 //! Implements async node
2841 template < typename Input, typename Output,
2842            typename Policy = queueing_lightweight >
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)2843     __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
2844 class async_node
2845     : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
2846 {
2847     typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
2848     typedef multifunction_input<
2849         Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
2850 
2851 public:
2852     typedef Input input_type;
2853     typedef Output output_type;
2854     typedef receiver<input_type> receiver_type;
2855     typedef receiver<output_type> successor_type;
2856     typedef sender<input_type> predecessor_type;
2857     typedef receiver_gateway<output_type> gateway_type;
2858     typedef async_body_base<gateway_type> async_body_base_type;
2859     typedef typename base_type::output_ports_type output_ports_type;
2860 
2861 private:
2862     class receiver_gateway_impl: public receiver_gateway<Output> {
2863     public:
2864         receiver_gateway_impl(async_node* node): my_node(node) {}
2865         void reserve_wait() override {
2866             fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
2867             my_node->my_graph.reserve_wait();
2868         }
2869 
2870         void release_wait() override {
2871             async_node* n = my_node;
2872             graph* g = &n->my_graph;
2873             g->release_wait();
2874             fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
2875         }
2876 
2877         //! Implements gateway_type::try_put for an external activity to submit a message to FG
2878         bool try_put(const Output &i) override {
2879             return my_node->try_put_impl(i);
2880         }
2881 
2882     private:
2883         async_node* my_node;
2884     } my_gateway;
2885 
2886     //The substitute of 'this' for member construction, to prevent compiler warnings
2887     async_node* self() { return this; }
2888 
2889     //! Implements gateway_type::try_put for an external activity to submit a message to FG
2890     bool try_put_impl(const Output &i) {
2891         multifunction_output<Output> &port_0 = output_port<0>(*this);
2892         broadcast_cache<output_type>& port_successors = port_0.successors();
2893         fgt_async_try_put_begin(this, &port_0);
2894         // TODO revamp: change to std::list<graph_task*>
2895         graph_task_list tasks;
2896         bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
2897         __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
2898                       "Return status is inconsistent with the method operation." );
2899 
2900         while( !tasks.empty() ) {
2901             enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
2902         }
2903         fgt_async_try_put_end(this, &port_0);
2904         return is_at_least_one_put_successful;
2905     }
2906 
2907 public:
2908     template<typename Body>
2909         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2910     __TBB_NOINLINE_SYM async_node(
2911         graph &g, size_t concurrency,
2912         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
2913     ) : base_type(
2914         g, concurrency,
2915         async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
2916         (body, &my_gateway), a_priority ), my_gateway(self()) {
2917         fgt_multioutput_node_with_body<1>(
2918             CODEPTR(), FLOW_ASYNC_NODE,
2919             &this->my_graph, static_cast<receiver<input_type> *>(this),
2920             this->output_ports(), this->my_body
2921         );
2922     }
2923 
2924     template <typename Body>
2925         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2926     __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
2927         : async_node(g, concurrency, body, Policy(), a_priority) {}
2928 
2929 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2930     template <typename Body, typename... Args>
2931         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2932     __TBB_NOINLINE_SYM async_node(
2933         const node_set<Args...>& nodes, size_t concurrency, Body body,
2934         Policy = Policy(), node_priority_t a_priority = no_priority )
2935         : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
2936         make_edges_in_order(nodes, *this);
2937     }
2938 
2939     template <typename Body, typename... Args>
2940         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2941     __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
2942         : async_node(nodes, concurrency, body, Policy(), a_priority) {}
2943 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2944 
2945     __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
2946         static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
2947         static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
2948 
2949         fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
2950                 &this->my_graph, static_cast<receiver<input_type> *>(this),
2951                 this->output_ports(), this->my_body );
2952     }
2953 
2954     gateway_type& gateway() {
2955         return my_gateway;
2956     }
2957 
2958     // Define sender< Output >
2959 
2960     //! Add a new successor to this node
2961     bool register_successor(successor_type&) override {
2962         __TBB_ASSERT(false, "Successors must be registered only via ports");
2963         return false;
2964     }
2965 
2966     //! Removes a successor from this node
2967     bool remove_successor(successor_type&) override {
2968         __TBB_ASSERT(false, "Successors must be removed only via ports");
2969         return false;
2970     }
2971 
2972     template<typename Body>
2973     Body copy_function_object() {
2974         typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
2975         typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
2976         mfn_body_type &body_ref = *this->my_body;
2977         async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
2978         return ab.get_body();
2979     }
2980 
2981 protected:
2982 
2983     void reset_node( reset_flags f) override {
2984        base_type::reset_node(f);
2985     }
2986 };
2987 
2988 #include "detail/_flow_graph_node_set_impl.h"
2989 
2990 template< typename T >
2991 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
2992 public:
2993     typedef T input_type;
2994     typedef T output_type;
2995     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2996     typedef typename sender<output_type>::successor_type successor_type;
2997 
overwrite_node(graph & g)2998     __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
2999         : graph_node(g), my_successors(this), my_buffer_is_valid(false)
3000     {
3001         fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
3002                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3003     }
3004 
3005 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3006     template <typename... Args>
overwrite_node(const node_set<Args...> & nodes)3007     overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
3008         make_edges_in_order(nodes, *this);
3009     }
3010 #endif
3011 
3012     //! Copy constructor; doesn't take anything from src; default won't work
overwrite_node(const overwrite_node & src)3013     __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
3014 
~overwrite_node()3015     ~overwrite_node() {}
3016 
register_successor(successor_type & s)3017     bool register_successor( successor_type &s ) override {
3018         spin_mutex::scoped_lock l( my_mutex );
3019         if (my_buffer_is_valid && is_graph_active( my_graph )) {
3020             // We have a valid value that must be forwarded immediately.
3021             bool ret = s.try_put( my_buffer );
3022             if ( ret ) {
3023                 // We add the successor that accepted our put
3024                 my_successors.register_successor( s );
3025             } else {
3026                 // In case of reservation a race between the moment of reservation and register_successor can appear,
3027                 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3028                 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3029                 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3030                 small_object_allocator allocator{};
3031                 typedef register_predecessor_task task_type;
3032                 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
3033                 graph_reference().reserve_wait();
3034                 spawn_in_graph_arena( my_graph, *t );
3035             }
3036         } else {
3037             // No valid value yet, just add as successor
3038             my_successors.register_successor( s );
3039         }
3040         return true;
3041     }
3042 
remove_successor(successor_type & s)3043     bool remove_successor( successor_type &s ) override {
3044         spin_mutex::scoped_lock l( my_mutex );
3045         my_successors.remove_successor(s);
3046         return true;
3047     }
3048 
try_get(input_type & v)3049     bool try_get( input_type &v ) override {
3050         spin_mutex::scoped_lock l( my_mutex );
3051         if ( my_buffer_is_valid ) {
3052             v = my_buffer;
3053             return true;
3054         }
3055         return false;
3056     }
3057 
3058     //! Reserves an item
try_reserve(T & v)3059     bool try_reserve( T &v ) override {
3060         return try_get(v);
3061     }
3062 
3063     //! Releases the reserved item
try_release()3064     bool try_release() override { return true; }
3065 
3066     //! Consumes the reserved item
try_consume()3067     bool try_consume() override { return true; }
3068 
is_valid()3069     bool is_valid() {
3070        spin_mutex::scoped_lock l( my_mutex );
3071        return my_buffer_is_valid;
3072     }
3073 
clear()3074     void clear() {
3075        spin_mutex::scoped_lock l( my_mutex );
3076        my_buffer_is_valid = false;
3077     }
3078 
3079 protected:
3080 
3081     template< typename R, typename B > friend class run_and_put_task;
3082     template<typename X, typename Y> friend class broadcast_cache;
3083     template<typename X, typename Y> friend class round_robin_cache;
try_put_task(const input_type & v)3084     graph_task* try_put_task( const input_type &v ) override {
3085         spin_mutex::scoped_lock l( my_mutex );
3086         return try_put_task_impl(v);
3087     }
3088 
try_put_task_impl(const input_type & v)3089     graph_task * try_put_task_impl(const input_type &v) {
3090         my_buffer = v;
3091         my_buffer_is_valid = true;
3092         graph_task* rtask = my_successors.try_put_task(v);
3093         if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3094         return rtask;
3095     }
3096 
graph_reference()3097     graph& graph_reference() const override {
3098         return my_graph;
3099     }
3100 
3101     //! Breaks an infinite loop between the node reservation and register_successor call
3102     struct register_predecessor_task : public graph_task {
register_predecessor_taskregister_predecessor_task3103         register_predecessor_task(
3104             graph& g, small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
3105             : graph_task(g, allocator), o(owner), s(succ) {};
3106 
executeregister_predecessor_task3107         task* execute(execution_data& ed) override {
3108             // TODO revamp: investigate why qualification is needed for register_successor() call
3109             using tbb::detail::d1::register_predecessor;
3110             using tbb::detail::d1::register_successor;
3111             if ( !register_predecessor(s, o) ) {
3112                 register_successor(o, s);
3113             }
3114             finalize(ed);
3115             return nullptr;
3116         }
3117 
3118         predecessor_type& o;
3119         successor_type& s;
3120     };
3121 
3122     spin_mutex my_mutex;
3123     broadcast_cache< input_type, null_rw_mutex > my_successors;
3124     input_type my_buffer;
3125     bool my_buffer_is_valid;
3126 
reset_node(reset_flags f)3127     void reset_node( reset_flags f) override {
3128         my_buffer_is_valid = false;
3129        if (f&rf_clear_edges) {
3130            my_successors.clear();
3131        }
3132     }
3133 };  // overwrite_node
3134 
3135 template< typename T >
3136 class write_once_node : public overwrite_node<T> {
3137 public:
3138     typedef T input_type;
3139     typedef T output_type;
3140     typedef overwrite_node<T> base_type;
3141     typedef typename receiver<input_type>::predecessor_type predecessor_type;
3142     typedef typename sender<output_type>::successor_type successor_type;
3143 
3144     //! Constructor
write_once_node(graph & g)3145     __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
3146         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3147                                  static_cast<receiver<input_type> *>(this),
3148                                  static_cast<sender<output_type> *>(this) );
3149     }
3150 
3151 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3152     template <typename... Args>
write_once_node(const node_set<Args...> & nodes)3153     write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
3154         make_edges_in_order(nodes, *this);
3155     }
3156 #endif
3157 
3158     //! Copy constructor: call base class copy constructor
write_once_node(const write_once_node & src)3159     __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
3160         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3161                                  static_cast<receiver<input_type> *>(this),
3162                                  static_cast<sender<output_type> *>(this) );
3163     }
3164 
3165 protected:
3166     template< typename R, typename B > friend class run_and_put_task;
3167     template<typename X, typename Y> friend class broadcast_cache;
3168     template<typename X, typename Y> friend class round_robin_cache;
try_put_task(const T & v)3169     graph_task *try_put_task( const T &v ) override {
3170         spin_mutex::scoped_lock l( this->my_mutex );
3171         return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
3172     }
3173 }; // write_once_node
3174 
set_name(const graph & g,const char * name)3175 inline void set_name(const graph& g, const char *name) {
3176     fgt_graph_desc(&g, name);
3177 }
3178 
3179 template <typename Output>
set_name(const input_node<Output> & node,const char * name)3180 inline void set_name(const input_node<Output>& node, const char *name) {
3181     fgt_node_desc(&node, name);
3182 }
3183 
3184 template <typename Input, typename Output, typename Policy>
set_name(const function_node<Input,Output,Policy> & node,const char * name)3185 inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
3186     fgt_node_desc(&node, name);
3187 }
3188 
3189 template <typename Output, typename Policy>
set_name(const continue_node<Output,Policy> & node,const char * name)3190 inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
3191     fgt_node_desc(&node, name);
3192 }
3193 
3194 template <typename T>
set_name(const broadcast_node<T> & node,const char * name)3195 inline void set_name(const broadcast_node<T>& node, const char *name) {
3196     fgt_node_desc(&node, name);
3197 }
3198 
3199 template <typename T>
set_name(const buffer_node<T> & node,const char * name)3200 inline void set_name(const buffer_node<T>& node, const char *name) {
3201     fgt_node_desc(&node, name);
3202 }
3203 
3204 template <typename T>
set_name(const queue_node<T> & node,const char * name)3205 inline void set_name(const queue_node<T>& node, const char *name) {
3206     fgt_node_desc(&node, name);
3207 }
3208 
3209 template <typename T>
set_name(const sequencer_node<T> & node,const char * name)3210 inline void set_name(const sequencer_node<T>& node, const char *name) {
3211     fgt_node_desc(&node, name);
3212 }
3213 
3214 template <typename T, typename Compare>
set_name(const priority_queue_node<T,Compare> & node,const char * name)3215 inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
3216     fgt_node_desc(&node, name);
3217 }
3218 
3219 template <typename T, typename DecrementType>
set_name(const limiter_node<T,DecrementType> & node,const char * name)3220 inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
3221     fgt_node_desc(&node, name);
3222 }
3223 
3224 template <typename OutputTuple, typename JP>
set_name(const join_node<OutputTuple,JP> & node,const char * name)3225 inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
3226     fgt_node_desc(&node, name);
3227 }
3228 
3229 template <typename... Types>
set_name(const indexer_node<Types...> & node,const char * name)3230 inline void set_name(const indexer_node<Types...>& node, const char *name) {
3231     fgt_node_desc(&node, name);
3232 }
3233 
3234 template <typename T>
set_name(const overwrite_node<T> & node,const char * name)3235 inline void set_name(const overwrite_node<T>& node, const char *name) {
3236     fgt_node_desc(&node, name);
3237 }
3238 
3239 template <typename T>
set_name(const write_once_node<T> & node,const char * name)3240 inline void set_name(const write_once_node<T>& node, const char *name) {
3241     fgt_node_desc(&node, name);
3242 }
3243 
3244 template<typename Input, typename Output, typename Policy>
set_name(const multifunction_node<Input,Output,Policy> & node,const char * name)3245 inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
3246     fgt_multioutput_node_desc(&node, name);
3247 }
3248 
3249 template<typename TupleType>
set_name(const split_node<TupleType> & node,const char * name)3250 inline void set_name(const split_node<TupleType>& node, const char *name) {
3251     fgt_multioutput_node_desc(&node, name);
3252 }
3253 
3254 template< typename InputTuple, typename OutputTuple >
set_name(const composite_node<InputTuple,OutputTuple> & node,const char * name)3255 inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
3256     fgt_multiinput_multioutput_node_desc(&node, name);
3257 }
3258 
3259 template<typename Input, typename Output, typename Policy>
set_name(const async_node<Input,Output,Policy> & node,const char * name)3260 inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
3261 {
3262     fgt_multioutput_node_desc(&node, name);
3263 }
3264 } // d1
3265 } // detail
3266 } // tbb
3267 
3268 
3269 // Include deduction guides for node classes
3270 #include "detail/_flow_graph_nodes_deduction.h"
3271 
3272 namespace tbb {
3273 namespace flow {
3274 inline namespace v1 {
3275     using detail::d1::receiver;
3276     using detail::d1::sender;
3277 
3278     using detail::d1::serial;
3279     using detail::d1::unlimited;
3280 
3281     using detail::d1::reset_flags;
3282     using detail::d1::rf_reset_protocol;
3283     using detail::d1::rf_reset_bodies;
3284     using detail::d1::rf_clear_edges;
3285 
3286     using detail::d1::graph;
3287     using detail::d1::graph_node;
3288     using detail::d1::continue_msg;
3289 
3290     using detail::d1::input_node;
3291     using detail::d1::function_node;
3292     using detail::d1::multifunction_node;
3293     using detail::d1::split_node;
3294     using detail::d1::output_port;
3295     using detail::d1::indexer_node;
3296     using detail::d1::tagged_msg;
3297     using detail::d1::cast_to;
3298     using detail::d1::is_a;
3299     using detail::d1::continue_node;
3300     using detail::d1::overwrite_node;
3301     using detail::d1::write_once_node;
3302     using detail::d1::broadcast_node;
3303     using detail::d1::buffer_node;
3304     using detail::d1::queue_node;
3305     using detail::d1::sequencer_node;
3306     using detail::d1::priority_queue_node;
3307     using detail::d1::limiter_node;
3308     using namespace detail::d1::graph_policy_namespace;
3309     using detail::d1::join_node;
3310     using detail::d1::input_port;
3311     using detail::d1::copy_body;
3312     using detail::d1::make_edge;
3313     using detail::d1::remove_edge;
3314     using detail::d1::tag_value;
3315     using detail::d1::composite_node;
3316     using detail::d1::async_node;
3317     using detail::d1::node_priority_t;
3318     using detail::d1::no_priority;
3319 
3320 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3321     using detail::d1::follows;
3322     using detail::d1::precedes;
3323     using detail::d1::make_node_set;
3324     using detail::d1::make_edges;
3325 #endif
3326 
3327 } // v1
3328 } // flow
3329 
3330     using detail::d1::flow_control;
3331 
3332 namespace profiling {
3333     using detail::d1::set_name;
3334 } // profiling
3335 
3336 } // tbb
3337 
3338 
3339 #if TBB_USE_PROFILING_TOOLS  && ( __unix__ || __APPLE__ )
3340    // We don't do pragma pop here, since it still gives warning on the USER side
3341    #undef __TBB_NOINLINE_SYM
3342 #endif
3343 
3344 #endif // __TBB_flow_graph_H
3345