1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #define __TBB_flow_graph_H_include_area
21 #include "internal/_warning_suppress_enable_notice.h"
22 
23 #include "tbb_stddef.h"
24 #include "atomic.h"
25 #include "spin_mutex.h"
26 #include "null_mutex.h"
27 #include "spin_rw_mutex.h"
28 #include "null_rw_mutex.h"
29 #include "task.h"
30 #include "cache_aligned_allocator.h"
31 #include "tbb_exception.h"
32 #include "pipeline.h"
33 #include "internal/_template_helpers.h"
34 #include "internal/_aggregator_impl.h"
35 #include "tbb/internal/_allocator_traits.h"
36 #include "tbb_profiling.h"
37 #include "task_arena.h"
38 
39 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
40    #if __INTEL_COMPILER
41        // Disabled warning "routine is both inline and noinline"
42        #pragma warning (push)
43        #pragma warning( disable: 2196 )
44    #endif
45    #define __TBB_NOINLINE_SYM __attribute__((noinline))
46 #else
47    #define __TBB_NOINLINE_SYM
48 #endif
49 
50 #if __TBB_PREVIEW_ASYNC_MSG
51 #include <vector>    // std::vector in internal::async_storage
52 #include <memory>    // std::shared_ptr in async_msg
53 #endif
54 
55 #if __TBB_PREVIEW_STREAMING_NODE
56 // For streaming_node
57 #include <array>            // std::array
58 #include <unordered_map>    // std::unordered_map
59 #include <type_traits>      // std::decay, std::true_type, std::false_type
60 #endif // __TBB_PREVIEW_STREAMING_NODE
61 
62 #if TBB_DEPRECATED_FLOW_ENQUEUE
63 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
64 #else
65 #define FLOW_SPAWN(a) tbb::task::spawn((a))
66 #endif
67 
68 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
69 #define __TBB_DEFAULT_NODE_ALLOCATOR(T) cache_aligned_allocator<T>
70 #else
71 #define __TBB_DEFAULT_NODE_ALLOCATOR(T) null_type
72 #endif
73 
74 // use the VC10 or gcc version of tuple if it is available.
75 #if __TBB_CPP11_TUPLE_PRESENT
76     #include <tuple>
77 namespace tbb {
78     namespace flow {
79         using std::tuple;
80         using std::tuple_size;
81         using std::tuple_element;
82         using std::get;
83     }
84 }
85 #else
86     #include "compat/tuple"
87 #endif
88 
89 #include<list>
90 #include<queue>
91 
92 /** @file
93   \brief The graph related classes and functions
94 
95   There are some applications that best express dependencies as messages
96   passed between nodes in a graph.  These messages may contain data or
97   simply act as signals that a predecessors has completed. The graph
98   class and its associated node classes can be used to express such
99   applications.
100 */
101 
102 namespace tbb {
103 namespace flow {
104 
105 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
106 enum concurrency { unlimited = 0, serial = 1 };
107 
108 namespace interface11 {
109 
110 //! A generic null type
111 struct null_type {};
112 
113 //! An empty class used for messages that mean "I'm done"
114 class continue_msg {};
115 
116 //! Forward declaration section
117 template< typename T > class sender;
118 template< typename T > class receiver;
119 class continue_receiver;
120 
121 template< typename T, typename U > class limiter_node;  // needed for resetting decrementer
122 
123 template< typename R, typename B > class run_and_put_task;
124 
125 namespace internal {
126 
127 template<typename T, typename M> class successor_cache;
128 template<typename T, typename M> class broadcast_cache;
129 template<typename T, typename M> class round_robin_cache;
130 template<typename T, typename M> class predecessor_cache;
131 template<typename T, typename M> class reservable_predecessor_cache;
132 
133 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
134 namespace order {
135 struct following;
136 struct preceding;
137 }
138 template<typename Order, typename... Args> struct node_set;
139 #endif
140 
141 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
142 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
143 // C == receiver< ... > or sender< ... >, depending.
144 template<typename C>
145 class edge_container {
146 
147 public:
148     typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
149 
add_edge(C & s)150     void add_edge(C &s) {
151         built_edges.push_back(&s);
152     }
153 
delete_edge(C & s)154     void delete_edge(C &s) {
155         for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
156             if (*i == &s) {
157                 (void)built_edges.erase(i);
158                 return;  // only remove one predecessor per request
159             }
160         }
161     }
162 
copy_edges(edge_list_type & v)163     void copy_edges(edge_list_type &v) {
164         v = built_edges;
165     }
166 
edge_count()167     size_t edge_count() {
168         return (size_t)(built_edges.size());
169     }
170 
clear()171     void clear() {
172         built_edges.clear();
173     }
174 
175     // methods remove the statement from all predecessors/successors liste in the edge
176     // container.
177     template< typename S > void sender_extract(S &s);
178     template< typename R > void receiver_extract(R &r);
179 
180 private:
181     edge_list_type built_edges;
182 };  // class edge_container
183 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
184 
185 } // namespace internal
186 
187 } // namespace interfaceX
188 } // namespace flow
189 } // namespace tbb
190 
191 //! The graph class
192 #include "internal/_flow_graph_impl.h"
193 
194 namespace tbb {
195 namespace flow {
196 namespace interface11 {
197 
198 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
combine_tasks(graph & g,tbb::task * left,tbb::task * right)199 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
200     // if no RHS task, don't change left.
201     if (right == NULL) return left;
202     // right != NULL
203     if (left == NULL) return right;
204     if (left == SUCCESSFULLY_ENQUEUED) return right;
205     // left contains a task
206     if (right != SUCCESSFULLY_ENQUEUED) {
207         // both are valid tasks
208         internal::spawn_in_graph_arena(g, *left);
209         return right;
210     }
211     return left;
212 }
213 
214 #if __TBB_PREVIEW_ASYNC_MSG
215 
216 template < typename T > class __TBB_DEPRECATED async_msg;
217 
218 namespace internal {
219 
220 template < typename T > class async_storage;
221 
222 template< typename T, typename = void >
223 struct async_helpers {
224     typedef async_msg<T> async_type;
225     typedef T filtered_type;
226 
227     static const bool is_async_type = false;
228 
to_void_ptrasync_helpers229     static const void* to_void_ptr(const T& t) {
230         return static_cast<const void*>(&t);
231     }
232 
to_void_ptrasync_helpers233     static void* to_void_ptr(T& t) {
234         return static_cast<void*>(&t);
235     }
236 
from_void_ptrasync_helpers237     static const T& from_void_ptr(const void* p) {
238         return *static_cast<const T*>(p);
239     }
240 
from_void_ptrasync_helpers241     static T& from_void_ptr(void* p) {
242         return *static_cast<T*>(p);
243     }
244 
try_put_task_wrapper_implasync_helpers245     static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
246         if (is_async) {
247             // This (T) is NOT async and incoming 'A<X> t' IS async
248             // Get data from async_msg
249             const async_msg<filtered_type>& msg = async_helpers< async_msg<filtered_type> >::from_void_ptr(p);
250             task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
251             // finalize() must be called after subscribe() because set() can be called in finalize()
252             // and 'this_recv' client must be subscribed by this moment
253             msg.finalize();
254             return new_task;
255         }
256         else {
257             // Incoming 't' is NOT async
258             return this_recv->try_put_task(from_void_ptr(p));
259         }
260     }
261 };
262 
263 template< typename T >
264 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
265     typedef T async_type;
266     typedef typename T::async_msg_data_type filtered_type;
267 
268     static const bool is_async_type = true;
269 
270     // Receiver-classes use const interfaces
271     static const void* to_void_ptr(const T& t) {
272         return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
273     }
274 
275     static void* to_void_ptr(T& t) {
276         return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
277     }
278 
279     // Sender-classes use non-const interfaces
280     static const T& from_void_ptr(const void* p) {
281         return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
282     }
283 
284     static T& from_void_ptr(void* p) {
285         return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
286     }
287 
288     // Used in receiver<T> class
289     static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
290         if (is_async) {
291             // Both are async
292             return this_recv->try_put_task(from_void_ptr(p));
293         }
294         else {
295             // This (T) is async and incoming 'X t' is NOT async
296             // Create async_msg for X
297             const filtered_type& t = async_helpers<filtered_type>::from_void_ptr(p);
298             const T msg(t);
299             return this_recv->try_put_task(msg);
300         }
301     }
302 };
303 
304 class untyped_receiver;
305 
306 class untyped_sender {
307     template< typename, typename > friend class internal::predecessor_cache;
308     template< typename, typename > friend class internal::reservable_predecessor_cache;
309 public:
310     //! The successor type for this node
311     typedef untyped_receiver successor_type;
312 
313     virtual ~untyped_sender() {}
314 
315     // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
316 
317     // TODO: Prevent untyped successor registration
318 
319     //! Add a new successor to this node
320     virtual bool register_successor( successor_type &r ) = 0;
321 
322     //! Removes a successor from this node
323     virtual bool remove_successor( successor_type &r ) = 0;
324 
325     //! Releases the reserved item
326     virtual bool try_release( ) { return false; }
327 
328     //! Consumes the reserved item
329     virtual bool try_consume( ) { return false; }
330 
331 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
332     //! interface to record edges for traversal & deletion
333     typedef internal::edge_container<successor_type> built_successors_type;
334     typedef built_successors_type::edge_list_type successor_list_type;
335     virtual built_successors_type &built_successors()                   = 0;
336     virtual void    internal_add_built_successor( successor_type & )    = 0;
337     virtual void    internal_delete_built_successor( successor_type & ) = 0;
338     virtual void    copy_successors( successor_list_type &)             = 0;
339     virtual size_t  successor_count()                                   = 0;
340 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
341 protected:
342     //! Request an item from the sender
343     template< typename X >
344     bool try_get( X &t ) {
345         return try_get_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
346     }
347 
348     //! Reserves an item in the sender
349     template< typename X >
350     bool try_reserve( X &t ) {
351         return try_reserve_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
352     }
353 
354     virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
355     virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
356 };
357 
358 class untyped_receiver  {
359     template< typename, typename > friend class run_and_put_task;
360 
361     template< typename, typename > friend class internal::broadcast_cache;
362     template< typename, typename > friend class internal::round_robin_cache;
363     template< typename, typename > friend class internal::successor_cache;
364 
365 #if __TBB_PREVIEW_OPENCL_NODE
366     template< typename, typename > friend class proxy_dependency_receiver;
367 #endif /* __TBB_PREVIEW_OPENCL_NODE */
368 public:
369     //! The predecessor type for this node
370     typedef untyped_sender predecessor_type;
371 
372     //! Destructor
373     virtual ~untyped_receiver() {}
374 
375     //! Put an item to the receiver
376     template<typename X>
377     bool try_put(const X& t) {
378         task *res = try_put_task(t);
379         if (!res) return false;
380         if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
381         return true;
382     }
383 
384     // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
385 
386     // TODO: Prevent untyped predecessor registration
387 
388     //! Add a predecessor to the node
389     virtual bool register_predecessor( predecessor_type & ) { return false; }
390 
391     //! Remove a predecessor from the node
392     virtual bool remove_predecessor( predecessor_type & ) { return false; }
393 
394 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
395     typedef internal::edge_container<predecessor_type> built_predecessors_type;
396     typedef built_predecessors_type::edge_list_type predecessor_list_type;
397     virtual built_predecessors_type &built_predecessors()                  = 0;
398     virtual void   internal_add_built_predecessor( predecessor_type & )    = 0;
399     virtual void   internal_delete_built_predecessor( predecessor_type & ) = 0;
400     virtual void   copy_predecessors( predecessor_list_type & )            = 0;
401     virtual size_t predecessor_count()                                     = 0;
402 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
403 protected:
404     template<typename X>
405     task *try_put_task(const X& t) {
406         return try_put_task_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
407     }
408 
409     virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
410 
411     virtual graph& graph_reference() const = 0;
412 
413     // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
414 
415     //! put receiver back in initial state
416     virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
417 
418     virtual bool is_continue_receiver() { return false; }
419 };
420 
421 } // namespace internal
422 
423 //! Pure virtual template class that defines a sender of messages of type T
424 template< typename T >
425 class sender : public internal::untyped_sender {
426 public:
427     //! The output type of this sender
428     __TBB_DEPRECATED typedef T output_type;
429 
430     __TBB_DEPRECATED typedef typename internal::async_helpers<T>::filtered_type filtered_type;
431 
432     //! Request an item from the sender
433     virtual bool try_get( T & ) { return false; }
434 
435     //! Reserves an item in the sender
436     virtual bool try_reserve( T & ) { return false; }
437 
438 protected:
439     virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
440         // Both async OR both are NOT async
441         if ( internal::async_helpers<T>::is_async_type == is_async ) {
442             return try_get( internal::async_helpers<T>::from_void_ptr(p) );
443         }
444         // Else: this (T) is async OR incoming 't' is async
445         __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
446         return false;
447     }
448 
449     virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
450         // Both async OR both are NOT async
451         if ( internal::async_helpers<T>::is_async_type == is_async ) {
452             return try_reserve( internal::async_helpers<T>::from_void_ptr(p) );
453         }
454         // Else: this (T) is async OR incoming 't' is async
455         __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
456         return false;
457     }
458 };  // class sender<T>
459 
460 //! Pure virtual template class that defines a receiver of messages of type T
461 template< typename T >
462 class receiver : public internal::untyped_receiver {
463     template< typename > friend class internal::async_storage;
464     template< typename, typename > friend struct internal::async_helpers;
465 public:
466     //! The input type of this receiver
467     __TBB_DEPRECATED typedef T input_type;
468 
469     __TBB_DEPRECATED typedef typename internal::async_helpers<T>::filtered_type filtered_type;
470 
471     //! Put an item to the receiver
472     bool try_put( const typename internal::async_helpers<T>::filtered_type& t ) {
473         return internal::untyped_receiver::try_put(t);
474     }
475 
476     bool try_put( const typename internal::async_helpers<T>::async_type& t ) {
477         return internal::untyped_receiver::try_put(t);
478     }
479 
480 protected:
481     virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
482         return internal::async_helpers<T>::try_put_task_wrapper_impl(this, p, is_async);
483     }
484 
485     //! Put item to successor; return task to run the successor if possible.
486     virtual task *try_put_task(const T& t) = 0;
487 
488 }; // class receiver<T>
489 
490 #else // __TBB_PREVIEW_ASYNC_MSG
491 
492 //! Pure virtual template class that defines a sender of messages of type T
493 template< typename T >
494 class sender {
495 public:
496     //! The output type of this sender
497     __TBB_DEPRECATED typedef T output_type;
498 
499     //! The successor type for this node
500     __TBB_DEPRECATED typedef receiver<T> successor_type;
501 
502     virtual ~sender() {}
503 
504     // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
505 
506     //! Add a new successor to this node
507     __TBB_DEPRECATED virtual bool register_successor( successor_type &r ) = 0;
508 
509     //! Removes a successor from this node
510     __TBB_DEPRECATED virtual bool remove_successor( successor_type &r ) = 0;
511 
512     //! Request an item from the sender
513     virtual bool try_get( T & ) { return false; }
514 
515     //! Reserves an item in the sender
516     virtual bool try_reserve( T & ) { return false; }
517 
518     //! Releases the reserved item
519     virtual bool try_release( ) { return false; }
520 
521     //! Consumes the reserved item
522     virtual bool try_consume( ) { return false; }
523 
524 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
525     //! interface to record edges for traversal & deletion
526     __TBB_DEPRECATED typedef typename  internal::edge_container<successor_type> built_successors_type;
527     __TBB_DEPRECATED typedef typename  built_successors_type::edge_list_type successor_list_type;
528     __TBB_DEPRECATED virtual built_successors_type &built_successors()                   = 0;
529     __TBB_DEPRECATED virtual void    internal_add_built_successor( successor_type & )    = 0;
530     __TBB_DEPRECATED virtual void    internal_delete_built_successor( successor_type & ) = 0;
531     __TBB_DEPRECATED virtual void    copy_successors( successor_list_type &)             = 0;
532     __TBB_DEPRECATED virtual size_t  successor_count()                                   = 0;
533 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
534 };  // class sender<T>
535 
536 //! Pure virtual template class that defines a receiver of messages of type T
537 template< typename T >
538 class receiver {
539 public:
540     //! The input type of this receiver
541     __TBB_DEPRECATED typedef T input_type;
542 
543     //! The predecessor type for this node
544     __TBB_DEPRECATED typedef sender<T> predecessor_type;
545 
546     //! Destructor
547     virtual ~receiver() {}
548 
549     //! Put an item to the receiver
550     bool try_put( const T& t ) {
551         task *res = try_put_task(t);
552         if (!res) return false;
553         if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
554         return true;
555     }
556 
557     //! put item to successor; return task to run the successor if possible.
558 protected:
559     template< typename R, typename B > friend class run_and_put_task;
560     template< typename X, typename Y > friend class internal::broadcast_cache;
561     template< typename X, typename Y > friend class internal::round_robin_cache;
562     virtual task *try_put_task(const T& t) = 0;
563     virtual graph& graph_reference() const = 0;
564 public:
565     // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
566 
567     //! Add a predecessor to the node
568     __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
569 
570     //! Remove a predecessor from the node
571     __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
572 
573 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
574     __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
575     __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
576     __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors()                  = 0;
577     __TBB_DEPRECATED virtual void   internal_add_built_predecessor( predecessor_type & )    = 0;
578     __TBB_DEPRECATED virtual void   internal_delete_built_predecessor( predecessor_type & ) = 0;
579     __TBB_DEPRECATED virtual void   copy_predecessors( predecessor_list_type & )            = 0;
580     __TBB_DEPRECATED virtual size_t predecessor_count()                                     = 0;
581 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
582 
583 protected:
584     //! put receiver back in initial state
585     virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
586 
587     template<typename TT, typename M> friend class internal::successor_cache;
588     virtual bool is_continue_receiver() { return false; }
589 
590 #if __TBB_PREVIEW_OPENCL_NODE
591     template< typename, typename > friend class proxy_dependency_receiver;
592 #endif /* __TBB_PREVIEW_OPENCL_NODE */
593 }; // class receiver<T>
594 
595 #endif // __TBB_PREVIEW_ASYNC_MSG
596 
597 //! Base class for receivers of completion messages
598 /** These receivers automatically reset, but cannot be explicitly waited on */
599 class continue_receiver : public receiver< continue_msg > {
600 public:
601 
602     //! The input type
603     __TBB_DEPRECATED typedef continue_msg input_type;
604 
605     //! The predecessor type for this node
606     __TBB_DEPRECATED typedef receiver<input_type>::predecessor_type predecessor_type;
607 
608     //! Constructor
609     __TBB_DEPRECATED explicit continue_receiver(
610         __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
611         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
612         my_current_count = 0;
613         __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
614     }
615 
616     //! Copy constructor
617     __TBB_DEPRECATED continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
618         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
619         my_current_count = 0;
620         __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
621     }
622 
623     //! Increments the trigger threshold
624     __TBB_DEPRECATED bool register_predecessor( predecessor_type & ) __TBB_override {
625         spin_mutex::scoped_lock l(my_mutex);
626         ++my_predecessor_count;
627         return true;
628     }
629 
630     //! Decrements the trigger threshold
631     /** Does not check to see if the removal of the predecessor now makes the current count
632         exceed the new threshold.  So removing a predecessor while the graph is active can cause
633         unexpected results. */
634     __TBB_DEPRECATED bool remove_predecessor( predecessor_type & ) __TBB_override {
635         spin_mutex::scoped_lock l(my_mutex);
636         --my_predecessor_count;
637         return true;
638     }
639 
640 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
641     __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
642     __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
643     built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
644 
645     __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
646         spin_mutex::scoped_lock l(my_mutex);
647         my_built_predecessors.add_edge( s );
648     }
649 
650     __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
651         spin_mutex::scoped_lock l(my_mutex);
652         my_built_predecessors.delete_edge(s);
653     }
654 
655     __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
656         spin_mutex::scoped_lock l(my_mutex);
657         my_built_predecessors.copy_edges(v);
658     }
659 
660     __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
661         spin_mutex::scoped_lock l(my_mutex);
662         return my_built_predecessors.edge_count();
663     }
664 
665 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
666 
667 protected:
668     template< typename R, typename B > friend class run_and_put_task;
669     template<typename X, typename Y> friend class internal::broadcast_cache;
670     template<typename X, typename Y> friend class internal::round_robin_cache;
671     // execute body is supposed to be too small to create a task for.
672     task *try_put_task( const input_type & ) __TBB_override {
673         {
674             spin_mutex::scoped_lock l(my_mutex);
675             if ( ++my_current_count < my_predecessor_count )
676                 return SUCCESSFULLY_ENQUEUED;
677             else
678                 my_current_count = 0;
679         }
680         task * res = execute();
681         return res? res : SUCCESSFULLY_ENQUEUED;
682     }
683 
684 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
685     // continue_receiver must contain its own built_predecessors because it does
686     // not have a node_cache.
687     built_predecessors_type my_built_predecessors;
688 #endif
689     spin_mutex my_mutex;
690     int my_predecessor_count;
691     int my_current_count;
692     int my_initial_predecessor_count;
693     __TBB_FLOW_GRAPH_PRIORITY_EXPR( node_priority_t my_priority; )
694     // the friend declaration in the base class did not eliminate the "protected class"
695     // error in gcc 4.1.2
696     template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
697 
698     void reset_receiver( reset_flags f ) __TBB_override {
699         my_current_count = 0;
700         if (f & rf_clear_edges) {
701 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
702             my_built_predecessors.clear();
703 #endif
704             my_predecessor_count = my_initial_predecessor_count;
705         }
706     }
707 
708     //! Does whatever should happen when the threshold is reached
709     /** This should be very fast or else spawn a task.  This is
710         called while the sender is blocked in the try_put(). */
711     virtual task * execute() = 0;
712     template<typename TT, typename M> friend class internal::successor_cache;
713     bool is_continue_receiver() __TBB_override { return true; }
714 
715 }; // class continue_receiver
716 
717 }  // interfaceX
718 
719 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
720     template <typename K, typename T>
721     K key_from_message( const T &t ) {
722         return t.key();
723     }
724 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
725 
726     using interface11::sender;
727     using interface11::receiver;
728     using interface11::continue_receiver;
729 }  // flow
730 }  // tbb
731 
732 #include "internal/_flow_graph_trace_impl.h"
733 #include "internal/_tbb_hash_compare_impl.h"
734 
735 namespace tbb {
736 namespace flow {
737 namespace interface11 {
738 
739 #include "internal/_flow_graph_body_impl.h"
740 #include "internal/_flow_graph_cache_impl.h"
741 #include "internal/_flow_graph_types_impl.h"
742 #if __TBB_PREVIEW_ASYNC_MSG
743 #include "internal/_flow_graph_async_msg_impl.h"
744 #endif
745 using namespace internal::graph_policy_namespace;
746 
747 template <typename C, typename N>
748 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
749 {
750     if (begin) current_node = my_graph->my_nodes;
751     //else it is an end iterator by default
752 }
753 
754 template <typename C, typename N>
755 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
756     __TBB_ASSERT(current_node, "graph_iterator at end");
757     return *operator->();
758 }
759 
760 template <typename C, typename N>
761 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
762     return current_node;
763 }
764 
765 template <typename C, typename N>
766 void graph_iterator<C,N>::internal_forward() {
767     if (current_node) current_node = current_node->next;
768 }
769 
770 } // namespace interfaceX
771 
772 namespace interface10 {
773 //! Constructs a graph with isolated task_group_context
774 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
775     prepare_task_arena();
776     own_context = true;
777     cancelled = false;
778     caught_exception = false;
779     my_context = new task_group_context(tbb::internal::FLOW_TASKS);
780     my_root_task = (new (task::allocate_root(*my_context)) empty_task);
781     my_root_task->set_ref_count(1);
782     tbb::internal::fgt_graph(this);
783     my_is_active = true;
784 }
785 
786 inline graph::graph(task_group_context& use_this_context) :
787     my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
788     prepare_task_arena();
789     own_context = false;
790     cancelled = false;
791     caught_exception = false;
792     my_root_task = (new (task::allocate_root(*my_context)) empty_task);
793     my_root_task->set_ref_count(1);
794     tbb::internal::fgt_graph(this);
795     my_is_active = true;
796 }
797 
798 inline graph::~graph() {
799     wait_for_all();
800     my_root_task->set_ref_count(0);
801     tbb::task::destroy(*my_root_task);
802     if (own_context) delete my_context;
803     delete my_task_arena;
804 }
805 
806 inline void graph::reserve_wait() {
807     if (my_root_task) {
808         my_root_task->increment_ref_count();
809         tbb::internal::fgt_reserve_wait(this);
810     }
811 }
812 
813 inline void graph::release_wait() {
814     if (my_root_task) {
815         tbb::internal::fgt_release_wait(this);
816         my_root_task->decrement_ref_count();
817     }
818 }
819 
820 inline void graph::register_node(tbb::flow::interface11::graph_node *n) {
821     n->next = NULL;
822     {
823         spin_mutex::scoped_lock lock(nodelist_mutex);
824         n->prev = my_nodes_last;
825         if (my_nodes_last) my_nodes_last->next = n;
826         my_nodes_last = n;
827         if (!my_nodes) my_nodes = n;
828     }
829 }
830 
831 inline void graph::remove_node(tbb::flow::interface11::graph_node *n) {
832     {
833         spin_mutex::scoped_lock lock(nodelist_mutex);
834         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
835         if (n->prev) n->prev->next = n->next;
836         if (n->next) n->next->prev = n->prev;
837         if (my_nodes_last == n) my_nodes_last = n->prev;
838         if (my_nodes == n) my_nodes = n->next;
839     }
840     n->prev = n->next = NULL;
841 }
842 
843 inline void graph::reset( tbb::flow::interface11::reset_flags f ) {
844     // reset context
845     tbb::flow::interface11::internal::deactivate_graph(*this);
846 
847     if(my_context) my_context->reset();
848     cancelled = false;
849     caught_exception = false;
850     // reset all the nodes comprising the graph
851     for(iterator ii = begin(); ii != end(); ++ii) {
852         tbb::flow::interface11::graph_node *my_p = &(*ii);
853         my_p->reset_node(f);
854     }
855     // Reattach the arena. Might be useful to run the graph in a particular task_arena
856     // while not limiting graph lifetime to a single task_arena::execute() call.
857     prepare_task_arena( /*reinit=*/true );
858     tbb::flow::interface11::internal::activate_graph(*this);
859     // now spawn the tasks necessary to start the graph
860     for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
861         tbb::flow::interface11::internal::spawn_in_graph_arena(*this, *(*rti));
862     }
863     my_reset_task_list.clear();
864 }
865 
866 inline graph::iterator graph::begin() { return iterator(this, true); }
867 
868 inline graph::iterator graph::end() { return iterator(this, false); }
869 
870 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
871 
872 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
873 
874 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
875 
876 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
877 
878 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
879 inline void graph::set_name(const char *name) {
880     tbb::internal::fgt_graph_desc(this, name);
881 }
882 #endif
883 
884 } // namespace interface10
885 
886 namespace interface11 {
887 
888 inline graph_node::graph_node(graph& g) : my_graph(g) {
889     my_graph.register_node(this);
890 }
891 
892 inline graph_node::~graph_node() {
893     my_graph.remove_node(this);
894 }
895 
896 #include "internal/_flow_graph_node_impl.h"
897 
898 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
899 using internal::node_set;
900 #endif
901 
902 //! An executable node that acts as a source, i.e. it has no predecessors
903 template < typename Output >
904 class input_node : public graph_node, public sender< Output > {
905 public:
906     //! The type of the output message, which is complete
907     typedef Output output_type;
908 
909     //! The type of successors of this node
910     typedef typename sender<output_type>::successor_type successor_type;
911 
912     //Source node has no input type
913     typedef null_type input_type;
914 
915 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
916     typedef typename sender<output_type>::built_successors_type built_successors_type;
917     typedef typename sender<output_type>::successor_list_type successor_list_type;
918 #endif
919 
920     //! Constructor for a node with a successor
921     template< typename Body >
922      __TBB_NOINLINE_SYM input_node( graph &g, Body body )
923         : graph_node(g), my_active(false),
924         my_body( new internal::input_body_leaf< output_type, Body>(body) ),
925         my_init_body( new internal::input_body_leaf< output_type, Body>(body) ),
926         my_reserved(false), my_has_cached_item(false)
927     {
928         my_successors.set_owner(this);
929         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
930                                            static_cast<sender<output_type> *>(this), this->my_body );
931     }
932 
933 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
934     template <typename Body, typename... Successors>
935     input_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
936         : input_node(successors.graph_reference(), body) {
937         make_edges(*this, successors);
938     }
939 #endif
940 
941     //! Copy constructor
942     __TBB_NOINLINE_SYM input_node( const input_node& src ) :
943         graph_node(src.my_graph), sender<Output>(),
944         my_active(false),
945         my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
946         my_reserved(false), my_has_cached_item(false)
947     {
948         my_successors.set_owner(this);
949         tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
950                                            static_cast<sender<output_type> *>(this), this->my_body );
951     }
952 
953     //! The destructor
954     ~input_node() { delete my_body; delete my_init_body; }
955 
956 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
957     void set_name( const char *name ) __TBB_override {
958         tbb::internal::fgt_node_desc( this, name );
959     }
960 #endif
961 
962     //! Add a new successor to this node
963     bool register_successor( successor_type &r ) __TBB_override {
964         spin_mutex::scoped_lock lock(my_mutex);
965         my_successors.register_successor(r);
966         if ( my_active )
967             spawn_put();
968         return true;
969     }
970 
971     //! Removes a successor from this node
972     bool remove_successor( successor_type &r ) __TBB_override {
973         spin_mutex::scoped_lock lock(my_mutex);
974         my_successors.remove_successor(r);
975         return true;
976     }
977 
978 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
979 
980     built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
981 
982     void internal_add_built_successor( successor_type &r) __TBB_override {
983         spin_mutex::scoped_lock lock(my_mutex);
984         my_successors.internal_add_built_successor(r);
985     }
986 
987     void internal_delete_built_successor( successor_type &r) __TBB_override {
988         spin_mutex::scoped_lock lock(my_mutex);
989         my_successors.internal_delete_built_successor(r);
990     }
991 
992     size_t successor_count() __TBB_override {
993         spin_mutex::scoped_lock lock(my_mutex);
994         return my_successors.successor_count();
995     }
996 
997     void copy_successors(successor_list_type &v) __TBB_override {
998         spin_mutex::scoped_lock l(my_mutex);
999         my_successors.copy_successors(v);
1000     }
1001 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1002 
1003     //! Request an item from the node
1004     bool try_get( output_type &v ) __TBB_override {
1005         spin_mutex::scoped_lock lock(my_mutex);
1006         if ( my_reserved )
1007             return false;
1008 
1009         if ( my_has_cached_item ) {
1010             v = my_cached_item;
1011             my_has_cached_item = false;
1012             return true;
1013         }
1014         // we've been asked to provide an item, but we have none.  enqueue a task to
1015         // provide one.
1016         if ( my_active )
1017             spawn_put();
1018         return false;
1019     }
1020 
1021     //! Reserves an item.
1022     bool try_reserve( output_type &v ) __TBB_override {
1023         spin_mutex::scoped_lock lock(my_mutex);
1024         if ( my_reserved ) {
1025             return false;
1026         }
1027 
1028         if ( my_has_cached_item ) {
1029             v = my_cached_item;
1030             my_reserved = true;
1031             return true;
1032         } else {
1033             return false;
1034         }
1035     }
1036 
1037     //! Release a reserved item.
1038     /** true = item has been released and so remains in sender, dest must request or reserve future items */
1039     bool try_release( ) __TBB_override {
1040         spin_mutex::scoped_lock lock(my_mutex);
1041         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1042         my_reserved = false;
1043         if(!my_successors.empty())
1044             spawn_put();
1045         return true;
1046     }
1047 
1048     //! Consumes a reserved item
1049     bool try_consume( ) __TBB_override {
1050         spin_mutex::scoped_lock lock(my_mutex);
1051         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1052         my_reserved = false;
1053         my_has_cached_item = false;
1054         if ( !my_successors.empty() ) {
1055             spawn_put();
1056         }
1057         return true;
1058     }
1059 
1060     //! Activates a node that was created in the inactive state
1061     void activate() {
1062         spin_mutex::scoped_lock lock(my_mutex);
1063         my_active = true;
1064         if (!my_successors.empty())
1065             spawn_put();
1066     }
1067 
1068     template<typename Body>
1069     Body copy_function_object() {
1070         internal::input_body<output_type> &body_ref = *this->my_body;
1071         return dynamic_cast< internal::input_body_leaf<output_type, Body> & >(body_ref).get_body();
1072     }
1073 
1074 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1075     void extract( ) __TBB_override {
1076         my_successors.built_successors().sender_extract(*this);   // removes "my_owner" == this from each successor
1077         my_active = false;
1078         my_reserved = false;
1079         if(my_has_cached_item) my_has_cached_item = false;
1080     }
1081 #endif
1082 
1083 protected:
1084 
1085     //! resets the input_node to its initial state
1086     void reset_node( reset_flags f) __TBB_override {
1087         my_active = false;
1088         my_reserved = false;
1089         my_has_cached_item = false;
1090 
1091         if(f & rf_clear_edges) my_successors.clear();
1092         if(f & rf_reset_bodies) {
1093             internal::input_body<output_type> *tmp = my_init_body->clone();
1094             delete my_body;
1095             my_body = tmp;
1096         }
1097     }
1098 
1099 private:
1100     spin_mutex my_mutex;
1101     bool my_active;
1102     internal::input_body<output_type> *my_body;
1103     internal::input_body<output_type> *my_init_body;
1104     internal::broadcast_cache< output_type > my_successors;
1105     bool my_reserved;
1106     bool my_has_cached_item;
1107     output_type my_cached_item;
1108 
1109     // used by apply_body_bypass, can invoke body of node.
1110     bool try_reserve_apply_body(output_type &v) {
1111         spin_mutex::scoped_lock lock(my_mutex);
1112         if ( my_reserved ) {
1113             return false;
1114         }
1115         if ( !my_has_cached_item ) {
1116             tbb::internal::fgt_begin_body( my_body );
1117 
1118 #if TBB_DEPRECATED_INPUT_NODE_BODY
1119             bool r = (*my_body)(my_cached_item);
1120             if (r) {
1121                 my_has_cached_item = true;
1122             }
1123 #else
1124             flow_control control;
1125             my_cached_item = (*my_body)(control);
1126             my_has_cached_item = !control.is_pipeline_stopped;
1127 #endif
1128             tbb::internal::fgt_end_body( my_body );
1129         }
1130         if ( my_has_cached_item ) {
1131             v = my_cached_item;
1132             my_reserved = true;
1133             return true;
1134         } else {
1135             return false;
1136         }
1137     }
1138 
1139     task* create_put_task() {
1140         return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1141                         internal:: source_task_bypass < input_node< output_type > >( *this ) );
1142     }
1143 
1144     //! Spawns a task that applies the body
1145     void spawn_put( ) {
1146         if(internal::is_graph_active(this->my_graph)) {
1147             internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1148         }
1149     }
1150 
1151     friend class internal::source_task_bypass< input_node< output_type > >;
1152     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
1153     task * apply_body_bypass( ) {
1154         output_type v;
1155         if ( !try_reserve_apply_body(v) )
1156             return NULL;
1157 
1158         task *last_task = my_successors.try_put_task(v);
1159         if ( last_task )
1160             try_consume();
1161         else
1162             try_release();
1163         return last_task;
1164     }
1165 };  // class input_node
1166 
1167 #if TBB_USE_SOURCE_NODE_AS_ALIAS
1168 template < typename Output >
1169 class source_node : public input_node <Output> {
1170 public:
1171     //! Constructor for a node with a successor
1172     template< typename Body >
1173      __TBB_NOINLINE_SYM source_node( graph &g, Body body )
1174          : input_node<Output>(g, body)
1175     {
1176     }
1177 
1178 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1179     template <typename Body, typename... Successors>
1180     source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
1181         : input_node<Output>(successors, body) {
1182      }
1183 #endif
1184 };
1185 #else // TBB_USE_SOURCE_NODE_AS_ALIAS
1186 //! An executable node that acts as a source, i.e. it has no predecessors
1187 template < typename Output > class
1188 __TBB_DEPRECATED_MSG("TBB Warning: tbb::flow::source_node is deprecated, use tbb::flow::input_node." )
1189 source_node : public graph_node, public sender< Output > {
1190 public:
1191     //! The type of the output message, which is complete
1192     typedef Output output_type;
1193 
1194     //! The type of successors of this node
1195     typedef typename sender<output_type>::successor_type successor_type;
1196 
1197     //Source node has no input type
1198     typedef null_type input_type;
1199 
1200 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1201     typedef typename sender<output_type>::built_successors_type built_successors_type;
1202     typedef typename sender<output_type>::successor_list_type successor_list_type;
1203 #endif
1204 
1205     //! Constructor for a node with a successor
1206     template< typename Body >
1207      __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
1208         : graph_node(g), my_active(is_active), init_my_active(is_active),
1209         my_body( new internal::source_body_leaf< output_type, Body>(body) ),
1210         my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
1211         my_reserved(false), my_has_cached_item(false)
1212     {
1213         my_successors.set_owner(this);
1214 	tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1215                                            static_cast<sender<output_type> *>(this), this->my_body );
1216     }
1217 
1218 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1219     template <typename Body, typename... Successors>
1220     source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
1221         : source_node(successors.graph_reference(), body, is_active) {
1222         make_edges(*this, successors);
1223     }
1224 #endif
1225 
1226     //! Copy constructor
1227     __TBB_NOINLINE_SYM source_node( const source_node& src ) :
1228         graph_node(src.my_graph), sender<Output>(),
1229         my_active(src.init_my_active),
1230         init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
1231         my_reserved(false), my_has_cached_item(false)
1232     {
1233         my_successors.set_owner(this);
1234         tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1235                                            static_cast<sender<output_type> *>(this), this->my_body );
1236     }
1237 
1238     //! The destructor
1239     ~source_node() { delete my_body; delete my_init_body; }
1240 
1241 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1242     void set_name( const char *name ) __TBB_override {
1243         tbb::internal::fgt_node_desc( this, name );
1244     }
1245 #endif
1246 
1247     //! Add a new successor to this node
1248     bool register_successor( successor_type &r ) __TBB_override {
1249         spin_mutex::scoped_lock lock(my_mutex);
1250         my_successors.register_successor(r);
1251         if ( my_active )
1252             spawn_put();
1253         return true;
1254     }
1255 
1256     //! Removes a successor from this node
1257     bool remove_successor( successor_type &r ) __TBB_override {
1258         spin_mutex::scoped_lock lock(my_mutex);
1259         my_successors.remove_successor(r);
1260         return true;
1261     }
1262 
1263 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1264 
1265     built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1266 
1267     void internal_add_built_successor( successor_type &r) __TBB_override {
1268         spin_mutex::scoped_lock lock(my_mutex);
1269         my_successors.internal_add_built_successor(r);
1270     }
1271 
1272     void internal_delete_built_successor( successor_type &r) __TBB_override {
1273         spin_mutex::scoped_lock lock(my_mutex);
1274         my_successors.internal_delete_built_successor(r);
1275     }
1276 
1277     size_t successor_count() __TBB_override {
1278         spin_mutex::scoped_lock lock(my_mutex);
1279         return my_successors.successor_count();
1280     }
1281 
1282     void copy_successors(successor_list_type &v) __TBB_override {
1283         spin_mutex::scoped_lock l(my_mutex);
1284         my_successors.copy_successors(v);
1285     }
1286 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1287 
1288     //! Request an item from the node
1289     bool try_get( output_type &v ) __TBB_override {
1290         spin_mutex::scoped_lock lock(my_mutex);
1291         if ( my_reserved )
1292             return false;
1293 
1294         if ( my_has_cached_item ) {
1295             v = my_cached_item;
1296             my_has_cached_item = false;
1297             return true;
1298         }
1299         // we've been asked to provide an item, but we have none.  enqueue a task to
1300         // provide one.
1301         spawn_put();
1302         return false;
1303     }
1304 
1305     //! Reserves an item.
1306     bool try_reserve( output_type &v ) __TBB_override {
1307         spin_mutex::scoped_lock lock(my_mutex);
1308         if ( my_reserved ) {
1309             return false;
1310         }
1311 
1312         if ( my_has_cached_item ) {
1313             v = my_cached_item;
1314             my_reserved = true;
1315             return true;
1316         } else {
1317             return false;
1318         }
1319     }
1320 
1321     //! Release a reserved item.
1322     /** true = item has been released and so remains in sender, dest must request or reserve future items */
1323     bool try_release( ) __TBB_override {
1324         spin_mutex::scoped_lock lock(my_mutex);
1325         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1326         my_reserved = false;
1327         if(!my_successors.empty())
1328             spawn_put();
1329         return true;
1330     }
1331 
1332     //! Consumes a reserved item
1333     bool try_consume( ) __TBB_override {
1334         spin_mutex::scoped_lock lock(my_mutex);
1335         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1336         my_reserved = false;
1337         my_has_cached_item = false;
1338         if ( !my_successors.empty() ) {
1339             spawn_put();
1340         }
1341         return true;
1342     }
1343 
1344     //! Activates a node that was created in the inactive state
1345     void activate() {
1346         spin_mutex::scoped_lock lock(my_mutex);
1347         my_active = true;
1348         if (!my_successors.empty())
1349             spawn_put();
1350     }
1351 
1352     template<typename Body>
1353     Body copy_function_object() {
1354         internal::source_body<output_type> &body_ref = *this->my_body;
1355         return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1356     }
1357 
1358 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1359     void extract( ) __TBB_override {
1360         my_successors.built_successors().sender_extract(*this);   // removes "my_owner" == this from each successor
1361         my_active = init_my_active;
1362         my_reserved = false;
1363         if(my_has_cached_item) my_has_cached_item = false;
1364     }
1365 #endif
1366 
1367 protected:
1368 
1369     //! resets the source_node to its initial state
1370     void reset_node( reset_flags f) __TBB_override {
1371         my_active = init_my_active;
1372         my_reserved =false;
1373         if(my_has_cached_item) {
1374             my_has_cached_item = false;
1375         }
1376         if(f & rf_clear_edges) my_successors.clear();
1377         if(f & rf_reset_bodies) {
1378             internal::source_body<output_type> *tmp = my_init_body->clone();
1379             delete my_body;
1380             my_body = tmp;
1381         }
1382         if(my_active)
1383             internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
1384     }
1385 
1386 private:
1387     spin_mutex my_mutex;
1388     bool my_active;
1389     bool init_my_active;
1390     internal::source_body<output_type> *my_body;
1391     internal::source_body<output_type> *my_init_body;
1392     internal::broadcast_cache< output_type > my_successors;
1393     bool my_reserved;
1394     bool my_has_cached_item;
1395     output_type my_cached_item;
1396 
1397     // used by apply_body_bypass, can invoke body of node.
1398     bool try_reserve_apply_body(output_type &v) {
1399         spin_mutex::scoped_lock lock(my_mutex);
1400         if ( my_reserved ) {
1401             return false;
1402         }
1403         if ( !my_has_cached_item ) {
1404             tbb::internal::fgt_begin_body( my_body );
1405             bool r = (*my_body)(my_cached_item);
1406             tbb::internal::fgt_end_body( my_body );
1407             if (r) {
1408                 my_has_cached_item = true;
1409             }
1410         }
1411         if ( my_has_cached_item ) {
1412             v = my_cached_item;
1413             my_reserved = true;
1414             return true;
1415         } else {
1416             return false;
1417         }
1418     }
1419 
1420     // when resetting, and if the source_node was created with my_active == true, then
1421     // when we reset the node we must store a task to run the node, and spawn it only
1422     // after the reset is complete and is_active() is again true.  This is why we don't
1423     // test for is_active() here.
1424     task* create_put_task() {
1425         return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1426                         internal:: source_task_bypass < source_node< output_type > >( *this ) );
1427     }
1428 
1429     //! Spawns a task that applies the body
1430     void spawn_put( ) {
1431         if(internal::is_graph_active(this->my_graph)) {
1432             internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1433         }
1434     }
1435 
1436     friend class internal::source_task_bypass< source_node< output_type > >;
1437     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
1438     task * apply_body_bypass( ) {
1439         output_type v;
1440         if ( !try_reserve_apply_body(v) )
1441             return NULL;
1442 
1443         task *last_task = my_successors.try_put_task(v);
1444         if ( last_task )
1445             try_consume();
1446         else
1447             try_release();
1448         return last_task;
1449     }
1450 };  // class source_node
1451 #endif // TBB_USE_SOURCE_NODE_AS_ALIAS
1452 
1453 //! Implements a function node that supports Input -> Output
1454 template<typename Input, typename Output = continue_msg, typename Policy = queueing,
1455          typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1456 class function_node
1457     : public graph_node
1458 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1459     , public internal::function_input< Input, Output, Policy, Allocator >
1460 #else
1461     , public internal::function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
1462 #endif
1463     , public internal::function_output<Output> {
1464 
1465 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1466     typedef Allocator internals_allocator;
1467 #else
1468     typedef cache_aligned_allocator<Input> internals_allocator;
1469 
1470     __TBB_STATIC_ASSERT(
1471         (tbb::internal::is_same_type<Allocator, null_type>::value),
1472         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1473         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1474     );
1475 #endif
1476 
1477 public:
1478     typedef Input input_type;
1479     typedef Output output_type;
1480     typedef internal::function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
1481     typedef internal::function_input_queue<input_type, internals_allocator> input_queue_type;
1482     typedef internal::function_output<output_type> fOutput_type;
1483     typedef typename input_impl_type::predecessor_type predecessor_type;
1484     typedef typename fOutput_type::successor_type successor_type;
1485 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1486     typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1487     typedef typename fOutput_type::successor_list_type successor_list_type;
1488 #endif
1489     using input_impl_type::my_predecessors;
1490 
1491     //! Constructor
1492     // input_queue_type is allocated here, but destroyed in the function_input_base.
1493     // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1494     // be done in one place.  This would be an interface-breaking change.
1495     template< typename Body >
1496      __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
1497 #if __TBB_CPP11_PRESENT
1498                    Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1499 #else
1500                    __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority ))
1501 #endif
1502         : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1503           fOutput_type(g) {
1504         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1505                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1506     }
1507 
1508 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1509     template <typename Body>
1510     function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
1511         : function_node(g, concurrency, body, Policy(), priority) {}
1512 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1513 
1514 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1515     template <typename Body, typename... Args>
1516     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
1517                    __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1518         : function_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1519         make_edges_in_order(nodes, *this);
1520     }
1521 
1522 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1523     template <typename Body, typename... Args>
1524     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
1525         : function_node(nodes, concurrency, body, Policy(), priority) {}
1526 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1527 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1528 
1529     //! Copy constructor
1530     __TBB_NOINLINE_SYM function_node( const function_node& src ) :
1531         graph_node(src.my_graph),
1532         input_impl_type(src),
1533         fOutput_type(src.my_graph) {
1534         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1535                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1536     }
1537 
1538 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1539     void set_name( const char *name ) __TBB_override {
1540         tbb::internal::fgt_node_desc( this, name );
1541     }
1542 #endif
1543 
1544 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1545     void extract( ) __TBB_override {
1546         my_predecessors.built_predecessors().receiver_extract(*this);
1547         successors().built_successors().sender_extract(*this);
1548     }
1549 #endif
1550 
1551 protected:
1552     template< typename R, typename B > friend class run_and_put_task;
1553     template<typename X, typename Y> friend class internal::broadcast_cache;
1554     template<typename X, typename Y> friend class internal::round_robin_cache;
1555     using input_impl_type::try_put_task;
1556 
1557     internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1558 
1559     void reset_node(reset_flags f) __TBB_override {
1560         input_impl_type::reset_function_input(f);
1561         // TODO: use clear() instead.
1562         if(f & rf_clear_edges) {
1563             successors().clear();
1564             my_predecessors.clear();
1565         }
1566         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1567         __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1568     }
1569 
1570 };  // class function_node
1571 
1572 //! implements a function node that supports Input -> (set of outputs)
1573 // Output is a tuple of output types.
1574 template<typename Input, typename Output, typename Policy = queueing,
1575          typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1576 class multifunction_node :
1577     public graph_node,
1578     public internal::multifunction_input
1579     <
1580         Input,
1581         typename internal::wrap_tuple_elements<
1582             tbb::flow::tuple_size<Output>::value,  // #elements in tuple
1583             internal::multifunction_output,  // wrap this around each element
1584             Output // the tuple providing the types
1585         >::type,
1586         Policy,
1587 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1588         Allocator
1589 #else
1590         cache_aligned_allocator<Input>
1591 #endif
1592     > {
1593 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1594     typedef Allocator internals_allocator;
1595 #else
1596     typedef cache_aligned_allocator<Input> internals_allocator;
1597 
1598     __TBB_STATIC_ASSERT(
1599         (tbb::internal::is_same_type<Allocator, null_type>::value),
1600         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1601         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1602     );
1603 #endif
1604 
1605 protected:
1606     static const int N = tbb::flow::tuple_size<Output>::value;
1607 public:
1608     typedef Input input_type;
1609     typedef null_type output_type;
1610     typedef typename internal::wrap_tuple_elements<N,internal::multifunction_output, Output>::type output_ports_type;
1611     typedef internal::multifunction_input<
1612         input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
1613     typedef internal::function_input_queue<input_type, internals_allocator> input_queue_type;
1614 private:
1615     using input_impl_type::my_predecessors;
1616 public:
1617     template<typename Body>
1618     __TBB_NOINLINE_SYM multifunction_node(
1619         graph &g, size_t concurrency,
1620 #if __TBB_CPP11_PRESENT
1621         Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1622 #else
1623         __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority = tbb::flow::internal::no_priority)
1624 #endif
1625     ) : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1626         tbb::internal::fgt_multioutput_node_with_body<N>(
1627             CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1628             &this->my_graph, static_cast<receiver<input_type> *>(this),
1629             this->output_ports(), this->my_body
1630         );
1631     }
1632 
1633 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1634     template <typename Body>
1635     __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
1636         : multifunction_node(g, concurrency, body, Policy(), priority) {}
1637 #endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1638 
1639 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1640     template <typename Body, typename... Args>
1641     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1642                        __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority))
1643         : multifunction_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1644         make_edges_in_order(nodes, *this);
1645     }
1646 
1647 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1648     template <typename Body, typename... Args>
1649     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
1650         : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
1651 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1652 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1653 
1654     __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
1655         graph_node(other.my_graph), input_impl_type(other) {
1656         tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1657                 &this->my_graph, static_cast<receiver<input_type> *>(this),
1658                 this->output_ports(), this->my_body );
1659     }
1660 
1661 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1662     void set_name( const char *name ) __TBB_override {
1663         tbb::internal::fgt_multioutput_node_desc( this, name );
1664     }
1665 #endif
1666 
1667 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1668     void extract( ) __TBB_override {
1669         my_predecessors.built_predecessors().receiver_extract(*this);
1670         input_impl_type::extract();
1671     }
1672 #endif
1673     // all the guts are in multifunction_input...
1674 protected:
1675     void reset_node(reset_flags f) __TBB_override { input_impl_type::reset(f); }
1676 };  // multifunction_node
1677 
1678 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
1679 //  successors.  The node has unlimited concurrency, so it does not reject inputs.
1680 template<typename TupleType, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(TupleType)>
1681 class split_node : public graph_node, public receiver<TupleType> {
1682     static const int N = tbb::flow::tuple_size<TupleType>::value;
1683     typedef receiver<TupleType> base_type;
1684 public:
1685     typedef TupleType input_type;
1686 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1687     typedef Allocator allocator_type;
1688 #else
1689     __TBB_STATIC_ASSERT(
1690         (tbb::internal::is_same_type<Allocator, null_type>::value),
1691         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1692         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1693     );
1694 #endif
1695 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1696     typedef typename base_type::predecessor_type predecessor_type;
1697     typedef typename base_type::predecessor_list_type predecessor_list_type;
1698     typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1699     typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1700 #endif
1701 
1702     typedef typename internal::wrap_tuple_elements<
1703             N,  // #elements in tuple
1704             internal::multifunction_output,  // wrap this around each element
1705             TupleType // the tuple providing the types
1706         >::type  output_ports_type;
1707 
1708     __TBB_NOINLINE_SYM explicit split_node(graph &g)
1709         : graph_node(g),
1710           my_output_ports(internal::init_output_ports<output_ports_type>::call(g, my_output_ports))
1711     {
1712         tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1713             static_cast<receiver<input_type> *>(this), this->output_ports());
1714     }
1715 
1716 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1717     template <typename... Args>
1718     __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1719         make_edges_in_order(nodes, *this);
1720     }
1721 #endif
1722 
1723     __TBB_NOINLINE_SYM split_node(const split_node& other)
1724         : graph_node(other.my_graph), base_type(other),
1725           my_output_ports(internal::init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
1726     {
1727         tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1728             static_cast<receiver<input_type> *>(this), this->output_ports());
1729     }
1730 
1731 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1732     void set_name( const char *name ) __TBB_override {
1733         tbb::internal::fgt_multioutput_node_desc( this, name );
1734     }
1735 #endif
1736 
1737     output_ports_type &output_ports() { return my_output_ports; }
1738 
1739 protected:
1740     task *try_put_task(const TupleType& t) __TBB_override {
1741         // Sending split messages in parallel is not justified, as overheads would prevail.
1742         // Also, we do not have successors here. So we just tell the task returned here is successful.
1743         return internal::emit_element<N>::emit_this(this->my_graph, t, output_ports());
1744     }
1745     void reset_node(reset_flags f) __TBB_override {
1746         if (f & rf_clear_edges)
1747             internal::clear_element<N>::clear_this(my_output_ports);
1748 
1749         __TBB_ASSERT(!(f & rf_clear_edges) || internal::clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
1750     }
1751     void reset_receiver(reset_flags /*f*/) __TBB_override {}
1752     graph& graph_reference() const __TBB_override {
1753         return my_graph;
1754     }
1755 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1756 private: //! split_node doesn't use this "predecessors" functionality; so, we have "dummies" here;
1757     void extract() __TBB_override {}
1758 
1759     //! Adds to list of predecessors added by make_edge
1760     void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1761 
1762     //! removes from to list of predecessors (used by remove_edge)
1763     void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1764 
1765     size_t predecessor_count() __TBB_override { return 0; }
1766 
1767     void copy_predecessors(predecessor_list_type&) __TBB_override {}
1768 
1769     built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1770 
1771     //! dummy member
1772     built_predecessors_type my_predessors;
1773 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1774 
1775 private:
1776     output_ports_type my_output_ports;
1777 };
1778 
1779 //! Implements an executable node that supports continue_msg -> Output
1780 template <typename Output, typename Policy = internal::Policy<void> >
1781 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1782                       public internal::function_output<Output> {
1783 public:
1784     typedef continue_msg input_type;
1785     typedef Output output_type;
1786     typedef internal::continue_input<Output, Policy> input_impl_type;
1787     typedef internal::function_output<output_type> fOutput_type;
1788     typedef typename input_impl_type::predecessor_type predecessor_type;
1789     typedef typename fOutput_type::successor_type successor_type;
1790 
1791     //! Constructor for executable node with continue_msg -> Output
1792     template <typename Body >
1793     __TBB_NOINLINE_SYM continue_node(
1794         graph &g,
1795 #if __TBB_CPP11_PRESENT
1796         Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1797 #else
1798         __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
1799 #endif
1800     ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
1801         fOutput_type(g) {
1802         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1803 
1804                                            static_cast<receiver<input_type> *>(this),
1805                                            static_cast<sender<output_type> *>(this), this->my_body );
1806     }
1807 
1808 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1809     template <typename Body>
1810     continue_node( graph& g, Body body, node_priority_t priority )
1811         : continue_node(g, body, Policy(), priority) {}
1812 #endif
1813 
1814 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1815     template <typename Body, typename... Args>
1816     continue_node( const node_set<Args...>& nodes, Body body,
1817                    __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority))
1818         : continue_node(nodes.graph_reference(), body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority) ) {
1819         make_edges_in_order(nodes, *this);
1820     }
1821 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1822     template <typename Body, typename... Args>
1823     continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
1824         : continue_node(nodes, body, Policy(), priority) {}
1825 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1826 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1827 
1828     //! Constructor for executable node with continue_msg -> Output
1829     template <typename Body >
1830     __TBB_NOINLINE_SYM continue_node(
1831         graph &g, int number_of_predecessors,
1832 #if __TBB_CPP11_PRESENT
1833         Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1834 #else
1835         __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
1836 #endif
1837     ) : graph_node(g)
1838       , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1839         fOutput_type(g) {
1840         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1841                                            static_cast<receiver<input_type> *>(this),
1842                                            static_cast<sender<output_type> *>(this), this->my_body );
1843     }
1844 
1845 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1846     template <typename Body>
1847     continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
1848         : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
1849 #endif
1850 
1851 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1852     template <typename Body, typename... Args>
1853     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1854                    Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1855         : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1856         make_edges_in_order(nodes, *this);
1857     }
1858 
1859 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1860     template <typename Body, typename... Args>
1861     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1862                    Body body, node_priority_t priority )
1863         : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
1864 #endif
1865 #endif
1866 
1867     //! Copy constructor
1868     __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1869         graph_node(src.my_graph), input_impl_type(src),
1870         internal::function_output<Output>(src.my_graph) {
1871         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1872                                            static_cast<receiver<input_type> *>(this),
1873                                            static_cast<sender<output_type> *>(this), this->my_body );
1874     }
1875 
1876 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1877     void set_name( const char *name ) __TBB_override {
1878         tbb::internal::fgt_node_desc( this, name );
1879     }
1880 #endif
1881 
1882 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1883     void extract() __TBB_override {
1884         input_impl_type::my_built_predecessors.receiver_extract(*this);
1885         successors().built_successors().sender_extract(*this);
1886     }
1887 #endif
1888 
1889 protected:
1890     template< typename R, typename B > friend class run_and_put_task;
1891     template<typename X, typename Y> friend class internal::broadcast_cache;
1892     template<typename X, typename Y> friend class internal::round_robin_cache;
1893     using input_impl_type::try_put_task;
1894     internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1895 
1896     void reset_node(reset_flags f) __TBB_override {
1897         input_impl_type::reset_receiver(f);
1898         if(f & rf_clear_edges)successors().clear();
1899         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1900     }
1901 };  // continue_node
1902 
1903 //! Forwards messages of type T to all successors
1904 template <typename T>
1905 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1906 public:
1907     typedef T input_type;
1908     typedef T output_type;
1909     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1910     typedef typename sender<output_type>::successor_type successor_type;
1911 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1912     typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1913     typedef typename sender<output_type>::successor_list_type successor_list_type;
1914 #endif
1915 private:
1916     internal::broadcast_cache<input_type> my_successors;
1917 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1918     internal::edge_container<predecessor_type> my_built_predecessors;
1919     spin_mutex pred_mutex;  // serialize accesses on edge_container
1920 #endif
1921 public:
1922 
1923     __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g) {
1924         my_successors.set_owner( this );
1925         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1926                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1927     }
1928 
1929 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1930     template <typename... Args>
1931     broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1932         make_edges_in_order(nodes, *this);
1933     }
1934 #endif
1935 
1936     // Copy constructor
1937     __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) :
1938         graph_node(src.my_graph), receiver<T>(), sender<T>()
1939     {
1940         my_successors.set_owner( this );
1941         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1942                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1943     }
1944 
1945 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1946     void set_name( const char *name ) __TBB_override {
1947         tbb::internal::fgt_node_desc( this, name );
1948     }
1949 #endif
1950 
1951     //! Adds a successor
1952     bool register_successor( successor_type &r ) __TBB_override {
1953         my_successors.register_successor( r );
1954         return true;
1955     }
1956 
1957     //! Removes s as a successor
1958     bool remove_successor( successor_type &r ) __TBB_override {
1959         my_successors.remove_successor( r );
1960         return true;
1961     }
1962 
1963 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1964     typedef typename sender<T>::built_successors_type built_successors_type;
1965 
1966     built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1967 
1968     void internal_add_built_successor(successor_type &r) __TBB_override {
1969         my_successors.internal_add_built_successor(r);
1970     }
1971 
1972     void internal_delete_built_successor(successor_type &r) __TBB_override {
1973         my_successors.internal_delete_built_successor(r);
1974     }
1975 
1976     size_t successor_count() __TBB_override {
1977         return my_successors.successor_count();
1978     }
1979 
1980     void copy_successors(successor_list_type &v) __TBB_override {
1981         my_successors.copy_successors(v);
1982     }
1983 
1984     typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1985 
1986     built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1987 
1988     void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1989         spin_mutex::scoped_lock l(pred_mutex);
1990         my_built_predecessors.add_edge(p);
1991     }
1992 
1993     void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1994         spin_mutex::scoped_lock l(pred_mutex);
1995         my_built_predecessors.delete_edge(p);
1996     }
1997 
1998     size_t predecessor_count() __TBB_override {
1999         spin_mutex::scoped_lock l(pred_mutex);
2000         return my_built_predecessors.edge_count();
2001     }
2002 
2003     void copy_predecessors(predecessor_list_type &v) __TBB_override {
2004         spin_mutex::scoped_lock l(pred_mutex);
2005         my_built_predecessors.copy_edges(v);
2006     }
2007 
2008     void extract() __TBB_override {
2009         my_built_predecessors.receiver_extract(*this);
2010         my_successors.built_successors().sender_extract(*this);
2011     }
2012 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2013 
2014 protected:
2015     template< typename R, typename B > friend class run_and_put_task;
2016     template<typename X, typename Y> friend class internal::broadcast_cache;
2017     template<typename X, typename Y> friend class internal::round_robin_cache;
2018     //! build a task to run the successor if possible.  Default is old behavior.
2019     task *try_put_task(const T& t) __TBB_override {
2020         task *new_task = my_successors.try_put_task(t);
2021         if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
2022         return new_task;
2023     }
2024 
2025     graph& graph_reference() const __TBB_override {
2026         return my_graph;
2027     }
2028 
2029     void reset_receiver(reset_flags /*f*/) __TBB_override {}
2030 
2031     void reset_node(reset_flags f) __TBB_override {
2032         if (f&rf_clear_edges) {
2033            my_successors.clear();
2034 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2035            my_built_predecessors.clear();
2036 #endif
2037         }
2038         __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
2039     }
2040 };  // broadcast_node
2041 
2042 //! Forwards messages in arbitrary order
2043 template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2044 class buffer_node
2045     : public graph_node
2046 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2047     , public internal::reservable_item_buffer< T, Allocator >
2048 #else
2049     , public internal::reservable_item_buffer< T, cache_aligned_allocator<T> >
2050 #endif
2051     , public receiver<T>, public sender<T> {
2052 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2053     typedef Allocator internals_allocator;
2054 #else
2055     typedef cache_aligned_allocator<T> internals_allocator;
2056 #endif
2057 public:
2058     typedef T input_type;
2059     typedef T output_type;
2060     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2061     typedef typename sender<output_type>::successor_type successor_type;
2062     typedef buffer_node<T, Allocator> class_type;
2063 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2064     typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2065     typedef typename sender<output_type>::successor_list_type successor_list_type;
2066 #endif
2067 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2068     __TBB_STATIC_ASSERT(
2069         (tbb::internal::is_same_type<Allocator, null_type>::value),
2070         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2071         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2072     );
2073 #endif
2074 
2075 protected:
2076     typedef size_t size_type;
2077     internal::round_robin_cache< T, null_rw_mutex > my_successors;
2078 
2079 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2080     internal::edge_container<predecessor_type> my_built_predecessors;
2081 #endif
2082 
2083     friend class internal::forward_task_bypass< class_type >;
2084 
2085     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
2086 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2087         , add_blt_succ, del_blt_succ,
2088         add_blt_pred, del_blt_pred,
2089         blt_succ_cnt, blt_pred_cnt,
2090         blt_succ_cpy, blt_pred_cpy   // create vector copies of preds and succs
2091 #endif
2092     };
2093 
2094     // implements the aggregator_operation concept
2095     class buffer_operation : public internal::aggregated_operation< buffer_operation > {
2096     public:
2097         char type;
2098 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2099         task * ltask;
2100         union {
2101             input_type *elem;
2102             successor_type *r;
2103             predecessor_type *p;
2104             size_t cnt_val;
2105             successor_list_type *svec;
2106             predecessor_list_type *pvec;
2107         };
2108 #else
2109         T *elem;
2110         task * ltask;
2111         successor_type *r;
2112 #endif
2113         buffer_operation(const T& e, op_type t) : type(char(t))
2114 
2115 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2116                                                   , ltask(NULL), elem(const_cast<T*>(&e))
2117 #else
2118                                                   , elem(const_cast<T*>(&e)) , ltask(NULL)
2119 #endif
2120         {}
2121         buffer_operation(op_type t) : type(char(t)),  ltask(NULL) {}
2122     };
2123 
2124     bool forwarder_busy;
2125     typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
2126     friend class internal::aggregating_functor<class_type, buffer_operation>;
2127     internal::aggregator< handler_type, buffer_operation> my_aggregator;
2128 
2129     virtual void handle_operations(buffer_operation *op_list) {
2130         handle_operations_impl(op_list, this);
2131     }
2132 
2133     template<typename derived_type>
2134     void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
2135         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2136 
2137         buffer_operation *tmp = NULL;
2138         bool try_forwarding = false;
2139         while (op_list) {
2140             tmp = op_list;
2141             op_list = op_list->next;
2142             switch (tmp->type) {
2143             case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
2144             case rem_succ: internal_rem_succ(tmp); break;
2145             case req_item: internal_pop(tmp); break;
2146             case res_item: internal_reserve(tmp); break;
2147             case rel_res:  internal_release(tmp); try_forwarding = true; break;
2148             case con_res:  internal_consume(tmp); try_forwarding = true; break;
2149             case put_item: try_forwarding = internal_push(tmp); break;
2150             case try_fwd_task: internal_forward_task(tmp); break;
2151 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2152             // edge recording
2153             case add_blt_succ: internal_add_built_succ(tmp); break;
2154             case del_blt_succ: internal_del_built_succ(tmp); break;
2155             case add_blt_pred: internal_add_built_pred(tmp); break;
2156             case del_blt_pred: internal_del_built_pred(tmp); break;
2157             case blt_succ_cnt: internal_succ_cnt(tmp); break;
2158             case blt_pred_cnt: internal_pred_cnt(tmp); break;
2159             case blt_succ_cpy: internal_copy_succs(tmp); break;
2160             case blt_pred_cpy: internal_copy_preds(tmp); break;
2161 #endif
2162             }
2163         }
2164 
2165         derived->order();
2166 
2167         if (try_forwarding && !forwarder_busy) {
2168             if(internal::is_graph_active(this->my_graph)) {
2169                 forwarder_busy = true;
2170                 task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
2171                     forward_task_bypass<class_type>(*this);
2172                 // tmp should point to the last item handled by the aggregator.  This is the operation
2173                 // the handling thread enqueued.  So modifying that record will be okay.
2174                 // workaround for icc bug
2175                 tbb::task *z = tmp->ltask;
2176                 graph &g = this->my_graph;
2177                 tmp->ltask = combine_tasks(g, z, new_task);  // in case the op generated a task
2178             }
2179         }
2180     }  // handle_operations
2181 
2182     inline task *grab_forwarding_task( buffer_operation &op_data) {
2183         return op_data.ltask;
2184     }
2185 
2186     inline bool enqueue_forwarding_task(buffer_operation &op_data) {
2187         task *ft = grab_forwarding_task(op_data);
2188         if(ft) {
2189             internal::spawn_in_graph_arena(graph_reference(), *ft);
2190             return true;
2191         }
2192         return false;
2193     }
2194 
2195     //! This is executed by an enqueued task, the "forwarder"
2196     virtual task *forward_task() {
2197         buffer_operation op_data(try_fwd_task);
2198         task *last_task = NULL;
2199         do {
2200             op_data.status = internal::WAIT;
2201             op_data.ltask = NULL;
2202             my_aggregator.execute(&op_data);
2203 
2204             // workaround for icc bug
2205             tbb::task *xtask = op_data.ltask;
2206             graph& g = this->my_graph;
2207             last_task = combine_tasks(g, last_task, xtask);
2208         } while (op_data.status ==internal::SUCCEEDED);
2209         return last_task;
2210     }
2211 
2212     //! Register successor
2213     virtual void internal_reg_succ(buffer_operation *op) {
2214         my_successors.register_successor(*(op->r));
2215         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2216     }
2217 
2218     //! Remove successor
2219     virtual void internal_rem_succ(buffer_operation *op) {
2220         my_successors.remove_successor(*(op->r));
2221         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2222     }
2223 
2224 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2225     typedef typename sender<T>::built_successors_type built_successors_type;
2226 
2227     built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2228 
2229     virtual void internal_add_built_succ(buffer_operation *op) {
2230         my_successors.internal_add_built_successor(*(op->r));
2231         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2232     }
2233 
2234     virtual void internal_del_built_succ(buffer_operation *op) {
2235         my_successors.internal_delete_built_successor(*(op->r));
2236         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2237     }
2238 
2239     typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
2240 
2241     built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
2242 
2243     virtual void internal_add_built_pred(buffer_operation *op) {
2244         my_built_predecessors.add_edge(*(op->p));
2245         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2246     }
2247 
2248     virtual void internal_del_built_pred(buffer_operation *op) {
2249         my_built_predecessors.delete_edge(*(op->p));
2250         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2251     }
2252 
2253     virtual void internal_succ_cnt(buffer_operation *op) {
2254         op->cnt_val = my_successors.successor_count();
2255         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2256     }
2257 
2258     virtual void internal_pred_cnt(buffer_operation *op) {
2259         op->cnt_val = my_built_predecessors.edge_count();
2260         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2261     }
2262 
2263     virtual void internal_copy_succs(buffer_operation *op) {
2264         my_successors.copy_successors(*(op->svec));
2265         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2266     }
2267 
2268     virtual void internal_copy_preds(buffer_operation *op) {
2269         my_built_predecessors.copy_edges(*(op->pvec));
2270         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2271     }
2272 
2273 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2274 
2275 private:
2276     void order() {}
2277 
2278     bool is_item_valid() {
2279         return this->my_item_valid(this->my_tail - 1);
2280     }
2281 
2282     void try_put_and_add_task(task*& last_task) {
2283         task *new_task = my_successors.try_put_task(this->back());
2284         if (new_task) {
2285             // workaround for icc bug
2286             graph& g = this->my_graph;
2287             last_task = combine_tasks(g, last_task, new_task);
2288             this->destroy_back();
2289         }
2290     }
2291 
2292 protected:
2293     //! Tries to forward valid items to successors
2294     virtual void internal_forward_task(buffer_operation *op) {
2295         internal_forward_task_impl(op, this);
2296     }
2297 
2298     template<typename derived_type>
2299     void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
2300         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2301 
2302         if (this->my_reserved || !derived->is_item_valid()) {
2303             __TBB_store_with_release(op->status, internal::FAILED);
2304             this->forwarder_busy = false;
2305             return;
2306         }
2307         // Try forwarding, giving each successor a chance
2308         task * last_task = NULL;
2309         size_type counter = my_successors.size();
2310         for (; counter > 0 && derived->is_item_valid(); --counter)
2311             derived->try_put_and_add_task(last_task);
2312 
2313         op->ltask = last_task;  // return task
2314         if (last_task && !counter) {
2315             __TBB_store_with_release(op->status, internal::SUCCEEDED);
2316         }
2317         else {
2318             __TBB_store_with_release(op->status, internal::FAILED);
2319             forwarder_busy = false;
2320         }
2321     }
2322 
2323     virtual bool internal_push(buffer_operation *op) {
2324         this->push_back(*(op->elem));
2325         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2326         return true;
2327     }
2328 
2329     virtual void internal_pop(buffer_operation *op) {
2330         if(this->pop_back(*(op->elem))) {
2331             __TBB_store_with_release(op->status, internal::SUCCEEDED);
2332         }
2333         else {
2334             __TBB_store_with_release(op->status, internal::FAILED);
2335         }
2336     }
2337 
2338     virtual void internal_reserve(buffer_operation *op) {
2339         if(this->reserve_front(*(op->elem))) {
2340             __TBB_store_with_release(op->status, internal::SUCCEEDED);
2341         }
2342         else {
2343             __TBB_store_with_release(op->status, internal::FAILED);
2344         }
2345     }
2346 
2347     virtual void internal_consume(buffer_operation *op) {
2348         this->consume_front();
2349         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2350     }
2351 
2352     virtual void internal_release(buffer_operation *op) {
2353         this->release_front();
2354         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2355     }
2356 
2357 public:
2358     //! Constructor
2359     __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
2360         : graph_node(g), internal::reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
2361           sender<T>(), forwarder_busy(false)
2362     {
2363         my_successors.set_owner(this);
2364         my_aggregator.initialize_handler(handler_type(this));
2365         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2366                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2367     }
2368 
2369 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2370     template <typename... Args>
2371     buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
2372         make_edges_in_order(nodes, *this);
2373     }
2374 #endif
2375 
2376     //! Copy constructor
2377     __TBB_NOINLINE_SYM buffer_node( const buffer_node& src )
2378         : graph_node(src.my_graph), internal::reservable_item_buffer<T, internals_allocator>(),
2379           receiver<T>(), sender<T>(), forwarder_busy(false)
2380     {
2381         my_successors.set_owner(this);
2382         my_aggregator.initialize_handler(handler_type(this));
2383         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2384                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2385     }
2386 
2387 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2388     void set_name( const char *name ) __TBB_override {
2389         tbb::internal::fgt_node_desc( this, name );
2390     }
2391 #endif
2392 
2393     //
2394     // message sender implementation
2395     //
2396 
2397     //! Adds a new successor.
2398     /** Adds successor r to the list of successors; may forward tasks.  */
2399     bool register_successor( successor_type &r ) __TBB_override {
2400         buffer_operation op_data(reg_succ);
2401         op_data.r = &r;
2402         my_aggregator.execute(&op_data);
2403         (void)enqueue_forwarding_task(op_data);
2404         return true;
2405     }
2406 
2407 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2408     void internal_add_built_successor( successor_type &r) __TBB_override {
2409         buffer_operation op_data(add_blt_succ);
2410         op_data.r = &r;
2411         my_aggregator.execute(&op_data);
2412     }
2413 
2414     void internal_delete_built_successor( successor_type &r) __TBB_override {
2415         buffer_operation op_data(del_blt_succ);
2416         op_data.r = &r;
2417         my_aggregator.execute(&op_data);
2418     }
2419 
2420     void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
2421         buffer_operation op_data(add_blt_pred);
2422         op_data.p = &p;
2423         my_aggregator.execute(&op_data);
2424     }
2425 
2426     void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
2427         buffer_operation op_data(del_blt_pred);
2428         op_data.p = &p;
2429         my_aggregator.execute(&op_data);
2430     }
2431 
2432     size_t predecessor_count() __TBB_override {
2433         buffer_operation op_data(blt_pred_cnt);
2434         my_aggregator.execute(&op_data);
2435         return op_data.cnt_val;
2436     }
2437 
2438     size_t successor_count() __TBB_override {
2439         buffer_operation op_data(blt_succ_cnt);
2440         my_aggregator.execute(&op_data);
2441         return op_data.cnt_val;
2442     }
2443 
2444     void copy_predecessors( predecessor_list_type &v ) __TBB_override {
2445         buffer_operation op_data(blt_pred_cpy);
2446         op_data.pvec = &v;
2447         my_aggregator.execute(&op_data);
2448     }
2449 
2450     void copy_successors( successor_list_type &v ) __TBB_override {
2451         buffer_operation op_data(blt_succ_cpy);
2452         op_data.svec = &v;
2453         my_aggregator.execute(&op_data);
2454     }
2455 
2456 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2457 
2458     //! Removes a successor.
2459     /** Removes successor r from the list of successors.
2460         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
2461     bool remove_successor( successor_type &r ) __TBB_override {
2462         r.remove_predecessor(*this);
2463         buffer_operation op_data(rem_succ);
2464         op_data.r = &r;
2465         my_aggregator.execute(&op_data);
2466         // even though this operation does not cause a forward, if we are the handler, and
2467         // a forward is scheduled, we may be the first to reach this point after the aggregator,
2468         // and so should check for the task.
2469         (void)enqueue_forwarding_task(op_data);
2470         return true;
2471     }
2472 
2473     //! Request an item from the buffer_node
2474     /**  true = v contains the returned item<BR>
2475          false = no item has been returned */
2476     bool try_get( T &v ) __TBB_override {
2477         buffer_operation op_data(req_item);
2478         op_data.elem = &v;
2479         my_aggregator.execute(&op_data);
2480         (void)enqueue_forwarding_task(op_data);
2481         return (op_data.status==internal::SUCCEEDED);
2482     }
2483 
2484     //! Reserves an item.
2485     /**  false = no item can be reserved<BR>
2486          true = an item is reserved */
2487     bool try_reserve( T &v ) __TBB_override {
2488         buffer_operation op_data(res_item);
2489         op_data.elem = &v;
2490         my_aggregator.execute(&op_data);
2491         (void)enqueue_forwarding_task(op_data);
2492         return (op_data.status==internal::SUCCEEDED);
2493     }
2494 
2495     //! Release a reserved item.
2496     /**  true = item has been released and so remains in sender */
2497     bool try_release() __TBB_override {
2498         buffer_operation op_data(rel_res);
2499         my_aggregator.execute(&op_data);
2500         (void)enqueue_forwarding_task(op_data);
2501         return true;
2502     }
2503 
2504     //! Consumes a reserved item.
2505     /** true = item is removed from sender and reservation removed */
2506     bool try_consume() __TBB_override {
2507         buffer_operation op_data(con_res);
2508         my_aggregator.execute(&op_data);
2509         (void)enqueue_forwarding_task(op_data);
2510         return true;
2511     }
2512 
2513 protected:
2514 
2515     template< typename R, typename B > friend class run_and_put_task;
2516     template<typename X, typename Y> friend class internal::broadcast_cache;
2517     template<typename X, typename Y> friend class internal::round_robin_cache;
2518     //! receive an item, return a task *if possible
2519     task *try_put_task(const T &t) __TBB_override {
2520         buffer_operation op_data(t, put_item);
2521         my_aggregator.execute(&op_data);
2522         task *ft = grab_forwarding_task(op_data);
2523         // sequencer_nodes can return failure (if an item has been previously inserted)
2524         // We have to spawn the returned task if our own operation fails.
2525 
2526         if(ft && op_data.status ==internal::FAILED) {
2527             // we haven't succeeded queueing the item, but for some reason the
2528             // call returned a task (if another request resulted in a successful
2529             // forward this could happen.)  Queue the task and reset the pointer.
2530             internal::spawn_in_graph_arena(graph_reference(), *ft); ft = NULL;
2531         }
2532         else if(!ft && op_data.status ==internal::SUCCEEDED) {
2533             ft = SUCCESSFULLY_ENQUEUED;
2534         }
2535         return ft;
2536     }
2537 
2538     graph& graph_reference() const __TBB_override {
2539         return my_graph;
2540     }
2541 
2542     void reset_receiver(reset_flags /*f*/) __TBB_override { }
2543 
2544 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2545 public:
2546     void extract() __TBB_override {
2547         my_built_predecessors.receiver_extract(*this);
2548         my_successors.built_successors().sender_extract(*this);
2549     }
2550 #endif
2551 
2552 protected:
2553     void reset_node( reset_flags f) __TBB_override {
2554         internal::reservable_item_buffer<T, internals_allocator>::reset();
2555         // TODO: just clear structures
2556         if (f&rf_clear_edges) {
2557             my_successors.clear();
2558 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2559             my_built_predecessors.clear();
2560 #endif
2561         }
2562         forwarder_busy = false;
2563     }
2564 };  // buffer_node
2565 
2566 //! Forwards messages in FIFO order
2567 template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2568 class queue_node : public buffer_node<T, Allocator> {
2569 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2570     __TBB_STATIC_ASSERT(
2571         (tbb::internal::is_same_type<Allocator, null_type>::value),
2572         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2573         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2574     );
2575 #endif
2576 protected:
2577     typedef buffer_node<T, Allocator> base_type;
2578     typedef typename base_type::size_type size_type;
2579     typedef typename base_type::buffer_operation queue_operation;
2580     typedef queue_node class_type;
2581 
2582 private:
2583     template<typename, typename> friend class buffer_node;
2584 
2585     bool is_item_valid() {
2586         return this->my_item_valid(this->my_head);
2587     }
2588 
2589     void try_put_and_add_task(task*& last_task) {
2590         task *new_task = this->my_successors.try_put_task(this->front());
2591         if (new_task) {
2592             // workaround for icc bug
2593             graph& graph_ref = this->graph_reference();
2594             last_task = combine_tasks(graph_ref, last_task, new_task);
2595             this->destroy_front();
2596         }
2597     }
2598 
2599 protected:
2600     void internal_forward_task(queue_operation *op) __TBB_override {
2601         this->internal_forward_task_impl(op, this);
2602     }
2603 
2604     void internal_pop(queue_operation *op) __TBB_override {
2605         if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2606             __TBB_store_with_release(op->status, internal::FAILED);
2607         }
2608         else {
2609             this->pop_front(*(op->elem));
2610             __TBB_store_with_release(op->status, internal::SUCCEEDED);
2611         }
2612     }
2613     void internal_reserve(queue_operation *op) __TBB_override {
2614         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2615             __TBB_store_with_release(op->status, internal::FAILED);
2616         }
2617         else {
2618             this->reserve_front(*(op->elem));
2619             __TBB_store_with_release(op->status, internal::SUCCEEDED);
2620         }
2621     }
2622     void internal_consume(queue_operation *op) __TBB_override {
2623         this->consume_front();
2624         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2625     }
2626 
2627 public:
2628     typedef T input_type;
2629     typedef T output_type;
2630     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2631     typedef typename sender<output_type>::successor_type successor_type;
2632 
2633     //! Constructor
2634     __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
2635         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2636                                  static_cast<receiver<input_type> *>(this),
2637                                  static_cast<sender<output_type> *>(this) );
2638     }
2639 
2640 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2641     template <typename... Args>
2642     queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
2643         make_edges_in_order(nodes, *this);
2644     }
2645 #endif
2646 
2647     //! Copy constructor
2648     __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
2649         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2650                                  static_cast<receiver<input_type> *>(this),
2651                                  static_cast<sender<output_type> *>(this) );
2652     }
2653 
2654 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2655     void set_name( const char *name ) __TBB_override {
2656         tbb::internal::fgt_node_desc( this, name );
2657     }
2658 #endif
2659 
2660 protected:
2661     void reset_node( reset_flags f) __TBB_override {
2662         base_type::reset_node(f);
2663     }
2664 };  // queue_node
2665 
2666 //! Forwards messages in sequence order
2667 template< typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2668 class sequencer_node : public queue_node<T, Allocator> {
2669     internal::function_body< T, size_t > *my_sequencer;
2670     // my_sequencer should be a benign function and must be callable
2671     // from a parallel context.  Does this mean it needn't be reset?
2672 public:
2673 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2674     __TBB_STATIC_ASSERT(
2675         (tbb::internal::is_same_type<Allocator, null_type>::value),
2676         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2677         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2678     );
2679 #endif
2680     typedef T input_type;
2681     typedef T output_type;
2682     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2683     typedef typename sender<output_type>::successor_type successor_type;
2684 
2685     //! Constructor
2686     template< typename Sequencer >
2687     __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, Allocator>(g),
2688         my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2689         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2690                                  static_cast<receiver<input_type> *>(this),
2691                                  static_cast<sender<output_type> *>(this) );
2692     }
2693 
2694 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2695     template <typename Sequencer, typename... Args>
2696     sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
2697         : sequencer_node(nodes.graph_reference(), s) {
2698         make_edges_in_order(nodes, *this);
2699     }
2700 #endif
2701 
2702     //! Copy constructor
2703     __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T, Allocator>(src),
2704         my_sequencer( src.my_sequencer->clone() ) {
2705         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2706                                  static_cast<receiver<input_type> *>(this),
2707                                  static_cast<sender<output_type> *>(this) );
2708     }
2709 
2710     //! Destructor
2711     ~sequencer_node() { delete my_sequencer; }
2712 
2713 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2714     void set_name( const char *name ) __TBB_override {
2715         tbb::internal::fgt_node_desc( this, name );
2716     }
2717 #endif
2718 
2719 protected:
2720     typedef typename buffer_node<T, Allocator>::size_type size_type;
2721     typedef typename buffer_node<T, Allocator>::buffer_operation sequencer_operation;
2722 
2723 private:
2724     bool internal_push(sequencer_operation *op) __TBB_override {
2725         size_type tag = (*my_sequencer)(*(op->elem));
2726 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2727         if (tag < this->my_head) {
2728             // have already emitted a message with this tag
2729             __TBB_store_with_release(op->status, internal::FAILED);
2730             return false;
2731         }
2732 #endif
2733         // cannot modify this->my_tail now; the buffer would be inconsistent.
2734         size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2735 
2736         if (this->size(new_tail) > this->capacity()) {
2737             this->grow_my_array(this->size(new_tail));
2738         }
2739         this->my_tail = new_tail;
2740 
2741         const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2742         __TBB_store_with_release(op->status, res);
2743         return res ==internal::SUCCEEDED;
2744     }
2745 };  // sequencer_node
2746 
2747 //! Forwards messages in priority order
2748 template<typename T, typename Compare = std::less<T>, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T)>
2749 class priority_queue_node : public buffer_node<T, Allocator> {
2750 public:
2751 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2752     __TBB_STATIC_ASSERT(
2753         (tbb::internal::is_same_type<Allocator, null_type>::value),
2754         "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
2755         "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
2756     );
2757 #endif
2758     typedef T input_type;
2759     typedef T output_type;
2760     typedef buffer_node<T,Allocator> base_type;
2761     typedef priority_queue_node class_type;
2762     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2763     typedef typename sender<output_type>::successor_type successor_type;
2764 
2765     //! Constructor
2766     __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
2767         : buffer_node<T, Allocator>(g), compare(comp), mark(0) {
2768         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2769                                  static_cast<receiver<input_type> *>(this),
2770                                  static_cast<sender<output_type> *>(this) );
2771     }
2772 
2773 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2774     template <typename... Args>
2775     priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
2776         : priority_queue_node(nodes.graph_reference(), comp) {
2777         make_edges_in_order(nodes, *this);
2778     }
2779 #endif
2780 
2781     //! Copy constructor
2782     __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
2783         : buffer_node<T, Allocator>(src), mark(0)
2784     {
2785         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2786                                  static_cast<receiver<input_type> *>(this),
2787                                  static_cast<sender<output_type> *>(this) );
2788     }
2789 
2790 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2791     void set_name( const char *name ) __TBB_override {
2792         tbb::internal::fgt_node_desc( this, name );
2793     }
2794 #endif
2795 
2796 protected:
2797 
2798     void reset_node( reset_flags f) __TBB_override {
2799         mark = 0;
2800         base_type::reset_node(f);
2801     }
2802 
2803     typedef typename buffer_node<T, Allocator>::size_type size_type;
2804     typedef typename buffer_node<T, Allocator>::item_type item_type;
2805     typedef typename buffer_node<T, Allocator>::buffer_operation prio_operation;
2806 
2807     //! Tries to forward valid items to successors
2808     void internal_forward_task(prio_operation *op) __TBB_override {
2809         this->internal_forward_task_impl(op, this);
2810     }
2811 
2812     void handle_operations(prio_operation *op_list) __TBB_override {
2813         this->handle_operations_impl(op_list, this);
2814     }
2815 
2816     bool internal_push(prio_operation *op) __TBB_override {
2817         prio_push(*(op->elem));
2818         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2819         return true;
2820     }
2821 
2822     void internal_pop(prio_operation *op) __TBB_override {
2823         // if empty or already reserved, don't pop
2824         if ( this->my_reserved == true || this->my_tail == 0 ) {
2825             __TBB_store_with_release(op->status, internal::FAILED);
2826             return;
2827         }
2828 
2829         *(op->elem) = prio();
2830         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2831         prio_pop();
2832 
2833     }
2834 
2835     // pops the highest-priority item, saves copy
2836     void internal_reserve(prio_operation *op) __TBB_override {
2837         if (this->my_reserved == true || this->my_tail == 0) {
2838             __TBB_store_with_release(op->status, internal::FAILED);
2839             return;
2840         }
2841         this->my_reserved = true;
2842         *(op->elem) = prio();
2843         reserved_item = *(op->elem);
2844         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2845         prio_pop();
2846     }
2847 
2848     void internal_consume(prio_operation *op) __TBB_override {
2849         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2850         this->my_reserved = false;
2851         reserved_item = input_type();
2852     }
2853 
2854     void internal_release(prio_operation *op) __TBB_override {
2855         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2856         prio_push(reserved_item);
2857         this->my_reserved = false;
2858         reserved_item = input_type();
2859     }
2860 
2861 private:
2862     template<typename, typename> friend class buffer_node;
2863 
2864     void order() {
2865         if (mark < this->my_tail) heapify();
2866         __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2867     }
2868 
2869     bool is_item_valid() {
2870         return this->my_tail > 0;
2871     }
2872 
2873     void try_put_and_add_task(task*& last_task) {
2874         task * new_task = this->my_successors.try_put_task(this->prio());
2875         if (new_task) {
2876             // workaround for icc bug
2877             graph& graph_ref = this->graph_reference();
2878             last_task = combine_tasks(graph_ref, last_task, new_task);
2879             prio_pop();
2880         }
2881     }
2882 
2883 private:
2884     Compare compare;
2885     size_type mark;
2886 
2887     input_type reserved_item;
2888 
2889     // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2890     bool prio_use_tail() {
2891         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2892         return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2893     }
2894 
2895     // prio_push: checks that the item will fit, expand array if necessary, put at end
2896     void prio_push(const T &src) {
2897         if ( this->my_tail >= this->my_array_size )
2898             this->grow_my_array( this->my_tail + 1 );
2899         (void) this->place_item(this->my_tail, src);
2900         ++(this->my_tail);
2901         __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2902     }
2903 
2904     // prio_pop: deletes highest priority item from the array, and if it is item
2905     // 0, move last item to 0 and reheap.  If end of array, just destroy and decrement tail
2906     // and mark.  Assumes the array has already been tested for emptiness; no failure.
2907     void prio_pop()  {
2908         if (prio_use_tail()) {
2909             // there are newly pushed elements; last one higher than top
2910             // copy the data
2911             this->destroy_item(this->my_tail-1);
2912             --(this->my_tail);
2913             __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2914             return;
2915         }
2916         this->destroy_item(0);
2917         if(this->my_tail > 1) {
2918             // push the last element down heap
2919             __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2920             this->move_item(0,this->my_tail - 1);
2921         }
2922         --(this->my_tail);
2923         if(mark > this->my_tail) --mark;
2924         if (this->my_tail > 1) // don't reheap for heap of size 1
2925             reheap();
2926         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2927     }
2928 
2929     const T& prio() {
2930         return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2931     }
2932 
2933     // turn array into heap
2934     void heapify() {
2935         if(this->my_tail == 0) {
2936             mark = 0;
2937             return;
2938         }
2939         if (!mark) mark = 1;
2940         for (; mark<this->my_tail; ++mark) { // for each unheaped element
2941             size_type cur_pos = mark;
2942             input_type to_place;
2943             this->fetch_item(mark,to_place);
2944             do { // push to_place up the heap
2945                 size_type parent = (cur_pos-1)>>1;
2946                 if (!compare(this->get_my_item(parent), to_place))
2947                     break;
2948                 this->move_item(cur_pos, parent);
2949                 cur_pos = parent;
2950             } while( cur_pos );
2951             (void) this->place_item(cur_pos, to_place);
2952         }
2953     }
2954 
2955     // otherwise heapified array with new root element; rearrange to heap
2956     void reheap() {
2957         size_type cur_pos=0, child=1;
2958         while (child < mark) {
2959             size_type target = child;
2960             if (child+1<mark &&
2961                 compare(this->get_my_item(child),
2962                         this->get_my_item(child+1)))
2963                 ++target;
2964             // target now has the higher priority child
2965             if (compare(this->get_my_item(target),
2966                         this->get_my_item(cur_pos)))
2967                 break;
2968             // swap
2969             this->swap_items(cur_pos, target);
2970             cur_pos = target;
2971             child = (cur_pos<<1)+1;
2972         }
2973     }
2974 };  // priority_queue_node
2975 
2976 } // interfaceX
2977 
2978 namespace interface11 {
2979 
2980 //! Forwards messages only if the threshold has not been reached
2981 /** This node forwards items until its threshold is reached.
2982     It contains no buffering.  If the downstream node rejects, the
2983     message is dropped. */
2984 template< typename T, typename DecrementType=continue_msg >
2985 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2986 public:
2987     typedef T input_type;
2988     typedef T output_type;
2989     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2990     typedef typename sender<output_type>::successor_type successor_type;
2991 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2992     typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2993     typedef typename sender<output_type>::built_successors_type built_successors_type;
2994     typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2995     typedef typename sender<output_type>::successor_list_type successor_list_type;
2996 #endif
2997     //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2998 
2999 private:
3000     size_t my_threshold;
3001     size_t my_count; //number of successful puts
3002     size_t my_tries; //number of active put attempts
3003     internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors;
3004     spin_mutex my_mutex;
3005     internal::broadcast_cache< T > my_successors;
3006     __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
3007 
3008     friend class internal::forward_task_bypass< limiter_node<T,DecrementType> >;
3009 
3010     // Let decrementer call decrement_counter()
3011     friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
3012 
3013     bool check_conditions() {  // always called under lock
3014         return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
3015     }
3016 
3017     // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
3018     task *forward_task() {
3019         input_type v;
3020         task *rval = NULL;
3021         bool reserved = false;
3022             {
3023                 spin_mutex::scoped_lock lock(my_mutex);
3024                 if ( check_conditions() )
3025                     ++my_tries;
3026                 else
3027                     return NULL;
3028             }
3029 
3030         //SUCCESS
3031         // if we can reserve and can put, we consume the reservation
3032         // we increment the count and decrement the tries
3033         if ( (my_predecessors.try_reserve(v)) == true ){
3034             reserved=true;
3035             if ( (rval = my_successors.try_put_task(v)) != NULL ){
3036                 {
3037                     spin_mutex::scoped_lock lock(my_mutex);
3038                     ++my_count;
3039                     --my_tries;
3040                     my_predecessors.try_consume();
3041                     if ( check_conditions() ) {
3042                         if ( internal::is_graph_active(this->my_graph) ) {
3043                             task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3044                                 internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3045                             internal::spawn_in_graph_arena(graph_reference(), *rtask);
3046                         }
3047                     }
3048                 }
3049                 return rval;
3050             }
3051         }
3052         //FAILURE
3053         //if we can't reserve, we decrement the tries
3054         //if we can reserve but can't put, we decrement the tries and release the reservation
3055         {
3056             spin_mutex::scoped_lock lock(my_mutex);
3057             --my_tries;
3058             if (reserved) my_predecessors.try_release();
3059             if ( check_conditions() ) {
3060                 if ( internal::is_graph_active(this->my_graph) ) {
3061                     task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3062                         internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3063                     __TBB_ASSERT(!rval, "Have two tasks to handle");
3064                     return rtask;
3065                 }
3066             }
3067             return rval;
3068         }
3069     }
3070 
3071     void forward() {
3072         __TBB_ASSERT(false, "Should never be called");
3073         return;
3074     }
3075 
3076     task* decrement_counter( long long delta ) {
3077         {
3078             spin_mutex::scoped_lock lock(my_mutex);
3079             if( delta > 0 && size_t(delta) > my_count )
3080                 my_count = 0;
3081             else if( delta < 0 && size_t(delta) > my_threshold - my_count )
3082                 my_count = my_threshold;
3083             else
3084                 my_count -= size_t(delta); // absolute value of delta is sufficiently small
3085         }
3086         return forward_task();
3087     }
3088 
3089     void initialize() {
3090         my_predecessors.set_owner(this);
3091         my_successors.set_owner(this);
3092         decrement.set_owner(this);
3093         tbb::internal::fgt_node(
3094             CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
3095             static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
3096             static_cast<sender<output_type> *>(this)
3097         );
3098     }
3099 public:
3100     //! The internal receiver< DecrementType > that decrements the count
3101     internal::decrementer< limiter_node<T, DecrementType>, DecrementType > decrement;
3102 
3103 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
3104     __TBB_STATIC_ASSERT( (tbb::internal::is_same_type<DecrementType, continue_msg>::value),
3105                          "Deprecated interface of the limiter node can be used only in conjunction "
3106                          "with continue_msg as the type of DecrementType template parameter." );
3107 #endif // Check for incompatible interface
3108 
3109     //! Constructor
3110     limiter_node(graph &g,
3111                  __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
3112         : graph_node(g), my_threshold(threshold), my_count(0),
3113           __TBB_DEPRECATED_LIMITER_ARG4(
3114               my_tries(0), decrement(),
3115               init_decrement_predecessors(num_decrement_predecessors),
3116               decrement(num_decrement_predecessors)) {
3117         initialize();
3118     }
3119 
3120 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3121     template <typename... Args>
3122     limiter_node(const node_set<Args...>& nodes, size_t threshold)
3123         : limiter_node(nodes.graph_reference(), threshold) {
3124         make_edges_in_order(nodes, *this);
3125     }
3126 #endif
3127 
3128     //! Copy constructor
3129     limiter_node( const limiter_node& src ) :
3130         graph_node(src.my_graph), receiver<T>(), sender<T>(),
3131         my_threshold(src.my_threshold), my_count(0),
3132         __TBB_DEPRECATED_LIMITER_ARG4(
3133             my_tries(0), decrement(),
3134             init_decrement_predecessors(src.init_decrement_predecessors),
3135             decrement(src.init_decrement_predecessors)) {
3136         initialize();
3137     }
3138 
3139 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3140     void set_name( const char *name ) __TBB_override {
3141         tbb::internal::fgt_node_desc( this, name );
3142     }
3143 #endif
3144 
3145     //! Replace the current successor with this new successor
3146     bool register_successor( successor_type &r ) __TBB_override {
3147         spin_mutex::scoped_lock lock(my_mutex);
3148         bool was_empty = my_successors.empty();
3149         my_successors.register_successor(r);
3150         //spawn a forward task if this is the only successor
3151         if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
3152             if ( internal::is_graph_active(this->my_graph) ) {
3153                 task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3154                     internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
3155                 internal::spawn_in_graph_arena(graph_reference(), *task);
3156             }
3157         }
3158         return true;
3159     }
3160 
3161     //! Removes a successor from this node
3162     /** r.remove_predecessor(*this) is also called. */
3163     bool remove_successor( successor_type &r ) __TBB_override {
3164         r.remove_predecessor(*this);
3165         my_successors.remove_successor(r);
3166         return true;
3167     }
3168 
3169 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3170     built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3171     built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
3172 
3173     void internal_add_built_successor(successor_type &src) __TBB_override {
3174         my_successors.internal_add_built_successor(src);
3175     }
3176 
3177     void internal_delete_built_successor(successor_type &src) __TBB_override {
3178         my_successors.internal_delete_built_successor(src);
3179     }
3180 
3181     size_t successor_count() __TBB_override { return my_successors.successor_count(); }
3182 
3183     void copy_successors(successor_list_type &v) __TBB_override {
3184         my_successors.copy_successors(v);
3185     }
3186 
3187     void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
3188         my_predecessors.internal_add_built_predecessor(src);
3189     }
3190 
3191     void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
3192         my_predecessors.internal_delete_built_predecessor(src);
3193     }
3194 
3195     size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
3196 
3197     void copy_predecessors(predecessor_list_type &v) __TBB_override {
3198         my_predecessors.copy_predecessors(v);
3199     }
3200 
3201     void extract() __TBB_override {
3202         my_count = 0;
3203         my_successors.built_successors().sender_extract(*this);
3204         my_predecessors.built_predecessors().receiver_extract(*this);
3205         decrement.built_predecessors().receiver_extract(decrement);
3206     }
3207 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3208 
3209     //! Adds src to the list of cached predecessors.
3210     bool register_predecessor( predecessor_type &src ) __TBB_override {
3211         spin_mutex::scoped_lock lock(my_mutex);
3212         my_predecessors.add( src );
3213         if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
3214             task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3215                 internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
3216             internal::spawn_in_graph_arena(graph_reference(), *task);
3217         }
3218         return true;
3219     }
3220 
3221     //! Removes src from the list of cached predecessors.
3222     bool remove_predecessor( predecessor_type &src ) __TBB_override {
3223         my_predecessors.remove( src );
3224         return true;
3225     }
3226 
3227 protected:
3228 
3229     template< typename R, typename B > friend class run_and_put_task;
3230     template<typename X, typename Y> friend class internal::broadcast_cache;
3231     template<typename X, typename Y> friend class internal::round_robin_cache;
3232     //! Puts an item to this receiver
3233     task *try_put_task( const T &t ) __TBB_override {
3234         {
3235             spin_mutex::scoped_lock lock(my_mutex);
3236             if ( my_count + my_tries >= my_threshold )
3237                 return NULL;
3238             else
3239                 ++my_tries;
3240         }
3241 
3242         task * rtask = my_successors.try_put_task(t);
3243 
3244         if ( !rtask ) {  // try_put_task failed.
3245             spin_mutex::scoped_lock lock(my_mutex);
3246             --my_tries;
3247             if (check_conditions() && internal::is_graph_active(this->my_graph)) {
3248                 rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3249                     internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3250             }
3251         }
3252         else {
3253             spin_mutex::scoped_lock lock(my_mutex);
3254             ++my_count;
3255             --my_tries;
3256              }
3257         return rtask;
3258     }
3259 
3260     graph& graph_reference() const __TBB_override { return my_graph; }
3261 
3262     void reset_receiver(reset_flags /*f*/) __TBB_override {
3263         __TBB_ASSERT(false,NULL);  // should never be called
3264     }
3265 
3266     void reset_node( reset_flags f) __TBB_override {
3267         my_count = 0;
3268         if(f & rf_clear_edges) {
3269             my_predecessors.clear();
3270             my_successors.clear();
3271         }
3272         else
3273         {
3274             my_predecessors.reset( );
3275         }
3276         decrement.reset_receiver(f);
3277     }
3278 };  // limiter_node
3279 
3280 #include "internal/_flow_graph_join_impl.h"
3281 
3282 using internal::reserving_port;
3283 using internal::queueing_port;
3284 using internal::key_matching_port;
3285 using internal::input_port;
3286 using internal::tag_value;
3287 
3288 template<typename OutputTuple, typename JP=queueing> class join_node;
3289 
3290 template<typename OutputTuple>
3291 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
3292 private:
3293     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3294     typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
3295 public:
3296     typedef OutputTuple output_type;
3297     typedef typename unfolded_type::input_ports_type input_ports_type;
3298      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
3299         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3300                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
3301     }
3302 
3303 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3304     template <typename... Args>
3305     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
3306         make_edges_in_order(nodes, *this);
3307     }
3308 #endif
3309 
3310     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3311         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3312                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
3313     }
3314 
3315 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3316     void set_name( const char *name ) __TBB_override {
3317         tbb::internal::fgt_node_desc( this, name );
3318     }
3319 #endif
3320 
3321 };
3322 
3323 template<typename OutputTuple>
3324 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
3325 private:
3326     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3327     typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
3328 public:
3329     typedef OutputTuple output_type;
3330     typedef typename unfolded_type::input_ports_type input_ports_type;
3331      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
3332         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3333                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
3334     }
3335 
3336 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3337     template <typename... Args>
3338     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
3339         make_edges_in_order(nodes, *this);
3340     }
3341 #endif
3342 
3343     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3344         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3345                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
3346     }
3347 
3348 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3349     void set_name( const char *name ) __TBB_override {
3350         tbb::internal::fgt_node_desc( this, name );
3351     }
3352 #endif
3353 
3354 };
3355 
3356 // template for key_matching join_node
3357 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
3358 template<typename OutputTuple, typename K, typename KHash>
3359 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
3360       key_matching_port, OutputTuple, key_matching<K,KHash> > {
3361 private:
3362     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3363     typedef typename internal::unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
3364 public:
3365     typedef OutputTuple output_type;
3366     typedef typename unfolded_type::input_ports_type input_ports_type;
3367 
3368 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
3369     join_node(graph &g) : unfolded_type(g) {}
3370 
3371 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3372     template <typename... Args>
3373     join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
3374         : join_node(nodes.graph_reference()) {
3375         make_edges_in_order(nodes, *this);
3376     }
3377 #endif
3378 
3379 #endif  /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
3380 
3381     template<typename __TBB_B0, typename __TBB_B1>
3382      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
3383         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3384                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3385     }
3386     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3387      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3388         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3389                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3390     }
3391     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3392      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3393         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3394                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3395     }
3396     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3397      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3398             unfolded_type(g, b0, b1, b2, b3, b4) {
3399         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3400                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3401     }
3402 #if __TBB_VARIADIC_MAX >= 6
3403     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3404         typename __TBB_B5>
3405      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3406             unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3407         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3408                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3409     }
3410 #endif
3411 #if __TBB_VARIADIC_MAX >= 7
3412     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3413         typename __TBB_B5, typename __TBB_B6>
3414      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3415             unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3416         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3417                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3418     }
3419 #endif
3420 #if __TBB_VARIADIC_MAX >= 8
3421     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3422         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3423      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3424             __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3425         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3426                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3427     }
3428 #endif
3429 #if __TBB_VARIADIC_MAX >= 9
3430     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3431         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3432      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3433             __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3434         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3435                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3436     }
3437 #endif
3438 #if __TBB_VARIADIC_MAX >= 10
3439     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3440         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3441      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3442             __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3443         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3444                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3445     }
3446 #endif
3447 
3448 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3449     template <typename... Args, typename... Bodies>
3450     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
3451         : join_node(nodes.graph_reference(), bodies...) {
3452         make_edges_in_order(nodes, *this);
3453     }
3454 #endif
3455 
3456     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3457         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3458                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3459     }
3460 
3461 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3462     void set_name( const char *name ) __TBB_override {
3463         tbb::internal::fgt_node_desc( this, name );
3464     }
3465 #endif
3466 
3467 };
3468 
3469 // indexer node
3470 #include "internal/_flow_graph_indexer_impl.h"
3471 
3472 // TODO: Implement interface with variadic template or tuple
3473 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3474                       typename T4=null_type, typename T5=null_type, typename T6=null_type,
3475                       typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3476 
3477 //indexer node specializations
3478 template<typename T0>
3479 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3480 private:
3481     static const int N = 1;
3482 public:
3483     typedef tuple<T0> InputTuple;
3484     typedef typename internal::tagged_msg<size_t, T0> output_type;
3485     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3486     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3487         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3488                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3489     }
3490 
3491 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3492     template <typename... Args>
3493     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3494         make_edges_in_order(nodes, *this);
3495     }
3496 #endif
3497 
3498     // Copy constructor
3499     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3500         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3501                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3502     }
3503 
3504 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3505      void set_name( const char *name ) __TBB_override {
3506         tbb::internal::fgt_node_desc( this, name );
3507     }
3508 #endif
3509 };
3510 
3511 template<typename T0, typename T1>
3512 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3513 private:
3514     static const int N = 2;
3515 public:
3516     typedef tuple<T0, T1> InputTuple;
3517     typedef typename internal::tagged_msg<size_t, T0, T1> output_type;
3518     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3519     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3520         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3521                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3522     }
3523 
3524 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3525     template <typename... Args>
3526     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3527         make_edges_in_order(nodes, *this);
3528     }
3529 #endif
3530 
3531     // Copy constructor
3532     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3533         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3534                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3535     }
3536 
3537 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3538      void set_name( const char *name ) __TBB_override {
3539         tbb::internal::fgt_node_desc( this, name );
3540     }
3541 #endif
3542 };
3543 
3544 template<typename T0, typename T1, typename T2>
3545 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3546 private:
3547     static const int N = 3;
3548 public:
3549     typedef tuple<T0, T1, T2> InputTuple;
3550     typedef typename internal::tagged_msg<size_t, T0, T1, T2> output_type;
3551     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3552     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3553         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3554                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3555     }
3556 
3557 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3558     template <typename... Args>
3559     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3560         make_edges_in_order(nodes, *this);
3561     }
3562 #endif
3563 
3564     // Copy constructor
3565     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3566         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3567                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3568     }
3569 
3570 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3571     void set_name( const char *name ) __TBB_override {
3572         tbb::internal::fgt_node_desc( this, name );
3573     }
3574 #endif
3575 };
3576 
3577 template<typename T0, typename T1, typename T2, typename T3>
3578 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3579 private:
3580     static const int N = 4;
3581 public:
3582     typedef tuple<T0, T1, T2, T3> InputTuple;
3583     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3> output_type;
3584     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3585     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3586         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3587                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3588     }
3589 
3590 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3591     template <typename... Args>
3592     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3593         make_edges_in_order(nodes, *this);
3594     }
3595 #endif
3596 
3597     // Copy constructor
3598     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3599         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3600                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3601     }
3602 
3603 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3604     void set_name( const char *name ) __TBB_override {
3605         tbb::internal::fgt_node_desc( this, name );
3606     }
3607 #endif
3608 };
3609 
3610 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3611 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3612 private:
3613     static const int N = 5;
3614 public:
3615     typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3616     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
3617     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3618     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3619         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3620                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3621     }
3622 
3623 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3624     template <typename... Args>
3625     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3626         make_edges_in_order(nodes, *this);
3627     }
3628 #endif
3629 
3630     // Copy constructor
3631     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3632         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3633                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3634     }
3635 
3636 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3637     void set_name( const char *name ) __TBB_override {
3638         tbb::internal::fgt_node_desc( this, name );
3639     }
3640 #endif
3641 };
3642 
3643 #if __TBB_VARIADIC_MAX >= 6
3644 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3645 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3646 private:
3647     static const int N = 6;
3648 public:
3649     typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3650     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
3651     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3652     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3653         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3654                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3655     }
3656 
3657 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3658     template <typename... Args>
3659     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3660         make_edges_in_order(nodes, *this);
3661     }
3662 #endif
3663 
3664     // Copy constructor
3665     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3666         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3667                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3668     }
3669 
3670 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3671     void set_name( const char *name ) __TBB_override {
3672         tbb::internal::fgt_node_desc( this, name );
3673     }
3674 #endif
3675 };
3676 #endif //variadic max 6
3677 
3678 #if __TBB_VARIADIC_MAX >= 7
3679 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3680          typename T6>
3681 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3682 private:
3683     static const int N = 7;
3684 public:
3685     typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3686     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
3687     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3688     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3689         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3690                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3691     }
3692 
3693 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3694     template <typename... Args>
3695     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3696         make_edges_in_order(nodes, *this);
3697     }
3698 #endif
3699 
3700     // Copy constructor
3701     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3702         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3703                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3704     }
3705 
3706 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3707     void set_name( const char *name ) __TBB_override {
3708         tbb::internal::fgt_node_desc( this, name );
3709     }
3710 #endif
3711 };
3712 #endif //variadic max 7
3713 
3714 #if __TBB_VARIADIC_MAX >= 8
3715 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3716          typename T6, typename T7>
3717 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3718 private:
3719     static const int N = 8;
3720 public:
3721     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3722     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
3723     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3724     indexer_node(graph& g) : unfolded_type(g) {
3725         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3726                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3727     }
3728 
3729 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3730     template <typename... Args>
3731     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3732         make_edges_in_order(nodes, *this);
3733     }
3734 #endif
3735 
3736     // Copy constructor
3737     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3738         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3739                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3740     }
3741 
3742 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3743     void set_name( const char *name ) __TBB_override {
3744         tbb::internal::fgt_node_desc( this, name );
3745     }
3746 #endif
3747 };
3748 #endif //variadic max 8
3749 
3750 #if __TBB_VARIADIC_MAX >= 9
3751 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3752          typename T6, typename T7, typename T8>
3753 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3754 private:
3755     static const int N = 9;
3756 public:
3757     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3758     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
3759     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3760     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3761         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3762                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3763     }
3764 
3765 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3766     template <typename... Args>
3767     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3768         make_edges_in_order(nodes, *this);
3769     }
3770 #endif
3771 
3772     // Copy constructor
3773     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3774         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3775                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3776     }
3777 
3778 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3779     void set_name( const char *name ) __TBB_override {
3780         tbb::internal::fgt_node_desc( this, name );
3781     }
3782 #endif
3783 };
3784 #endif //variadic max 9
3785 
3786 #if __TBB_VARIADIC_MAX >= 10
3787 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3788          typename T6, typename T7, typename T8, typename T9>
3789 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3790 private:
3791     static const int N = 10;
3792 public:
3793     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3794     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3795     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3796     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3797         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3798                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3799     }
3800 
3801 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3802     template <typename... Args>
3803     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3804         make_edges_in_order(nodes, *this);
3805     }
3806 #endif
3807 
3808     // Copy constructor
3809     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3810         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3811                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3812     }
3813 
3814 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3815     void set_name( const char *name ) __TBB_override {
3816         tbb::internal::fgt_node_desc( this, name );
3817     }
3818 #endif
3819 };
3820 #endif //variadic max 10
3821 
3822 #if __TBB_PREVIEW_ASYNC_MSG
3823 inline void internal_make_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3824 #else
3825 template< typename T >
3826 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3827 #endif
3828 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3829     s.internal_add_built_predecessor(p);
3830     p.internal_add_built_successor(s);
3831 #endif
3832     p.register_successor( s );
3833     tbb::internal::fgt_make_edge( &p, &s );
3834 }
3835 
3836 //! Makes an edge between a single predecessor and a single successor
3837 template< typename T >
3838 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3839     internal_make_edge( p, s );
3840 }
3841 
3842 #if __TBB_PREVIEW_ASYNC_MSG
3843 template< typename TS, typename TR,
3844     typename = typename tbb::internal::enable_if<tbb::internal::is_same_type<TS, internal::untyped_sender>::value
3845                                               || tbb::internal::is_same_type<TR, internal::untyped_receiver>::value>::type>
3846 inline void make_edge( TS &p, TR &s ) {
3847     internal_make_edge( p, s );
3848 }
3849 
3850 template< typename T >
3851 inline void make_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
3852     internal_make_edge( p, s );
3853 }
3854 
3855 template< typename T >
3856 inline void make_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
3857     internal_make_edge( p, s );
3858 }
3859 
3860 #endif // __TBB_PREVIEW_ASYNC_MSG
3861 
3862 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3863 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3864 template< typename T, typename V,
3865           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3866 inline void make_edge( T& output, V& input) {
3867     make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3868 }
3869 
3870 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3871 template< typename T, typename R,
3872           typename = typename T::output_ports_type >
3873 inline void make_edge( T& output, receiver<R>& input) {
3874      make_edge(get<0>(output.output_ports()), input);
3875 }
3876 
3877 //Makes an edge from a sender to port 0 of a multi-input successor.
3878 template< typename S,  typename V,
3879           typename = typename V::input_ports_type >
3880 inline void make_edge( sender<S>& output, V& input) {
3881      make_edge(output, get<0>(input.input_ports()));
3882 }
3883 #endif
3884 
3885 #if __TBB_PREVIEW_ASYNC_MSG
3886 inline void internal_remove_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3887 #else
3888 template< typename T >
3889 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3890 #endif
3891     p.remove_successor( s );
3892 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3893     // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3894     p.internal_delete_built_successor(s);
3895     s.internal_delete_built_predecessor(p);
3896 #endif
3897     tbb::internal::fgt_remove_edge( &p, &s );
3898 }
3899 
3900 //! Removes an edge between a single predecessor and a single successor
3901 template< typename T >
3902 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3903     internal_remove_edge( p, s );
3904 }
3905 
3906 #if __TBB_PREVIEW_ASYNC_MSG
3907 template< typename TS, typename TR,
3908     typename = typename tbb::internal::enable_if<tbb::internal::is_same_type<TS, internal::untyped_sender>::value
3909                                               || tbb::internal::is_same_type<TR, internal::untyped_receiver>::value>::type>
3910 inline void remove_edge( TS &p, TR &s ) {
3911     internal_remove_edge( p, s );
3912 }
3913 
3914 template< typename T >
3915 inline void remove_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
3916     internal_remove_edge( p, s );
3917 }
3918 
3919 template< typename T >
3920 inline void remove_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
3921     internal_remove_edge( p, s );
3922 }
3923 #endif // __TBB_PREVIEW_ASYNC_MSG
3924 
3925 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3926 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3927 template< typename T, typename V,
3928           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3929 inline void remove_edge( T& output, V& input) {
3930     remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3931 }
3932 
3933 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3934 template< typename T, typename R,
3935           typename = typename T::output_ports_type >
3936 inline void remove_edge( T& output, receiver<R>& input) {
3937      remove_edge(get<0>(output.output_ports()), input);
3938 }
3939 //Removes an edge between a sender and port 0 of a multi-input successor.
3940 template< typename S,  typename V,
3941           typename = typename V::input_ports_type >
3942 inline void remove_edge( sender<S>& output, V& input) {
3943      remove_edge(output, get<0>(input.input_ports()));
3944 }
3945 #endif
3946 
3947 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3948 template<typename C >
3949 template< typename S >
3950 void internal::edge_container<C>::sender_extract( S &s ) {
3951     edge_list_type e = built_edges;
3952     for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3953         remove_edge(s, **i);
3954     }
3955 }
3956 
3957 template<typename C >
3958 template< typename R >
3959 void internal::edge_container<C>::receiver_extract( R &r ) {
3960     edge_list_type e = built_edges;
3961     for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3962         remove_edge(**i, r);
3963     }
3964 }
3965 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3966 
3967 //! Returns a copy of the body from a function or continue node
3968 template< typename Body, typename Node >
3969 Body copy_body( Node &n ) {
3970     return n.template copy_function_object<Body>();
3971 }
3972 
3973 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3974 
3975 //composite_node
3976 template< typename InputTuple, typename OutputTuple > class composite_node;
3977 
3978 template< typename... InputTypes, typename... OutputTypes>
3979 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3980 
3981 public:
3982     typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3983     typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3984 
3985 private:
3986     std::unique_ptr<input_ports_type> my_input_ports;
3987     std::unique_ptr<output_ports_type> my_output_ports;
3988 
3989     static const size_t NUM_INPUTS = sizeof...(InputTypes);
3990     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3991 
3992 protected:
3993     void reset_node(reset_flags) __TBB_override {}
3994 
3995 public:
3996 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3997     composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3998         tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3999         tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
4000     }
4001 #else
4002     composite_node( graph &g ) : graph_node(g) {
4003         tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
4004     }
4005 #endif
4006 
4007     template<typename T1, typename T2>
4008     void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
4009         __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4010         __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4011         my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
4012         my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
4013 
4014         tbb::internal::fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
4015         tbb::internal::fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
4016     }
4017 
4018     template< typename... NodeTypes >
4019     void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4020 
4021     template< typename... NodeTypes >
4022     void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4023 
4024 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4025     void set_name( const char *name ) __TBB_override {
4026         tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
4027     }
4028 #endif
4029 
4030     input_ports_type& input_ports() {
4031          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4032          return *my_input_ports;
4033     }
4034 
4035     output_ports_type& output_ports() {
4036          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4037          return *my_output_ports;
4038     }
4039 
4040 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4041     void extract() __TBB_override {
4042         __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4043     }
4044 #endif
4045 };  // class composite_node
4046 
4047 //composite_node with only input ports
4048 template< typename... InputTypes>
4049 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
4050 public:
4051     typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
4052 
4053 private:
4054     std::unique_ptr<input_ports_type> my_input_ports;
4055     static const size_t NUM_INPUTS = sizeof...(InputTypes);
4056 
4057 protected:
4058     void reset_node(reset_flags) __TBB_override {}
4059 
4060 public:
4061 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4062     composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4063         tbb::internal::fgt_composite( CODEPTR(), this, &g );
4064         tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
4065     }
4066 #else
4067     composite_node( graph &g ) : graph_node(g) {
4068         tbb::internal::fgt_composite( CODEPTR(), this, &g );
4069     }
4070 #endif
4071 
4072    template<typename T>
4073    void set_external_ports(T&& input_ports_tuple) {
4074        __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4075 
4076        my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
4077 
4078        tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
4079    }
4080 
4081     template< typename... NodeTypes >
4082     void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4083 
4084     template< typename... NodeTypes >
4085     void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4086 
4087 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4088     void set_name( const char *name ) __TBB_override {
4089         tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
4090     }
4091 #endif
4092 
4093     input_ports_type& input_ports() {
4094          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4095          return *my_input_ports;
4096     }
4097 
4098 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4099     void extract() __TBB_override {
4100         __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4101     }
4102 #endif
4103 
4104 };  // class composite_node
4105 
4106 //composite_nodes with only output_ports
4107 template<typename... OutputTypes>
4108 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
4109 public:
4110     typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
4111 
4112 private:
4113     std::unique_ptr<output_ports_type> my_output_ports;
4114     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
4115 
4116 protected:
4117     void reset_node(reset_flags) __TBB_override {}
4118 
4119 public:
4120 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4121     __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4122         tbb::internal::fgt_composite( CODEPTR(), this, &g );
4123         tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
4124     }
4125 #else
4126     __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
4127         tbb::internal::fgt_composite( CODEPTR(), this, &g );
4128     }
4129 #endif
4130 
4131    template<typename T>
4132    void set_external_ports(T&& output_ports_tuple) {
4133        __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4134 
4135        my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
4136 
4137        tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
4138    }
4139 
4140     template<typename... NodeTypes >
4141     void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4142 
4143     template<typename... NodeTypes >
4144     void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4145 
4146 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4147     void set_name( const char *name ) __TBB_override {
4148         tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
4149     }
4150 #endif
4151 
4152     output_ports_type& output_ports() {
4153          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4154          return *my_output_ports;
4155     }
4156 
4157 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4158     void extract() __TBB_override {
4159         __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4160     }
4161 #endif
4162 
4163 };  // class composite_node
4164 
4165 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
4166 
4167 namespace internal {
4168 
4169 template<typename Gateway>
4170 class async_body_base: tbb::internal::no_assign {
4171 public:
4172     typedef Gateway gateway_type;
4173 
4174     async_body_base(gateway_type *gateway): my_gateway(gateway) { }
4175     void set_gateway(gateway_type *gateway) {
4176         my_gateway = gateway;
4177     }
4178 
4179 protected:
4180     gateway_type *my_gateway;
4181 };
4182 
4183 template<typename Input, typename Ports, typename Gateway, typename Body>
4184 class async_body: public async_body_base<Gateway> {
4185 public:
4186     typedef async_body_base<Gateway> base_type;
4187     typedef Gateway gateway_type;
4188 
4189     async_body(const Body &body, gateway_type *gateway)
4190         : base_type(gateway), my_body(body) { }
4191 
4192     void operator()( const Input &v, Ports & ) {
4193         my_body(v, *this->my_gateway);
4194     }
4195 
4196     Body get_body() { return my_body; }
4197 
4198 private:
4199     Body my_body;
4200 };
4201 
4202 } // namespace internal
4203 
4204 } // namespace interfaceX
4205 namespace interface11 {
4206 
4207 //! Implements async node
4208 template < typename Input, typename Output,
4209            typename Policy = queueing_lightweight,
4210            typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input) >
4211 class async_node
4212     : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output >
4213 {
4214 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
4215     __TBB_STATIC_ASSERT(
4216         (tbb::internal::is_same_type<Allocator, null_type>::value),
4217         "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
4218         "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
4219     );
4220 #endif
4221     typedef multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type;
4222     typedef typename internal::multifunction_input<Input, typename base_type::output_ports_type, Policy, Allocator> mfn_input_type;
4223 
4224 public:
4225     typedef Input input_type;
4226     typedef Output output_type;
4227     typedef receiver<input_type> receiver_type;
4228     typedef typename receiver_type::predecessor_type predecessor_type;
4229     typedef typename sender<output_type>::successor_type successor_type;
4230     typedef receiver_gateway<output_type> gateway_type;
4231     typedef internal::async_body_base<gateway_type> async_body_base_type;
4232     typedef typename base_type::output_ports_type output_ports_type;
4233 
4234 private:
4235     struct try_put_functor {
4236         typedef internal::multifunction_output<Output> output_port_type;
4237         output_port_type *port;
4238         // TODO: pass value by copy since we do not want to block asynchronous thread.
4239         const Output *value;
4240         bool result;
4241         try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
4242         void operator()() {
4243             result = port->try_put(*value);
4244         }
4245     };
4246 
4247     class receiver_gateway_impl: public receiver_gateway<Output> {
4248     public:
4249         receiver_gateway_impl(async_node* node): my_node(node) {}
4250         void reserve_wait() __TBB_override {
4251             tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4252             my_node->my_graph.reserve_wait();
4253         }
4254 
4255         void release_wait() __TBB_override {
4256             my_node->my_graph.release_wait();
4257             tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4258         }
4259 
4260         //! Implements gateway_type::try_put for an external activity to submit a message to FG
4261         bool try_put(const Output &i) __TBB_override {
4262             return my_node->try_put_impl(i);
4263         }
4264 
4265     private:
4266         async_node* my_node;
4267     } my_gateway;
4268 
4269     //The substitute of 'this' for member construction, to prevent compiler warnings
4270     async_node* self() { return this; }
4271 
4272     //! Implements gateway_type::try_put for an external activity to submit a message to FG
4273     bool try_put_impl(const Output &i) {
4274         internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
4275         internal::broadcast_cache<output_type>& port_successors = port_0.successors();
4276         tbb::internal::fgt_async_try_put_begin(this, &port_0);
4277         task_list tasks;
4278         bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
4279         __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
4280                       "Return status is inconsistent with the method operation." );
4281 
4282         while( !tasks.empty() ) {
4283             internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
4284         }
4285         tbb::internal::fgt_async_try_put_end(this, &port_0);
4286         return is_at_least_one_put_successful;
4287     }
4288 
4289 public:
4290     template<typename Body>
4291     __TBB_NOINLINE_SYM async_node(
4292         graph &g, size_t concurrency,
4293 #if __TBB_CPP11_PRESENT
4294         Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
4295 #else
4296         __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority = tbb::flow::internal::no_priority)
4297 #endif
4298     ) : base_type(
4299         g, concurrency,
4300         internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
4301         (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
4302         tbb::internal::fgt_multioutput_node_with_body<1>(
4303             CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4304             &this->my_graph, static_cast<receiver<input_type> *>(this),
4305             this->output_ports(), this->my_body
4306         );
4307     }
4308 
4309 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
4310     template <typename Body, typename... Args>
4311     __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
4312         : async_node(g, concurrency, body, Policy(), priority) {}
4313 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4314 
4315 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4316     template <typename Body, typename... Args>
4317     __TBB_NOINLINE_SYM async_node(
4318         const node_set<Args...>& nodes, size_t concurrency, Body body,
4319         __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
4320     ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
4321         make_edges_in_order(nodes, *this);
4322     }
4323 
4324 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4325     template <typename Body, typename... Args>
4326     __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
4327         : async_node(nodes, concurrency, body, Policy(), priority) {}
4328 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4329 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4330 
4331     __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
4332         static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
4333         static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
4334 
4335         tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4336                 &this->my_graph, static_cast<receiver<input_type> *>(this),
4337                 this->output_ports(), this->my_body );
4338     }
4339 
4340     gateway_type& gateway() {
4341         return my_gateway;
4342     }
4343 
4344 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4345     void set_name( const char *name ) __TBB_override {
4346         tbb::internal::fgt_multioutput_node_desc( this, name );
4347     }
4348 #endif
4349 
4350     // Define sender< Output >
4351 
4352     //! Add a new successor to this node
4353     bool register_successor( successor_type &r ) __TBB_override {
4354         return internal::output_port<0>(*this).register_successor(r);
4355     }
4356 
4357     //! Removes a successor from this node
4358     bool remove_successor( successor_type &r ) __TBB_override {
4359         return internal::output_port<0>(*this).remove_successor(r);
4360     }
4361 
4362     template<typename Body>
4363     Body copy_function_object() {
4364         typedef internal::multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
4365         typedef internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
4366         mfn_body_type &body_ref = *this->my_body;
4367         async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
4368         return ab.get_body();
4369     }
4370 
4371 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4372     //! interface to record edges for traversal & deletion
4373     typedef typename  internal::edge_container<successor_type> built_successors_type;
4374     typedef typename  built_successors_type::edge_list_type successor_list_type;
4375     built_successors_type &built_successors() __TBB_override {
4376         return internal::output_port<0>(*this).built_successors();
4377     }
4378 
4379     void internal_add_built_successor( successor_type &r ) __TBB_override {
4380         internal::output_port<0>(*this).internal_add_built_successor(r);
4381     }
4382 
4383     void internal_delete_built_successor( successor_type &r ) __TBB_override {
4384         internal::output_port<0>(*this).internal_delete_built_successor(r);
4385     }
4386 
4387     void copy_successors( successor_list_type &l ) __TBB_override {
4388         internal::output_port<0>(*this).copy_successors(l);
4389     }
4390 
4391     size_t  successor_count() __TBB_override {
4392         return internal::output_port<0>(*this).successor_count();
4393     }
4394 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4395 
4396 protected:
4397 
4398     void reset_node( reset_flags f) __TBB_override {
4399        base_type::reset_node(f);
4400     }
4401 };
4402 
4403 #if __TBB_PREVIEW_STREAMING_NODE
4404 #include "internal/_flow_graph_streaming_node.h"
4405 #endif // __TBB_PREVIEW_STREAMING_NODE
4406 
4407 #include "internal/_flow_graph_node_set_impl.h"
4408 
4409 template< typename T >
4410 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
4411 public:
4412     typedef T input_type;
4413     typedef T output_type;
4414     typedef typename receiver<input_type>::predecessor_type predecessor_type;
4415     typedef typename sender<output_type>::successor_type successor_type;
4416 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4417     typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
4418     typedef typename sender<output_type>::built_successors_type built_successors_type;
4419     typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
4420     typedef typename sender<output_type>::successor_list_type successor_list_type;
4421 #endif
4422 
4423     __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
4424         my_successors.set_owner( this );
4425         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4426                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4427     }
4428 
4429 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4430     template <typename... Args>
4431     overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
4432         make_edges_in_order(nodes, *this);
4433     }
4434 #endif
4435 
4436     //! Copy constructor; doesn't take anything from src; default won't work
4437     __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) :
4438         graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
4439     {
4440         my_successors.set_owner( this );
4441         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4442                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4443     }
4444 
4445     ~overwrite_node() {}
4446 
4447 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4448     void set_name( const char *name ) __TBB_override {
4449         tbb::internal::fgt_node_desc( this, name );
4450     }
4451 #endif
4452 
4453    bool register_successor( successor_type &s ) __TBB_override {
4454         spin_mutex::scoped_lock l( my_mutex );
4455         if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
4456             // We have a valid value that must be forwarded immediately.
4457             bool ret = s.try_put( my_buffer );
4458             if ( ret ) {
4459                 // We add the successor that accepted our put
4460                 my_successors.register_successor( s );
4461             } else {
4462                 // In case of reservation a race between the moment of reservation and register_successor can appear,
4463                 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
4464                 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
4465                 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
4466                 task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
4467                     register_predecessor_task( *this, s );
4468                 internal::spawn_in_graph_arena( my_graph, *rtask );
4469             }
4470         } else {
4471             // No valid value yet, just add as successor
4472             my_successors.register_successor( s );
4473         }
4474         return true;
4475     }
4476 
4477     bool remove_successor( successor_type &s ) __TBB_override {
4478         spin_mutex::scoped_lock l( my_mutex );
4479         my_successors.remove_successor(s);
4480         return true;
4481     }
4482 
4483 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4484     built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
4485     built_successors_type   &built_successors()   __TBB_override { return my_successors.built_successors(); }
4486 
4487     void internal_add_built_successor( successor_type &s) __TBB_override {
4488         spin_mutex::scoped_lock l( my_mutex );
4489         my_successors.internal_add_built_successor(s);
4490     }
4491 
4492     void internal_delete_built_successor( successor_type &s) __TBB_override {
4493         spin_mutex::scoped_lock l( my_mutex );
4494         my_successors.internal_delete_built_successor(s);
4495     }
4496 
4497     size_t successor_count() __TBB_override {
4498         spin_mutex::scoped_lock l( my_mutex );
4499         return my_successors.successor_count();
4500     }
4501 
4502     void copy_successors(successor_list_type &v) __TBB_override {
4503         spin_mutex::scoped_lock l( my_mutex );
4504         my_successors.copy_successors(v);
4505     }
4506 
4507     void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
4508         spin_mutex::scoped_lock l( my_mutex );
4509         my_built_predecessors.add_edge(p);
4510     }
4511 
4512     void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
4513         spin_mutex::scoped_lock l( my_mutex );
4514         my_built_predecessors.delete_edge(p);
4515     }
4516 
4517     size_t predecessor_count() __TBB_override {
4518         spin_mutex::scoped_lock l( my_mutex );
4519         return my_built_predecessors.edge_count();
4520     }
4521 
4522     void copy_predecessors( predecessor_list_type &v ) __TBB_override {
4523         spin_mutex::scoped_lock l( my_mutex );
4524         my_built_predecessors.copy_edges(v);
4525     }
4526 
4527     void extract() __TBB_override {
4528         my_buffer_is_valid = false;
4529         built_successors().sender_extract(*this);
4530         built_predecessors().receiver_extract(*this);
4531     }
4532 
4533 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4534 
4535     bool try_get( input_type &v ) __TBB_override {
4536         spin_mutex::scoped_lock l( my_mutex );
4537         if ( my_buffer_is_valid ) {
4538             v = my_buffer;
4539             return true;
4540         }
4541         return false;
4542     }
4543 
4544     //! Reserves an item
4545     bool try_reserve( T &v ) __TBB_override {
4546         return try_get(v);
4547     }
4548 
4549     //! Releases the reserved item
4550     bool try_release() __TBB_override { return true; }
4551 
4552     //! Consumes the reserved item
4553     bool try_consume() __TBB_override { return true; }
4554 
4555     bool is_valid() {
4556        spin_mutex::scoped_lock l( my_mutex );
4557        return my_buffer_is_valid;
4558     }
4559 
4560     void clear() {
4561        spin_mutex::scoped_lock l( my_mutex );
4562        my_buffer_is_valid = false;
4563     }
4564 
4565 protected:
4566 
4567     template< typename R, typename B > friend class run_and_put_task;
4568     template<typename X, typename Y> friend class internal::broadcast_cache;
4569     template<typename X, typename Y> friend class internal::round_robin_cache;
4570     task * try_put_task( const input_type &v ) __TBB_override {
4571         spin_mutex::scoped_lock l( my_mutex );
4572         return try_put_task_impl(v);
4573     }
4574 
4575     task * try_put_task_impl(const input_type &v) {
4576         my_buffer = v;
4577         my_buffer_is_valid = true;
4578         task * rtask = my_successors.try_put_task(v);
4579         if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
4580         return rtask;
4581     }
4582 
4583     graph& graph_reference() const __TBB_override {
4584         return my_graph;
4585     }
4586 
4587     //! Breaks an infinite loop between the node reservation and register_successor call
4588     struct register_predecessor_task : public graph_task {
4589 
4590         register_predecessor_task(predecessor_type& owner, successor_type& succ) :
4591             o(owner), s(succ) {};
4592 
4593         tbb::task* execute() __TBB_override {
4594             if (!s.register_predecessor(o)) {
4595                 o.register_successor(s);
4596             }
4597             return NULL;
4598         }
4599 
4600         predecessor_type& o;
4601         successor_type& s;
4602     };
4603 
4604     spin_mutex my_mutex;
4605     internal::broadcast_cache< input_type, null_rw_mutex > my_successors;
4606 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4607     internal::edge_container<predecessor_type> my_built_predecessors;
4608 #endif
4609     input_type my_buffer;
4610     bool my_buffer_is_valid;
4611     void reset_receiver(reset_flags /*f*/) __TBB_override {}
4612 
4613     void reset_node( reset_flags f) __TBB_override {
4614         my_buffer_is_valid = false;
4615        if (f&rf_clear_edges) {
4616            my_successors.clear();
4617        }
4618     }
4619 };  // overwrite_node
4620 
4621 template< typename T >
4622 class write_once_node : public overwrite_node<T> {
4623 public:
4624     typedef T input_type;
4625     typedef T output_type;
4626     typedef overwrite_node<T> base_type;
4627     typedef typename receiver<input_type>::predecessor_type predecessor_type;
4628     typedef typename sender<output_type>::successor_type successor_type;
4629 
4630     //! Constructor
4631     __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
4632         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4633                                  static_cast<receiver<input_type> *>(this),
4634                                  static_cast<sender<output_type> *>(this) );
4635     }
4636 
4637 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4638     template <typename... Args>
4639     write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
4640         make_edges_in_order(nodes, *this);
4641     }
4642 #endif
4643 
4644     //! Copy constructor: call base class copy constructor
4645     __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
4646         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4647                                  static_cast<receiver<input_type> *>(this),
4648                                  static_cast<sender<output_type> *>(this) );
4649     }
4650 
4651 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4652     void set_name( const char *name ) __TBB_override {
4653         tbb::internal::fgt_node_desc( this, name );
4654     }
4655 #endif
4656 
4657 protected:
4658     template< typename R, typename B > friend class run_and_put_task;
4659     template<typename X, typename Y> friend class internal::broadcast_cache;
4660     template<typename X, typename Y> friend class internal::round_robin_cache;
4661     task *try_put_task( const T &v ) __TBB_override {
4662         spin_mutex::scoped_lock l( this->my_mutex );
4663         return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
4664     }
4665 };
4666 
4667 } // interfaceX
4668 
4669     using interface11::reset_flags;
4670     using interface11::rf_reset_protocol;
4671     using interface11::rf_reset_bodies;
4672     using interface11::rf_clear_edges;
4673 
4674     using interface11::graph;
4675     using interface11::graph_node;
4676     using interface11::continue_msg;
4677     using interface11::source_node;
4678     using interface11::input_node;
4679     using interface11::function_node;
4680     using interface11::multifunction_node;
4681     using interface11::split_node;
4682     using interface11::internal::output_port;
4683     using interface11::indexer_node;
4684     using interface11::internal::tagged_msg;
4685     using interface11::internal::cast_to;
4686     using interface11::internal::is_a;
4687     using interface11::continue_node;
4688     using interface11::overwrite_node;
4689     using interface11::write_once_node;
4690     using interface11::broadcast_node;
4691     using interface11::buffer_node;
4692     using interface11::queue_node;
4693     using interface11::sequencer_node;
4694     using interface11::priority_queue_node;
4695     using interface11::limiter_node;
4696     using namespace interface11::internal::graph_policy_namespace;
4697     using interface11::join_node;
4698     using interface11::input_port;
4699     using interface11::copy_body;
4700     using interface11::make_edge;
4701     using interface11::remove_edge;
4702     using interface11::internal::tag_value;
4703 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
4704     using interface11::composite_node;
4705 #endif
4706     using interface11::async_node;
4707 #if __TBB_PREVIEW_ASYNC_MSG
4708     using interface11::async_msg;
4709 #endif
4710 #if __TBB_PREVIEW_STREAMING_NODE
4711     using interface11::port_ref;
4712     using interface11::streaming_node;
4713 #endif // __TBB_PREVIEW_STREAMING_NODE
4714 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4715     using internal::node_priority_t;
4716     using internal::no_priority;
4717 #endif
4718 
4719 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4720     using interface11::internal::follows;
4721     using interface11::internal::precedes;
4722     using interface11::internal::make_node_set;
4723     using interface11::internal::make_edges;
4724 #endif
4725 
4726 } // flow
4727 } // tbb
4728 
4729 // Include deduction guides for node classes
4730 #include "internal/_flow_graph_nodes_deduction.h"
4731 
4732 #undef __TBB_PFG_RESET_ARG
4733 #undef __TBB_COMMA
4734 #undef __TBB_DEFAULT_NODE_ALLOCATOR
4735 
4736 #include "internal/_warning_suppress_disable_notice.h"
4737 #undef __TBB_flow_graph_H_include_area
4738 
4739 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
4740    #undef __TBB_NOINLINE_SYM
4741 #endif
4742 
4743 #endif // __TBB_flow_graph_H
4744