1 /*
2 Copyright 2005-2014 Intel Corporation. All Rights Reserved.
3
4 This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5 you can redistribute it and/or modify it under the terms of the GNU General Public License
6 version 2 as published by the Free Software Foundation. Threading Building Blocks is
7 distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9 See the GNU General Public License for more details. You should have received a copy of
10 the GNU General Public License along with Threading Building Blocks; if not, write to the
11 Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12
13 As a special exception, you may use this file as part of a free software library without
14 restriction. Specifically, if other files instantiate templates or use macros or inline
15 functions from this file, or you compile this file and link it with other files to produce
16 an executable, this file does not by itself cause the resulting executable to be covered
17 by the GNU General Public License. This exception does not however invalidate any other
18 reasons why the executable file might be covered by the GNU General Public License.
19 */
20
21 #ifndef __TBB_flow_graph_H
22 #define __TBB_flow_graph_H
23
24 #include "tbb_stddef.h"
25 #include "atomic.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
30 #include "task.h"
31 #include "cache_aligned_allocator.h"
32 #include "tbb_exception.h"
33 #include "internal/_aggregator_impl.h"
34 #include "tbb_profiling.h"
35
36 #if TBB_DEPRECATED_FLOW_ENQUEUE
37 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
38 #else
39 #define FLOW_SPAWN(a) tbb::task::spawn((a))
40 #endif
41
42 // use the VC10 or gcc version of tuple if it is available.
43 #if __TBB_CPP11_TUPLE_PRESENT
44 #include <tuple>
45 namespace tbb {
46 namespace flow {
47 using std::tuple;
48 using std::tuple_size;
49 using std::tuple_element;
50 using std::get;
51 }
52 }
53 #else
54 #include "compat/tuple"
55 #endif
56
57 #include<list>
58 #include<queue>
59
60 /** @file
61 \brief The graph related classes and functions
62
63 There are some applications that best express dependencies as messages
64 passed between nodes in a graph. These messages may contain data or
65 simply act as signals that a predecessors has completed. The graph
66 class and its associated node classes can be used to express such
67 applications.
68 */
69
70 namespace tbb {
71 namespace flow {
72
73 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
74 enum concurrency { unlimited = 0, serial = 1 };
75
76 namespace interface7 {
77
78 namespace internal {
79 template<typename T, typename M> class successor_cache;
80 template<typename T, typename M> class broadcast_cache;
81 template<typename T, typename M> class round_robin_cache;
82 }
83
84 //! An empty class used for messages that mean "I'm done"
85 class continue_msg {};
86
87 template< typename T > class sender;
88 template< typename T > class receiver;
89 class continue_receiver;
90
91 //! Pure virtual template class that defines a sender of messages of type T
92 template< typename T >
93 class sender {
94 public:
95 //! The output type of this sender
96 typedef T output_type;
97
98 //! The successor type for this node
99 typedef receiver<T> successor_type;
100
~sender()101 virtual ~sender() {}
102
103 //! Add a new successor to this node
104 virtual bool register_successor( successor_type &r ) = 0;
105
106 //! Removes a successor from this node
107 virtual bool remove_successor( successor_type &r ) = 0;
108
109 //! Request an item from the sender
try_get(T &)110 virtual bool try_get( T & ) { return false; }
111
112 //! Reserves an item in the sender
try_reserve(T &)113 virtual bool try_reserve( T & ) { return false; }
114
115 //! Releases the reserved item
try_release()116 virtual bool try_release( ) { return false; }
117
118 //! Consumes the reserved item
try_consume()119 virtual bool try_consume( ) { return false; }
120
121 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
122 //! interface to record edges for traversal & deletion
123 virtual void internal_add_built_successor( successor_type & ) = 0;
124 virtual void internal_delete_built_successor( successor_type & ) = 0;
125 virtual void copy_successors( std::vector<successor_type *> &) = 0;
126 virtual size_t successor_count() = 0;
127 #endif
128 };
129
130 template< typename T > class limiter_node; // needed for resetting decrementer
131 template< typename R, typename B > class run_and_put_task;
132
133 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
134
135 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
136 // flags to modify the behavior of the graph reset(). Can be combined.
137 enum reset_flags {
138 rf_reset_protocol = 0,
139 rf_reset_bodies = 1<<0, // delete the current node body, reset to a copy of the initial node body.
140 rf_extract = 1<<1 // delete edges (extract() for single node, reset() for graph.)
141 };
142
143 #define __TBB_PFG_RESET_ARG(exp) exp
144 #define __TBB_COMMA ,
145 #else
146 #define __TBB_PFG_RESET_ARG(exp) /* nothing */
147 #define __TBB_COMMA /* nothing */
148 #endif
149
150 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
combine_tasks(tbb::task * left,tbb::task * right)151 static inline tbb::task *combine_tasks( tbb::task * left, tbb::task * right) {
152 // if no RHS task, don't change left.
153 if(right == NULL) return left;
154 // right != NULL
155 if(left == NULL) return right;
156 if(left == SUCCESSFULLY_ENQUEUED) return right;
157 // left contains a task
158 if(right != SUCCESSFULLY_ENQUEUED) {
159 // both are valid tasks
160 FLOW_SPAWN(*left);
161 return right;
162 }
163 return left;
164 }
165
166 //! Pure virtual template class that defines a receiver of messages of type T
167 template< typename T >
168 class receiver {
169 public:
170 //! The input type of this receiver
171 typedef T input_type;
172
173 //! The predecessor type for this node
174 typedef sender<T> predecessor_type;
175
176 //! Destructor
~receiver()177 virtual ~receiver() {}
178
179 //! Put an item to the receiver
try_put(const T & t)180 bool try_put( const T& t ) {
181 task *res = try_put_task(t);
182 if(!res) return false;
183 if (res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
184 return true;
185 }
186
187 //! put item to successor; return task to run the successor if possible.
188 protected:
189 template< typename R, typename B > friend class run_and_put_task;
190 template<typename X, typename Y> friend class internal::broadcast_cache;
191 template<typename X, typename Y> friend class internal::round_robin_cache;
192 virtual task *try_put_task(const T& t) = 0;
193 public:
194
195 //! Add a predecessor to the node
register_predecessor(predecessor_type &)196 virtual bool register_predecessor( predecessor_type & ) { return false; }
197
198 //! Remove a predecessor from the node
remove_predecessor(predecessor_type &)199 virtual bool remove_predecessor( predecessor_type & ) { return false; }
200
201 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
202 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
203 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
204 virtual void copy_predecessors( std::vector<predecessor_type *> & ) = 0;
205 virtual size_t predecessor_count() = 0;
206 #endif
207
208 protected:
209 //! put receiver back in initial state
210 template<typename U> friend class limiter_node;
211 virtual void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f = rf_reset_protocol ) ) = 0;
212
213 template<typename TT, typename M>
214 friend class internal::successor_cache;
is_continue_receiver()215 virtual bool is_continue_receiver() { return false; }
216 };
217
218 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
219 //* holder of edges both for caches and for those nodes which do not have predecessor caches.
220 // C == receiver< ... > or sender< ... >, depending.
221 template<typename C>
222 class edge_container {
223
224 public:
225 typedef std::vector<C *> edge_vector;
226
add_edge(C & s)227 void add_edge( C &s) {
228 built_edges.push_back( &s );
229 }
230
delete_edge(C & s)231 void delete_edge( C &s) {
232 for ( typename edge_vector::iterator i = built_edges.begin(); i != built_edges.end(); ++i ) {
233 if ( *i == &s ) {
234 (void)built_edges.erase(i);
235 return; // only remove one predecessor per request
236 }
237 }
238 }
239
copy_edges(edge_vector & v)240 void copy_edges( edge_vector &v) {
241 v = built_edges;
242 }
243
edge_count()244 size_t edge_count() {
245 return (size_t)(built_edges.size());
246 }
247
clear()248 void clear() {
249 built_edges.clear();
250 }
251
252 template< typename S > void sender_extract( S &s );
253 template< typename R > void receiver_extract( R &r );
254
255 private:
256 edge_vector built_edges;
257 };
258 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
259
260 //! Base class for receivers of completion messages
261 /** These receivers automatically reset, but cannot be explicitly waited on */
262 class continue_receiver : public receiver< continue_msg > {
263 public:
264
265 //! The input type
266 typedef continue_msg input_type;
267
268 //! The predecessor type for this node
269 typedef sender< continue_msg > predecessor_type;
270
271 //! Constructor
272 continue_receiver( int number_of_predecessors = 0 ) {
273 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
274 my_current_count = 0;
275 }
276
277 //! Copy constructor
continue_receiver(const continue_receiver & src)278 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
279 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
280 my_current_count = 0;
281 }
282
283 //! Destructor
~continue_receiver()284 virtual ~continue_receiver() { }
285
286 //! Increments the trigger threshold
register_predecessor(predecessor_type &)287 /* override */ bool register_predecessor( predecessor_type & ) {
288 spin_mutex::scoped_lock l(my_mutex);
289 ++my_predecessor_count;
290 return true;
291 }
292
293 //! Decrements the trigger threshold
294 /** Does not check to see if the removal of the predecessor now makes the current count
295 exceed the new threshold. So removing a predecessor while the graph is active can cause
296 unexpected results. */
remove_predecessor(predecessor_type &)297 /* override */ bool remove_predecessor( predecessor_type & ) {
298 spin_mutex::scoped_lock l(my_mutex);
299 --my_predecessor_count;
300 return true;
301 }
302
303 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
304 typedef std::vector<predecessor_type *> predecessor_vector_type;
305
internal_add_built_predecessor(predecessor_type & s)306 /*override*/ void internal_add_built_predecessor( predecessor_type &s) {
307 spin_mutex::scoped_lock l(my_mutex);
308 my_built_predecessors.add_edge( s );
309 }
310
internal_delete_built_predecessor(predecessor_type & s)311 /*override*/ void internal_delete_built_predecessor( predecessor_type &s) {
312 spin_mutex::scoped_lock l(my_mutex);
313 my_built_predecessors.delete_edge(s);
314 }
315
copy_predecessors(predecessor_vector_type & v)316 /*override*/ void copy_predecessors( predecessor_vector_type &v) {
317 spin_mutex::scoped_lock l(my_mutex);
318 my_built_predecessors.copy_edges(v);
319 }
320
predecessor_count()321 /*override*/ size_t predecessor_count() {
322 spin_mutex::scoped_lock l(my_mutex);
323 return my_built_predecessors.edge_count();
324 }
325 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
326
327 protected:
328 template< typename R, typename B > friend class run_and_put_task;
329 template<typename X, typename Y> friend class internal::broadcast_cache;
330 template<typename X, typename Y> friend class internal::round_robin_cache;
331 // execute body is supposed to be too small to create a task for.
try_put_task(const input_type &)332 /* override */ task *try_put_task( const input_type & ) {
333 {
334 spin_mutex::scoped_lock l(my_mutex);
335 if ( ++my_current_count < my_predecessor_count )
336 return SUCCESSFULLY_ENQUEUED;
337 else
338 my_current_count = 0;
339 }
340 task * res = execute();
341 if(!res) return SUCCESSFULLY_ENQUEUED;
342 return res;
343 }
344
345 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
346 edge_container<predecessor_type> my_built_predecessors;
347 #endif
348 spin_mutex my_mutex;
349 int my_predecessor_count;
350 int my_current_count;
351 int my_initial_predecessor_count;
352 // the friend declaration in the base class did not eliminate the "protected class"
353 // error in gcc 4.1.2
354 template<typename U> friend class limiter_node;
reset_receiver(__TBB_PFG_RESET_ARG (reset_flags f))355 /*override*/void reset_receiver( __TBB_PFG_RESET_ARG(reset_flags f) )
356 {
357 my_current_count = 0;
358 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
359 if(f & rf_extract) {
360 my_built_predecessors.receiver_extract(*this);
361 my_predecessor_count = my_initial_predecessor_count;
362 }
363 #endif
364 }
365
366 //! Does whatever should happen when the threshold is reached
367 /** This should be very fast or else spawn a task. This is
368 called while the sender is blocked in the try_put(). */
369 virtual task * execute() = 0;
370 template<typename TT, typename M>
371 friend class internal::successor_cache;
is_continue_receiver()372 /*override*/ bool is_continue_receiver() { return true; }
373 };
374 } // interface7
375 } // flow
376 } // tbb
377
378 #include "internal/_flow_graph_trace_impl.h"
379
380 namespace tbb {
381 namespace flow {
382 namespace interface7 {
383
384 #include "internal/_flow_graph_types_impl.h"
385 #include "internal/_flow_graph_impl.h"
386 using namespace internal::graph_policy_namespace;
387
388 class graph;
389 class graph_node;
390
391 template <typename GraphContainerType, typename GraphNodeType>
392 class graph_iterator {
393 friend class graph;
394 friend class graph_node;
395 public:
396 typedef size_t size_type;
397 typedef GraphNodeType value_type;
398 typedef GraphNodeType* pointer;
399 typedef GraphNodeType& reference;
400 typedef const GraphNodeType& const_reference;
401 typedef std::forward_iterator_tag iterator_category;
402
403 //! Default constructor
graph_iterator()404 graph_iterator() : my_graph(NULL), current_node(NULL) {}
405
406 //! Copy constructor
graph_iterator(const graph_iterator & other)407 graph_iterator(const graph_iterator& other) :
408 my_graph(other.my_graph), current_node(other.current_node)
409 {}
410
411 //! Assignment
412 graph_iterator& operator=(const graph_iterator& other) {
413 if (this != &other) {
414 my_graph = other.my_graph;
415 current_node = other.current_node;
416 }
417 return *this;
418 }
419
420 //! Dereference
421 reference operator*() const;
422
423 //! Dereference
424 pointer operator->() const;
425
426 //! Equality
427 bool operator==(const graph_iterator& other) const {
428 return ((my_graph == other.my_graph) && (current_node == other.current_node));
429 }
430
431 //! Inequality
432 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
433
434 //! Pre-increment
435 graph_iterator& operator++() {
436 internal_forward();
437 return *this;
438 }
439
440 //! Post-increment
441 graph_iterator operator++(int) {
442 graph_iterator result = *this;
443 operator++();
444 return result;
445 }
446
447 private:
448 // the graph over which we are iterating
449 GraphContainerType *my_graph;
450 // pointer into my_graph's my_nodes list
451 pointer current_node;
452
453 //! Private initializing constructor for begin() and end() iterators
454 graph_iterator(GraphContainerType *g, bool begin);
455 void internal_forward();
456 };
457
458 //! The graph class
459 /** This class serves as a handle to the graph */
460 class graph : tbb::internal::no_copy {
461 friend class graph_node;
462
463 template< typename Body >
464 class run_task : public task {
465 public:
run_task(Body & body)466 run_task( Body& body ) : my_body(body) {}
execute()467 task *execute() {
468 my_body();
469 return NULL;
470 }
471 private:
472 Body my_body;
473 };
474
475 template< typename Receiver, typename Body >
476 class run_and_put_task : public task {
477 public:
run_and_put_task(Receiver & r,Body & body)478 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
execute()479 task *execute() {
480 task *res = my_receiver.try_put_task( my_body() );
481 if(res == SUCCESSFULLY_ENQUEUED) res = NULL;
482 return res;
483 }
484 private:
485 Receiver &my_receiver;
486 Body my_body;
487 };
488
489 public:
490 //! Constructs a graph with isolated task_group_context
graph()491 explicit graph() : my_nodes(NULL), my_nodes_last(NULL)
492 {
493 own_context = true;
494 cancelled = false;
495 caught_exception = false;
496 my_context = new task_group_context();
497 my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
498 my_root_task->set_ref_count(1);
499 tbb::internal::fgt_graph( this );
500 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
501 my_is_active = true;
502 #endif
503 }
504
505 //! Constructs a graph with use_this_context as context
graph(task_group_context & use_this_context)506 explicit graph(task_group_context& use_this_context) :
507 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL)
508 {
509 own_context = false;
510 my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
511 my_root_task->set_ref_count(1);
512 tbb::internal::fgt_graph( this );
513 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
514 my_is_active = true;
515 #endif
516 }
517
518 //! Destroys the graph.
519 /** Calls wait_for_all, then destroys the root task and context. */
~graph()520 ~graph() {
521 wait_for_all();
522 my_root_task->set_ref_count(0);
523 task::destroy( *my_root_task );
524 if (own_context) delete my_context;
525 }
526
527 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)528 void set_name( const char *name ) {
529 tbb::internal::fgt_graph_desc( this, name );
530 }
531 #endif
532
533 //! Used to register that an external entity may still interact with the graph.
534 /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
535 is made. */
increment_wait_count()536 void increment_wait_count() {
537 if (my_root_task)
538 my_root_task->increment_ref_count();
539 }
540
541 //! Deregisters an external entity that may have interacted with the graph.
542 /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
543 matches the number of increment_wait_count calls. */
decrement_wait_count()544 void decrement_wait_count() {
545 if (my_root_task)
546 my_root_task->decrement_ref_count();
547 }
548
549 //! Spawns a task that runs a body and puts its output to a specific receiver
550 /** The task is spawned as a child of the graph. This is useful for running tasks
551 that need to block a wait_for_all() on the graph. For example a one-off source. */
552 template< typename Receiver, typename Body >
run(Receiver & r,Body body)553 void run( Receiver &r, Body body ) {
554 FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *my_root_task ) )
555 run_and_put_task< Receiver, Body >( r, body )) );
556 }
557
558 //! Spawns a task that runs a function object
559 /** The task is spawned as a child of the graph. This is useful for running tasks
560 that need to block a wait_for_all() on the graph. For example a one-off source. */
561 template< typename Body >
run(Body body)562 void run( Body body ) {
563 FLOW_SPAWN( * new ( task::allocate_additional_child_of( *my_root_task ) ) run_task< Body >( body ) );
564 }
565
566 //! Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
567 /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
wait_for_all()568 void wait_for_all() {
569 cancelled = false;
570 caught_exception = false;
571 if (my_root_task) {
572 #if TBB_USE_EXCEPTIONS
573 try {
574 #endif
575 my_root_task->wait_for_all();
576 cancelled = my_context->is_group_execution_cancelled();
577 #if TBB_USE_EXCEPTIONS
578 }
579 catch(...) {
580 my_root_task->set_ref_count(1);
581 my_context->reset();
582 caught_exception = true;
583 cancelled = true;
584 throw;
585 }
586 #endif
587 my_context->reset(); // consistent with behavior in catch()
588 my_root_task->set_ref_count(1);
589 }
590 }
591
592 //! Returns the root task of the graph
root_task()593 task * root_task() {
594 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
595 if (!my_is_active)
596 return NULL;
597 else
598 #endif
599 return my_root_task;
600 }
601
602 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
603 void set_active(bool a = true) {
604 my_is_active = a;
605 }
606
is_active()607 bool is_active() {
608 return my_is_active;
609 }
610 #endif
611
612 // ITERATORS
613 template<typename C, typename N>
614 friend class graph_iterator;
615
616 // Graph iterator typedefs
617 typedef graph_iterator<graph,graph_node> iterator;
618 typedef graph_iterator<const graph,const graph_node> const_iterator;
619
620 // Graph iterator constructors
621 //! start iterator
begin()622 iterator begin() { return iterator(this, true); }
623 //! end iterator
end()624 iterator end() { return iterator(this, false); }
625 //! start const iterator
begin()626 const_iterator begin() const { return const_iterator(this, true); }
627 //! end const iterator
end()628 const_iterator end() const { return const_iterator(this, false); }
629 //! start const iterator
cbegin()630 const_iterator cbegin() const { return const_iterator(this, true); }
631 //! end const iterator
cend()632 const_iterator cend() const { return const_iterator(this, false); }
633
634 //! return status of graph execution
is_cancelled()635 bool is_cancelled() { return cancelled; }
exception_thrown()636 bool exception_thrown() { return caught_exception; }
637
638 // thread-unsafe state reset.
639 void reset(__TBB_PFG_RESET_ARG(reset_flags f = rf_reset_protocol));
640
641 private:
642 task *my_root_task;
643 task_group_context *my_context;
644 bool own_context;
645 bool cancelled;
646 bool caught_exception;
647 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
648 bool my_is_active;
649 #endif
650
651
652 graph_node *my_nodes, *my_nodes_last;
653
654 spin_mutex nodelist_mutex;
655 void register_node(graph_node *n);
656 void remove_node(graph_node *n);
657
658 }; // class graph
659
660 template <typename C, typename N>
graph_iterator(C * g,bool begin)661 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
662 {
663 if (begin) current_node = my_graph->my_nodes;
664 //else it is an end iterator by default
665 }
666
667 template <typename C, typename N>
668 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
669 __TBB_ASSERT(current_node, "graph_iterator at end");
670 return *operator->();
671 }
672
673 template <typename C, typename N>
674 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
675 return current_node;
676 }
677
678
679 template <typename C, typename N>
internal_forward()680 void graph_iterator<C,N>::internal_forward() {
681 if (current_node) current_node = current_node->next;
682 }
683
684 //! The base of all graph nodes.
685 class graph_node : tbb::internal::no_assign {
686 friend class graph;
687 template<typename C, typename N>
688 friend class graph_iterator;
689 protected:
690 graph& my_graph;
691 graph_node *next, *prev;
692 public:
graph_node(graph & g)693 graph_node(graph& g) : my_graph(g) {
694 my_graph.register_node(this);
695 }
~graph_node()696 virtual ~graph_node() {
697 my_graph.remove_node(this);
698 }
699
700 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
701 virtual void set_name( const char *name ) = 0;
702 #endif
703
704 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
705 virtual void extract( reset_flags f=rf_extract ) {
706 bool a = my_graph.is_active();
707 my_graph.set_active(false);
708 reset((reset_flags)(f|rf_extract));
709 my_graph.set_active(a);
710 }
711 #endif
712
713 protected:
714 virtual void reset(__TBB_PFG_RESET_ARG(reset_flags f=rf_reset_protocol)) = 0;
715 };
716
register_node(graph_node * n)717 inline void graph::register_node(graph_node *n) {
718 n->next = NULL;
719 {
720 spin_mutex::scoped_lock lock(nodelist_mutex);
721 n->prev = my_nodes_last;
722 if (my_nodes_last) my_nodes_last->next = n;
723 my_nodes_last = n;
724 if (!my_nodes) my_nodes = n;
725 }
726 }
727
remove_node(graph_node * n)728 inline void graph::remove_node(graph_node *n) {
729 {
730 spin_mutex::scoped_lock lock(nodelist_mutex);
731 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
732 if (n->prev) n->prev->next = n->next;
733 if (n->next) n->next->prev = n->prev;
734 if (my_nodes_last == n) my_nodes_last = n->prev;
735 if (my_nodes == n) my_nodes = n->next;
736 }
737 n->prev = n->next = NULL;
738 }
739
reset(__TBB_PFG_RESET_ARG (reset_flags f))740 inline void graph::reset( __TBB_PFG_RESET_ARG( reset_flags f )) {
741 // reset context
742 task *saved_my_root_task = my_root_task;
743 my_root_task = NULL;
744 if(my_context) my_context->reset();
745 cancelled = false;
746 caught_exception = false;
747 // reset all the nodes comprising the graph
748 for(iterator ii = begin(); ii != end(); ++ii) {
749 graph_node *my_p = &(*ii);
750 my_p->reset(__TBB_PFG_RESET_ARG(f));
751 }
752 my_root_task = saved_my_root_task;
753 }
754
755
756 #include "internal/_flow_graph_node_impl.h"
757
758 //! An executable node that acts as a source, i.e. it has no predecessors
759 template < typename Output >
760 class source_node : public graph_node, public sender< Output > {
761 protected:
762 using graph_node::my_graph;
763 public:
764 //! The type of the output message, which is complete
765 typedef Output output_type;
766
767 //! The type of successors of this node
768 typedef receiver< Output > successor_type;
769
770 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
771 typedef std::vector<successor_type *> successor_vector_type;
772 #endif
773
774 //! Constructor for a node with a successor
775 template< typename Body >
776 source_node( graph &g, Body body, bool is_active = true )
graph_node(g)777 : graph_node(g), my_active(is_active), init_my_active(is_active),
778 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
779 my_reserved(false), my_has_cached_item(false)
780 {
781 my_successors.set_owner(this);
782 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
783 static_cast<sender<output_type> *>(this), this->my_body );
784 }
785
786 //! Copy constructor
source_node(const source_node & src)787 source_node( const source_node& src ) :
788 graph_node(src.my_graph), sender<Output>(),
789 my_active(src.init_my_active),
790 init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
791 my_reserved(false), my_has_cached_item(false)
792 {
793 my_successors.set_owner(this);
794 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
795 static_cast<sender<output_type> *>(this), this->my_body );
796 }
797
798 //! The destructor
~source_node()799 ~source_node() { delete my_body; }
800
801 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)802 /* override */ void set_name( const char *name ) {
803 tbb::internal::fgt_node_desc( this, name );
804 }
805 #endif
806
807 //! Add a new successor to this node
register_successor(successor_type & r)808 /* override */ bool register_successor( successor_type &r ) {
809 spin_mutex::scoped_lock lock(my_mutex);
810 my_successors.register_successor(r);
811 if ( my_active )
812 spawn_put();
813 return true;
814 }
815
816 //! Removes a successor from this node
remove_successor(successor_type & r)817 /* override */ bool remove_successor( successor_type &r ) {
818 spin_mutex::scoped_lock lock(my_mutex);
819 my_successors.remove_successor(r);
820 return true;
821 }
822
823 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_successor(successor_type & r)824 /*override*/void internal_add_built_successor( successor_type &r) {
825 spin_mutex::scoped_lock lock(my_mutex);
826 my_successors.internal_add_built_successor(r);
827 }
828
internal_delete_built_successor(successor_type & r)829 /*override*/void internal_delete_built_successor( successor_type &r) {
830 spin_mutex::scoped_lock lock(my_mutex);
831 my_successors.internal_delete_built_successor(r);
832 }
833
successor_count()834 /*override*/size_t successor_count() {
835 spin_mutex::scoped_lock lock(my_mutex);
836 return my_successors.successor_count();
837 }
838
copy_successors(successor_vector_type & v)839 /*override*/void copy_successors(successor_vector_type &v) {
840 spin_mutex::scoped_lock l(my_mutex);
841 my_successors.copy_successors(v);
842 }
843 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
844
845 //! Request an item from the node
try_get(output_type & v)846 /*override */ bool try_get( output_type &v ) {
847 spin_mutex::scoped_lock lock(my_mutex);
848 if ( my_reserved )
849 return false;
850
851 if ( my_has_cached_item ) {
852 v = my_cached_item;
853 my_has_cached_item = false;
854 return true;
855 }
856 // we've been asked to provide an item, but we have none. enqueue a task to
857 // provide one.
858 spawn_put();
859 return false;
860 }
861
862 //! Reserves an item.
try_reserve(output_type & v)863 /* override */ bool try_reserve( output_type &v ) {
864 spin_mutex::scoped_lock lock(my_mutex);
865 if ( my_reserved ) {
866 return false;
867 }
868
869 if ( my_has_cached_item ) {
870 v = my_cached_item;
871 my_reserved = true;
872 return true;
873 } else {
874 return false;
875 }
876 }
877
878 //! Release a reserved item.
879 /** true = item has been released and so remains in sender, dest must request or reserve future items */
try_release()880 /* override */ bool try_release( ) {
881 spin_mutex::scoped_lock lock(my_mutex);
882 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
883 my_reserved = false;
884 if(!my_successors.empty())
885 spawn_put();
886 return true;
887 }
888
889 //! Consumes a reserved item
try_consume()890 /* override */ bool try_consume( ) {
891 spin_mutex::scoped_lock lock(my_mutex);
892 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
893 my_reserved = false;
894 my_has_cached_item = false;
895 if ( !my_successors.empty() ) {
896 spawn_put();
897 }
898 return true;
899 }
900
901 //! Activates a node that was created in the inactive state
activate()902 void activate() {
903 spin_mutex::scoped_lock lock(my_mutex);
904 my_active = true;
905 if ( !my_successors.empty() )
906 spawn_put();
907 }
908
909 template<typename Body>
copy_function_object()910 Body copy_function_object() {
911 internal::source_body<output_type> &body_ref = *this->my_body;
912 return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
913 }
914
915 protected:
916
917 //! resets the source_node to its initial state
reset(__TBB_PFG_RESET_ARG (reset_flags f))918 void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
919 my_active = init_my_active;
920 my_reserved =false;
921 if(my_has_cached_item) {
922 my_has_cached_item = false;
923 }
924 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
925 my_successors.reset(f);
926 if(f & rf_reset_bodies) my_body->reset_body();
927 #endif
928 }
929
930 private:
931 spin_mutex my_mutex;
932 bool my_active;
933 bool init_my_active;
934 internal::source_body<output_type> *my_body;
935 internal::broadcast_cache< output_type > my_successors;
936 bool my_reserved;
937 bool my_has_cached_item;
938 output_type my_cached_item;
939
940 // used by apply_body, can invoke body of node.
try_reserve_apply_body(output_type & v)941 bool try_reserve_apply_body(output_type &v) {
942 spin_mutex::scoped_lock lock(my_mutex);
943 if ( my_reserved ) {
944 return false;
945 }
946 if ( !my_has_cached_item ) {
947 tbb::internal::fgt_begin_body( my_body );
948 bool r = (*my_body)(my_cached_item);
949 tbb::internal::fgt_end_body( my_body );
950 if (r) {
951 my_has_cached_item = true;
952 }
953 }
954 if ( my_has_cached_item ) {
955 v = my_cached_item;
956 my_reserved = true;
957 return true;
958 } else {
959 return false;
960 }
961 }
962
963 //! Spawns a task that applies the body
spawn_put()964 /* override */ void spawn_put( ) {
965 task* tp = this->my_graph.root_task();
966 if(tp) {
967 FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
968 internal:: source_task_bypass < source_node< output_type > >( *this ) ) );
969 }
970 }
971
972 friend class internal::source_task_bypass< source_node< output_type > >;
973 //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
apply_body_bypass()974 /* override */ task * apply_body_bypass( ) {
975 output_type v;
976 if ( !try_reserve_apply_body(v) )
977 return NULL;
978
979 task *last_task = my_successors.try_put_task(v);
980 if ( last_task )
981 try_consume();
982 else
983 try_release();
984 return last_task;
985 }
986 }; // source_node
987
988 //! Implements a function node that supports Input -> Output
989 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
990 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
991 protected:
992 using graph_node::my_graph;
993 public:
994 typedef Input input_type;
995 typedef Output output_type;
996 typedef sender< input_type > predecessor_type;
997 typedef receiver< output_type > successor_type;
998 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
999 typedef internal::function_output<output_type> fOutput_type;
1000 #if TBB_PREVIEW_FLOW_GRAPH_FEAURES
1001 typedef std::vector<predecessor_type *> predecessor_vector_type;
1002 typedef std::vector<successor_type *> successor_vector_type;
1003 #endif
1004
1005 //! Constructor
1006 template< typename Body >
function_node(graph & g,size_t concurrency,Body body)1007 function_node( graph &g, size_t concurrency, Body body ) :
1008 graph_node(g), internal::function_input<input_type,output_type,Allocator>(g, concurrency, body) {
1009 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1010 static_cast<sender<output_type> *>(this), this->my_body );
1011 }
1012
1013 //! Copy constructor
function_node(const function_node & src)1014 function_node( const function_node& src ) :
1015 graph_node(src.my_graph), internal::function_input<input_type,output_type,Allocator>( src ),
1016 fOutput_type() {
1017 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph, static_cast<receiver<input_type> *>(this),
1018 static_cast<sender<output_type> *>(this), this->my_body );
1019 }
1020
1021 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1022 /* override */ void set_name( const char *name ) {
1023 tbb::internal::fgt_node_desc( this, name );
1024 }
1025 #endif
1026
1027 protected:
1028 template< typename R, typename B > friend class run_and_put_task;
1029 template<typename X, typename Y> friend class internal::broadcast_cache;
1030 template<typename X, typename Y> friend class internal::round_robin_cache;
1031 using fInput_type::try_put_task;
1032
1033 // override of graph_node's reset.
reset(__TBB_PFG_RESET_ARG (reset_flags f))1034 /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1035 fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f));
1036 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1037 successors().reset(f);
1038 __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_node successors not empty");
1039 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1040 #endif
1041 }
1042
successors()1043 /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1044 };
1045
1046 //! Implements a function node that supports Input -> Output
1047 template < typename Input, typename Output, typename Allocator >
1048 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
1049 protected:
1050 using graph_node::my_graph;
1051 public:
1052 typedef Input input_type;
1053 typedef Output output_type;
1054 typedef sender< input_type > predecessor_type;
1055 typedef receiver< output_type > successor_type;
1056 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
1057 typedef internal::function_input_queue<input_type, Allocator> queue_type;
1058 typedef internal::function_output<output_type> fOutput_type;
1059 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1060 typedef std::vector<predecessor_type *> predecessor_vector_type;
1061 typedef std::vector<successor_type *> successor_vector_type;
1062 #endif
1063
1064 //! Constructor
1065 template< typename Body >
function_node(graph & g,size_t concurrency,Body body)1066 function_node( graph &g, size_t concurrency, Body body ) :
1067 graph_node(g), fInput_type( g, concurrency, body, new queue_type() ) {
1068 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1069 static_cast<sender<output_type> *>(this), this->my_body );
1070 }
1071
1072 //! Copy constructor
function_node(const function_node & src)1073 function_node( const function_node& src ) :
1074 graph_node(src.graph_node::my_graph), fInput_type( src, new queue_type() ), fOutput_type() {
1075 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1076 static_cast<sender<output_type> *>(this), this->my_body );
1077 }
1078
1079 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1080 /* override */ void set_name( const char *name ) {
1081 tbb::internal::fgt_node_desc( this, name );
1082 }
1083 #endif
1084
1085 protected:
1086 template< typename R, typename B > friend class run_and_put_task;
1087 template<typename X, typename Y> friend class internal::broadcast_cache;
1088 template<typename X, typename Y> friend class internal::round_robin_cache;
1089 using fInput_type::try_put_task;
1090
reset(__TBB_PFG_RESET_ARG (reset_flags f))1091 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
1092 fInput_type::reset_function_input(__TBB_PFG_RESET_ARG(f));
1093 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1094 successors().reset(f);
1095 __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "function_node successors not empty");
1096 __TBB_ASSERT(!(f & rf_extract) || this->my_predecessors.empty(), "function_node predecessors not empty");
1097 #endif
1098
1099 }
1100
successors()1101 /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1102 };
1103
1104 //! implements a function node that supports Input -> (set of outputs)
1105 // Output is a tuple of output types.
1106 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1107 class multifunction_node :
1108 public graph_node,
1109 public internal::multifunction_input
1110 <
1111 Input,
1112 typename internal::wrap_tuple_elements<
1113 tbb::flow::tuple_size<Output>::value, // #elements in tuple
1114 internal::multifunction_output, // wrap this around each element
1115 Output // the tuple providing the types
1116 >::type,
1117 Allocator
1118 > {
1119 protected:
1120 using graph_node::my_graph;
1121 private:
1122 static const int N = tbb::flow::tuple_size<Output>::value;
1123 public:
1124 typedef Input input_type;
1125 typedef typename internal::wrap_tuple_elements<N,internal::multifunction_output, Output>::type output_ports_type;
1126 private:
1127 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
1128 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
1129 public:
1130 template<typename Body>
multifunction_node(graph & g,size_t concurrency,Body body)1131 multifunction_node( graph &g, size_t concurrency, Body body ) :
1132 graph_node(g), base_type(g,concurrency, body) {
1133 tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1134 &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1135 this->output_ports(), this->my_body );
1136 }
1137
multifunction_node(const multifunction_node & other)1138 multifunction_node( const multifunction_node &other) :
1139 graph_node(other.graph_node::my_graph), base_type(other) {
1140 tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1141 &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1142 this->output_ports(), this->my_body );
1143 }
1144
1145 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1146 /* override */ void set_name( const char *name ) {
1147 tbb::internal::fgt_multioutput_node_desc( this, name );
1148 }
1149 #endif
1150
1151 // all the guts are in multifunction_input...
1152 protected:
reset(__TBB_PFG_RESET_ARG (reset_flags f))1153 /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type::reset(__TBB_PFG_RESET_ARG(f)); }
1154 }; // multifunction_node
1155
1156 template < typename Input, typename Output, typename Allocator >
1157 class multifunction_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multifunction_input<Input,
1158 typename internal::wrap_tuple_elements<tbb::flow::tuple_size<Output>::value, internal::multifunction_output, Output>::type, Allocator> {
1159 protected:
1160 using graph_node::my_graph;
1161 static const int N = tbb::flow::tuple_size<Output>::value;
1162 public:
1163 typedef Input input_type;
1164 typedef typename internal::wrap_tuple_elements<N, internal::multifunction_output, Output>::type output_ports_type;
1165 private:
1166 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
1167 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
1168 public:
1169 template<typename Body>
multifunction_node(graph & g,size_t concurrency,Body body)1170 multifunction_node( graph &g, size_t concurrency, Body body) :
1171 graph_node(g), base_type(g,concurrency, body, new queue_type()) {
1172 tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1173 &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1174 this->output_ports(), this->my_body );
1175 }
1176
multifunction_node(const multifunction_node & other)1177 multifunction_node( const multifunction_node &other) :
1178 graph_node(other.graph_node::my_graph), base_type(other, new queue_type()) {
1179 tbb::internal::fgt_multioutput_node_with_body<Output,N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1180 &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1181 this->output_ports(), this->my_body );
1182 }
1183
1184 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1185 /* override */ void set_name( const char *name ) {
1186 tbb::internal::fgt_multioutput_node_desc( this, name );
1187 }
1188 #endif
1189
1190 // all the guts are in multifunction_input...
1191 protected:
reset(__TBB_PFG_RESET_ARG (reset_flags f))1192 /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) { base_type::reset(__TBB_PFG_RESET_ARG(f)); }
1193 }; // multifunction_node
1194
1195 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
1196 // successors. The node has unlimited concurrency, so though it is marked as
1197 // "rejecting" it does not reject inputs.
1198 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1199 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
1200 static const int N = tbb::flow::tuple_size<TupleType>::value;
1201 typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
1202 public:
1203 typedef typename base_type::output_ports_type output_ports_type;
1204 private:
1205 struct splitting_body {
operatorsplitting_body1206 void operator()(const TupleType& t, output_ports_type &p) {
1207 internal::emit_element<N>::emit_this(t, p);
1208 }
1209 };
1210 public:
1211 typedef TupleType input_type;
1212 typedef Allocator allocator_type;
split_node(graph & g)1213 split_node(graph &g) : base_type(g, unlimited, splitting_body()) {
1214 tbb::internal::fgt_multioutput_node<TupleType,N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1215 static_cast<receiver<input_type> *>(this), this->output_ports() );
1216 }
1217
split_node(const split_node & other)1218 split_node( const split_node & other) : base_type(other) {
1219 tbb::internal::fgt_multioutput_node<TupleType,N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1220 static_cast<receiver<input_type> *>(this), this->output_ports() );
1221 }
1222
1223 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1224 /* override */ void set_name( const char *name ) {
1225 tbb::internal::fgt_multioutput_node_desc( this, name );
1226 }
1227 #endif
1228
1229 };
1230
1231 //! Implements an executable node that supports continue_msg -> Output
1232 template <typename Output>
1233 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
1234 protected:
1235 using graph_node::my_graph;
1236 public:
1237 typedef continue_msg input_type;
1238 typedef Output output_type;
1239 typedef sender< input_type > predecessor_type;
1240 typedef receiver< output_type > successor_type;
1241 typedef internal::continue_input<Output> fInput_type;
1242 typedef internal::function_output<output_type> fOutput_type;
1243
1244 //! Constructor for executable node with continue_msg -> Output
1245 template <typename Body >
continue_node(graph & g,Body body)1246 continue_node( graph &g, Body body ) :
1247 graph_node(g), internal::continue_input<output_type>( g, body ) {
1248 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1249 static_cast<receiver<input_type> *>(this),
1250 static_cast<sender<output_type> *>(this), this->my_body );
1251 }
1252
1253
1254 //! Constructor for executable node with continue_msg -> Output
1255 template <typename Body >
continue_node(graph & g,int number_of_predecessors,Body body)1256 continue_node( graph &g, int number_of_predecessors, Body body ) :
1257 graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body ) {
1258 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1259 static_cast<receiver<input_type> *>(this),
1260 static_cast<sender<output_type> *>(this), this->my_body );
1261 }
1262
1263 //! Copy constructor
continue_node(const continue_node & src)1264 continue_node( const continue_node& src ) :
1265 graph_node(src.graph_node::my_graph), internal::continue_input<output_type>(src),
1266 internal::function_output<Output>() {
1267 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1268 static_cast<receiver<input_type> *>(this),
1269 static_cast<sender<output_type> *>(this), this->my_body );
1270 }
1271
1272 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1273 /* override */ void set_name( const char *name ) {
1274 tbb::internal::fgt_node_desc( this, name );
1275 }
1276 #endif
1277
1278 protected:
1279 template< typename R, typename B > friend class run_and_put_task;
1280 template<typename X, typename Y> friend class internal::broadcast_cache;
1281 template<typename X, typename Y> friend class internal::round_robin_cache;
1282 using fInput_type::try_put_task;
1283
reset(__TBB_PFG_RESET_ARG (reset_flags f))1284 /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1285 fInput_type::reset_receiver(__TBB_PFG_RESET_ARG(f));
1286 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1287 successors().reset(f);
1288 __TBB_ASSERT(!(f & rf_extract) || successors().empty(), "continue_node not reset");
1289 #endif
1290 }
1291
successors()1292 /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1293 }; // continue_node
1294
1295 template< typename T >
1296 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
1297 protected:
1298 using graph_node::my_graph;
1299 public:
1300 typedef T input_type;
1301 typedef T output_type;
1302 typedef sender< input_type > predecessor_type;
1303 typedef receiver< output_type > successor_type;
1304 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1305 typedef std::vector<predecessor_type *> predecessor_vector_type;
1306 typedef std::vector<successor_type *> successor_vector_type;
1307 #endif
1308
overwrite_node(graph & g)1309 overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
1310 my_successors.set_owner( this );
1311 tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1312 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1313 }
1314
1315 // Copy constructor; doesn't take anything from src; default won't work
overwrite_node(const overwrite_node & src)1316 overwrite_node( const overwrite_node& src ) :
1317 graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
1318 {
1319 my_successors.set_owner( this );
1320 tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1321 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1322 }
1323
~overwrite_node()1324 ~overwrite_node() {}
1325
1326 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1327 /* override */ void set_name( const char *name ) {
1328 tbb::internal::fgt_node_desc( this, name );
1329 }
1330 #endif
1331
register_successor(successor_type & s)1332 /* override */ bool register_successor( successor_type &s ) {
1333 spin_mutex::scoped_lock l( my_mutex );
1334 task* tp = this->my_graph.root_task(); // just to test if we are resetting
1335 if (my_buffer_is_valid && tp) {
1336 // We have a valid value that must be forwarded immediately.
1337 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
1338 // We add the successor: it accepted our put or it rejected it but won't let us become a predecessor
1339 my_successors.register_successor( s );
1340 } else {
1341 // We don't add the successor: it rejected our put and we became its predecessor instead
1342 return false;
1343 }
1344 } else {
1345 // No valid value yet, just add as successor
1346 my_successors.register_successor( s );
1347 }
1348 return true;
1349 }
1350
remove_successor(successor_type & s)1351 /* override */ bool remove_successor( successor_type &s ) {
1352 spin_mutex::scoped_lock l( my_mutex );
1353 my_successors.remove_successor(s);
1354 return true;
1355 }
1356
1357 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_successor(successor_type & s)1358 /*override*/void internal_add_built_successor( successor_type &s) {
1359 spin_mutex::scoped_lock l( my_mutex );
1360 my_successors.internal_add_built_successor(s);
1361 }
1362
internal_delete_built_successor(successor_type & s)1363 /*override*/void internal_delete_built_successor( successor_type &s) {
1364 spin_mutex::scoped_lock l( my_mutex );
1365 my_successors.internal_delete_built_successor(s);
1366 }
1367
successor_count()1368 /*override*/size_t successor_count() {
1369 spin_mutex::scoped_lock l( my_mutex );
1370 return my_successors.successor_count();
1371 }
1372
copy_successors(successor_vector_type & v)1373 /*override*/ void copy_successors(successor_vector_type &v) {
1374 spin_mutex::scoped_lock l( my_mutex );
1375 my_successors.copy_successors(v);
1376 }
1377
internal_add_built_predecessor(predecessor_type & p)1378 /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1379 spin_mutex::scoped_lock l( my_mutex );
1380 my_built_predecessors.add_edge(p);
1381 }
1382
internal_delete_built_predecessor(predecessor_type & p)1383 /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1384 spin_mutex::scoped_lock l( my_mutex );
1385 my_built_predecessors.delete_edge(p);
1386 }
1387
predecessor_count()1388 /*override*/size_t predecessor_count() {
1389 spin_mutex::scoped_lock l( my_mutex );
1390 return my_built_predecessors.edge_count();
1391 }
1392
copy_predecessors(predecessor_vector_type & v)1393 /*override*/void copy_predecessors(predecessor_vector_type &v) {
1394 spin_mutex::scoped_lock l( my_mutex );
1395 my_built_predecessors.copy_edges(v);
1396 }
1397 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1398
try_get(input_type & v)1399 /* override */ bool try_get( input_type &v ) {
1400 spin_mutex::scoped_lock l( my_mutex );
1401 if ( my_buffer_is_valid ) {
1402 v = my_buffer;
1403 return true;
1404 }
1405 return false;
1406 }
1407
is_valid()1408 bool is_valid() {
1409 spin_mutex::scoped_lock l( my_mutex );
1410 return my_buffer_is_valid;
1411 }
1412
clear()1413 void clear() {
1414 spin_mutex::scoped_lock l( my_mutex );
1415 my_buffer_is_valid = false;
1416 }
1417
1418 protected:
1419 template< typename R, typename B > friend class run_and_put_task;
1420 template<typename X, typename Y> friend class internal::broadcast_cache;
1421 template<typename X, typename Y> friend class internal::round_robin_cache;
try_put_task(const input_type & v)1422 /* override */ task * try_put_task( const input_type &v ) {
1423 spin_mutex::scoped_lock l( my_mutex );
1424 my_buffer = v;
1425 my_buffer_is_valid = true;
1426 task * rtask = my_successors.try_put_task(v);
1427 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
1428 return rtask;
1429 }
1430
reset(__TBB_PFG_RESET_ARG (reset_flags f))1431 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
1432 my_buffer_is_valid = false;
1433 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1434 my_successors.reset(f);
1435 if (f&rf_extract) {
1436 my_built_predecessors.receiver_extract(*this);
1437 }
1438 #endif
1439 }
1440
1441 spin_mutex my_mutex;
1442 internal::broadcast_cache< input_type, null_rw_mutex > my_successors;
1443 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1444 edge_container<sender<input_type> > my_built_predecessors;
1445 #endif
1446 input_type my_buffer;
1447 bool my_buffer_is_valid;
reset_receiver(__TBB_PFG_RESET_ARG (reset_flags))1448 /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {}
1449 }; // overwrite_node
1450
1451 template< typename T >
1452 class write_once_node : public overwrite_node<T> {
1453 public:
1454 typedef T input_type;
1455 typedef T output_type;
1456 typedef sender< input_type > predecessor_type;
1457 typedef receiver< output_type > successor_type;
1458
1459 //! Constructor
write_once_node(graph & g)1460 write_once_node(graph& g) : overwrite_node<T>(g) {
1461 tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1462 static_cast<receiver<input_type> *>(this),
1463 static_cast<sender<output_type> *>(this) );
1464 }
1465
1466 //! Copy constructor: call base class copy constructor
write_once_node(const write_once_node & src)1467 write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {
1468 tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1469 static_cast<receiver<input_type> *>(this),
1470 static_cast<sender<output_type> *>(this) );
1471 }
1472
1473 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1474 /* override */ void set_name( const char *name ) {
1475 tbb::internal::fgt_node_desc( this, name );
1476 }
1477 #endif
1478
1479 protected:
1480 template< typename R, typename B > friend class run_and_put_task;
1481 template<typename X, typename Y> friend class internal::broadcast_cache;
1482 template<typename X, typename Y> friend class internal::round_robin_cache;
try_put_task(const T & v)1483 /* override */ task *try_put_task( const T &v ) {
1484 spin_mutex::scoped_lock l( this->my_mutex );
1485 if ( this->my_buffer_is_valid ) {
1486 return NULL;
1487 } else {
1488 this->my_buffer = v;
1489 this->my_buffer_is_valid = true;
1490 task *res = this->my_successors.try_put_task(v);
1491 if(!res) res = SUCCESSFULLY_ENQUEUED;
1492 return res;
1493 }
1494 }
1495 };
1496
1497 //! Forwards messages of type T to all successors
1498 template <typename T>
1499 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1500 protected:
1501 using graph_node::my_graph;
1502 public:
1503 typedef T input_type;
1504 typedef T output_type;
1505 typedef sender< input_type > predecessor_type;
1506 typedef receiver< output_type > successor_type;
1507 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1508 typedef std::vector<predecessor_type *> predecessor_vector_type;
1509 typedef std::vector<successor_type *> successor_vector_type;
1510 #endif
1511 private:
1512 internal::broadcast_cache<input_type> my_successors;
1513 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1514 edge_container<predecessor_type> my_built_predecessors;
1515 spin_mutex pred_mutex;
1516 #endif
1517 public:
1518
broadcast_node(graph & g)1519 broadcast_node(graph& g) : graph_node(g) {
1520 my_successors.set_owner( this );
1521 tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1522 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1523 }
1524
1525 // Copy constructor
broadcast_node(const broadcast_node & src)1526 broadcast_node( const broadcast_node& src ) :
1527 graph_node(src.my_graph), receiver<T>(), sender<T>()
1528 {
1529 my_successors.set_owner( this );
1530 tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1531 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1532 }
1533
1534 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1535 /* override */ void set_name( const char *name ) {
1536 tbb::internal::fgt_node_desc( this, name );
1537 }
1538 #endif
1539
1540 //! Adds a successor
register_successor(receiver<T> & r)1541 virtual bool register_successor( receiver<T> &r ) {
1542 my_successors.register_successor( r );
1543 return true;
1544 }
1545
1546 //! Removes s as a successor
remove_successor(receiver<T> & r)1547 virtual bool remove_successor( receiver<T> &r ) {
1548 my_successors.remove_successor( r );
1549 return true;
1550 }
1551
1552 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_successor(successor_type & r)1553 /*override*/ void internal_add_built_successor(successor_type &r) {
1554 my_successors.internal_add_built_successor(r);
1555 }
1556
internal_delete_built_successor(successor_type & r)1557 /*override*/ void internal_delete_built_successor(successor_type &r) {
1558 my_successors.internal_delete_built_successor(r);
1559 }
1560
successor_count()1561 /*override*/ size_t successor_count() {
1562 return my_successors.successor_count();
1563 }
1564
copy_successors(successor_vector_type & v)1565 /*override*/ void copy_successors(successor_vector_type &v) {
1566 my_successors.copy_successors(v);
1567 }
1568
internal_add_built_predecessor(predecessor_type & p)1569 /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1570 my_built_predecessors.add_edge(p);
1571 }
1572
internal_delete_built_predecessor(predecessor_type & p)1573 /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1574 my_built_predecessors.delete_edge(p);
1575 }
1576
predecessor_count()1577 /*override*/ size_t predecessor_count() {
1578 return my_built_predecessors.edge_count();
1579 }
1580
copy_predecessors(predecessor_vector_type & v)1581 /*override*/ void copy_predecessors(predecessor_vector_type &v) {
1582 my_built_predecessors.copy_edges(v);
1583 }
1584 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1585
1586 protected:
1587 template< typename R, typename B > friend class run_and_put_task;
1588 template<typename X, typename Y> friend class internal::broadcast_cache;
1589 template<typename X, typename Y> friend class internal::round_robin_cache;
1590 //! build a task to run the successor if possible. Default is old behavior.
try_put_task(const T & t)1591 /*override*/ task *try_put_task(const T& t) {
1592 task *new_task = my_successors.try_put_task(t);
1593 if(!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1594 return new_task;
1595 }
1596
reset(__TBB_PFG_RESET_ARG (reset_flags f))1597 /*override*/void reset(__TBB_PFG_RESET_ARG(reset_flags f)) {
1598 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1599 my_successors.reset(f);
1600 if (f&rf_extract) {
1601 my_built_predecessors.receiver_extract(*this);
1602 }
1603 __TBB_ASSERT(!(f & rf_extract) || my_successors.empty(), "Error resetting broadcast_node");
1604 #endif
1605 }
reset_receiver(__TBB_PFG_RESET_ARG (reset_flags))1606 /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) {}
1607 }; // broadcast_node
1608
1609 //! Forwards messages in arbitrary order
1610 template <typename T, typename A=cache_aligned_allocator<T> >
1611 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1612 protected:
1613 using graph_node::my_graph;
1614 public:
1615 typedef T input_type;
1616 typedef T output_type;
1617 typedef sender< input_type > predecessor_type;
1618 typedef receiver< output_type > successor_type;
1619 typedef buffer_node<T, A> my_class;
1620 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1621 typedef std::vector<predecessor_type *> predecessor_vector_type;
1622 typedef std::vector<successor_type *> successor_vector_type;
1623 #endif
1624 protected:
1625 typedef size_t size_type;
1626 internal::round_robin_cache< T, null_rw_mutex > my_successors;
1627
1628 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1629 edge_container<predecessor_type> my_built_predecessors;
1630 #endif
1631
1632 friend class internal::forward_task_bypass< buffer_node< T, A > >;
1633
1634 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1635 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1636 , add_blt_succ, del_blt_succ,
1637 add_blt_pred, del_blt_pred,
1638 blt_succ_cnt, blt_pred_cnt,
1639 blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
1640 #endif
1641 };
1642 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1643
1644 // implements the aggregator_operation concept
1645 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1646 public:
1647 char type;
1648 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1649 task * ltask;
1650 union {
1651 input_type *elem;
1652 successor_type *r;
1653 predecessor_type *p;
1654 size_t cnt_val;
1655 successor_vector_type *svec;
1656 predecessor_vector_type *pvec;
1657 };
1658 #else
1659 T *elem;
1660 task * ltask;
1661 successor_type *r;
1662 #endif
buffer_operation(const T & e,op_type t)1663 buffer_operation(const T& e, op_type t) : type(char(t))
1664
1665 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1666 , ltask(NULL), elem(const_cast<T*>(&e))
1667 #else
1668 , elem(const_cast<T*>(&e)) , ltask(NULL)
1669 #endif
1670 {}
buffer_operation(op_type t)1671 buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1672 };
1673
1674 bool forwarder_busy;
1675 typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
1676 friend class internal::aggregating_functor<my_class, buffer_operation>;
1677 internal::aggregator< my_handler, buffer_operation> my_aggregator;
1678
handle_operations(buffer_operation * op_list)1679 virtual void handle_operations(buffer_operation *op_list) {
1680 buffer_operation *tmp = NULL;
1681 bool try_forwarding=false;
1682 while (op_list) {
1683 tmp = op_list;
1684 op_list = op_list->next;
1685 switch (tmp->type) {
1686 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1687 case rem_succ: internal_rem_succ(tmp); break;
1688 case req_item: internal_pop(tmp); break;
1689 case res_item: internal_reserve(tmp); break;
1690 case rel_res: internal_release(tmp); try_forwarding = true; break;
1691 case con_res: internal_consume(tmp); try_forwarding = true; break;
1692 case put_item: internal_push(tmp); try_forwarding = (tmp->status == SUCCEEDED); break;
1693 case try_fwd_task: internal_forward_task(tmp); break;
1694 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1695 // edge recording
1696 case add_blt_succ: internal_add_built_succ(tmp); break;
1697 case del_blt_succ: internal_del_built_succ(tmp); break;
1698 case add_blt_pred: internal_add_built_pred(tmp); break;
1699 case del_blt_pred: internal_del_built_pred(tmp); break;
1700 case blt_succ_cnt: internal_succ_cnt(tmp); break;
1701 case blt_pred_cnt: internal_pred_cnt(tmp); break;
1702 case blt_succ_cpy: internal_copy_succs(tmp); break;
1703 case blt_pred_cpy: internal_copy_preds(tmp); break;
1704 #endif
1705 }
1706 }
1707 if (try_forwarding && !forwarder_busy) {
1708 task* tp = this->my_graph.root_task();
1709 if(tp) {
1710 forwarder_busy = true;
1711 task *new_task = new(task::allocate_additional_child_of(*tp)) internal::
1712 forward_task_bypass
1713 < buffer_node<input_type, A> >(*this);
1714 // tmp should point to the last item handled by the aggregator. This is the operation
1715 // the handling thread enqueued. So modifying that record will be okay.
1716 tbb::task *z = tmp->ltask;
1717 tmp->ltask = combine_tasks(z, new_task); // in case the op generated a task
1718 }
1719 }
1720 }
1721
grab_forwarding_task(buffer_operation & op_data)1722 inline task *grab_forwarding_task( buffer_operation &op_data) {
1723 return op_data.ltask;
1724 }
1725
enqueue_forwarding_task(buffer_operation & op_data)1726 inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1727 task *ft = grab_forwarding_task(op_data);
1728 if(ft) {
1729 FLOW_SPAWN(*ft);
1730 return true;
1731 }
1732 return false;
1733 }
1734
1735 //! This is executed by an enqueued task, the "forwarder"
forward_task()1736 virtual task *forward_task() {
1737 buffer_operation op_data(try_fwd_task);
1738 task *last_task = NULL;
1739 do {
1740 op_data.status = WAIT;
1741 op_data.ltask = NULL;
1742 my_aggregator.execute(&op_data);
1743 tbb::task *xtask = op_data.ltask;
1744 last_task = combine_tasks(last_task, xtask);
1745 } while (op_data.status == SUCCEEDED);
1746 return last_task;
1747 }
1748
1749 //! Register successor
internal_reg_succ(buffer_operation * op)1750 virtual void internal_reg_succ(buffer_operation *op) {
1751 my_successors.register_successor(*(op->r));
1752 __TBB_store_with_release(op->status, SUCCEEDED);
1753 }
1754
1755 //! Remove successor
internal_rem_succ(buffer_operation * op)1756 virtual void internal_rem_succ(buffer_operation *op) {
1757 my_successors.remove_successor(*(op->r));
1758 __TBB_store_with_release(op->status, SUCCEEDED);
1759 }
1760
1761 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_succ(buffer_operation * op)1762 virtual void internal_add_built_succ(buffer_operation *op) {
1763 my_successors.internal_add_built_successor(*(op->r));
1764 __TBB_store_with_release(op->status, SUCCEEDED);
1765 }
1766
internal_del_built_succ(buffer_operation * op)1767 virtual void internal_del_built_succ(buffer_operation *op) {
1768 my_successors.internal_delete_built_successor(*(op->r));
1769 __TBB_store_with_release(op->status, SUCCEEDED);
1770 }
1771
internal_add_built_pred(buffer_operation * op)1772 virtual void internal_add_built_pred(buffer_operation *op) {
1773 my_built_predecessors.add_edge(*(op->p));
1774 __TBB_store_with_release(op->status, SUCCEEDED);
1775 }
1776
internal_del_built_pred(buffer_operation * op)1777 virtual void internal_del_built_pred(buffer_operation *op) {
1778 my_built_predecessors.delete_edge(*(op->p));
1779 __TBB_store_with_release(op->status, SUCCEEDED);
1780 }
1781
internal_succ_cnt(buffer_operation * op)1782 virtual void internal_succ_cnt(buffer_operation *op) {
1783 op->cnt_val = my_successors.successor_count();
1784 __TBB_store_with_release(op->status, SUCCEEDED);
1785 }
1786
internal_pred_cnt(buffer_operation * op)1787 virtual void internal_pred_cnt(buffer_operation *op) {
1788 op->cnt_val = my_built_predecessors.edge_count();
1789 __TBB_store_with_release(op->status, SUCCEEDED);
1790 }
1791
internal_copy_succs(buffer_operation * op)1792 virtual void internal_copy_succs(buffer_operation *op) {
1793 my_successors.copy_successors(*(op->svec));
1794 __TBB_store_with_release(op->status, SUCCEEDED);
1795 }
1796
internal_copy_preds(buffer_operation * op)1797 virtual void internal_copy_preds(buffer_operation *op) {
1798 my_built_predecessors.copy_edges(*(op->pvec));
1799 __TBB_store_with_release(op->status, SUCCEEDED);
1800 }
1801 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1802
1803 //! Tries to forward valid items to successors
internal_forward_task(buffer_operation * op)1804 virtual void internal_forward_task(buffer_operation *op) {
1805 if (this->my_reserved || !this->my_item_valid(this->my_tail-1)) {
1806 __TBB_store_with_release(op->status, FAILED);
1807 this->forwarder_busy = false;
1808 return;
1809 }
1810 T i_copy;
1811 task * last_task = NULL;
1812 size_type counter = my_successors.size();
1813 // Try forwarding, giving each successor a chance
1814 while (counter>0 && !this->buffer_empty() && this->my_item_valid(this->my_tail-1)) {
1815 this->copy_back(i_copy);
1816 task *new_task = my_successors.try_put_task(i_copy);
1817 if(new_task) {
1818 last_task = combine_tasks(last_task, new_task);
1819 this->destroy_back();
1820 }
1821 --counter;
1822 }
1823 op->ltask = last_task; // return task
1824 if (last_task && !counter) {
1825 __TBB_store_with_release(op->status, SUCCEEDED);
1826 }
1827 else {
1828 __TBB_store_with_release(op->status, FAILED);
1829 forwarder_busy = false;
1830 }
1831 }
1832
internal_push(buffer_operation * op)1833 virtual void internal_push(buffer_operation *op) {
1834 this->push_back(*(op->elem));
1835 __TBB_store_with_release(op->status, SUCCEEDED);
1836 }
1837
internal_pop(buffer_operation * op)1838 virtual void internal_pop(buffer_operation *op) {
1839 if(this->pop_back(*(op->elem))) {
1840 __TBB_store_with_release(op->status, SUCCEEDED);
1841 }
1842 else {
1843 __TBB_store_with_release(op->status, FAILED);
1844 }
1845 }
1846
internal_reserve(buffer_operation * op)1847 virtual void internal_reserve(buffer_operation *op) {
1848 if(this->reserve_front(*(op->elem))) {
1849 __TBB_store_with_release(op->status, SUCCEEDED);
1850 }
1851 else {
1852 __TBB_store_with_release(op->status, FAILED);
1853 }
1854 }
1855
internal_consume(buffer_operation * op)1856 virtual void internal_consume(buffer_operation *op) {
1857 this->consume_front();
1858 __TBB_store_with_release(op->status, SUCCEEDED);
1859 }
1860
internal_release(buffer_operation * op)1861 virtual void internal_release(buffer_operation *op) {
1862 this->release_front();
1863 __TBB_store_with_release(op->status, SUCCEEDED);
1864 }
1865
1866 public:
1867 //! Constructor
buffer_node(graph & g)1868 buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
1869 forwarder_busy(false) {
1870 my_successors.set_owner(this);
1871 my_aggregator.initialize_handler(my_handler(this));
1872 tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1873 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1874 }
1875
1876 //! Copy constructor
buffer_node(const buffer_node & src)1877 buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
1878 internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
1879 forwarder_busy = false;
1880 my_successors.set_owner(this);
1881 my_aggregator.initialize_handler(my_handler(this));
1882 tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1883 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1884 }
1885
~buffer_node()1886 virtual ~buffer_node() {}
1887
1888 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)1889 /* override */ void set_name( const char *name ) {
1890 tbb::internal::fgt_node_desc( this, name );
1891 }
1892 #endif
1893
1894 //
1895 // message sender implementation
1896 //
1897
1898 //! Adds a new successor.
1899 /** Adds successor r to the list of successors; may forward tasks. */
register_successor(successor_type & r)1900 /* override */ bool register_successor( successor_type &r ) {
1901 buffer_operation op_data(reg_succ);
1902 op_data.r = &r;
1903 my_aggregator.execute(&op_data);
1904 (void)enqueue_forwarding_task(op_data);
1905 return true;
1906 }
1907
1908 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_successor(successor_type & r)1909 /*override*/ void internal_add_built_successor( successor_type &r) {
1910 buffer_operation op_data(add_blt_succ);
1911 op_data.r = &r;
1912 my_aggregator.execute(&op_data);
1913 }
1914
internal_delete_built_successor(successor_type & r)1915 /*override*/ void internal_delete_built_successor( successor_type &r) {
1916 buffer_operation op_data(del_blt_succ);
1917 op_data.r = &r;
1918 my_aggregator.execute(&op_data);
1919 }
1920
internal_add_built_predecessor(predecessor_type & p)1921 /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1922 buffer_operation op_data(add_blt_pred);
1923 op_data.p = &p;
1924 my_aggregator.execute(&op_data);
1925 }
1926
internal_delete_built_predecessor(predecessor_type & p)1927 /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1928 buffer_operation op_data(del_blt_pred);
1929 op_data.p = &p;
1930 my_aggregator.execute(&op_data);
1931 }
1932
predecessor_count()1933 /*override*/ size_t predecessor_count() {
1934 buffer_operation op_data(blt_pred_cnt);
1935 my_aggregator.execute(&op_data);
1936 return op_data.cnt_val;
1937 }
1938
successor_count()1939 /*override*/ size_t successor_count() {
1940 buffer_operation op_data(blt_succ_cnt);
1941 my_aggregator.execute(&op_data);
1942 return op_data.cnt_val;
1943 }
1944
copy_predecessors(predecessor_vector_type & v)1945 /*override*/ void copy_predecessors( predecessor_vector_type &v ) {
1946 buffer_operation op_data(blt_pred_cpy);
1947 op_data.pvec = &v;
1948 my_aggregator.execute(&op_data);
1949 }
1950
copy_successors(successor_vector_type & v)1951 /*override*/ void copy_successors( successor_vector_type &v ) {
1952 buffer_operation op_data(blt_succ_cpy);
1953 op_data.svec = &v;
1954 my_aggregator.execute(&op_data);
1955 }
1956 #endif
1957
1958 //! Removes a successor.
1959 /** Removes successor r from the list of successors.
1960 It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
remove_successor(successor_type & r)1961 /* override */ bool remove_successor( successor_type &r ) {
1962 r.remove_predecessor(*this);
1963 buffer_operation op_data(rem_succ);
1964 op_data.r = &r;
1965 my_aggregator.execute(&op_data);
1966 // even though this operation does not cause a forward, if we are the handler, and
1967 // a forward is scheduled, we may be the first to reach this point after the aggregator,
1968 // and so should check for the task.
1969 (void)enqueue_forwarding_task(op_data);
1970 return true;
1971 }
1972
1973 //! Request an item from the buffer_node
1974 /** true = v contains the returned item<BR>
1975 false = no item has been returned */
try_get(T & v)1976 /* override */ bool try_get( T &v ) {
1977 buffer_operation op_data(req_item);
1978 op_data.elem = &v;
1979 my_aggregator.execute(&op_data);
1980 (void)enqueue_forwarding_task(op_data);
1981 return (op_data.status==SUCCEEDED);
1982 }
1983
1984 //! Reserves an item.
1985 /** false = no item can be reserved<BR>
1986 true = an item is reserved */
try_reserve(T & v)1987 /* override */ bool try_reserve( T &v ) {
1988 buffer_operation op_data(res_item);
1989 op_data.elem = &v;
1990 my_aggregator.execute(&op_data);
1991 (void)enqueue_forwarding_task(op_data);
1992 return (op_data.status==SUCCEEDED);
1993 }
1994
1995 //! Release a reserved item.
1996 /** true = item has been released and so remains in sender */
try_release()1997 /* override */ bool try_release() {
1998 buffer_operation op_data(rel_res);
1999 my_aggregator.execute(&op_data);
2000 (void)enqueue_forwarding_task(op_data);
2001 return true;
2002 }
2003
2004 //! Consumes a reserved item.
2005 /** true = item is removed from sender and reservation removed */
try_consume()2006 /* override */ bool try_consume() {
2007 buffer_operation op_data(con_res);
2008 my_aggregator.execute(&op_data);
2009 (void)enqueue_forwarding_task(op_data);
2010 return true;
2011 }
2012
2013 protected:
2014
2015 template< typename R, typename B > friend class run_and_put_task;
2016 template<typename X, typename Y> friend class internal::broadcast_cache;
2017 template<typename X, typename Y> friend class internal::round_robin_cache;
2018 //! receive an item, return a task *if possible
try_put_task(const T & t)2019 /* override */ task *try_put_task(const T &t) {
2020 buffer_operation op_data(t, put_item);
2021 my_aggregator.execute(&op_data);
2022 task *ft = grab_forwarding_task(op_data);
2023 // sequencer_nodes can return failure (if an item has been previously inserted)
2024 // We have to spawn the returned task if our own operation fails.
2025
2026 if(ft && op_data.status == FAILED) {
2027 // we haven't succeeded queueing the item, but for some reason the
2028 // call returned a task (if another request resulted in a successful
2029 // forward this could happen.) Queue the task and reset the pointer.
2030 FLOW_SPAWN(*ft); ft = NULL;
2031 }
2032 else if(!ft && op_data.status == SUCCEEDED) {
2033 ft = SUCCESSFULLY_ENQUEUED;
2034 }
2035 return ft;
2036 }
2037
reset(__TBB_PFG_RESET_ARG (reset_flags f))2038 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2039 internal::reservable_item_buffer<T, A>::reset();
2040 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2041 my_successors.reset(f);
2042 if (f&rf_extract) {
2043 my_built_predecessors.receiver_extract(*this);
2044 }
2045 #endif
2046 forwarder_busy = false;
2047 }
2048
reset_receiver(__TBB_PFG_RESET_ARG (reset_flags))2049 /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags /*f*/)) { }
2050
2051 }; // buffer_node
2052
2053 //! Forwards messages in FIFO order
2054 template <typename T, typename A=cache_aligned_allocator<T> >
2055 class queue_node : public buffer_node<T, A> {
2056 protected:
2057 typedef buffer_node<T, A> base_type;
2058 typedef typename base_type::size_type size_type;
2059 typedef typename base_type::buffer_operation queue_operation;
2060
2061 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2062
internal_forward_task(queue_operation * op)2063 /* override */ void internal_forward_task(queue_operation *op) {
2064 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2065 __TBB_store_with_release(op->status, FAILED);
2066 this->forwarder_busy = false;
2067 return;
2068 }
2069 T i_copy;
2070 task *last_task = NULL;
2071 size_type counter = this->my_successors.size();
2072 // Keep trying to send items while there is at least one accepting successor
2073 while (counter>0 && this->my_item_valid(this->my_head)) {
2074 this->copy_front(i_copy);
2075 task *new_task = this->my_successors.try_put_task(i_copy);
2076 if(new_task) {
2077 this->destroy_front();
2078 last_task = combine_tasks(last_task, new_task);
2079 }
2080 --counter;
2081 }
2082 op->ltask = last_task;
2083 if (last_task && !counter)
2084 __TBB_store_with_release(op->status, SUCCEEDED);
2085 else {
2086 __TBB_store_with_release(op->status, FAILED);
2087 this->forwarder_busy = false;
2088 }
2089 }
2090
internal_pop(queue_operation * op)2091 /* override */ void internal_pop(queue_operation *op) {
2092 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2093 __TBB_store_with_release(op->status, FAILED);
2094 }
2095 else {
2096 this->pop_front(*(op->elem));
2097 __TBB_store_with_release(op->status, SUCCEEDED);
2098 }
2099 }
internal_reserve(queue_operation * op)2100 /* override */ void internal_reserve(queue_operation *op) {
2101 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2102 __TBB_store_with_release(op->status, FAILED);
2103 }
2104 else {
2105 this->reserve_front(*(op->elem));
2106 __TBB_store_with_release(op->status, SUCCEEDED);
2107 }
2108 }
internal_consume(queue_operation * op)2109 /* override */ void internal_consume(queue_operation *op) {
2110 this->consume_front();
2111 __TBB_store_with_release(op->status, SUCCEEDED);
2112 }
2113
2114 public:
2115 typedef T input_type;
2116 typedef T output_type;
2117 typedef sender< input_type > predecessor_type;
2118 typedef receiver< output_type > successor_type;
2119
2120 //! Constructor
queue_node(graph & g)2121 queue_node( graph &g ) : base_type(g) {
2122 tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2123 static_cast<receiver<input_type> *>(this),
2124 static_cast<sender<output_type> *>(this) );
2125 }
2126
2127 //! Copy constructor
queue_node(const queue_node & src)2128 queue_node( const queue_node& src) : base_type(src) {
2129 tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2130 static_cast<receiver<input_type> *>(this),
2131 static_cast<sender<output_type> *>(this) );
2132 }
2133
2134 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2135 /* override */ void set_name( const char *name ) {
2136 tbb::internal::fgt_node_desc( this, name );
2137 }
2138 #endif
2139
reset(__TBB_PFG_RESET_ARG (reset_flags f))2140 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2141 base_type::reset(__TBB_PFG_RESET_ARG(f));
2142 }
2143 }; // queue_node
2144
2145 //! Forwards messages in sequence order
2146 template< typename T, typename A=cache_aligned_allocator<T> >
2147 class sequencer_node : public queue_node<T, A> {
2148 internal::function_body< T, size_t > *my_sequencer;
2149 // my_sequencer should be a benign function and must be callable
2150 // from a parallel context. Does this mean it needn't be reset?
2151 public:
2152 typedef T input_type;
2153 typedef T output_type;
2154 typedef sender< input_type > predecessor_type;
2155 typedef receiver< output_type > successor_type;
2156
2157 //! Constructor
2158 template< typename Sequencer >
sequencer_node(graph & g,const Sequencer & s)2159 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2160 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2161 tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2162 static_cast<receiver<input_type> *>(this),
2163 static_cast<sender<output_type> *>(this) );
2164 }
2165
2166 //! Copy constructor
sequencer_node(const sequencer_node & src)2167 sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2168 my_sequencer( src.my_sequencer->clone() ) {
2169 tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2170 static_cast<receiver<input_type> *>(this),
2171 static_cast<sender<output_type> *>(this) );
2172 }
2173
2174 //! Destructor
~sequencer_node()2175 ~sequencer_node() { delete my_sequencer; }
2176
2177 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2178 /* override */ void set_name( const char *name ) {
2179 tbb::internal::fgt_node_desc( this, name );
2180 }
2181 #endif
2182
2183 protected:
2184 typedef typename buffer_node<T, A>::size_type size_type;
2185 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
2186
2187 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2188
2189 private:
internal_push(sequencer_operation * op)2190 /* override */ void internal_push(sequencer_operation *op) {
2191 size_type tag = (*my_sequencer)(*(op->elem));
2192 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2193 if(tag < this->my_head) {
2194 // have already emitted a message with this tag
2195 __TBB_store_with_release(op->status, FAILED);
2196 return;
2197 }
2198 #endif
2199 // cannot modify this->my_tail now; the buffer would be inconsistent.
2200 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2201
2202 if(this->size(new_tail) > this->capacity()) {
2203 this->grow_my_array(this->size(new_tail));
2204 }
2205 this->my_tail = new_tail;
2206 if(this->place_item(tag,*(op->elem))) {
2207 __TBB_store_with_release(op->status, SUCCEEDED);
2208 }
2209 else {
2210 // already have a message with this tag
2211 __TBB_store_with_release(op->status, FAILED);
2212 }
2213 }
2214 }; // sequencer_node
2215
2216 //! Forwards messages in priority order
2217 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2218 class priority_queue_node : public buffer_node<T, A> {
2219 public:
2220 typedef T input_type;
2221 typedef T output_type;
2222 typedef buffer_node<T,A> base_type;
2223 typedef sender< input_type > predecessor_type;
2224 typedef receiver< output_type > successor_type;
2225
2226 //! Constructor
priority_queue_node(graph & g)2227 priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {
2228 tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2229 static_cast<receiver<input_type> *>(this),
2230 static_cast<sender<output_type> *>(this) );
2231 }
2232
2233 //! Copy constructor
priority_queue_node(const priority_queue_node & src)2234 priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2235 tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2236 static_cast<receiver<input_type> *>(this),
2237 static_cast<sender<output_type> *>(this) );
2238 }
2239
2240 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2241 /* override */ void set_name( const char *name ) {
2242 tbb::internal::fgt_node_desc( this, name );
2243 }
2244 #endif
2245
2246
2247 protected:
2248
reset(__TBB_PFG_RESET_ARG (reset_flags f))2249 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2250 mark = 0;
2251 base_type::reset(__TBB_PFG_RESET_ARG(f));
2252 }
2253
2254 typedef typename buffer_node<T, A>::size_type size_type;
2255 typedef typename buffer_node<T, A>::item_type item_type;
2256 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
2257
2258 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2259
handle_operations(prio_operation * op_list)2260 /* override */ void handle_operations(prio_operation *op_list) {
2261 prio_operation *tmp = op_list /*, *pop_list*/ ;
2262 bool try_forwarding=false;
2263 while (op_list) {
2264 tmp = op_list;
2265 op_list = op_list->next;
2266 switch (tmp->type) {
2267 case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
2268 case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
2269 case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
2270 case buffer_node<T, A>::try_fwd_task: internal_forward_task(tmp); break;
2271 case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
2272 case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
2273 case buffer_node<T, A>::req_item: internal_pop(tmp); break;
2274 case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
2275 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2276 case buffer_node<T, A>::add_blt_succ: this->internal_add_built_succ(tmp); break;
2277 case buffer_node<T, A>::del_blt_succ: this->internal_del_built_succ(tmp); break;
2278 case buffer_node<T, A>::add_blt_pred: this->internal_add_built_pred(tmp); break;
2279 case buffer_node<T, A>::del_blt_pred: this->internal_del_built_pred(tmp); break;
2280 case buffer_node<T, A>::blt_succ_cnt: this->internal_succ_cnt(tmp); break;
2281 case buffer_node<T, A>::blt_pred_cnt: this->internal_pred_cnt(tmp); break;
2282 case buffer_node<T, A>::blt_succ_cpy: this->internal_copy_succs(tmp); break;
2283 case buffer_node<T, A>::blt_pred_cpy: this->internal_copy_preds(tmp); break;
2284 #endif
2285 }
2286 }
2287 // process pops! for now, no special pop processing
2288 if (mark<this->my_tail) heapify();
2289 if (try_forwarding && !this->forwarder_busy) {
2290 task* tp = this->my_graph.root_task();
2291 if(tp) {
2292 this->forwarder_busy = true;
2293 task *new_task = new(task::allocate_additional_child_of(*tp)) internal::
2294 forward_task_bypass
2295 < buffer_node<input_type, A> >(*this);
2296 // tmp should point to the last item handled by the aggregator. This is the operation
2297 // the handling thread enqueued. So modifying that record will be okay.
2298 tbb::task *tmp1 = tmp->ltask;
2299 tmp->ltask = combine_tasks(tmp1, new_task);
2300 }
2301 }
2302 }
2303
2304 //! Tries to forward valid items to successors
internal_forward_task(prio_operation * op)2305 /* override */ void internal_forward_task(prio_operation *op) {
2306 T i_copy;
2307 task * last_task = NULL; // flagged when a successor accepts
2308 size_type counter = this->my_successors.size();
2309
2310 if (this->my_reserved || this->my_tail == 0) {
2311 __TBB_store_with_release(op->status, FAILED);
2312 this->forwarder_busy = false;
2313 return;
2314 }
2315 // Keep trying to send while there exists an accepting successor
2316 while (counter>0 && this->my_tail > 0) {
2317 i_copy = this->get_my_item(0);
2318 task * new_task = this->my_successors.try_put_task(i_copy);
2319 if ( new_task ) {
2320 last_task = combine_tasks(last_task, new_task);
2321 this->destroy_item(0); // we've forwarded this item
2322 if (mark == this->my_tail) --mark;
2323 if(--(this->my_tail)) { // didn't consume last item on heap
2324 this->move_item(0,this->my_tail);
2325 }
2326 if (this->my_tail > 1) // don't reheap for heap of size 1
2327 reheap();
2328 }
2329 --counter;
2330 }
2331 op->ltask = last_task;
2332 if (last_task && !counter)
2333 __TBB_store_with_release(op->status, SUCCEEDED);
2334 else {
2335 __TBB_store_with_release(op->status, FAILED);
2336 this->forwarder_busy = false;
2337 }
2338 }
2339
internal_push(prio_operation * op)2340 /* override */ void internal_push(prio_operation *op) {
2341 if ( this->my_tail >= this->my_array_size )
2342 this->grow_my_array( this->my_tail + 1 );
2343 (void) this->place_item(this->my_tail, *(op->elem));
2344 ++(this->my_tail);
2345 __TBB_store_with_release(op->status, SUCCEEDED);
2346 }
2347
internal_pop(prio_operation * op)2348 /* override */ void internal_pop(prio_operation *op) {
2349 // if empty or already reserved, don't pop
2350 if ( this->my_reserved == true || this->my_tail == 0 ) {
2351 __TBB_store_with_release(op->status, FAILED);
2352 return;
2353 }
2354 if (mark<this->my_tail && // item pushed, no re-heap
2355 compare(this->get_my_item(0),
2356 this->get_my_item(this->my_tail-1))) {
2357 // there are newly pushed elems; last one higher than top
2358 // copy the data
2359 this->fetch_item(this->my_tail-1, *(op->elem));
2360 __TBB_store_with_release(op->status, SUCCEEDED);
2361 --(this->my_tail);
2362 return;
2363 }
2364 // extract and push the last element down heap
2365 *(op->elem) = this->get_my_item(0); // copy the data, item 0 still valid
2366 __TBB_store_with_release(op->status, SUCCEEDED);
2367 if (mark == this->my_tail) --mark;
2368 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2369 if(--(this->my_tail)) {
2370 // there were two or more items in heap. Move the
2371 // last item to the top of the heap
2372 this->set_my_item(0,this->get_my_item(this->my_tail));
2373 }
2374 this->destroy_item(this->my_tail);
2375 if (this->my_tail > 1) // don't reheap for heap of size 1
2376 reheap();
2377 }
2378
internal_reserve(prio_operation * op)2379 /* override */ void internal_reserve(prio_operation *op) {
2380 if (this->my_reserved == true || this->my_tail == 0) {
2381 __TBB_store_with_release(op->status, FAILED);
2382 return;
2383 }
2384 this->my_reserved = true;
2385 *(op->elem) = reserved_item = this->get_my_item(0);
2386 if (mark == this->my_tail) --mark;
2387 --(this->my_tail);
2388 __TBB_store_with_release(op->status, SUCCEEDED);
2389 this->set_my_item(0, this->get_my_item(this->my_tail));
2390 this->destroy_item(this->my_tail);
2391 if (this->my_tail > 1)
2392 reheap();
2393 }
2394
internal_consume(prio_operation * op)2395 /* override */ void internal_consume(prio_operation *op) {
2396 this->my_reserved = false;
2397 __TBB_store_with_release(op->status, SUCCEEDED);
2398 }
internal_release(prio_operation * op)2399 /* override */ void internal_release(prio_operation *op) {
2400 if (this->my_tail >= this->my_array_size)
2401 this->grow_my_array( this->my_tail + 1 );
2402 this->set_my_item(this->my_tail, reserved_item);
2403 ++(this->my_tail);
2404 this->my_reserved = false;
2405 __TBB_store_with_release(op->status, SUCCEEDED);
2406 heapify();
2407 }
2408 private:
2409 Compare compare;
2410 size_type mark;
2411 input_type reserved_item;
2412
2413 // turn array into heap
heapify()2414 void heapify() {
2415 if (!mark) mark = 1;
2416 for (; mark<this->my_tail; ++mark) { // for each unheaped element
2417 size_type cur_pos = mark;
2418 input_type to_place;
2419 this->fetch_item(mark,to_place);
2420 do { // push to_place up the heap
2421 size_type parent = (cur_pos-1)>>1;
2422 if (!compare(this->get_my_item(parent), to_place))
2423 break;
2424 this->move_item(cur_pos, parent);
2425 cur_pos = parent;
2426 } while( cur_pos );
2427 (void) this->place_item(cur_pos, to_place);
2428 }
2429 }
2430
2431 // otherwise heapified array with new root element; rearrange to heap
reheap()2432 void reheap() {
2433 size_type cur_pos=0, child=1;
2434 while (child < mark) {
2435 size_type target = child;
2436 if (child+1<mark &&
2437 compare(this->get_my_item(child),
2438 this->get_my_item(child+1)))
2439 ++target;
2440 // target now has the higher priority child
2441 if (compare(this->get_my_item(target),
2442 this->get_my_item(cur_pos)))
2443 break;
2444 // swap
2445 this->swap_items(cur_pos, target);
2446 cur_pos = target;
2447 child = (cur_pos<<1)+1;
2448 }
2449 }
2450 }; // priority_queue_node
2451
2452 //! Forwards messages only if the threshold has not been reached
2453 /** This node forwards items until its threshold is reached.
2454 It contains no buffering. If the downstream node rejects, the
2455 message is dropped. */
2456 template< typename T >
2457 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2458 protected:
2459 using graph_node::my_graph;
2460 public:
2461 typedef T input_type;
2462 typedef T output_type;
2463 typedef sender< input_type > predecessor_type;
2464 typedef receiver< output_type > successor_type;
2465 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2466 typedef std::vector<successor_type *> successor_vector_type;
2467 typedef std::vector<predecessor_type *> predecessor_vector_type;
2468 #endif
2469
2470 private:
2471 size_t my_threshold;
2472 size_t my_count; //number of successful puts
2473 size_t my_tries; //number of active put attempts
2474 internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors;
2475 spin_mutex my_mutex;
2476 internal::broadcast_cache< T > my_successors;
2477 int init_decrement_predecessors;
2478
2479 friend class internal::forward_task_bypass< limiter_node<T> >;
2480
2481 // Let decrementer call decrement_counter()
2482 friend class internal::decrementer< limiter_node<T> >;
2483
check_conditions()2484 bool check_conditions() { // always called under lock
2485 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2486 }
2487
2488 // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
forward_task()2489 task *forward_task() {
2490 input_type v;
2491 task *rval = NULL;
2492 bool reserved = false;
2493 {
2494 spin_mutex::scoped_lock lock(my_mutex);
2495 if ( check_conditions() )
2496 ++my_tries;
2497 else
2498 return NULL;
2499 }
2500
2501 //SUCCESS
2502 // if we can reserve and can put, we consume the reservation
2503 // we increment the count and decrement the tries
2504 if ( (my_predecessors.try_reserve(v)) == true ){
2505 reserved=true;
2506 if ( (rval = my_successors.try_put_task(v)) != NULL ){
2507 {
2508 spin_mutex::scoped_lock lock(my_mutex);
2509 ++my_count;
2510 --my_tries;
2511 my_predecessors.try_consume();
2512 if ( check_conditions() ) {
2513 task* tp = this->my_graph.root_task();
2514 if ( tp ) {
2515 task *rtask = new ( task::allocate_additional_child_of( *tp ) )
2516 internal::forward_task_bypass< limiter_node<T> >( *this );
2517 FLOW_SPAWN (*rtask);
2518 }
2519 }
2520 }
2521 return rval;
2522 }
2523 }
2524 //FAILURE
2525 //if we can't reserve, we decrement the tries
2526 //if we can reserve but can't put, we decrement the tries and release the reservation
2527 {
2528 spin_mutex::scoped_lock lock(my_mutex);
2529 --my_tries;
2530 if (reserved) my_predecessors.try_release();
2531 if ( check_conditions() ) {
2532 task* tp = this->my_graph.root_task();
2533 if ( tp ) {
2534 task *rtask = new ( task::allocate_additional_child_of( *tp ) )
2535 internal::forward_task_bypass< limiter_node<T> >( *this );
2536 __TBB_ASSERT(!rval, "Have two tasks to handle");
2537 return rtask;
2538 }
2539 }
2540 return rval;
2541 }
2542 }
2543
forward()2544 void forward() {
2545 __TBB_ASSERT(false, "Should never be called");
2546 return;
2547 }
2548
decrement_counter()2549 task * decrement_counter() {
2550 {
2551 spin_mutex::scoped_lock lock(my_mutex);
2552 if(my_count) --my_count;
2553 }
2554 return forward_task();
2555 }
2556
2557 public:
2558 //! The internal receiver< continue_msg > that decrements the count
2559 internal::decrementer< limiter_node<T> > decrement;
2560
2561 //! Constructor
2562 limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
graph_node(g)2563 graph_node(g), my_threshold(threshold), my_count(0), my_tries(0),
2564 init_decrement_predecessors(num_decrement_predecessors),
2565 decrement(num_decrement_predecessors)
2566 {
2567 my_predecessors.set_owner(this);
2568 my_successors.set_owner(this);
2569 decrement.set_owner(this);
2570 tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2571 static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2572 static_cast<sender<output_type> *>(this) );
2573 }
2574
2575 //! Copy constructor
limiter_node(const limiter_node & src)2576 limiter_node( const limiter_node& src ) :
2577 graph_node(src.my_graph), receiver<T>(), sender<T>(),
2578 my_threshold(src.my_threshold), my_count(0), my_tries(0),
2579 init_decrement_predecessors(src.init_decrement_predecessors),
2580 decrement(src.init_decrement_predecessors)
2581 {
2582 my_predecessors.set_owner(this);
2583 my_successors.set_owner(this);
2584 decrement.set_owner(this);
2585 tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2586 static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2587 static_cast<sender<output_type> *>(this) );
2588 }
2589
2590 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2591 /* override */ void set_name( const char *name ) {
2592 tbb::internal::fgt_node_desc( this, name );
2593 }
2594 #endif
2595
2596 //! Replace the current successor with this new successor
register_successor(receiver<output_type> & r)2597 /* override */ bool register_successor( receiver<output_type> &r ) {
2598 spin_mutex::scoped_lock lock(my_mutex);
2599 bool was_empty = my_successors.empty();
2600 my_successors.register_successor(r);
2601 //spawn a forward task if this is the only successor
2602 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2603 task* tp = this->my_graph.root_task();
2604 if ( tp ) {
2605 FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
2606 internal::forward_task_bypass < limiter_node<T> >( *this ) ) );
2607 }
2608 }
2609 return true;
2610 }
2611
2612 //! Removes a successor from this node
2613 /** r.remove_predecessor(*this) is also called. */
remove_successor(receiver<output_type> & r)2614 /* override */ bool remove_successor( receiver<output_type> &r ) {
2615 r.remove_predecessor(*this);
2616 my_successors.remove_successor(r);
2617 return true;
2618 }
2619
2620 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
internal_add_built_successor(receiver<output_type> & src)2621 /*override*/void internal_add_built_successor(receiver<output_type> &src) {
2622 my_successors.internal_add_built_successor(src);
2623 }
2624
internal_delete_built_successor(receiver<output_type> & src)2625 /*override*/void internal_delete_built_successor(receiver<output_type> &src) {
2626 my_successors.internal_delete_built_successor(src);
2627 }
2628
successor_count()2629 /*override*/size_t successor_count() { return my_successors.successor_count(); }
2630
copy_successors(successor_vector_type & v)2631 /*override*/ void copy_successors(successor_vector_type &v) {
2632 my_successors.copy_successors(v);
2633 }
2634
internal_add_built_predecessor(sender<output_type> & src)2635 /*override*/void internal_add_built_predecessor(sender<output_type> &src) {
2636 my_predecessors.internal_add_built_predecessor(src);
2637 }
2638
internal_delete_built_predecessor(sender<output_type> & src)2639 /*override*/void internal_delete_built_predecessor(sender<output_type> &src) {
2640 my_predecessors.internal_delete_built_predecessor(src);
2641 }
2642
predecessor_count()2643 /*override*/size_t predecessor_count() { return my_predecessors.predecessor_count(); }
2644
copy_predecessors(predecessor_vector_type & v)2645 /*override*/ void copy_predecessors(predecessor_vector_type &v) {
2646 my_predecessors.copy_predecessors(v);
2647 }
2648 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
2649
2650 //! Adds src to the list of cached predecessors.
register_predecessor(predecessor_type & src)2651 /* override */ bool register_predecessor( predecessor_type &src ) {
2652 spin_mutex::scoped_lock lock(my_mutex);
2653 my_predecessors.add( src );
2654 task* tp = this->my_graph.root_task();
2655 if ( my_count + my_tries < my_threshold && !my_successors.empty() && tp ) {
2656 FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *tp ) )
2657 internal::forward_task_bypass < limiter_node<T> >( *this ) ) );
2658 }
2659 return true;
2660 }
2661
2662 //! Removes src from the list of cached predecessors.
remove_predecessor(predecessor_type & src)2663 /* override */ bool remove_predecessor( predecessor_type &src ) {
2664 my_predecessors.remove( src );
2665 return true;
2666 }
2667
2668 protected:
2669
2670 template< typename R, typename B > friend class run_and_put_task;
2671 template<typename X, typename Y> friend class internal::broadcast_cache;
2672 template<typename X, typename Y> friend class internal::round_robin_cache;
2673 //! Puts an item to this receiver
try_put_task(const T & t)2674 /* override */ task *try_put_task( const T &t ) {
2675 {
2676 spin_mutex::scoped_lock lock(my_mutex);
2677 if ( my_count + my_tries >= my_threshold )
2678 return NULL;
2679 else
2680 ++my_tries;
2681 }
2682
2683 task * rtask = my_successors.try_put_task(t);
2684
2685 if ( !rtask ) { // try_put_task failed.
2686 spin_mutex::scoped_lock lock(my_mutex);
2687 --my_tries;
2688 task* tp = this->my_graph.root_task();
2689 if ( check_conditions() && tp ) {
2690 rtask = new ( task::allocate_additional_child_of( *tp ) )
2691 internal::forward_task_bypass< limiter_node<T> >( *this );
2692 }
2693 }
2694 else {
2695 spin_mutex::scoped_lock lock(my_mutex);
2696 ++my_count;
2697 --my_tries;
2698 }
2699 return rtask;
2700 }
2701
reset(__TBB_PFG_RESET_ARG (reset_flags f))2702 /*override*/void reset( __TBB_PFG_RESET_ARG(reset_flags f)) {
2703 my_count = 0;
2704 my_predecessors.reset(__TBB_PFG_RESET_ARG(f));
2705 decrement.reset_receiver(__TBB_PFG_RESET_ARG(f));
2706 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2707 my_successors.reset(f);
2708 #endif
2709 }
2710
reset_receiver(__TBB_PFG_RESET_ARG (reset_flags f))2711 /*override*/void reset_receiver(__TBB_PFG_RESET_ARG(reset_flags f)) { my_predecessors.reset(__TBB_PFG_RESET_ARG(f)); }
2712 }; // limiter_node
2713
2714 #include "internal/_flow_graph_join_impl.h"
2715
2716 using internal::reserving_port;
2717 using internal::queueing_port;
2718 using internal::tag_matching_port;
2719 using internal::input_port;
2720 using internal::tag_value;
2721 using internal::NO_TAG;
2722
2723 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
2724
2725 template<typename OutputTuple>
2726 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2727 private:
2728 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2729 typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2730 public:
2731 typedef OutputTuple output_type;
2732 typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)2733 join_node(graph &g) : unfolded_type(g) {
2734 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2735 this->input_ports(), static_cast< sender< output_type > *>(this) );
2736 }
join_node(const join_node & other)2737 join_node(const join_node &other) : unfolded_type(other) {
2738 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2739 this->input_ports(), static_cast< sender< output_type > *>(this) );
2740 }
2741
2742 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2743 /* override */ void set_name( const char *name ) {
2744 tbb::internal::fgt_node_desc( this, name );
2745 }
2746 #endif
2747
2748 };
2749
2750 template<typename OutputTuple>
2751 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2752 private:
2753 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2754 typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2755 public:
2756 typedef OutputTuple output_type;
2757 typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)2758 join_node(graph &g) : unfolded_type(g) {
2759 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2760 this->input_ports(), static_cast< sender< output_type > *>(this) );
2761 }
join_node(const join_node & other)2762 join_node(const join_node &other) : unfolded_type(other) {
2763 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2764 this->input_ports(), static_cast< sender< output_type > *>(this) );
2765 }
2766
2767 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2768 /* override */ void set_name( const char *name ) {
2769 tbb::internal::fgt_node_desc( this, name );
2770 }
2771 #endif
2772
2773 };
2774
2775 // template for tag_matching join_node
2776 template<typename OutputTuple>
2777 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2778 tag_matching_port, OutputTuple, tag_matching> {
2779 private:
2780 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2781 typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
2782 public:
2783 typedef OutputTuple output_type;
2784 typedef typename unfolded_type::input_ports_type input_ports_type;
2785
2786 template<typename __TBB_B0, typename __TBB_B1>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1)2787 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2788 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2789 this->input_ports(), static_cast< sender< output_type > *>(this) );
2790 }
2791 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2)2792 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2793 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2794 this->input_ports(), static_cast< sender< output_type > *>(this) );
2795 }
2796 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3)2797 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2798 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2799 this->input_ports(), static_cast< sender< output_type > *>(this) );
2800 }
2801 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4)2802 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2803 unfolded_type(g, b0, b1, b2, b3, b4) {
2804 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2805 this->input_ports(), static_cast< sender< output_type > *>(this) );
2806 }
2807 #if __TBB_VARIADIC_MAX >= 6
2808 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2809 typename __TBB_B5>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4,__TBB_B5 b5)2810 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2811 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2812 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2813 this->input_ports(), static_cast< sender< output_type > *>(this) );
2814 }
2815 #endif
2816 #if __TBB_VARIADIC_MAX >= 7
2817 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2818 typename __TBB_B5, typename __TBB_B6>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4,__TBB_B5 b5,__TBB_B6 b6)2819 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2820 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2821 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2822 this->input_ports(), static_cast< sender< output_type > *>(this) );
2823 }
2824 #endif
2825 #if __TBB_VARIADIC_MAX >= 8
2826 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2827 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4,__TBB_B5 b5,__TBB_B6 b6,__TBB_B7 b7)2828 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2829 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2830 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2831 this->input_ports(), static_cast< sender< output_type > *>(this) );
2832 }
2833 #endif
2834 #if __TBB_VARIADIC_MAX >= 9
2835 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2836 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4,__TBB_B5 b5,__TBB_B6 b6,__TBB_B7 b7,__TBB_B8 b8)2837 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2838 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2839 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2840 this->input_ports(), static_cast< sender< output_type > *>(this) );
2841 }
2842 #endif
2843 #if __TBB_VARIADIC_MAX >= 10
2844 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2845 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
join_node(graph & g,__TBB_B0 b0,__TBB_B1 b1,__TBB_B2 b2,__TBB_B3 b3,__TBB_B4 b4,__TBB_B5 b5,__TBB_B6 b6,__TBB_B7 b7,__TBB_B8 b8,__TBB_B9 b9)2846 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2847 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2848 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2849 this->input_ports(), static_cast< sender< output_type > *>(this) );
2850 }
2851 #endif
join_node(const join_node & other)2852 join_node(const join_node &other) : unfolded_type(other) {
2853 tbb::internal::fgt_multiinput_node<OutputTuple,N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2854 this->input_ports(), static_cast< sender< output_type > *>(this) );
2855 }
2856
2857 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2858 /* override */ void set_name( const char *name ) {
2859 tbb::internal::fgt_node_desc( this, name );
2860 }
2861 #endif
2862
2863 };
2864
2865 // indexer node
2866 #include "internal/_flow_graph_indexer_impl.h"
2867
2868 struct indexer_null_type {};
2869
2870 template<typename T0, typename T1=indexer_null_type, typename T2=indexer_null_type, typename T3=indexer_null_type,
2871 typename T4=indexer_null_type, typename T5=indexer_null_type, typename T6=indexer_null_type,
2872 typename T7=indexer_null_type, typename T8=indexer_null_type, typename T9=indexer_null_type> class indexer_node;
2873
2874 //indexer node specializations
2875 template<typename T0>
2876 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
2877 private:
2878 static const int N = 1;
2879 public:
2880 typedef tuple<T0> InputTuple;
2881 typedef typename internal::tagged_msg<size_t, T0> output_type;
2882 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2883 indexer_node(graph& g) : unfolded_type(g) {
2884 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2885 this->input_ports(), static_cast< sender< output_type > *>(this) );
2886 }
2887 // Copy constructor
indexer_node(const indexer_node & other)2888 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2889 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2890 this->input_ports(), static_cast< sender< output_type > *>(this) );
2891 }
2892
2893 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2894 void set_name( const char *name ) {
2895 tbb::internal::fgt_node_desc( this, name );
2896 }
2897 #endif
2898 };
2899
2900 template<typename T0, typename T1>
2901 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
2902 private:
2903 static const int N = 2;
2904 public:
2905 typedef tuple<T0, T1> InputTuple;
2906 typedef typename internal::tagged_msg<size_t, T0, T1> output_type;
2907 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2908 indexer_node(graph& g) : unfolded_type(g) {
2909 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2910 this->input_ports(), static_cast< sender< output_type > *>(this) );
2911 }
2912 // Copy constructor
indexer_node(const indexer_node & other)2913 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2914 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2915 this->input_ports(), static_cast< sender< output_type > *>(this) );
2916 }
2917
2918 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2919 void set_name( const char *name ) {
2920 tbb::internal::fgt_node_desc( this, name );
2921 }
2922 #endif
2923 };
2924
2925 template<typename T0, typename T1, typename T2>
2926 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
2927 private:
2928 static const int N = 3;
2929 public:
2930 typedef tuple<T0, T1, T2> InputTuple;
2931 typedef typename internal::tagged_msg<size_t, T0, T1, T2> output_type;
2932 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2933 indexer_node(graph& g) : unfolded_type(g) {
2934 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2935 this->input_ports(), static_cast< sender< output_type > *>(this) );
2936 }
2937 // Copy constructor
indexer_node(const indexer_node & other)2938 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2939 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2940 this->input_ports(), static_cast< sender< output_type > *>(this) );
2941 }
2942
2943 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2944 void set_name( const char *name ) {
2945 tbb::internal::fgt_node_desc( this, name );
2946 }
2947 #endif
2948 };
2949
2950 template<typename T0, typename T1, typename T2, typename T3>
2951 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
2952 private:
2953 static const int N = 4;
2954 public:
2955 typedef tuple<T0, T1, T2, T3> InputTuple;
2956 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3> output_type;
2957 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2958 indexer_node(graph& g) : unfolded_type(g) {
2959 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2960 this->input_ports(), static_cast< sender< output_type > *>(this) );
2961 }
2962 // Copy constructor
indexer_node(const indexer_node & other)2963 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2964 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2965 this->input_ports(), static_cast< sender< output_type > *>(this) );
2966 }
2967
2968 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2969 /* override */ void set_name( const char *name ) {
2970 tbb::internal::fgt_node_desc( this, name );
2971 }
2972 #endif
2973 };
2974
2975 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2976 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
2977 private:
2978 static const int N = 5;
2979 public:
2980 typedef tuple<T0, T1, T2, T3, T4> InputTuple;
2981 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2982 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)2983 indexer_node(graph& g) : unfolded_type(g) {
2984 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2985 this->input_ports(), static_cast< sender< output_type > *>(this) );
2986 }
2987 // Copy constructor
indexer_node(const indexer_node & other)2988 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2989 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2990 this->input_ports(), static_cast< sender< output_type > *>(this) );
2991 }
2992
2993 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)2994 /* override */ void set_name( const char *name ) {
2995 tbb::internal::fgt_node_desc( this, name );
2996 }
2997 #endif
2998 };
2999
3000 #if __TBB_VARIADIC_MAX >= 6
3001 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3002 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3003 private:
3004 static const int N = 6;
3005 public:
3006 typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3007 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
3008 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)3009 indexer_node(graph& g) : unfolded_type(g) {
3010 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3011 this->input_ports(), static_cast< sender< output_type > *>(this) );
3012 }
3013 // Copy constructor
indexer_node(const indexer_node & other)3014 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3015 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3016 this->input_ports(), static_cast< sender< output_type > *>(this) );
3017 }
3018
3019 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)3020 /* override */ void set_name( const char *name ) {
3021 tbb::internal::fgt_node_desc( this, name );
3022 }
3023 #endif
3024 };
3025 #endif //variadic max 6
3026
3027 #if __TBB_VARIADIC_MAX >= 7
3028 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3029 typename T6>
3030 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3031 private:
3032 static const int N = 7;
3033 public:
3034 typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3035 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
3036 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)3037 indexer_node(graph& g) : unfolded_type(g) {
3038 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3039 this->input_ports(), static_cast< sender< output_type > *>(this) );
3040 }
3041 // Copy constructor
indexer_node(const indexer_node & other)3042 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3043 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3044 this->input_ports(), static_cast< sender< output_type > *>(this) );
3045 }
3046
3047 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)3048 /* override */ void set_name( const char *name ) {
3049 tbb::internal::fgt_node_desc( this, name );
3050 }
3051 #endif
3052 };
3053 #endif //variadic max 7
3054
3055 #if __TBB_VARIADIC_MAX >= 8
3056 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3057 typename T6, typename T7>
3058 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3059 private:
3060 static const int N = 8;
3061 public:
3062 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3063 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
3064 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)3065 indexer_node(graph& g) : unfolded_type(g) {
3066 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3067 this->input_ports(), static_cast< sender< output_type > *>(this) );
3068 }
3069 // Copy constructor
indexer_node(const indexer_node & other)3070 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3071 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3072 this->input_ports(), static_cast< sender< output_type > *>(this) );
3073 }
3074
3075 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)3076 /* override */ void set_name( const char *name ) {
3077 tbb::internal::fgt_node_desc( this, name );
3078 }
3079 #endif
3080 };
3081 #endif //variadic max 8
3082
3083 #if __TBB_VARIADIC_MAX >= 9
3084 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3085 typename T6, typename T7, typename T8>
3086 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3087 private:
3088 static const int N = 9;
3089 public:
3090 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3091 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
3092 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)3093 indexer_node(graph& g) : unfolded_type(g) {
3094 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3095 this->input_ports(), static_cast< sender< output_type > *>(this) );
3096 }
3097 // Copy constructor
indexer_node(const indexer_node & other)3098 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3099 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3100 this->input_ports(), static_cast< sender< output_type > *>(this) );
3101 }
3102
3103 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)3104 /* override */ void set_name( const char *name ) {
3105 tbb::internal::fgt_node_desc( this, name );
3106 }
3107 #endif
3108 };
3109 #endif //variadic max 9
3110
3111 #if __TBB_VARIADIC_MAX >= 10
3112 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3113 typename T6, typename T7, typename T8, typename T9>
3114 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3115 private:
3116 static const int N = 10;
3117 public:
3118 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3119 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3120 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)3121 indexer_node(graph& g) : unfolded_type(g) {
3122 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3123 this->input_ports(), static_cast< sender< output_type > *>(this) );
3124 }
3125 // Copy constructor
indexer_node(const indexer_node & other)3126 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3127 tbb::internal::fgt_multiinput_node<InputTuple,N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3128 this->input_ports(), static_cast< sender< output_type > *>(this) );
3129 }
3130
3131 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
set_name(const char * name)3132 /* override */ void set_name( const char *name ) {
3133 tbb::internal::fgt_node_desc( this, name );
3134 }
3135 #endif
3136 };
3137 #endif //variadic max 10
3138
3139 //! Makes an edge between a single predecessor and a single successor
3140 template< typename T >
make_edge(sender<T> & p,receiver<T> & s)3141 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3142 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3143 s.internal_add_built_predecessor(p);
3144 p.internal_add_built_successor(s);
3145 #endif
3146 p.register_successor( s );
3147 tbb::internal::fgt_make_edge( &p, &s );
3148 }
3149
3150 //! Makes an edge between a single predecessor and a single successor
3151 template< typename T >
remove_edge(sender<T> & p,receiver<T> & s)3152 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3153 p.remove_successor( s );
3154 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3155 // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3156 p.internal_delete_built_successor(s);
3157 s.internal_delete_built_predecessor(p);
3158 #endif
3159 tbb::internal::fgt_remove_edge( &p, &s );
3160 }
3161
3162 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3163 template<typename C >
3164 template< typename S >
sender_extract(S & s)3165 void edge_container<C>::sender_extract( S &s ) {
3166 edge_vector e = built_edges;
3167 for ( typename edge_vector::iterator i = e.begin(); i != e.end(); ++i ) {
3168 remove_edge(s, **i);
3169 }
3170 }
3171
3172 template<typename C >
3173 template< typename R >
receiver_extract(R & r)3174 void edge_container<C>::receiver_extract( R &r ) {
3175 edge_vector e = built_edges;
3176 for ( typename edge_vector::iterator i = e.begin(); i != e.end(); ++i ) {
3177 remove_edge(**i, r);
3178 }
3179 }
3180 #endif
3181
3182 //! Returns a copy of the body from a function or continue node
3183 template< typename Body, typename Node >
copy_body(Node & n)3184 Body copy_body( Node &n ) {
3185 return n.template copy_function_object<Body>();
3186 }
3187
3188 } // interface7
3189
3190 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3191 using interface7::reset_flags;
3192 using interface7::rf_reset_protocol;
3193 using interface7::rf_reset_bodies;
3194 using interface7::rf_extract;
3195 #endif
3196
3197 using interface7::graph;
3198 using interface7::graph_node;
3199 using interface7::continue_msg;
3200 using interface7::sender;
3201 using interface7::receiver;
3202 using interface7::continue_receiver;
3203
3204 using interface7::source_node;
3205 using interface7::function_node;
3206 using interface7::multifunction_node;
3207 using interface7::split_node;
3208 using interface7::internal::output_port;
3209 using interface7::indexer_node;
3210 using interface7::internal::tagged_msg;
3211 using interface7::internal::cast_to;
3212 using interface7::internal::is_a;
3213 using interface7::continue_node;
3214 using interface7::overwrite_node;
3215 using interface7::write_once_node;
3216 using interface7::broadcast_node;
3217 using interface7::buffer_node;
3218 using interface7::queue_node;
3219 using interface7::sequencer_node;
3220 using interface7::priority_queue_node;
3221 using interface7::limiter_node;
3222 using namespace interface7::internal::graph_policy_namespace;
3223 using interface7::join_node;
3224 using interface7::input_port;
3225 using interface7::copy_body;
3226 using interface7::make_edge;
3227 using interface7::remove_edge;
3228 using interface7::internal::NO_TAG;
3229 using interface7::internal::tag_value;
3230
3231 } // flow
3232 } // tbb
3233
3234 #undef __TBB_PFG_RESET_ARG
3235 #undef __TBB_COMMA
3236
3237 #endif // __TBB_flow_graph_H
3238