1 /*
2     Copyright 2005-2014 Intel Corporation.  All Rights Reserved.
3 
4     This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5     you can redistribute it and/or modify it under the terms of the GNU General Public License
6     version 2  as  published  by  the  Free Software Foundation.  Threading Building Blocks is
7     distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8     implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9     See  the GNU General Public License for more details.   You should have received a copy of
10     the  GNU General Public License along with Threading Building Blocks; if not, write to the
11     Free Software Foundation, Inc.,  51 Franklin St,  Fifth Floor,  Boston,  MA 02110-1301 USA
12 
13     As a special exception,  you may use this file  as part of a free software library without
14     restriction.  Specifically,  if other files instantiate templates  or use macros or inline
15     functions from this file, or you compile this file and link it with other files to produce
16     an executable,  this file does not by itself cause the resulting executable to be covered
17     by the GNU General Public License. This exception does not however invalidate any other
18     reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB_flow_graph_H
22 #define __TBB_flow_graph_H
23 
24 #include "tbb_stddef.h"
25 #include "atomic.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
30 #include "task.h"
31 #include "cache_aligned_allocator.h"
32 #include "tbb_exception.h"
33 #include "internal/_aggregator_impl.h"
34 #include "tbb_profiling.h"
35 
36 #if TBB_DEPRECATED_FLOW_ENQUEUE
37 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
38 #else
39 #define FLOW_SPAWN(a) tbb::task::spawn((a))
40 #endif
41 
42 // use the VC10 or gcc version of tuple if it is available.
43 #if __TBB_CPP11_TUPLE_PRESENT
44     #include <tuple>
45 namespace tbb {
46     namespace flow {
47         using std::tuple;
48         using std::tuple_size;
49         using std::tuple_element;
50         using std::get;
51     }
52 }
53 #else
54     #include "compat/tuple"
55 #endif
56 
57 #include<list>
58 #include<queue>
59 
60 /** @file
61   \brief The graph related classes and functions
62 
63   There are some applications that best express dependencies as messages
64   passed between nodes in a graph.  These messages may contain data or
65   simply act as signals that a predecessors has completed. The graph
66   class and its associated node classes can be used to express such
67   applications.
68 */
69 
70 namespace tbb {
71 namespace flow {
72 
73 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
74 enum concurrency { unlimited = 0, serial = 1 };
75 
76 namespace interface7 {
77 
78 namespace internal {
79     template<typename T, typename M> class successor_cache;
80     template<typename T, typename M> class broadcast_cache;
81     template<typename T, typename M> class round_robin_cache;
82 }
83 
84 //! An empty class used for messages that mean "I'm done"
85 class continue_msg {};
86 
87 template< typename T > class sender;
88 template< typename T > class receiver;
89 class continue_receiver;
90 
91 //! Pure virtual template class that defines a sender of messages of type T
92 template< typename T >
93 class sender {
94 public:
95     //! The output type of this sender
96     typedef T output_type;
97 
98     //! The successor type for this node
99     typedef receiver<T> successor_type;
100 
~sender()101     virtual ~sender() {}
102 
103     //! Add a new successor to this node
104     virtual bool register_successor( successor_type &r ) = 0;
105 
106     //! Removes a successor from this node
107     virtual bool remove_successor( successor_type &r ) = 0;
108 
109     //! Request an item from the sender
try_get(T &)110     virtual bool try_get( T & ) { return false; }
111 
112     //! Reserves an item in the sender
try_reserve(T &)113     virtual bool try_reserve( T & ) { return false; }
114 
115     //! Releases the reserved item
try_release()116     virtual bool try_release( ) { return false; }
117 
118     //! Consumes the reserved item
try_consume()119     virtual bool try_consume( ) { return false; }
120 
121 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
122     //! interface to record edges for traversal & deletion
123     virtual void    internal_add_built_successor( successor_type & )    = 0;
124     virtual void    internal_delete_built_successor( successor_type & ) = 0;
125     virtual void    copy_successors( std::vector<successor_type *> &)   = 0;
126     virtual size_t  successor_count()                                   = 0;
127 #endif
128 };
129 
130 template< typename T > class limiter_node;  // needed for resetting decrementer
131 template< typename R, typename B > class run_and_put_task;
132 
133 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
134 
135 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
136 // flags to modify the behavior of the graph reset().  Can be combined.
137 enum reset_flags {
138     rf_reset_protocol   = 0,
139     rf_reset_bodies     = 1<<0,  // delete the current node body, reset to a copy of the initial node body.
140     rf_extract          = 1<<1   // delete edges (extract() for single node, reset() for graph.)
141 };
142 
143 #define __TBB_PFG_RESET_ARG(exp) exp
144 #define __TBB_COMMA ,
145 #else
146 #define __TBB_PFG_RESET_ARG(exp)  /* nothing */
147 #define __TBB_COMMA /* nothing */
148 #endif
149 
150 // enqueue left task if necessary.  Returns the non-enqueued task if there is one.
combine_tasks(tbb::task * left,tbb::task * right)151 static inline tbb::task *combine_tasks( tbb::task * left, tbb::task * right) {
152     // if no RHS task, don't change left.
153     if(right == NULL) return left;
154     // right != NULL
155     if(left == NULL) return right;
156     if(left == SUCCESSFULLY_ENQUEUED) return right;
157     // left contains a task
158     if(right != SUCCESSFULLY_ENQUEUED) {
159         // both are valid tasks
160         FLOW_SPAWN(*left);
161         return right;
162     }
163     return left;
164 }
165 
166 //! Pure virtual template class that defines a receiver of messages of type T
167 template< typename T >
168 class receiver {
169 public:
170     //! The input type of this receiver
171     typedef T input_type;
172 
173     //! The predecessor type for this node
174     typedef sender<T> predecessor_type;
175 
176     //! Destructor
~receiver()177     virtual ~receiver() {}
178 
179     //! Put an item to the receiver
try_put(const T & t)180     bool try_put( const T& t ) {
181         task *res = try_put_task(t);
182         if(!res) return false;
183         if (res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
184         return true;
185     }
186 
187     //! put item to successor; return task to run the successor if possible.
188 protected:
189     template< typename R, typename B > friend class run_and_put_task;
190     template<typename X, typename Y> friend class internal::broadcast_cache;
191     template<typename X, typename Y> friend class internal::round_robin_cache;
192     virtual task *try_put_task(const T& t) = 0;
193 public:
194 
195     //! Add a predecessor to the node
register_predecessor(predecessor_type &)196     virtual bool register_predecessor( predecessor_type & ) { return false; }
197 
198     //! Remove a predecessor from the node
remove_predecessor(predecessor_type &)199     virtual bool remove_predecessor( predecessor_type & ) { return false; }
200 
201 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
202     virtual void   internal_add_built_predecessor( predecessor_type & )    = 0;
203     virtual void   internal_delete_built_predecessor( predecessor_type & ) = 0;
204     virtual void   copy_predecessors( std::vector<predecessor_type *> & )  = 0;
205     virtual size_t predecessor_count()                                     = 0;
206 #endif
207 
208 protected:
209     //! put receiver back in initial state
210     template<typename U> friend class limiter_node;
211     virtual void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f = rf_reset_protocol ) ) = 0;
212 
213     template<typename TT, typename M>
214         friend class internal::successor_cache;
is_continue_receiver()215     virtual bool is_continue_receiver() { return false; }
216 };
217 
218 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
219 //* holder of edges both for caches and for those nodes which do not have predecessor caches.
220 // C == receiver< ... > or sender< ... >, depending.
221 template<typename C>
222 class edge_container {
223 
224 public:
225     typedef std::vector<C *> edge_vector;
226 
add_edge(C & s)227     void add_edge( C &s) {
228         built_edges.push_back( &s );
229     }
230 
delete_edge(C & s)231     void delete_edge( C &s) {
232         for ( typename edge_vector::iterator i = built_edges.begin(); i != built_edges.end(); ++i ) {
233             if ( *i == &s )  {
234                 (void)built_edges.erase(i);
235                 return;  // only remove one predecessor per request
236             }
237         }
238     }
239 
copy_edges(edge_vector & v)240     void copy_edges( edge_vector &v) {
241         v = built_edges;
242     }
243 
edge_count()244     size_t edge_count() {
245         return (size_t)(built_edges.size());
246     }
247 
clear()248     void clear() {
249         built_edges.clear();
250     }
251 
252     template< typename S > void sender_extract( S &s );
253     template< typename R > void receiver_extract( R &r );
254 
255 private:
256     edge_vector built_edges;
257 };
258 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
259 
260 //! Base class for receivers of completion messages
261 /** These receivers automatically reset, but cannot be explicitly waited on */
262 class continue_receiver : public receiver< continue_msg > {
263 public:
264 
265     //! The input type
266     typedef continue_msg input_type;
267 
268     //! The predecessor type for this node
269     typedef sender< continue_msg > predecessor_type;
270 
271     //! Constructor
272     continue_receiver( int number_of_predecessors = 0 ) {
273         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
274         my_current_count = 0;
275     }
276 
277     //! Copy constructor
continue_receiver(const continue_receiver & src)278     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
279         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
280         my_current_count = 0;
281     }
282 
283     //! Destructor
~continue_receiver()284     virtual ~continue_receiver() { }
285 
286     //! Increments the trigger threshold
register_predecessor(predecessor_type &)287     /* override */ bool register_predecessor( predecessor_type & ) {
288         spin_mutex::scoped_lock l(my_mutex);
289         ++my_predecessor_count;
290         return true;
291     }
292 
293     //! Decrements the trigger threshold
294     /** Does not check to see if the removal of the predecessor now makes the current count
295         exceed the new threshold.  So removing a predecessor while the graph is active can cause
296         unexpected results. */
remove_predecessor(predecessor_type &)297     /* override */ bool remove_predecessor( predecessor_type & ) {
298         spin_mutex::scoped_lock l(my_mutex);
299         --my_predecessor_count;
300         return true;
301     }
302 
303 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
304     typedef std::vector<predecessor_type *> predecessor_vector_type;
305 
internal_add_built_predecessor(predecessor_type & s)306     /*override*/ void internal_add_built_predecessor( predecessor_type &s) {
307         spin_mutex::scoped_lock l(my_mutex);
308         my_built_predecessors.add_edge( s );
309     }
310 
internal_delete_built_predecessor(predecessor_type & s)311     /*override*/ void internal_delete_built_predecessor( predecessor_type &s) {
312         spin_mutex::scoped_lock l(my_mutex);
313         my_built_predecessors.delete_edge(s);
314     }
315 
copy_predecessors(predecessor_vector_type & v)316     /*override*/ void copy_predecessors( predecessor_vector_type &v) {
317         spin_mutex::scoped_lock l(my_mutex);
318         my_built_predecessors.copy_edges(v);
319     }
320 
predecessor_count()321     /*override*/ size_t predecessor_count() {
322         spin_mutex::scoped_lock l(my_mutex);
323         return my_built_predecessors.edge_count();
324     }
325 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
326 
327 protected:
328     template< typename R, typename B > friend class run_and_put_task;
329     template<typename X, typename Y> friend class internal::broadcast_cache;
330     template<typename X, typename Y> friend class internal::round_robin_cache;
331     // execute body is supposed to be too small to create a task for.
try_put_task(const input_type &)332     /* override */ task *try_put_task( const input_type & ) {
333         {
334             spin_mutex::scoped_lock l(my_mutex);
335             if ( ++my_current_count < my_predecessor_count )
336                 return SUCCESSFULLY_ENQUEUED;
337             else
338                 my_current_count = 0;
339         }
340         task * res = execute();
341         if(!res) return SUCCESSFULLY_ENQUEUED;
342         return res;
343     }
344 
345 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
346     edge_container<predecessor_type> my_built_predecessors;
347 #endif
348     spin_mutex my_mutex;
349     int my_predecessor_count;
350     int my_current_count;
351     int my_initial_predecessor_count;
352     // the friend declaration in the base class did not eliminate the "protected class"
353     // error in gcc 4.1.2
354     template<typename U> friend class limiter_node;
reset_receiver(__TBB_PFG_RESET_ARG (reset_flags f))355     /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) )
356     {
357         my_current_count = 0;
358 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
359         if(f & rf_extract) {
360             my_built_predecessors.receiver_extract(*this);
361             my_predecessor_count = my_initial_predecessor_count;
362         }
363 #endif
364     }
365 
366     //! Does whatever should happen when the threshold is reached
367     /** This should be very fast or else spawn a task.  This is
368         called while the sender is blocked in the try_put(). */
369     virtual task * execute() = 0;
370     template<typename TT, typename M>
371         friend class internal::successor_cache;
is_continue_receiver()372     /*override*/ bool is_continue_receiver() { return true; }
373 };
374 }  // interface7
375 }  // flow
376 }  // tbb
377 
378 #include "internal/_flow_graph_trace_impl.h"
379 
380 namespace tbb {
381 namespace flow {
382 namespace interface7 {
383 
384 #include "internal/_flow_graph_types_impl.h"
385 #include "internal/_flow_graph_impl.h"
386 using namespace internal::graph_policy_namespace;
387 
388 class graph;
389 class graph_node;
390 
391 template <typename GraphContainerType, typename GraphNodeType>
392 class graph_iterator {
393     friend class graph;
394     friend class graph_node;
395 public:
396     typedef size_t size_type;
397     typedef GraphNodeType value_type;
398     typedef GraphNodeType* pointer;
399     typedef GraphNodeType& reference;
400     typedef const GraphNodeType& const_reference;
401     typedef std::forward_iterator_tag iterator_category;
402 
403     //! Default constructor
graph_iterator()404     graph_iterator() : my_graph(NULL), current_node(NULL) {}
405 
406     //! Copy constructor
graph_iterator(const graph_iterator & other)407     graph_iterator(const graph_iterator& other) :
408         my_graph(other.my_graph), current_node(other.current_node)
409     {}
410 
411     //! Assignment
412     graph_iterator& operator=(const graph_iterator& other) {
413         if (this != &other) {
414             my_graph = other.my_graph;
415             current_node = other.current_node;
416         }
417         return *this;
418     }
419 
420     //! Dereference
421     reference operator*() const;
422 
423     //! Dereference
424     pointer operator->() const;
425 
426     //! Equality
427     bool operator==(const graph_iterator& other) const {
428         return ((my_graph == other.my_graph) && (current_node == other.current_node));
429     }
430 
431     //! Inequality
432     bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
433 
434     //! Pre-increment
435     graph_iterator& operator++() {
436         internal_forward();
437         return *this;
438     }
439 
440     //! Post-increment
441     graph_iterator operator++(int) {
442         graph_iterator result = *this;
443         operator++();
444         return result;
445     }
446 
447 private:
448     // the graph over which we are iterating
449     GraphContainerType *my_graph;
450     // pointer into my_graph's my_nodes list
451     pointer current_node;
452 
453     //! Private initializing constructor for begin() and end() iterators
454     graph_iterator(GraphContainerType *g, bool begin);
455     void internal_forward();
456 };
457 
458 //! The graph class
459 /** This class serves as a handle to the graph */
460 class graph : tbb::internal::no_copy {
461     friend class graph_node;
462 
463     template< typename Body >
464     class run_task : public task {
465     public:
run_task(Body & body)466         run_task( Body& body ) : my_body(body) {}
execute()467         task *execute() {
468             my_body();
469             return NULL;
470         }
471     private:
472         Body my_body;
473     };
474 
475     template< typename Receiver, typename Body >
476     class run_and_put_task : public task {
477     public:
run_and_put_task(Receiver & r,Body & body)478         run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
execute()479         task *execute() {
480             task *res = my_receiver.try_put_task( my_body() );
481             if(res == SUCCESSFULLY_ENQUEUED) res = NULL;
482             return res;
483         }
484     private:
485         Receiver &my_receiver;
486         Body my_body;
487     };
488 
489 public:
490     //! Constructs a graph with isolated task_group_context
graph()491     explicit graph() : my_nodes(NULL), my_nodes_last(NULL)
492     {
493         own_context = true;
494         cancelled = false;
495         caught_exception = false;
496         my_context = new task_group_context();
497         my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
498         my_root_task->set_ref_count(1);
499         tbb::internal::fgt_graph( this );
500 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
501         my_is_active = true;
502 #endif
503     }
504 
505     //! Constructs a graph with use_this_context as context
graph(task_group_context & use_this_context)506     explicit graph(task_group_context& use_this_context) :
507     my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL)
508     {
509         own_context = false;
510         my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
511         my_root_task->set_ref_count(1);
512         tbb::internal::fgt_graph( this );
513 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
514         my_is_active = true;
515 #endif
516     }
517 
518     //! Destroys the graph.
519     /** Calls wait_for_all, then destroys the root task and context. */
~graph()520     ~graph() {
521         wait_for_all();
522         my_root_task->set_ref_count(0);
523         task::destroy( *my_root_task );
524         if (own_context) delete my_context;
525     }
526 
527 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)528     void set_name( const char *name ) {
529         tbb::internal::fgt_graph_desc( this, name );
530     }
531 #endif
532 
533     //! Used to register that an external entity may still interact with the graph.
534     /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
535         is made. */
increment_wait_count()536     void increment_wait_count() {
537         if (my_root_task)
538             my_root_task->increment_ref_count();
539     }
540 
541     //! Deregisters an external entity that may have interacted with the graph.
542     /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
543         matches the number of increment_wait_count calls. */
decrement_wait_count()544     void decrement_wait_count() {
545         if (my_root_task)
546             my_root_task->decrement_ref_count();
547     }
548 
549     //! Spawns a task that runs a body and puts its output to a specific receiver
550     /** The task is spawned as a child of the graph. This is useful for running tasks
551         that need to block a wait_for_all() on the graph.  For example a one-off source. */
552     template< typename Receiver, typename Body >
run(Receiver & r,Body body)553         void run( Receiver &r, Body body ) {
554        FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *my_root_task ) )
555                    run_and_put_task< Receiver, Body >( r, body )) );
556     }
557 
558     //! Spawns a task that runs a function object
559     /** The task is spawned as a child of the graph. This is useful for running tasks
560         that need to block a wait_for_all() on the graph. For example a one-off source. */
561     template< typename Body >
run(Body body)562     void run( Body body ) {
563        FLOW_SPAWN( * new ( task::allocate_additional_child_of( *my_root_task ) ) run_task< Body >( body ) );
564     }
565 
566     //! Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
567     /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
wait_for_all()568     void wait_for_all() {
569         cancelled = false;
570         caught_exception = false;
571         if (my_root_task) {
572 #if TBB_USE_EXCEPTIONS
573             try {
574 #endif
575                 my_root_task->wait_for_all();
576                 cancelled = my_context->is_group_execution_cancelled();
577 #if TBB_USE_EXCEPTIONS
578             }
579             catch(...) {
580                 my_root_task->set_ref_count(1);
581                 my_context->reset();
582                 caught_exception = true;
583                 cancelled = true;
584                 throw;
585             }
586 #endif
587             my_context->reset();  // consistent with behavior in catch()
588             my_root_task->set_ref_count(1);
589         }
590     }
591 
592     //! Returns the root task of the graph
root_task()593     task * root_task() {
594 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
595         if (!my_is_active)
596             return NULL;
597         else
598 #endif
599             return my_root_task;
600     }
601 
602 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
603     void set_active(bool a = true) {
604        my_is_active = a;
605     }
606 
is_active()607     bool is_active() {
608        return my_is_active;
609     }
610 #endif
611 
612     // ITERATORS
613     template<typename C, typename N>
614     friend class graph_iterator;
615 
616     // Graph iterator typedefs
617     typedef graph_iterator<graph,graph_node> iterator;
618     typedef graph_iterator<const graph,const graph_node> const_iterator;
619 
620     // Graph iterator constructors
621     //! start iterator
begin()622     iterator begin() { return iterator(this, true); }
623     //! end iterator
end()624     iterator end() { return iterator(this, false); }
625      //! start const iterator
begin()626     const_iterator begin() const { return const_iterator(this, true); }
627     //! end const iterator
end()628     const_iterator end() const { return const_iterator(this, false); }
629     //! start const iterator
cbegin()630     const_iterator cbegin() const { return const_iterator(this, true); }
631     //! end const iterator
cend()632     const_iterator cend() const { return const_iterator(this, false); }
633 
634     //! return status of graph execution
is_cancelled()635     bool is_cancelled() { return cancelled; }
exception_thrown()636     bool exception_thrown() { return caught_exception; }
637 
638     // thread-unsafe state reset.
639     void reset(__TBB_PFG_RESET_ARG(reset_flags f = rf_reset_protocol));
640 
641 private:
642     task *my_root_task;
643     task_group_context *my_context;
644     bool own_context;
645     bool cancelled;
646     bool caught_exception;
647 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
648     bool my_is_active;
649 #endif
650 
651 
652     graph_node *my_nodes, *my_nodes_last;
653 
654     spin_mutex nodelist_mutex;
655     void register_node(graph_node *n);
656     void remove_node(graph_node *n);
657 
658 };  // class graph
659 
660 template <typename C, typename N>
graph_iterator(C * g,bool begin)661 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
662 {
663     if (begin) current_node = my_graph->my_nodes;
664     //else it is an end iterator by default
665 }
666 
667 template <typename C, typename N>
668 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
669     __TBB_ASSERT(current_node, "graph_iterator at end");
670     return *operator->();
671 }
672 
673 template <typename C, typename N>
674 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
675     return current_node;
676 }
677 
678 
679 template <typename C, typename N>
internal_forward()680 void graph_iterator<C,N>::internal_forward() {
681     if (current_node) current_node = current_node->next;
682 }
683 
684 //! The base of all graph nodes.
685 class graph_node : tbb::internal::no_assign {
686     friend class graph;
687     template<typename C, typename N>
688     friend class graph_iterator;
689 protected:
690     graph& my_graph;
691     graph_node *next, *prev;
692 public:
graph_node(graph & g)693     graph_node(graph& g) : my_graph(g) {
694         my_graph.register_node(this);
695     }
~graph_node()696     virtual ~graph_node() {
697         my_graph.remove_node(this);
698     }
699 
700 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
701     virtual void set_name( const char *name ) = 0;
702 #endif
703 
704 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
705     virtual void extract( reset_flags f=rf_extract ) {
706         bool a = my_graph.is_active();
707         my_graph.set_active(false);
708         reset((reset_flags)(f|rf_extract));
709         my_graph.set_active(a);
710     }
711 #endif
712 
713 protected:
714     virtual void reset(__TBB_PFG_RESET_ARG(reset_flags f=rf_reset_protocol)) = 0;
715 };
716 
register_node(graph_node * n)717 inline void graph::register_node(graph_node *n) {
718     n->next = NULL;
719     {
720         spin_mutex::scoped_lock lock(nodelist_mutex);
721         n->prev = my_nodes_last;
722         if (my_nodes_last) my_nodes_last->next = n;
723         my_nodes_last = n;
724         if (!my_nodes) my_nodes = n;
725     }
726 }
727 
remove_node(graph_node * n)728 inline void graph::remove_node(graph_node *n) {
729     {
730         spin_mutex::scoped_lock lock(nodelist_mutex);
731         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
732         if (n->prev) n->prev->next = n->next;
733         if (n->next) n->next->prev = n->prev;
734         if (my_nodes_last == n) my_nodes_last = n->prev;
735         if (my_nodes == n) my_nodes = n->next;
736     }
737     n->prev = n->next = NULL;
738 }
739 
reset(__TBB_PFG_RESET_ARG (reset_flags f))740 inline void graph::reset( __TBB_PFG_RESET_ARG( reset_flags f )) {
741     // reset context
742     task *saved_my_root_task = my_root_task;
743     my_root_task = NULL;
744     if(my_context) my_context->reset();
745     cancelled = false;
746     caught_exception = false;
747     // reset all the nodes comprising the graph
748     for(iterator ii = begin(); ii != end(); ++ii) {
749         graph_node *my_p = &(*ii);
750         my_p->reset(__TBB_PFG_RESET_ARG(f));
751     }
752     my_root_task = saved_my_root_task;
753 }
754 
755 
756 #include "internal/_flow_graph_node_impl.h"
757 
758 //! An executable node that acts as a source, i.e. it has no predecessors
759 template < typename Output >
760 class source_node : public graph_node, public sender< Output > {
761 protected:
762     using graph_node::my_graph;
763 public:
764     //! The type of the output message, which is complete
765     typedef Output output_type;
766 
767     //! The type of successors of this node
768     typedef receiver< Output > successor_type;
769 
770 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
771     typedef std::vector<successor_type *> successor_vector_type;
772 #endif
773 
774     //! Constructor for a node with a successor
775     template< typename Body >
776     source_node( graph &g, Body body, bool is_active = true )
graph_node(g)777         : graph_node(g), my_active(is_active), init_my_active(is_active),
778         my_body( new internal::source_body_leaf< output_type, Body>(body) ),
779         my_reserved(false), my_has_cached_item(false)
780     {
781         my_successors.set_owner(this);
782         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
783                                            static_cast<sender<output_type> *>(this), this->my_body );
784     }
785 
786     //! Copy constructor
source_node(const source_node & src)787     source_node( const source_node& src ) :
788         graph_node(src.my_graph), sender<Output>(),
789         my_active(src.init_my_active),
790         init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
791         my_reserved(false), my_has_cached_item(false)
792     {
793         my_successors.set_owner(this);
794         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
795                                            static_cast<sender<output_type> *>(this), this->my_body );
796     }
797 
798     //! The destructor
~source_node()799     ~source_node() { delete my_body; }
800 
801 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)802     /* override */ void set_name( const char *name ) {
803         tbb::internal::fgt_node_desc( this, name );
804     }
805 #endif
806 
807     //! Add a new successor to this node
register_successor(successor_type & r)808     /* override */ bool register_successor( successor_type &r ) {
809         spin_mutex::scoped_lock lock(my_mutex);
810         my_successors.register_successor(r);
811         if ( my_active )
812             spawn_put();
813         return true;
814     }
815 
816     //! Removes a successor from this node
remove_successor(successor_type & r)817     /* override */ bool remove_successor( successor_type &r ) {
818         spin_mutex::scoped_lock lock(my_mutex);
819         my_successors.remove_successor(r);
820         return true;
821     }
822 
823 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_successor(successor_type & r)824     /*override*/void internal_add_built_successor( successor_type &r) {
825         spin_mutex::scoped_lock lock(my_mutex);
826         my_successors.internal_add_built_successor(r);
827     }
828 
internal_delete_built_successor(successor_type & r)829     /*override*/void internal_delete_built_successor( successor_type &r) {
830         spin_mutex::scoped_lock lock(my_mutex);
831         my_successors.internal_delete_built_successor(r);
832     }
833 
successor_count()834     /*override*/size_t successor_count() {
835         spin_mutex::scoped_lock lock(my_mutex);
836         return my_successors.successor_count();
837     }
838 
copy_successors(successor_vector_type & v)839     /*override*/void copy_successors(successor_vector_type &v) {
840         spin_mutex::scoped_lock l(my_mutex);
841         my_successors.copy_successors(v);
842     }
843 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
844 
845     //! Request an item from the node
try_get(output_type & v)846     /*override */ bool try_get( output_type &v ) {
847         spin_mutex::scoped_lock lock(my_mutex);
848         if ( my_reserved )
849             return false;
850 
851         if ( my_has_cached_item ) {
852             v = my_cached_item;
853             my_has_cached_item = false;
854             return true;
855         }
856         // we've been asked to provide an item, but we have none.  enqueue a task to
857         // provide one.
858         spawn_put();
859         return false;
860     }
861 
862     //! Reserves an item.
try_reserve(output_type & v)863     /* override */ bool try_reserve( output_type &v ) {
864         spin_mutex::scoped_lock lock(my_mutex);
865         if ( my_reserved ) {
866             return false;
867         }
868 
869         if ( my_has_cached_item ) {
870             v = my_cached_item;
871             my_reserved = true;
872             return true;
873         } else {
874             return false;
875         }
876     }
877 
878     //! Release a reserved item.
879     /** true = item has been released and so remains in sender, dest must request or reserve future items */
try_release()880     /* override */ bool try_release( ) {
881         spin_mutex::scoped_lock lock(my_mutex);
882         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
883         my_reserved = false;
884         if(!my_successors.empty())
885             spawn_put();
886         return true;
887     }
888 
889     //! Consumes a reserved item
try_consume()890     /* override */ bool try_consume( ) {
891         spin_mutex::scoped_lock lock(my_mutex);
892         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
893         my_reserved = false;
894         my_has_cached_item = false;
895         if ( !my_successors.empty() ) {
896             spawn_put();
897         }
898         return true;
899     }
900 
901     //! Activates a node that was created in the inactive state
activate()902     void activate() {
903         spin_mutex::scoped_lock lock(my_mutex);
904         my_active = true;
905         if ( !my_successors.empty() )
906             spawn_put();
907     }
908 
909     template<typename Body>
copy_function_object()910     Body copy_function_object() {
911         internal::source_body<output_type> &body_ref = *this->my_body;
912         return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
913     }
914 
915 protected:
916 
917     //! resets the source_node to its initial state
reset(__TBB_PFG_RESET_ARG (reset_flags f))918     void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
919         my_active = init_my_active;
920         my_reserved =false;
921         if(my_has_cached_item) {
922             my_has_cached_item = false;
923         }
924 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
925         my_successors.reset(f);
926         if(f & rf_reset_bodies) my_body->reset_body();
927 #endif
928     }
929 
930 private:
931     spin_mutex my_mutex;
932     bool my_active;
933     bool init_my_active;
934     internal::source_body<output_type> *my_body;
935     internal::broadcast_cache< output_type > my_successors;
936     bool my_reserved;
937     bool my_has_cached_item;
938     output_type my_cached_item;
939 
940     // used by apply_body, can invoke body of node.
try_reserve_apply_body(output_type & v)941     bool try_reserve_apply_body(output_type &v) {
942         spin_mutex::scoped_lock lock(my_mutex);
943         if ( my_reserved ) {
944             return false;
945         }
946         if ( !my_has_cached_item ) {
947             tbb::internal::fgt_begin_body( my_body );
948             bool r = (*my_body)(my_cached_item);
949             tbb::internal::fgt_end_body( my_body );
950             if (r) {
951                 my_has_cached_item = true;
952             }
953         }
954         if ( my_has_cached_item ) {
955             v = my_cached_item;
956             my_reserved = true;
957             return true;
958         } else {
959             return false;
960         }
961     }
962 
963     //! Spawns a task that applies the body
spawn_put()964     /* override */ void spawn_put( ) {
965         task* tp = this->my_graph.root_task();
966         if(tp) {
967             FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
968                         internal:: source_task_bypass < source_node< output_type > >( *this ) ) );
969         }
970     }
971 
972     friend class internal::source_task_bypass< source_node< output_type > >;
973     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
apply_body_bypass()974     /* override */ task * apply_body_bypass( ) {
975         output_type v;
976         if ( !try_reserve_apply_body(v) )
977             return NULL;
978 
979         task *last_task = my_successors.try_put_task(v);
980         if ( last_task )
981             try_consume();
982         else
983             try_release();
984         return last_task;
985     }
986 };  // source_node
987 
988 //! Implements a function node that supports Input -> Output
989 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
990 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
991 protected:
992     using graph_node::my_graph;
993 public:
994     typedef Input input_type;
995     typedef Output output_type;
996     typedef sender< input_type > predecessor_type;
997     typedef receiver< output_type > successor_type;
998     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
999     typedef internal::function_output<output_type> fOutput_type;
1000 #if TBB_PREVIEW_FLOW_GRAPH_FEAURES
1001     typedef std::vector<predecessor_type *> predecessor_vector_type;
1002     typedef std::vector<successor_type *> successor_vector_type;
1003 #endif
1004 
1005     //! Constructor
1006     template< typename Body >
function_node(graph & g,size_t concurrency,Body body)1007     function_node( graph &g, size_t concurrency, Body body ) :
1008         graph_node(g), internal::function_input<input_type,output_type,Allocator>(g, concurrency, body) {
1009         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1010                                            static_cast<sender<output_type> *>(this), this->my_body );
1011     }
1012 
1013     //! Copy constructor
function_node(const function_node & src)1014     function_node( const function_node& src ) :
1015         graph_node(src.my_graph), internal::function_input<input_type,output_type,Allocator>( src ),
1016         fOutput_type() {
1017         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph, static_cast<receiver<input_type> *>(this),
1018                                            static_cast<sender<output_type> *>(this), this->my_body );
1019     }
1020 
1021 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1022     /* override */ void set_name( const char *name ) {
1023         tbb::internal::fgt_node_desc( this, name );
1024     }
1025 #endif
1026 
1027 protected:
1028     template< typename R, typename B > friend class run_and_put_task;
1029     template<typename X, typename Y> friend class internal::broadcast_cache;
1030     template<typename X, typename Y> friend class internal::round_robin_cache;
1031     using fInput_type::try_put_task;
1032 
1033     // override of graph_node's reset.
reset(__TBB_PFG_RESET_ARG (reset_flags f))1034     /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1035         fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f));
1036 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1037         successors().reset(f);
1038         __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_node successors not empty");
1039         __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1040 #endif
1041     }
1042 
successors()1043     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1044 };
1045 
1046 //! Implements a function node that supports Input -> Output
1047 template < typename Input, typename Output, typename Allocator >
1048 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
1049 protected:
1050     using graph_node::my_graph;
1051 public:
1052     typedef Input input_type;
1053     typedef Output output_type;
1054     typedef sender< input_type > predecessor_type;
1055     typedef receiver< output_type > successor_type;
1056     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
1057     typedef internal::function_input_queue<input_type, Allocator> queue_type;
1058     typedef internal::function_output<output_type> fOutput_type;
1059 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1060     typedef std::vector<predecessor_type *> predecessor_vector_type;
1061     typedef std::vector<successor_type *> successor_vector_type;
1062 #endif
1063 
1064     //! Constructor
1065     template< typename Body >
function_node(graph & g,size_t concurrency,Body body)1066     function_node( graph &g, size_t concurrency, Body body ) :
1067         graph_node(g), fInput_type( g, concurrency, body, new queue_type() ) {
1068         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1069                                            static_cast<sender<output_type> *>(this), this->my_body );
1070     }
1071 
1072     //! Copy constructor
function_node(const function_node & src)1073     function_node( const function_node& src ) :
1074         graph_node(src.graph_node::my_graph), fInput_type( src, new queue_type() ), fOutput_type() {
1075         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1076                                            static_cast<sender<output_type> *>(this), this->my_body );
1077     }
1078 
1079 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1080     /* override */ void set_name( const char *name ) {
1081         tbb::internal::fgt_node_desc( this, name );
1082     }
1083 #endif
1084 
1085 protected:
1086     template< typename R, typename B > friend class run_and_put_task;
1087     template<typename X, typename Y> friend class internal::broadcast_cache;
1088     template<typename X, typename Y> friend class internal::round_robin_cache;
1089     using fInput_type::try_put_task;
1090 
reset(__TBB_PFG_RESET_ARG (reset_flags f))1091     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
1092         fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f));
1093 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1094         successors().reset(f);
1095         __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_node successors not empty");
1096         __TBB_ASSERT(!(f & rf_extract) || this->my_predecessors.empty(), "function_node predecessors not empty");
1097 #endif
1098 
1099     }
1100 
successors()1101     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1102 };
1103 
1104 //! implements a function node that supports Input -> (set of outputs)
1105 // Output is a tuple of output types.
1106 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1107 class multifunction_node :
1108     public graph_node,
1109     public internal::multifunction_input
1110     <
1111         Input,
1112         typename internal::wrap_tuple_elements<
1113             tbb::flow::tuple_size<Output>::value,  // #elements in tuple
1114             internal::multifunction_output,  // wrap this around each element
1115             Output // the tuple providing the types
1116         >::type,
1117         Allocator
1118     > {
1119 protected:
1120     using graph_node::my_graph;
1121 private:
1122     static const int N = tbb::flow::tuple_size<Output>::value;
1123 public:
1124     typedef Input input_type;
1125     typedef typename internal::wrap_tuple_elements<N,internal::multifunction_output, Output>::type output_ports_type;
1126 private:
1127     typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
1128     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
1129 public:
1130     template<typename Body>
multifunction_node(graph & g,size_t concurrency,Body body)1131     multifunction_node( graph &g, size_t concurrency, Body body ) :
1132         graph_node(g), base_type(g,concurrency, body) {
1133         tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1134                                                                  &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1135                                                                  this->output_ports(), this->my_body );
1136     }
1137 
multifunction_node(const multifunction_node & other)1138     multifunction_node( const multifunction_node &other) :
1139         graph_node(other.graph_node::my_graph), base_type(other) {
1140         tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1141                                                                  &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1142                                                                  this->output_ports(), this->my_body );
1143     }
1144 
1145 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1146     /* override */ void set_name( const char *name ) {
1147         tbb::internal::fgt_multioutput_node_desc( this, name );
1148     }
1149 #endif
1150 
1151     // all the guts are in multifunction_input...
1152 protected:
reset(__TBB_PFG_RESET_ARG (reset_flags f))1153     /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type::reset(__TBB_PFG_RESET_ARG(f)); }
1154 };  // multifunction_node
1155 
1156 template < typename Input, typename Output, typename Allocator >
1157 class multifunction_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multifunction_input<Input,
1158     typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::value, internal::multifunction_output, Output>::type, Allocator> {
1159 protected:
1160     using graph_node::my_graph;
1161     static const int N = tbb::flow::tuple_size<Output>::value;
1162 public:
1163     typedef Input input_type;
1164     typedef typename internal::wrap_tuple_elements<N, internal::multifunction_output, Output>::type output_ports_type;
1165 private:
1166     typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
1167     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
1168 public:
1169     template<typename Body>
multifunction_node(graph & g,size_t concurrency,Body body)1170     multifunction_node( graph &g, size_t concurrency, Body body) :
1171         graph_node(g), base_type(g,concurrency, body, new queue_type()) {
1172         tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1173                                                                  &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1174                                                                  this->output_ports(), this->my_body );
1175     }
1176 
multifunction_node(const multifunction_node & other)1177     multifunction_node( const multifunction_node &other) :
1178         graph_node(other.graph_node::my_graph), base_type(other, new queue_type()) {
1179         tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1180                                                                  &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1181                                                                  this->output_ports(), this->my_body );
1182     }
1183 
1184 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1185     /* override */ void set_name( const char *name ) {
1186         tbb::internal::fgt_multioutput_node_desc( this, name );
1187     }
1188 #endif
1189 
1190     // all the guts are in multifunction_input...
1191 protected:
reset(__TBB_PFG_RESET_ARG (reset_flags f))1192     /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type::reset(__TBB_PFG_RESET_ARG(f)); }
1193 };  // multifunction_node
1194 
1195 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
1196 //  successors.  The node has unlimited concurrency, so though it is marked as
1197 //  "rejecting" it does not reject inputs.
1198 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1199 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
1200     static const int N = tbb::flow::tuple_size<TupleType>::value;
1201     typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
1202 public:
1203     typedef typename base_type::output_ports_type output_ports_type;
1204 private:
1205     struct splitting_body {
operatorsplitting_body1206         void operator()(const TupleType& t, output_ports_type &p) {
1207             internal::emit_element<N>::emit_this(t, p);
1208         }
1209     };
1210 public:
1211     typedef TupleType input_type;
1212     typedef Allocator allocator_type;
split_node(graph & g)1213     split_node(graph &g) : base_type(g, unlimited, splitting_body()) {
1214         tbb::internal::fgt_multioutput_node<TupleType,N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1215                                                           static_cast<receiver<input_type> *>(this), this->output_ports() );
1216     }
1217 
split_node(const split_node & other)1218     split_node( const split_node & other) : base_type(other) {
1219         tbb::internal::fgt_multioutput_node<TupleType,N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1220                                                           static_cast<receiver<input_type> *>(this), this->output_ports() );
1221     }
1222 
1223 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1224     /* override */ void set_name( const char *name ) {
1225         tbb::internal::fgt_multioutput_node_desc( this, name );
1226     }
1227 #endif
1228 
1229 };
1230 
1231 //! Implements an executable node that supports continue_msg -> Output
1232 template <typename Output>
1233 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
1234 protected:
1235     using graph_node::my_graph;
1236 public:
1237     typedef continue_msg input_type;
1238     typedef Output output_type;
1239     typedef sender< input_type > predecessor_type;
1240     typedef receiver< output_type > successor_type;
1241     typedef internal::continue_input<Output> fInput_type;
1242     typedef internal::function_output<output_type> fOutput_type;
1243 
1244     //! Constructor for executable node with continue_msg -> Output
1245     template <typename Body >
continue_node(graph & g,Body body)1246     continue_node( graph &g, Body body ) :
1247         graph_node(g), internal::continue_input<output_type>( g, body ) {
1248         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1249                                            static_cast<receiver<input_type> *>(this),
1250                                            static_cast<sender<output_type> *>(this), this->my_body );
1251     }
1252 
1253 
1254     //! Constructor for executable node with continue_msg -> Output
1255     template <typename Body >
continue_node(graph & g,int number_of_predecessors,Body body)1256     continue_node( graph &g, int number_of_predecessors, Body body ) :
1257         graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body ) {
1258         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1259                                            static_cast<receiver<input_type> *>(this),
1260                                            static_cast<sender<output_type> *>(this), this->my_body );
1261     }
1262 
1263     //! Copy constructor
continue_node(const continue_node & src)1264     continue_node( const continue_node& src ) :
1265         graph_node(src.graph_node::my_graph), internal::continue_input<output_type>(src),
1266         internal::function_output<Output>() {
1267         tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1268                                            static_cast<receiver<input_type> *>(this),
1269                                            static_cast<sender<output_type> *>(this), this->my_body );
1270     }
1271 
1272 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1273     /* override */ void set_name( const char *name ) {
1274         tbb::internal::fgt_node_desc( this, name );
1275     }
1276 #endif
1277 
1278 protected:
1279     template< typename R, typename B > friend class run_and_put_task;
1280     template<typename X, typename Y> friend class internal::broadcast_cache;
1281     template<typename X, typename Y> friend class internal::round_robin_cache;
1282     using fInput_type::try_put_task;
1283 
reset(__TBB_PFG_RESET_ARG (reset_flags f))1284     /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1285         fInput_type::reset_receiver(__TBB_PFG_RESET_ARG(f));
1286 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1287         successors().reset(f);
1288         __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "continue_node not reset");
1289 #endif
1290     }
1291 
successors()1292     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1293 };  // continue_node
1294 
1295 template< typename T >
1296 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
1297 protected:
1298     using graph_node::my_graph;
1299 public:
1300     typedef T input_type;
1301     typedef T output_type;
1302     typedef sender< input_type > predecessor_type;
1303     typedef receiver< output_type > successor_type;
1304 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1305     typedef std::vector<predecessor_type *> predecessor_vector_type;
1306     typedef std::vector<successor_type *> successor_vector_type;
1307 #endif
1308 
overwrite_node(graph & g)1309     overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
1310         my_successors.set_owner( this );
1311         tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1312                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1313     }
1314 
1315     // Copy constructor; doesn't take anything from src; default won't work
overwrite_node(const overwrite_node & src)1316     overwrite_node( const overwrite_node& src ) :
1317         graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
1318     {
1319         my_successors.set_owner( this );
1320         tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1321                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1322     }
1323 
~overwrite_node()1324     ~overwrite_node() {}
1325 
1326 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1327     /* override */ void set_name( const char *name ) {
1328         tbb::internal::fgt_node_desc( this, name );
1329     }
1330 #endif
1331 
register_successor(successor_type & s)1332     /* override */ bool register_successor( successor_type &s ) {
1333         spin_mutex::scoped_lock l( my_mutex );
1334         task* tp = this->my_graph.root_task();  // just to test if we are resetting
1335         if (my_buffer_is_valid && tp) {
1336             // We have a valid value that must be forwarded immediately.
1337             if ( s.try_put( my_buffer ) || !s.register_predecessor( *this  ) ) {
1338                 // We add the successor: it accepted our put or it rejected it but won't let us become a predecessor
1339                 my_successors.register_successor( s );
1340             } else {
1341                 // We don't add the successor: it rejected our put and we became its predecessor instead
1342                 return false;
1343             }
1344         } else {
1345             // No valid value yet, just add as successor
1346             my_successors.register_successor( s );
1347         }
1348         return true;
1349     }
1350 
remove_successor(successor_type & s)1351     /* override */ bool remove_successor( successor_type &s ) {
1352         spin_mutex::scoped_lock l( my_mutex );
1353         my_successors.remove_successor(s);
1354         return true;
1355     }
1356 
1357 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_successor(successor_type & s)1358     /*override*/void internal_add_built_successor( successor_type &s) {
1359         spin_mutex::scoped_lock l( my_mutex );
1360         my_successors.internal_add_built_successor(s);
1361     }
1362 
internal_delete_built_successor(successor_type & s)1363     /*override*/void internal_delete_built_successor( successor_type &s) {
1364         spin_mutex::scoped_lock l( my_mutex );
1365         my_successors.internal_delete_built_successor(s);
1366     }
1367 
successor_count()1368     /*override*/size_t successor_count() {
1369         spin_mutex::scoped_lock l( my_mutex );
1370         return my_successors.successor_count();
1371     }
1372 
copy_successors(successor_vector_type & v)1373     /*override*/ void copy_successors(successor_vector_type &v) {
1374         spin_mutex::scoped_lock l( my_mutex );
1375         my_successors.copy_successors(v);
1376     }
1377 
internal_add_built_predecessor(predecessor_type & p)1378     /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1379         spin_mutex::scoped_lock l( my_mutex );
1380         my_built_predecessors.add_edge(p);
1381     }
1382 
internal_delete_built_predecessor(predecessor_type & p)1383     /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1384         spin_mutex::scoped_lock l( my_mutex );
1385         my_built_predecessors.delete_edge(p);
1386     }
1387 
predecessor_count()1388     /*override*/size_t predecessor_count() {
1389         spin_mutex::scoped_lock l( my_mutex );
1390         return my_built_predecessors.edge_count();
1391     }
1392 
copy_predecessors(predecessor_vector_type & v)1393     /*override*/void copy_predecessors(predecessor_vector_type &v) {
1394         spin_mutex::scoped_lock l( my_mutex );
1395         my_built_predecessors.copy_edges(v);
1396     }
1397 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1398 
try_get(input_type & v)1399     /* override */ bool try_get( input_type &v ) {
1400         spin_mutex::scoped_lock l( my_mutex );
1401         if ( my_buffer_is_valid ) {
1402             v = my_buffer;
1403             return true;
1404         }
1405         return false;
1406     }
1407 
is_valid()1408     bool is_valid() {
1409        spin_mutex::scoped_lock l( my_mutex );
1410        return my_buffer_is_valid;
1411     }
1412 
clear()1413     void clear() {
1414        spin_mutex::scoped_lock l( my_mutex );
1415        my_buffer_is_valid = false;
1416     }
1417 
1418 protected:
1419     template< typename R, typename B > friend class run_and_put_task;
1420     template<typename X, typename Y> friend class internal::broadcast_cache;
1421     template<typename X, typename Y> friend class internal::round_robin_cache;
try_put_task(const input_type & v)1422     /* override */ task * try_put_task( const input_type &v ) {
1423         spin_mutex::scoped_lock l( my_mutex );
1424         my_buffer = v;
1425         my_buffer_is_valid = true;
1426         task * rtask = my_successors.try_put_task(v);
1427         if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
1428         return rtask;
1429     }
1430 
reset(__TBB_PFG_RESET_ARG (reset_flags f))1431     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
1432         my_buffer_is_valid = false;
1433 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1434         my_successors.reset(f);
1435        if (f&rf_extract) {
1436            my_built_predecessors.receiver_extract(*this);
1437        }
1438 #endif
1439     }
1440 
1441     spin_mutex my_mutex;
1442     internal::broadcast_cache< input_type, null_rw_mutex > my_successors;
1443 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1444     edge_container<sender<input_type> > my_built_predecessors;
1445 #endif
1446     input_type my_buffer;
1447     bool my_buffer_is_valid;
reset_receiver(__TBB_PFG_RESET_ARG (reset_flags))1448     /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {}
1449 };  // overwrite_node
1450 
1451 template< typename T >
1452 class write_once_node : public overwrite_node<T> {
1453 public:
1454     typedef T input_type;
1455     typedef T output_type;
1456     typedef sender< input_type > predecessor_type;
1457     typedef receiver< output_type > successor_type;
1458 
1459     //! Constructor
write_once_node(graph & g)1460     write_once_node(graph& g) : overwrite_node<T>(g) {
1461         tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1462                                  static_cast<receiver<input_type> *>(this),
1463                                  static_cast<sender<output_type> *>(this) );
1464     }
1465 
1466     //! Copy constructor: call base class copy constructor
write_once_node(const write_once_node & src)1467     write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {
1468         tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1469                                  static_cast<receiver<input_type> *>(this),
1470                                  static_cast<sender<output_type> *>(this) );
1471     }
1472 
1473 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1474     /* override */ void set_name( const char *name ) {
1475         tbb::internal::fgt_node_desc( this, name );
1476     }
1477 #endif
1478 
1479 protected:
1480     template< typename R, typename B > friend class run_and_put_task;
1481     template<typename X, typename Y> friend class internal::broadcast_cache;
1482     template<typename X, typename Y> friend class internal::round_robin_cache;
try_put_task(const T & v)1483     /* override */ task *try_put_task( const T &v ) {
1484         spin_mutex::scoped_lock l( this->my_mutex );
1485         if ( this->my_buffer_is_valid ) {
1486             return NULL;
1487         } else {
1488             this->my_buffer = v;
1489             this->my_buffer_is_valid = true;
1490             task *res = this->my_successors.try_put_task(v);
1491             if(!res) res = SUCCESSFULLY_ENQUEUED;
1492             return res;
1493         }
1494     }
1495 };
1496 
1497 //! Forwards messages of type T to all successors
1498 template <typename T>
1499 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1500 protected:
1501     using graph_node::my_graph;
1502 public:
1503     typedef T input_type;
1504     typedef T output_type;
1505     typedef sender< input_type > predecessor_type;
1506     typedef receiver< output_type > successor_type;
1507 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1508     typedef std::vector<predecessor_type *> predecessor_vector_type;
1509     typedef std::vector<successor_type *> successor_vector_type;
1510 #endif
1511 private:
1512     internal::broadcast_cache<input_type> my_successors;
1513 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1514     edge_container<predecessor_type> my_built_predecessors;
1515     spin_mutex pred_mutex;
1516 #endif
1517 public:
1518 
broadcast_node(graph & g)1519     broadcast_node(graph& g) : graph_node(g) {
1520         my_successors.set_owner( this );
1521         tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1522                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1523     }
1524 
1525     // Copy constructor
broadcast_node(const broadcast_node & src)1526     broadcast_node( const broadcast_node& src ) :
1527         graph_node(src.my_graph), receiver<T>(), sender<T>()
1528     {
1529         my_successors.set_owner( this );
1530         tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1531                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1532     }
1533 
1534 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1535     /* override */ void set_name( const char *name ) {
1536         tbb::internal::fgt_node_desc( this, name );
1537     }
1538 #endif
1539 
1540     //! Adds a successor
register_successor(receiver<T> & r)1541     virtual bool register_successor( receiver<T> &r ) {
1542         my_successors.register_successor( r );
1543         return true;
1544     }
1545 
1546     //! Removes s as a successor
remove_successor(receiver<T> & r)1547     virtual bool remove_successor( receiver<T> &r ) {
1548         my_successors.remove_successor( r );
1549         return true;
1550     }
1551 
1552 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_successor(successor_type & r)1553     /*override*/ void internal_add_built_successor(successor_type &r) {
1554         my_successors.internal_add_built_successor(r);
1555     }
1556 
internal_delete_built_successor(successor_type & r)1557     /*override*/ void internal_delete_built_successor(successor_type &r) {
1558         my_successors.internal_delete_built_successor(r);
1559     }
1560 
successor_count()1561     /*override*/ size_t successor_count() {
1562         return my_successors.successor_count();
1563     }
1564 
copy_successors(successor_vector_type & v)1565     /*override*/ void copy_successors(successor_vector_type &v) {
1566         my_successors.copy_successors(v);
1567     }
1568 
internal_add_built_predecessor(predecessor_type & p)1569     /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1570         my_built_predecessors.add_edge(p);
1571     }
1572 
internal_delete_built_predecessor(predecessor_type & p)1573     /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1574         my_built_predecessors.delete_edge(p);
1575     }
1576 
predecessor_count()1577     /*override*/ size_t predecessor_count() {
1578         return my_built_predecessors.edge_count();
1579     }
1580 
copy_predecessors(predecessor_vector_type & v)1581     /*override*/ void copy_predecessors(predecessor_vector_type &v) {
1582         my_built_predecessors.copy_edges(v);
1583     }
1584 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1585 
1586 protected:
1587     template< typename R, typename B > friend class run_and_put_task;
1588     template<typename X, typename Y> friend class internal::broadcast_cache;
1589     template<typename X, typename Y> friend class internal::round_robin_cache;
1590     //! build a task to run the successor if possible.  Default is old behavior.
try_put_task(const T & t)1591     /*override*/ task *try_put_task(const T& t) {
1592         task *new_task = my_successors.try_put_task(t);
1593         if(!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1594         return new_task;
1595     }
1596 
reset(__TBB_PFG_RESET_ARG (reset_flags f))1597     /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1598 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1599         my_successors.reset(f);
1600         if (f&rf_extract) {
1601            my_built_predecessors.receiver_extract(*this);
1602         }
1603         __TBB_ASSERT(!(f & rf_extract) || my_successors.empty(), "Error resetting broadcast_node");
1604 #endif
1605     }
reset_receiver(__TBB_PFG_RESET_ARG (reset_flags))1606     /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {}
1607 };  // broadcast_node
1608 
1609 //! Forwards messages in arbitrary order
1610 template <typename T, typename A=cache_aligned_allocator<T> >
1611 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1612 protected:
1613     using graph_node::my_graph;
1614 public:
1615     typedef T input_type;
1616     typedef T output_type;
1617     typedef sender< input_type > predecessor_type;
1618     typedef receiver< output_type > successor_type;
1619     typedef buffer_node<T, A> my_class;
1620 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1621     typedef std::vector<predecessor_type *> predecessor_vector_type;
1622     typedef std::vector<successor_type *> successor_vector_type;
1623 #endif
1624 protected:
1625     typedef size_t size_type;
1626     internal::round_robin_cache< T, null_rw_mutex > my_successors;
1627 
1628 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1629     edge_container<predecessor_type> my_built_predecessors;
1630 #endif
1631 
1632     friend class internal::forward_task_bypass< buffer_node< T, A > >;
1633 
1634     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1635 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1636         , add_blt_succ, del_blt_succ,
1637         add_blt_pred, del_blt_pred,
1638         blt_succ_cnt, blt_pred_cnt,
1639         blt_succ_cpy, blt_pred_cpy   // create vector copies of preds and succs
1640 #endif
1641     };
1642     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1643 
1644     // implements the aggregator_operation concept
1645     class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1646     public:
1647         char type;
1648 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1649         task * ltask;
1650         union {
1651             input_type *elem;
1652             successor_type *r;
1653             predecessor_type *p;
1654             size_t cnt_val;
1655             successor_vector_type *svec;
1656             predecessor_vector_type *pvec;
1657         };
1658 #else
1659         T *elem;
1660         task * ltask;
1661         successor_type *r;
1662 #endif
buffer_operation(const T & e,op_type t)1663         buffer_operation(const T& e, op_type t) : type(char(t))
1664 
1665 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1666                                                   , ltask(NULL), elem(const_cast<T*>(&e))
1667 #else
1668                                                   , elem(const_cast<T*>(&e)) , ltask(NULL)
1669 #endif
1670         {}
buffer_operation(op_type t)1671         buffer_operation(op_type t) : type(char(t)),  ltask(NULL) {}
1672     };
1673 
1674     bool forwarder_busy;
1675     typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
1676     friend class internal::aggregating_functor<my_class, buffer_operation>;
1677     internal::aggregator< my_handler, buffer_operation> my_aggregator;
1678 
handle_operations(buffer_operation * op_list)1679     virtual void handle_operations(buffer_operation *op_list) {
1680         buffer_operation *tmp = NULL;
1681         bool try_forwarding=false;
1682         while (op_list) {
1683             tmp = op_list;
1684             op_list = op_list->next;
1685             switch (tmp->type) {
1686             case reg_succ: internal_reg_succ(tmp);  try_forwarding = true; break;
1687             case rem_succ: internal_rem_succ(tmp); break;
1688             case req_item: internal_pop(tmp); break;
1689             case res_item: internal_reserve(tmp); break;
1690             case rel_res:  internal_release(tmp);  try_forwarding = true; break;
1691             case con_res:  internal_consume(tmp);  try_forwarding = true; break;
1692             case put_item: internal_push(tmp);  try_forwarding = (tmp->status == SUCCEEDED); break;
1693             case try_fwd_task: internal_forward_task(tmp); break;
1694 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1695             // edge recording
1696             case add_blt_succ: internal_add_built_succ(tmp); break;
1697             case del_blt_succ: internal_del_built_succ(tmp); break;
1698             case add_blt_pred: internal_add_built_pred(tmp); break;
1699             case del_blt_pred: internal_del_built_pred(tmp); break;
1700             case blt_succ_cnt: internal_succ_cnt(tmp); break;
1701             case blt_pred_cnt: internal_pred_cnt(tmp); break;
1702             case blt_succ_cpy: internal_copy_succs(tmp); break;
1703             case blt_pred_cpy: internal_copy_preds(tmp); break;
1704 #endif
1705             }
1706         }
1707         if (try_forwarding && !forwarder_busy) {
1708             task* tp = this->my_graph.root_task();
1709             if(tp) {
1710                 forwarder_busy = true;
1711                 task *new_task = new(task::allocate_additional_child_of(*tp)) internal::
1712                         forward_task_bypass
1713                         < buffer_node<input_type, A> >(*this);
1714                 // tmp should point to the last item handled by the aggregator.  This is the operation
1715                 // the handling thread enqueued.  So modifying that record will be okay.
1716                 tbb::task *z = tmp->ltask;
1717                 tmp->ltask = combine_tasks(z, new_task);  // in case the op generated a task
1718             }
1719         }
1720     }
1721 
grab_forwarding_task(buffer_operation & op_data)1722     inline task *grab_forwarding_task( buffer_operation &op_data) {
1723         return op_data.ltask;
1724     }
1725 
enqueue_forwarding_task(buffer_operation & op_data)1726     inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1727         task *ft = grab_forwarding_task(op_data);
1728         if(ft) {
1729             FLOW_SPAWN(*ft);
1730             return true;
1731         }
1732         return false;
1733     }
1734 
1735     //! This is executed by an enqueued task, the "forwarder"
forward_task()1736     virtual task *forward_task() {
1737         buffer_operation op_data(try_fwd_task);
1738         task *last_task = NULL;
1739         do {
1740             op_data.status = WAIT;
1741             op_data.ltask = NULL;
1742             my_aggregator.execute(&op_data);
1743             tbb::task *xtask = op_data.ltask;
1744             last_task = combine_tasks(last_task, xtask);
1745         } while (op_data.status == SUCCEEDED);
1746         return last_task;
1747     }
1748 
1749     //! Register successor
internal_reg_succ(buffer_operation * op)1750     virtual void internal_reg_succ(buffer_operation *op) {
1751         my_successors.register_successor(*(op->r));
1752         __TBB_store_with_release(op->status, SUCCEEDED);
1753     }
1754 
1755     //! Remove successor
internal_rem_succ(buffer_operation * op)1756     virtual void internal_rem_succ(buffer_operation *op) {
1757         my_successors.remove_successor(*(op->r));
1758         __TBB_store_with_release(op->status, SUCCEEDED);
1759     }
1760 
1761 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_succ(buffer_operation * op)1762     virtual void internal_add_built_succ(buffer_operation *op) {
1763         my_successors.internal_add_built_successor(*(op->r));
1764         __TBB_store_with_release(op->status, SUCCEEDED);
1765     }
1766 
internal_del_built_succ(buffer_operation * op)1767     virtual void internal_del_built_succ(buffer_operation *op) {
1768         my_successors.internal_delete_built_successor(*(op->r));
1769         __TBB_store_with_release(op->status, SUCCEEDED);
1770     }
1771 
internal_add_built_pred(buffer_operation * op)1772     virtual void internal_add_built_pred(buffer_operation *op) {
1773         my_built_predecessors.add_edge(*(op->p));
1774         __TBB_store_with_release(op->status, SUCCEEDED);
1775     }
1776 
internal_del_built_pred(buffer_operation * op)1777     virtual void internal_del_built_pred(buffer_operation *op) {
1778         my_built_predecessors.delete_edge(*(op->p));
1779         __TBB_store_with_release(op->status, SUCCEEDED);
1780     }
1781 
internal_succ_cnt(buffer_operation * op)1782     virtual void internal_succ_cnt(buffer_operation *op) {
1783         op->cnt_val = my_successors.successor_count();
1784         __TBB_store_with_release(op->status, SUCCEEDED);
1785     }
1786 
internal_pred_cnt(buffer_operation * op)1787     virtual void internal_pred_cnt(buffer_operation *op) {
1788         op->cnt_val = my_built_predecessors.edge_count();
1789         __TBB_store_with_release(op->status, SUCCEEDED);
1790     }
1791 
internal_copy_succs(buffer_operation * op)1792     virtual void internal_copy_succs(buffer_operation *op) {
1793         my_successors.copy_successors(*(op->svec));
1794         __TBB_store_with_release(op->status, SUCCEEDED);
1795     }
1796 
internal_copy_preds(buffer_operation * op)1797     virtual void internal_copy_preds(buffer_operation *op) {
1798         my_built_predecessors.copy_edges(*(op->pvec));
1799         __TBB_store_with_release(op->status, SUCCEEDED);
1800     }
1801 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1802 
1803     //! Tries to forward valid items to successors
internal_forward_task(buffer_operation * op)1804     virtual void internal_forward_task(buffer_operation *op) {
1805         if (this->my_reserved || !this->my_item_valid(this->my_tail-1)) {
1806             __TBB_store_with_release(op->status, FAILED);
1807             this->forwarder_busy = false;
1808             return;
1809         }
1810         T i_copy;
1811         task * last_task = NULL;
1812         size_type counter = my_successors.size();
1813         // Try forwarding, giving each successor a chance
1814         while (counter>0 && !this->buffer_empty() && this->my_item_valid(this->my_tail-1)) {
1815             this->copy_back(i_copy);
1816             task *new_task = my_successors.try_put_task(i_copy);
1817             if(new_task) {
1818                 last_task = combine_tasks(last_task, new_task);
1819                 this->destroy_back();
1820             }
1821             --counter;
1822         }
1823         op->ltask = last_task;  // return task
1824         if (last_task && !counter) {
1825             __TBB_store_with_release(op->status, SUCCEEDED);
1826         }
1827         else {
1828             __TBB_store_with_release(op->status, FAILED);
1829             forwarder_busy = false;
1830         }
1831     }
1832 
internal_push(buffer_operation * op)1833     virtual void internal_push(buffer_operation *op) {
1834         this->push_back(*(op->elem));
1835         __TBB_store_with_release(op->status, SUCCEEDED);
1836     }
1837 
internal_pop(buffer_operation * op)1838     virtual void internal_pop(buffer_operation *op) {
1839         if(this->pop_back(*(op->elem))) {
1840             __TBB_store_with_release(op->status, SUCCEEDED);
1841         }
1842         else {
1843             __TBB_store_with_release(op->status, FAILED);
1844         }
1845     }
1846 
internal_reserve(buffer_operation * op)1847     virtual void internal_reserve(buffer_operation *op) {
1848         if(this->reserve_front(*(op->elem))) {
1849             __TBB_store_with_release(op->status, SUCCEEDED);
1850         }
1851         else {
1852             __TBB_store_with_release(op->status, FAILED);
1853         }
1854     }
1855 
internal_consume(buffer_operation * op)1856     virtual void internal_consume(buffer_operation *op) {
1857         this->consume_front();
1858         __TBB_store_with_release(op->status, SUCCEEDED);
1859     }
1860 
internal_release(buffer_operation * op)1861     virtual void internal_release(buffer_operation *op) {
1862         this->release_front();
1863         __TBB_store_with_release(op->status, SUCCEEDED);
1864     }
1865 
1866 public:
1867     //! Constructor
buffer_node(graph & g)1868     buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
1869         forwarder_busy(false) {
1870         my_successors.set_owner(this);
1871         my_aggregator.initialize_handler(my_handler(this));
1872         tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1873                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1874     }
1875 
1876     //! Copy constructor
buffer_node(const buffer_node & src)1877     buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
1878         internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
1879         forwarder_busy = false;
1880         my_successors.set_owner(this);
1881         my_aggregator.initialize_handler(my_handler(this));
1882         tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1883                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1884     }
1885 
~buffer_node()1886     virtual ~buffer_node() {}
1887 
1888 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1889     /* override */ void set_name( const char *name ) {
1890         tbb::internal::fgt_node_desc( this, name );
1891     }
1892 #endif
1893 
1894     //
1895     // message sender implementation
1896     //
1897 
1898     //! Adds a new successor.
1899     /** Adds successor r to the list of successors; may forward tasks.  */
register_successor(successor_type & r)1900     /* override */ bool register_successor( successor_type &r ) {
1901         buffer_operation op_data(reg_succ);
1902         op_data.r = &r;
1903         my_aggregator.execute(&op_data);
1904         (void)enqueue_forwarding_task(op_data);
1905         return true;
1906     }
1907 
1908 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_successor(successor_type & r)1909     /*override*/ void internal_add_built_successor( successor_type &r) {
1910         buffer_operation op_data(add_blt_succ);
1911         op_data.r = &r;
1912         my_aggregator.execute(&op_data);
1913     }
1914 
internal_delete_built_successor(successor_type & r)1915     /*override*/ void internal_delete_built_successor( successor_type &r) {
1916         buffer_operation op_data(del_blt_succ);
1917         op_data.r = &r;
1918         my_aggregator.execute(&op_data);
1919     }
1920 
internal_add_built_predecessor(predecessor_type & p)1921     /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1922         buffer_operation op_data(add_blt_pred);
1923         op_data.p = &p;
1924         my_aggregator.execute(&op_data);
1925     }
1926 
internal_delete_built_predecessor(predecessor_type & p)1927     /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1928         buffer_operation op_data(del_blt_pred);
1929         op_data.p = &p;
1930         my_aggregator.execute(&op_data);
1931     }
1932 
predecessor_count()1933     /*override*/ size_t predecessor_count() {
1934         buffer_operation op_data(blt_pred_cnt);
1935         my_aggregator.execute(&op_data);
1936         return op_data.cnt_val;
1937     }
1938 
successor_count()1939     /*override*/ size_t successor_count() {
1940         buffer_operation op_data(blt_succ_cnt);
1941         my_aggregator.execute(&op_data);
1942         return op_data.cnt_val;
1943     }
1944 
copy_predecessors(predecessor_vector_type & v)1945     /*override*/ void copy_predecessors( predecessor_vector_type &v ) {
1946         buffer_operation op_data(blt_pred_cpy);
1947         op_data.pvec = &v;
1948         my_aggregator.execute(&op_data);
1949     }
1950 
copy_successors(successor_vector_type & v)1951     /*override*/ void copy_successors( successor_vector_type &v ) {
1952         buffer_operation op_data(blt_succ_cpy);
1953         op_data.svec = &v;
1954         my_aggregator.execute(&op_data);
1955     }
1956 #endif
1957 
1958     //! Removes a successor.
1959     /** Removes successor r from the list of successors.
1960         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
remove_successor(successor_type & r)1961     /* override */ bool remove_successor( successor_type &r ) {
1962         r.remove_predecessor(*this);
1963         buffer_operation op_data(rem_succ);
1964         op_data.r = &r;
1965         my_aggregator.execute(&op_data);
1966         // even though this operation does not cause a forward, if we are the handler, and
1967         // a forward is scheduled, we may be the first to reach this point after the aggregator,
1968         // and so should check for the task.
1969         (void)enqueue_forwarding_task(op_data);
1970         return true;
1971     }
1972 
1973     //! Request an item from the buffer_node
1974     /**  true = v contains the returned item<BR>
1975          false = no item has been returned */
try_get(T & v)1976     /* override */ bool try_get( T &v ) {
1977         buffer_operation op_data(req_item);
1978         op_data.elem = &v;
1979         my_aggregator.execute(&op_data);
1980         (void)enqueue_forwarding_task(op_data);
1981         return (op_data.status==SUCCEEDED);
1982     }
1983 
1984     //! Reserves an item.
1985     /**  false = no item can be reserved<BR>
1986          true = an item is reserved */
try_reserve(T & v)1987     /* override */ bool try_reserve( T &v ) {
1988         buffer_operation op_data(res_item);
1989         op_data.elem = &v;
1990         my_aggregator.execute(&op_data);
1991         (void)enqueue_forwarding_task(op_data);
1992         return (op_data.status==SUCCEEDED);
1993     }
1994 
1995     //! Release a reserved item.
1996     /**  true = item has been released and so remains in sender */
try_release()1997     /* override */ bool try_release() {
1998         buffer_operation op_data(rel_res);
1999         my_aggregator.execute(&op_data);
2000         (void)enqueue_forwarding_task(op_data);
2001         return true;
2002     }
2003 
2004     //! Consumes a reserved item.
2005     /** true = item is removed from sender and reservation removed */
try_consume()2006     /* override */ bool try_consume() {
2007         buffer_operation op_data(con_res);
2008         my_aggregator.execute(&op_data);
2009         (void)enqueue_forwarding_task(op_data);
2010         return true;
2011     }
2012 
2013 protected:
2014 
2015     template< typename R, typename B > friend class run_and_put_task;
2016     template<typename X, typename Y> friend class internal::broadcast_cache;
2017     template<typename X, typename Y> friend class internal::round_robin_cache;
2018     //! receive an item, return a task *if possible
try_put_task(const T & t)2019     /* override */ task *try_put_task(const T &t) {
2020         buffer_operation op_data(t, put_item);
2021         my_aggregator.execute(&op_data);
2022         task *ft = grab_forwarding_task(op_data);
2023         // sequencer_nodes can return failure (if an item has been previously inserted)
2024         // We have to spawn the returned task if our own operation fails.
2025 
2026         if(ft && op_data.status == FAILED) {
2027             // we haven't succeeded queueing the item, but for some reason the
2028             // call returned a task (if another request resulted in a successful
2029             // forward this could happen.)  Queue the task and reset the pointer.
2030             FLOW_SPAWN(*ft); ft = NULL;
2031         }
2032         else if(!ft && op_data.status == SUCCEEDED) {
2033             ft = SUCCESSFULLY_ENQUEUED;
2034         }
2035         return ft;
2036     }
2037 
reset(__TBB_PFG_RESET_ARG (reset_flags f))2038     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2039         internal::reservable_item_buffer<T, A>::reset();
2040 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2041         my_successors.reset(f);
2042         if (f&rf_extract) {
2043             my_built_predecessors.receiver_extract(*this);
2044         }
2045 #endif
2046         forwarder_busy = false;
2047     }
2048 
reset_receiver(__TBB_PFG_RESET_ARG (reset_flags))2049     /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) { }
2050 
2051 };  // buffer_node
2052 
2053 //! Forwards messages in FIFO order
2054 template <typename T, typename A=cache_aligned_allocator<T> >
2055 class queue_node : public buffer_node<T, A> {
2056 protected:
2057     typedef buffer_node<T, A> base_type;
2058     typedef typename base_type::size_type size_type;
2059     typedef typename base_type::buffer_operation queue_operation;
2060 
2061     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2062 
internal_forward_task(queue_operation * op)2063     /* override */ void internal_forward_task(queue_operation *op) {
2064         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2065             __TBB_store_with_release(op->status, FAILED);
2066             this->forwarder_busy = false;
2067             return;
2068         }
2069         T i_copy;
2070         task *last_task = NULL;
2071         size_type counter = this->my_successors.size();
2072         // Keep trying to send items while there is at least one accepting successor
2073         while (counter>0 && this->my_item_valid(this->my_head)) {
2074             this->copy_front(i_copy);
2075             task *new_task = this->my_successors.try_put_task(i_copy);
2076             if(new_task) {
2077                 this->destroy_front();
2078                 last_task = combine_tasks(last_task, new_task);
2079             }
2080             --counter;
2081         }
2082         op->ltask = last_task;
2083         if (last_task && !counter)
2084             __TBB_store_with_release(op->status, SUCCEEDED);
2085         else {
2086             __TBB_store_with_release(op->status, FAILED);
2087             this->forwarder_busy = false;
2088         }
2089     }
2090 
internal_pop(queue_operation * op)2091     /* override */ void internal_pop(queue_operation *op) {
2092         if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2093             __TBB_store_with_release(op->status, FAILED);
2094         }
2095         else {
2096             this->pop_front(*(op->elem));
2097             __TBB_store_with_release(op->status, SUCCEEDED);
2098         }
2099     }
internal_reserve(queue_operation * op)2100     /* override */ void internal_reserve(queue_operation *op) {
2101         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2102             __TBB_store_with_release(op->status, FAILED);
2103         }
2104         else {
2105             this->reserve_front(*(op->elem));
2106             __TBB_store_with_release(op->status, SUCCEEDED);
2107         }
2108     }
internal_consume(queue_operation * op)2109     /* override */ void internal_consume(queue_operation *op) {
2110         this->consume_front();
2111         __TBB_store_with_release(op->status, SUCCEEDED);
2112     }
2113 
2114 public:
2115     typedef T input_type;
2116     typedef T output_type;
2117     typedef sender< input_type > predecessor_type;
2118     typedef receiver< output_type > successor_type;
2119 
2120     //! Constructor
queue_node(graph & g)2121     queue_node( graph &g ) : base_type(g) {
2122         tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2123                                  static_cast<receiver<input_type> *>(this),
2124                                  static_cast<sender<output_type> *>(this) );
2125     }
2126 
2127     //! Copy constructor
queue_node(const queue_node & src)2128     queue_node( const queue_node& src) : base_type(src) {
2129         tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2130                                  static_cast<receiver<input_type> *>(this),
2131                                  static_cast<sender<output_type> *>(this) );
2132     }
2133 
2134 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2135     /* override */ void set_name( const char *name ) {
2136         tbb::internal::fgt_node_desc( this, name );
2137     }
2138 #endif
2139 
reset(__TBB_PFG_RESET_ARG (reset_flags f))2140     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2141         base_type::reset(__TBB_PFG_RESET_ARG(f));
2142     }
2143 };  // queue_node
2144 
2145 //! Forwards messages in sequence order
2146 template< typename T, typename A=cache_aligned_allocator<T> >
2147 class sequencer_node : public queue_node<T, A> {
2148     internal::function_body< T, size_t > *my_sequencer;
2149     // my_sequencer should be a benign function and must be callable
2150     // from a parallel context.  Does this mean it needn't be reset?
2151 public:
2152     typedef T input_type;
2153     typedef T output_type;
2154     typedef sender< input_type > predecessor_type;
2155     typedef receiver< output_type > successor_type;
2156 
2157     //! Constructor
2158     template< typename Sequencer >
sequencer_node(graph & g,const Sequencer & s)2159     sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2160         my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2161         tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2162                                  static_cast<receiver<input_type> *>(this),
2163                                  static_cast<sender<output_type> *>(this) );
2164     }
2165 
2166     //! Copy constructor
sequencer_node(const sequencer_node & src)2167     sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2168         my_sequencer( src.my_sequencer->clone() ) {
2169         tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2170                                  static_cast<receiver<input_type> *>(this),
2171                                  static_cast<sender<output_type> *>(this) );
2172     }
2173 
2174     //! Destructor
~sequencer_node()2175     ~sequencer_node() { delete my_sequencer; }
2176 
2177 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2178     /* override */ void set_name( const char *name ) {
2179         tbb::internal::fgt_node_desc( this, name );
2180     }
2181 #endif
2182 
2183 protected:
2184     typedef typename buffer_node<T, A>::size_type size_type;
2185     typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
2186 
2187     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2188 
2189 private:
internal_push(sequencer_operation * op)2190     /* override */ void internal_push(sequencer_operation *op) {
2191         size_type tag = (*my_sequencer)(*(op->elem));
2192 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2193         if(tag < this->my_head) {
2194             // have already emitted a message with this tag
2195             __TBB_store_with_release(op->status, FAILED);
2196             return;
2197         }
2198 #endif
2199         // cannot modify this->my_tail now; the buffer would be inconsistent.
2200         size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2201 
2202         if(this->size(new_tail) > this->capacity()) {
2203             this->grow_my_array(this->size(new_tail));
2204         }
2205         this->my_tail = new_tail;
2206         if(this->place_item(tag,*(op->elem))) {
2207             __TBB_store_with_release(op->status, SUCCEEDED);
2208         }
2209         else {
2210             // already have a message with this tag
2211             __TBB_store_with_release(op->status, FAILED);
2212         }
2213     }
2214 };  // sequencer_node
2215 
2216 //! Forwards messages in priority order
2217 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2218 class priority_queue_node : public buffer_node<T, A> {
2219 public:
2220     typedef T input_type;
2221     typedef T output_type;
2222     typedef buffer_node<T,A> base_type;
2223     typedef sender< input_type > predecessor_type;
2224     typedef receiver< output_type > successor_type;
2225 
2226     //! Constructor
priority_queue_node(graph & g)2227     priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {
2228         tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2229                                  static_cast<receiver<input_type> *>(this),
2230                                  static_cast<sender<output_type> *>(this) );
2231     }
2232 
2233     //! Copy constructor
priority_queue_node(const priority_queue_node & src)2234     priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2235         tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2236                                  static_cast<receiver<input_type> *>(this),
2237                                  static_cast<sender<output_type> *>(this) );
2238     }
2239 
2240 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2241     /* override */ void set_name( const char *name ) {
2242         tbb::internal::fgt_node_desc( this, name );
2243     }
2244 #endif
2245 
2246 
2247 protected:
2248 
reset(__TBB_PFG_RESET_ARG (reset_flags f))2249     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2250         mark = 0;
2251         base_type::reset(__TBB_PFG_RESET_ARG(f));
2252     }
2253 
2254     typedef typename buffer_node<T, A>::size_type size_type;
2255     typedef typename buffer_node<T, A>::item_type item_type;
2256     typedef typename buffer_node<T, A>::buffer_operation prio_operation;
2257 
2258     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2259 
handle_operations(prio_operation * op_list)2260     /* override */ void handle_operations(prio_operation *op_list) {
2261         prio_operation *tmp = op_list /*, *pop_list*/ ;
2262         bool try_forwarding=false;
2263         while (op_list) {
2264             tmp = op_list;
2265             op_list = op_list->next;
2266             switch (tmp->type) {
2267             case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
2268             case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
2269             case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
2270             case buffer_node<T, A>::try_fwd_task: internal_forward_task(tmp); break;
2271             case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
2272             case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
2273             case buffer_node<T, A>::req_item: internal_pop(tmp); break;
2274             case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
2275 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2276             case buffer_node<T, A>::add_blt_succ: this->internal_add_built_succ(tmp); break;
2277             case buffer_node<T, A>::del_blt_succ: this->internal_del_built_succ(tmp); break;
2278             case buffer_node<T, A>::add_blt_pred: this->internal_add_built_pred(tmp); break;
2279             case buffer_node<T, A>::del_blt_pred: this->internal_del_built_pred(tmp); break;
2280             case buffer_node<T, A>::blt_succ_cnt: this->internal_succ_cnt(tmp); break;
2281             case buffer_node<T, A>::blt_pred_cnt: this->internal_pred_cnt(tmp); break;
2282             case buffer_node<T, A>::blt_succ_cpy: this->internal_copy_succs(tmp); break;
2283             case buffer_node<T, A>::blt_pred_cpy: this->internal_copy_preds(tmp); break;
2284 #endif
2285             }
2286         }
2287         // process pops!  for now, no special pop processing
2288         if (mark<this->my_tail) heapify();
2289         if (try_forwarding && !this->forwarder_busy) {
2290             task* tp = this->my_graph.root_task();
2291             if(tp) {
2292                 this->forwarder_busy = true;
2293                 task *new_task = new(task::allocate_additional_child_of(*tp)) internal::
2294                         forward_task_bypass
2295                         < buffer_node<input_type, A> >(*this);
2296                 // tmp should point to the last item handled by the aggregator.  This is the operation
2297                 // the handling thread enqueued.  So modifying that record will be okay.
2298                 tbb::task *tmp1 = tmp->ltask;
2299                 tmp->ltask = combine_tasks(tmp1, new_task);
2300             }
2301         }
2302     }
2303 
2304     //! Tries to forward valid items to successors
internal_forward_task(prio_operation * op)2305     /* override */ void internal_forward_task(prio_operation *op) {
2306         T i_copy;
2307         task * last_task = NULL; // flagged when a successor accepts
2308         size_type counter = this->my_successors.size();
2309 
2310         if (this->my_reserved || this->my_tail == 0) {
2311             __TBB_store_with_release(op->status, FAILED);
2312             this->forwarder_busy = false;
2313             return;
2314         }
2315         // Keep trying to send while there exists an accepting successor
2316         while (counter>0 && this->my_tail > 0) {
2317             i_copy = this->get_my_item(0);
2318             task * new_task = this->my_successors.try_put_task(i_copy);
2319             if ( new_task ) {
2320                 last_task = combine_tasks(last_task, new_task);
2321                 this->destroy_item(0);  // we've forwarded this item
2322                 if (mark == this->my_tail) --mark;
2323                 if(--(this->my_tail)) { // didn't consume last item on heap
2324                     this->move_item(0,this->my_tail);
2325                 }
2326                 if (this->my_tail > 1) // don't reheap for heap of size 1
2327                     reheap();
2328             }
2329             --counter;
2330         }
2331         op->ltask = last_task;
2332         if (last_task && !counter)
2333             __TBB_store_with_release(op->status, SUCCEEDED);
2334         else {
2335             __TBB_store_with_release(op->status, FAILED);
2336             this->forwarder_busy = false;
2337         }
2338     }
2339 
internal_push(prio_operation * op)2340     /* override */ void internal_push(prio_operation *op) {
2341         if ( this->my_tail >= this->my_array_size )
2342             this->grow_my_array( this->my_tail + 1 );
2343         (void) this->place_item(this->my_tail, *(op->elem));
2344         ++(this->my_tail);
2345         __TBB_store_with_release(op->status, SUCCEEDED);
2346     }
2347 
internal_pop(prio_operation * op)2348     /* override */ void internal_pop(prio_operation *op) {
2349         // if empty or already reserved, don't pop
2350         if ( this->my_reserved == true || this->my_tail == 0 ) {
2351             __TBB_store_with_release(op->status, FAILED);
2352             return;
2353         }
2354         if (mark<this->my_tail &&  // item pushed, no re-heap
2355             compare(this->get_my_item(0),
2356                     this->get_my_item(this->my_tail-1))) {
2357             // there are newly pushed elems; last one higher than top
2358             // copy the data
2359             this->fetch_item(this->my_tail-1, *(op->elem));
2360             __TBB_store_with_release(op->status, SUCCEEDED);
2361             --(this->my_tail);
2362             return;
2363         }
2364         // extract and push the last element down heap
2365         *(op->elem) = this->get_my_item(0); // copy the data, item 0 still valid
2366         __TBB_store_with_release(op->status, SUCCEEDED);
2367         if (mark == this->my_tail) --mark;
2368         __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2369         if(--(this->my_tail)) {
2370             // there were two or more items in heap.  Move the
2371             // last item to the top of the heap
2372             this->set_my_item(0,this->get_my_item(this->my_tail));
2373         }
2374         this->destroy_item(this->my_tail);
2375         if (this->my_tail > 1) // don't reheap for heap of size 1
2376             reheap();
2377     }
2378 
internal_reserve(prio_operation * op)2379     /* override */ void internal_reserve(prio_operation *op) {
2380         if (this->my_reserved == true || this->my_tail == 0) {
2381             __TBB_store_with_release(op->status, FAILED);
2382             return;
2383         }
2384         this->my_reserved = true;
2385         *(op->elem) = reserved_item = this->get_my_item(0);
2386         if (mark == this->my_tail) --mark;
2387         --(this->my_tail);
2388         __TBB_store_with_release(op->status, SUCCEEDED);
2389         this->set_my_item(0, this->get_my_item(this->my_tail));
2390         this->destroy_item(this->my_tail);
2391         if (this->my_tail > 1)
2392             reheap();
2393     }
2394 
internal_consume(prio_operation * op)2395     /* override */ void internal_consume(prio_operation *op) {
2396         this->my_reserved = false;
2397         __TBB_store_with_release(op->status, SUCCEEDED);
2398     }
internal_release(prio_operation * op)2399     /* override */ void internal_release(prio_operation *op) {
2400         if (this->my_tail >= this->my_array_size)
2401             this->grow_my_array( this->my_tail + 1 );
2402         this->set_my_item(this->my_tail, reserved_item);
2403         ++(this->my_tail);
2404         this->my_reserved = false;
2405         __TBB_store_with_release(op->status, SUCCEEDED);
2406         heapify();
2407     }
2408 private:
2409     Compare compare;
2410     size_type mark;
2411     input_type reserved_item;
2412 
2413     // turn array into heap
heapify()2414     void heapify() {
2415         if (!mark) mark = 1;
2416         for (; mark<this->my_tail; ++mark) { // for each unheaped element
2417             size_type cur_pos = mark;
2418             input_type to_place;
2419             this->fetch_item(mark,to_place);
2420             do { // push to_place up the heap
2421                 size_type parent = (cur_pos-1)>>1;
2422                 if (!compare(this->get_my_item(parent), to_place))
2423                     break;
2424                 this->move_item(cur_pos, parent);
2425                 cur_pos = parent;
2426             } while( cur_pos );
2427             (void) this->place_item(cur_pos, to_place);
2428         }
2429     }
2430 
2431     // otherwise heapified array with new root element; rearrange to heap
reheap()2432     void reheap() {
2433         size_type cur_pos=0, child=1;
2434         while (child < mark) {
2435             size_type target = child;
2436             if (child+1<mark &&
2437                 compare(this->get_my_item(child),
2438                         this->get_my_item(child+1)))
2439                 ++target;
2440             // target now has the higher priority child
2441             if (compare(this->get_my_item(target),
2442                         this->get_my_item(cur_pos)))
2443                 break;
2444             // swap
2445             this->swap_items(cur_pos, target);
2446             cur_pos = target;
2447             child = (cur_pos<<1)+1;
2448         }
2449     }
2450 };  // priority_queue_node
2451 
2452 //! Forwards messages only if the threshold has not been reached
2453 /** This node forwards items until its threshold is reached.
2454     It contains no buffering.  If the downstream node rejects, the
2455     message is dropped. */
2456 template< typename T >
2457 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2458 protected:
2459     using graph_node::my_graph;
2460 public:
2461     typedef T input_type;
2462     typedef T output_type;
2463     typedef sender< input_type > predecessor_type;
2464     typedef receiver< output_type > successor_type;
2465 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2466     typedef std::vector<successor_type *> successor_vector_type;
2467     typedef std::vector<predecessor_type *> predecessor_vector_type;
2468 #endif
2469 
2470 private:
2471     size_t my_threshold;
2472     size_t my_count; //number of successful puts
2473     size_t my_tries; //number of active put attempts
2474     internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors;
2475     spin_mutex my_mutex;
2476     internal::broadcast_cache< T > my_successors;
2477     int init_decrement_predecessors;
2478 
2479     friend class internal::forward_task_bypass< limiter_node<T> >;
2480 
2481     // Let decrementer call decrement_counter()
2482     friend class internal::decrementer< limiter_node<T> >;
2483 
check_conditions()2484     bool check_conditions() {  // always called under lock
2485         return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2486     }
2487 
2488     // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
forward_task()2489     task *forward_task() {
2490         input_type v;
2491         task *rval = NULL;
2492         bool reserved = false;
2493             {
2494                 spin_mutex::scoped_lock lock(my_mutex);
2495                 if ( check_conditions() )
2496                     ++my_tries;
2497                 else
2498                     return NULL;
2499             }
2500 
2501         //SUCCESS
2502         // if we can reserve and can put, we consume the reservation
2503         // we increment the count and decrement the tries
2504         if ( (my_predecessors.try_reserve(v)) == true ){
2505             reserved=true;
2506             if ( (rval = my_successors.try_put_task(v)) != NULL ){
2507                 {
2508                     spin_mutex::scoped_lock lock(my_mutex);
2509                     ++my_count;
2510                     --my_tries;
2511                     my_predecessors.try_consume();
2512                     if ( check_conditions() ) {
2513                         task* tp = this->my_graph.root_task();
2514                         if ( tp ) {
2515                             task *rtask = new ( task::allocate_additional_child_of( *tp ) )
2516                                 internal::forward_task_bypass< limiter_node<T> >( *this );
2517                             FLOW_SPAWN (*rtask);
2518                         }
2519                     }
2520                 }
2521                 return rval;
2522             }
2523         }
2524         //FAILURE
2525         //if we can't reserve, we decrement the tries
2526         //if we can reserve but can't put, we decrement the tries and release the reservation
2527         {
2528             spin_mutex::scoped_lock lock(my_mutex);
2529             --my_tries;
2530             if (reserved) my_predecessors.try_release();
2531             if ( check_conditions() ) {
2532                 task* tp = this->my_graph.root_task();
2533                 if ( tp ) {
2534                     task *rtask = new ( task::allocate_additional_child_of( *tp ) )
2535                         internal::forward_task_bypass< limiter_node<T> >( *this );
2536                     __TBB_ASSERT(!rval, "Have two tasks to handle");
2537                     return rtask;
2538                 }
2539             }
2540             return rval;
2541         }
2542     }
2543 
forward()2544     void forward() {
2545         __TBB_ASSERT(false, "Should never be called");
2546         return;
2547     }
2548 
decrement_counter()2549     task * decrement_counter() {
2550         {
2551             spin_mutex::scoped_lock lock(my_mutex);
2552             if(my_count) --my_count;
2553         }
2554         return forward_task();
2555     }
2556 
2557 public:
2558     //! The internal receiver< continue_msg > that decrements the count
2559     internal::decrementer< limiter_node<T> > decrement;
2560 
2561     //! Constructor
2562     limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
graph_node(g)2563         graph_node(g), my_threshold(threshold), my_count(0), my_tries(0),
2564         init_decrement_predecessors(num_decrement_predecessors),
2565         decrement(num_decrement_predecessors)
2566     {
2567         my_predecessors.set_owner(this);
2568         my_successors.set_owner(this);
2569         decrement.set_owner(this);
2570         tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2571                                  static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2572                                  static_cast<sender<output_type> *>(this) );
2573     }
2574 
2575     //! Copy constructor
limiter_node(const limiter_node & src)2576     limiter_node( const limiter_node& src ) :
2577         graph_node(src.my_graph), receiver<T>(), sender<T>(),
2578         my_threshold(src.my_threshold), my_count(0), my_tries(0),
2579         init_decrement_predecessors(src.init_decrement_predecessors),
2580         decrement(src.init_decrement_predecessors)
2581     {
2582         my_predecessors.set_owner(this);
2583         my_successors.set_owner(this);
2584         decrement.set_owner(this);
2585         tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2586                                  static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2587                                  static_cast<sender<output_type> *>(this) );
2588     }
2589 
2590 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2591     /* override */ void set_name( const char *name ) {
2592         tbb::internal::fgt_node_desc( this, name );
2593     }
2594 #endif
2595 
2596     //! Replace the current successor with this new successor
register_successor(receiver<output_type> & r)2597     /* override */ bool register_successor( receiver<output_type> &r ) {
2598         spin_mutex::scoped_lock lock(my_mutex);
2599         bool was_empty = my_successors.empty();
2600         my_successors.register_successor(r);
2601         //spawn a forward task if this is the only successor
2602         if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2603             task* tp = this->my_graph.root_task();
2604             if ( tp ) {
2605                 FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
2606                             internal::forward_task_bypass < limiter_node<T> >( *this ) ) );
2607             }
2608         }
2609         return true;
2610     }
2611 
2612     //! Removes a successor from this node
2613     /** r.remove_predecessor(*this) is also called. */
remove_successor(receiver<output_type> & r)2614     /* override */ bool remove_successor( receiver<output_type> &r ) {
2615         r.remove_predecessor(*this);
2616         my_successors.remove_successor(r);
2617         return true;
2618     }
2619 
2620 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_successor(receiver<output_type> & src)2621     /*override*/void internal_add_built_successor(receiver<output_type> &src) {
2622         my_successors.internal_add_built_successor(src);
2623     }
2624 
internal_delete_built_successor(receiver<output_type> & src)2625     /*override*/void internal_delete_built_successor(receiver<output_type> &src) {
2626         my_successors.internal_delete_built_successor(src);
2627     }
2628 
successor_count()2629     /*override*/size_t successor_count() { return my_successors.successor_count(); }
2630 
copy_successors(successor_vector_type & v)2631     /*override*/ void copy_successors(successor_vector_type &v) {
2632         my_successors.copy_successors(v);
2633     }
2634 
internal_add_built_predecessor(sender<output_type> & src)2635     /*override*/void internal_add_built_predecessor(sender<output_type> &src) {
2636         my_predecessors.internal_add_built_predecessor(src);
2637     }
2638 
internal_delete_built_predecessor(sender<output_type> & src)2639     /*override*/void internal_delete_built_predecessor(sender<output_type> &src) {
2640         my_predecessors.internal_delete_built_predecessor(src);
2641     }
2642 
predecessor_count()2643     /*override*/size_t predecessor_count() { return my_predecessors.predecessor_count(); }
2644 
copy_predecessors(predecessor_vector_type & v)2645     /*override*/ void copy_predecessors(predecessor_vector_type &v) {
2646         my_predecessors.copy_predecessors(v);
2647     }
2648 #endif  /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
2649 
2650     //! Adds src to the list of cached predecessors.
register_predecessor(predecessor_type & src)2651     /* override */ bool register_predecessor( predecessor_type &src ) {
2652         spin_mutex::scoped_lock lock(my_mutex);
2653         my_predecessors.add( src );
2654         task* tp = this->my_graph.root_task();
2655         if ( my_count + my_tries < my_threshold && !my_successors.empty() && tp ) {
2656             FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
2657                         internal::forward_task_bypass < limiter_node<T> >( *this ) ) );
2658         }
2659         return true;
2660     }
2661 
2662     //! Removes src from the list of cached predecessors.
remove_predecessor(predecessor_type & src)2663     /* override */ bool remove_predecessor( predecessor_type &src ) {
2664         my_predecessors.remove( src );
2665         return true;
2666     }
2667 
2668 protected:
2669 
2670     template< typename R, typename B > friend class run_and_put_task;
2671     template<typename X, typename Y> friend class internal::broadcast_cache;
2672     template<typename X, typename Y> friend class internal::round_robin_cache;
2673     //! Puts an item to this receiver
try_put_task(const T & t)2674     /* override */ task *try_put_task( const T &t ) {
2675         {
2676             spin_mutex::scoped_lock lock(my_mutex);
2677             if ( my_count + my_tries >= my_threshold )
2678                 return NULL;
2679             else
2680                 ++my_tries;
2681         }
2682 
2683         task * rtask = my_successors.try_put_task(t);
2684 
2685         if ( !rtask ) {  // try_put_task failed.
2686             spin_mutex::scoped_lock lock(my_mutex);
2687             --my_tries;
2688             task* tp = this->my_graph.root_task();
2689             if ( check_conditions() && tp ) {
2690                 rtask = new ( task::allocate_additional_child_of( *tp ) )
2691                     internal::forward_task_bypass< limiter_node<T> >( *this );
2692             }
2693         }
2694         else {
2695             spin_mutex::scoped_lock lock(my_mutex);
2696             ++my_count;
2697             --my_tries;
2698              }
2699         return rtask;
2700     }
2701 
reset(__TBB_PFG_RESET_ARG (reset_flags f))2702     /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2703         my_count = 0;
2704         my_predecessors.reset(__TBB_PFG_RESET_ARG(f));
2705         decrement.reset_receiver(__TBB_PFG_RESET_ARG(f));
2706 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2707         my_successors.reset(f);
2708 #endif
2709     }
2710 
reset_receiver(__TBB_PFG_RESET_ARG (reset_flags f))2711     /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) { my_predecessors.reset(__TBB_PFG_RESET_ARG(f)); }
2712 };  // limiter_node
2713 
2714 #include "internal/_flow_graph_join_impl.h"
2715 
2716 using internal::reserving_port;
2717 using internal::queueing_port;
2718 using internal::tag_matching_port;
2719 using internal::input_port;
2720 using internal::tag_value;
2721 using internal::NO_TAG;
2722 
2723 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
2724 
2725 template<typename OutputTuple>
2726 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2727 private:
2728     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2729     typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2730 public:
2731     typedef OutputTuple output_type;
2732     typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)2733     join_node(graph &g) : unfolded_type(g) {
2734         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2735                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2736     }
join_node(const join_node & other)2737     join_node(const join_node &other) : unfolded_type(other) {
2738         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2739                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2740     }
2741 
2742 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2743     /* override */ void set_name( const char *name ) {
2744         tbb::internal::fgt_node_desc( this, name );
2745     }
2746 #endif
2747 
2748 };
2749 
2750 template<typename OutputTuple>
2751 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2752 private:
2753     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2754     typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2755 public:
2756     typedef OutputTuple output_type;
2757     typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)2758     join_node(graph &g) : unfolded_type(g) {
2759         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2760                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2761     }
join_node(const join_node & other)2762     join_node(const join_node &other) : unfolded_type(other) {
2763         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2764                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2765     }
2766 
2767 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2768     /* override */ void set_name( const char *name ) {
2769         tbb::internal::fgt_node_desc( this, name );
2770     }
2771 #endif
2772 
2773 };
2774 
2775 // template for tag_matching join_node
2776 template<typename OutputTuple>
2777 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2778       tag_matching_port, OutputTuple, tag_matching> {
2779 private:
2780     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2781     typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
2782 public:
2783     typedef OutputTuple output_type;
2784     typedef typename unfolded_type::input_ports_type input_ports_type;
2785 
2786     template<typename __TBB_B0, typename __TBB_B1>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1)2787     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2788         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2789                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2790     }
2791     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2)2792     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2793         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2794                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2795     }
2796     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3)2797     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2798         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2799                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2800     }
2801     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4)2802     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2803             unfolded_type(g, b0, b1, b2, b3, b4) {
2804         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2805                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2806     }
2807 #if __TBB_VARIADIC_MAX >= 6
2808     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2809         typename __TBB_B5>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4,__TBB_B5 b5)2810     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2811             unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2812         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2813                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2814     }
2815 #endif
2816 #if __TBB_VARIADIC_MAX >= 7
2817     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2818         typename __TBB_B5, typename __TBB_B6>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4,__TBB_B5 b5,__TBB_B6 b6)2819     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2820             unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2821         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2822                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2823     }
2824 #endif
2825 #if __TBB_VARIADIC_MAX >= 8
2826     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2827         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4,__TBB_B5 b5,__TBB_B6 b6,__TBB_B7 b7)2828     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2829             __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2830         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2831                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2832     }
2833 #endif
2834 #if __TBB_VARIADIC_MAX >= 9
2835     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2836         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4,__TBB_B5 b5,__TBB_B6 b6,__TBB_B7 b7,__TBB_B8 b8)2837     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2838             __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2839         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2840                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2841     }
2842 #endif
2843 #if __TBB_VARIADIC_MAX >= 10
2844     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2845         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4,__TBB_B5 b5,__TBB_B6 b6,__TBB_B7 b7,__TBB_B8 b8,__TBB_B9 b9)2846     join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2847             __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2848         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2849                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2850     }
2851 #endif
join_node(const join_node & other)2852     join_node(const join_node &other) : unfolded_type(other) {
2853         tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2854                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2855     }
2856 
2857 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2858     /* override */ void set_name( const char *name ) {
2859         tbb::internal::fgt_node_desc( this, name );
2860     }
2861 #endif
2862 
2863 };
2864 
2865 // indexer node
2866 #include "internal/_flow_graph_indexer_impl.h"
2867 
2868 struct indexer_null_type {};
2869 
2870 template<typename T0, typename T1=indexer_null_type, typename T2=indexer_null_type, typename T3=indexer_null_type,
2871                       typename T4=indexer_null_type, typename T5=indexer_null_type, typename T6=indexer_null_type,
2872                       typename T7=indexer_null_type, typename T8=indexer_null_type, typename T9=indexer_null_type> class indexer_node;
2873 
2874 //indexer node specializations
2875 template<typename T0>
2876 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
2877 private:
2878     static const int N = 1;
2879 public:
2880     typedef tuple<T0> InputTuple;
2881     typedef typename internal::tagged_msg<size_t, T0> output_type;
2882     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2883     indexer_node(graph& g) : unfolded_type(g) {
2884         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2885                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2886     }
2887     // Copy constructor
indexer_node(const indexer_node & other)2888     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2889         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2890                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2891     }
2892 
2893 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2894      void set_name( const char *name ) {
2895         tbb::internal::fgt_node_desc( this, name );
2896     }
2897 #endif
2898 };
2899 
2900 template<typename T0, typename T1>
2901 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
2902 private:
2903     static const int N = 2;
2904 public:
2905     typedef tuple<T0, T1> InputTuple;
2906     typedef typename internal::tagged_msg<size_t, T0, T1> output_type;
2907     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2908     indexer_node(graph& g) : unfolded_type(g) {
2909         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2910                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2911     }
2912     // Copy constructor
indexer_node(const indexer_node & other)2913     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2914         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2915                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2916     }
2917 
2918 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2919      void set_name( const char *name ) {
2920         tbb::internal::fgt_node_desc( this, name );
2921     }
2922 #endif
2923 };
2924 
2925 template<typename T0, typename T1, typename T2>
2926 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
2927 private:
2928     static const int N = 3;
2929 public:
2930     typedef tuple<T0, T1, T2> InputTuple;
2931     typedef typename internal::tagged_msg<size_t, T0, T1, T2> output_type;
2932     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2933     indexer_node(graph& g) : unfolded_type(g) {
2934         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2935                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2936     }
2937     // Copy constructor
indexer_node(const indexer_node & other)2938     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2939         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2940                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2941     }
2942 
2943 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2944         void set_name( const char *name ) {
2945         tbb::internal::fgt_node_desc( this, name );
2946     }
2947 #endif
2948 };
2949 
2950 template<typename T0, typename T1, typename T2, typename T3>
2951 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
2952 private:
2953     static const int N = 4;
2954 public:
2955     typedef tuple<T0, T1, T2, T3> InputTuple;
2956     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3> output_type;
2957     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2958     indexer_node(graph& g) : unfolded_type(g) {
2959         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2960                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2961     }
2962     // Copy constructor
indexer_node(const indexer_node & other)2963     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2964         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2965                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2966     }
2967 
2968 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2969     /* override */ void set_name( const char *name ) {
2970         tbb::internal::fgt_node_desc( this, name );
2971     }
2972 #endif
2973 };
2974 
2975 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2976 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
2977 private:
2978     static const int N = 5;
2979 public:
2980     typedef tuple<T0, T1, T2, T3, T4> InputTuple;
2981     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2982     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2983     indexer_node(graph& g) : unfolded_type(g) {
2984         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2985                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2986     }
2987     // Copy constructor
indexer_node(const indexer_node & other)2988     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2989         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2990                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2991     }
2992 
2993 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2994     /* override */ void set_name( const char *name ) {
2995         tbb::internal::fgt_node_desc( this, name );
2996     }
2997 #endif
2998 };
2999 
3000 #if __TBB_VARIADIC_MAX >= 6
3001 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3002 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3003 private:
3004     static const int N = 6;
3005 public:
3006     typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3007     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
3008     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)3009     indexer_node(graph& g) : unfolded_type(g) {
3010         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3011                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3012     }
3013     // Copy constructor
indexer_node(const indexer_node & other)3014     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3015         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3016                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3017     }
3018 
3019 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)3020     /* override */ void set_name( const char *name ) {
3021         tbb::internal::fgt_node_desc( this, name );
3022     }
3023 #endif
3024 };
3025 #endif //variadic max 6
3026 
3027 #if __TBB_VARIADIC_MAX >= 7
3028 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3029          typename T6>
3030 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3031 private:
3032     static const int N = 7;
3033 public:
3034     typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3035     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
3036     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)3037     indexer_node(graph& g) : unfolded_type(g) {
3038         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3039                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3040     }
3041     // Copy constructor
indexer_node(const indexer_node & other)3042     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3043         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3044                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3045     }
3046 
3047 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)3048     /* override */ void set_name( const char *name ) {
3049         tbb::internal::fgt_node_desc( this, name );
3050     }
3051 #endif
3052 };
3053 #endif //variadic max 7
3054 
3055 #if __TBB_VARIADIC_MAX >= 8
3056 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3057          typename T6, typename T7>
3058 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3059 private:
3060     static const int N = 8;
3061 public:
3062     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3063     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
3064     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)3065     indexer_node(graph& g) : unfolded_type(g) {
3066         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3067                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3068     }
3069     // Copy constructor
indexer_node(const indexer_node & other)3070     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3071         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3072                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3073     }
3074 
3075 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)3076     /* override */ void set_name( const char *name ) {
3077         tbb::internal::fgt_node_desc( this, name );
3078     }
3079 #endif
3080 };
3081 #endif //variadic max 8
3082 
3083 #if __TBB_VARIADIC_MAX >= 9
3084 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3085          typename T6, typename T7, typename T8>
3086 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3087 private:
3088     static const int N = 9;
3089 public:
3090     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3091     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
3092     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)3093     indexer_node(graph& g) : unfolded_type(g) {
3094         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3095                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3096     }
3097     // Copy constructor
indexer_node(const indexer_node & other)3098     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3099         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3100                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3101     }
3102 
3103 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)3104     /* override */ void set_name( const char *name ) {
3105         tbb::internal::fgt_node_desc( this, name );
3106     }
3107 #endif
3108 };
3109 #endif //variadic max 9
3110 
3111 #if __TBB_VARIADIC_MAX >= 10
3112 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3113          typename T6, typename T7, typename T8, typename T9>
3114 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3115 private:
3116     static const int N = 10;
3117 public:
3118     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3119     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3120     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)3121     indexer_node(graph& g) : unfolded_type(g) {
3122         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3123                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3124     }
3125     // Copy constructor
indexer_node(const indexer_node & other)3126     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3127         tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3128                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3129     }
3130 
3131 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)3132     /* override */ void set_name( const char *name ) {
3133         tbb::internal::fgt_node_desc( this, name );
3134     }
3135 #endif
3136 };
3137 #endif //variadic max 10
3138 
3139 //! Makes an edge between a single predecessor and a single successor
3140 template< typename T >
make_edge(sender<T> & p,receiver<T> & s)3141 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3142 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3143     s.internal_add_built_predecessor(p);
3144     p.internal_add_built_successor(s);
3145 #endif
3146     p.register_successor( s );
3147     tbb::internal::fgt_make_edge( &p, &s );
3148 }
3149 
3150 //! Makes an edge between a single predecessor and a single successor
3151 template< typename T >
remove_edge(sender<T> & p,receiver<T> & s)3152 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3153     p.remove_successor( s );
3154 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3155     // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3156     p.internal_delete_built_successor(s);
3157     s.internal_delete_built_predecessor(p);
3158 #endif
3159     tbb::internal::fgt_remove_edge( &p, &s );
3160 }
3161 
3162 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3163 template<typename C >
3164 template< typename S >
sender_extract(S & s)3165 void edge_container<C>::sender_extract( S &s ) {
3166     edge_vector e = built_edges;
3167     for ( typename edge_vector::iterator i = e.begin(); i != e.end(); ++i ) {
3168         remove_edge(s, **i);
3169     }
3170 }
3171 
3172 template<typename C >
3173 template< typename R >
receiver_extract(R & r)3174 void edge_container<C>::receiver_extract( R &r ) {
3175     edge_vector e = built_edges;
3176     for ( typename edge_vector::iterator i = e.begin(); i != e.end(); ++i ) {
3177         remove_edge(**i, r);
3178     }
3179 }
3180 #endif
3181 
3182 //! Returns a copy of the body from a function or continue node
3183 template< typename Body, typename Node >
copy_body(Node & n)3184 Body copy_body( Node &n ) {
3185     return n.template copy_function_object<Body>();
3186 }
3187 
3188 } // interface7
3189 
3190 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3191     using interface7::reset_flags;
3192     using interface7::rf_reset_protocol;
3193     using interface7::rf_reset_bodies;
3194     using interface7::rf_extract;
3195 #endif
3196 
3197     using interface7::graph;
3198     using interface7::graph_node;
3199     using interface7::continue_msg;
3200     using interface7::sender;
3201     using interface7::receiver;
3202     using interface7::continue_receiver;
3203 
3204     using interface7::source_node;
3205     using interface7::function_node;
3206     using interface7::multifunction_node;
3207     using interface7::split_node;
3208     using interface7::internal::output_port;
3209     using interface7::indexer_node;
3210     using interface7::internal::tagged_msg;
3211     using interface7::internal::cast_to;
3212     using interface7::internal::is_a;
3213     using interface7::continue_node;
3214     using interface7::overwrite_node;
3215     using interface7::write_once_node;
3216     using interface7::broadcast_node;
3217     using interface7::buffer_node;
3218     using interface7::queue_node;
3219     using interface7::sequencer_node;
3220     using interface7::priority_queue_node;
3221     using interface7::limiter_node;
3222     using namespace interface7::internal::graph_policy_namespace;
3223     using interface7::join_node;
3224     using interface7::input_port;
3225     using interface7::copy_body;
3226     using interface7::make_edge;
3227     using interface7::remove_edge;
3228     using interface7::internal::NO_TAG;
3229     using interface7::internal::tag_value;
3230 
3231 } // flow
3232 } // tbb
3233 
3234 #undef __TBB_PFG_RESET_ARG
3235 #undef __TBB_COMMA
3236 
3237 #endif // __TBB_flow_graph_H
3238