1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19
20 #define __TBB_flow_graph_H_include_area
21 #include "internal/_warning_suppress_enable_notice.h"
22
23 #include "tbb_stddef.h"
24 #include "atomic.h"
25 #include "spin_mutex.h"
26 #include "null_mutex.h"
27 #include "spin_rw_mutex.h"
28 #include "null_rw_mutex.h"
29 #include "task.h"
30 #include "cache_aligned_allocator.h"
31 #include "tbb_exception.h"
32 #include "pipeline.h"
33 #include "internal/_template_helpers.h"
34 #include "internal/_aggregator_impl.h"
35 #include "tbb/internal/_allocator_traits.h"
36 #include "tbb_profiling.h"
37 #include "task_arena.h"
38
39 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
40 #if __INTEL_COMPILER
41 // Disabled warning "routine is both inline and noinline"
42 #pragma warning (push)
43 #pragma warning( disable: 2196 )
44 #endif
45 #define __TBB_NOINLINE_SYM __attribute__((noinline))
46 #else
47 #define __TBB_NOINLINE_SYM
48 #endif
49
50 #if __TBB_PREVIEW_ASYNC_MSG
51 #include <vector> // std::vector in internal::async_storage
52 #include <memory> // std::shared_ptr in async_msg
53 #endif
54
55 #if __TBB_PREVIEW_STREAMING_NODE
56 // For streaming_node
57 #include <array> // std::array
58 #include <unordered_map> // std::unordered_map
59 #include <type_traits> // std::decay, std::true_type, std::false_type
60 #endif // __TBB_PREVIEW_STREAMING_NODE
61
62 #if TBB_DEPRECATED_FLOW_ENQUEUE
63 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
64 #else
65 #define FLOW_SPAWN(a) tbb::task::spawn((a))
66 #endif
67
68 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
69 #define __TBB_DEFAULT_NODE_ALLOCATOR(T) cache_aligned_allocator<T>
70 #else
71 #define __TBB_DEFAULT_NODE_ALLOCATOR(T) null_type
72 #endif
73
74 // use the VC10 or gcc version of tuple if it is available.
75 #if __TBB_CPP11_TUPLE_PRESENT
76 #include <tuple>
77 namespace tbb {
78 namespace flow {
79 using std::tuple;
80 using std::tuple_size;
81 using std::tuple_element;
82 using std::get;
83 }
84 }
85 #else
86 #include "compat/tuple"
87 #endif
88
89 #include<list>
90 #include<queue>
91
92 /** @file
93 \brief The graph related classes and functions
94
95 There are some applications that best express dependencies as messages
96 passed between nodes in a graph. These messages may contain data or
97 simply act as signals that a predecessors has completed. The graph
98 class and its associated node classes can be used to express such
99 applications.
100 */
101
102 namespace tbb {
103 namespace flow {
104
105 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
106 enum concurrency { unlimited = 0, serial = 1 };
107
108 namespace interface11 {
109
110 //! A generic null type
111 struct null_type {};
112
113 //! An empty class used for messages that mean "I'm done"
114 class continue_msg {};
115
116 //! Forward declaration section
117 template< typename T > class sender;
118 template< typename T > class receiver;
119 class continue_receiver;
120
121 template< typename T, typename U > class limiter_node; // needed for resetting decrementer
122
123 template< typename R, typename B > class run_and_put_task;
124
125 namespace internal {
126
127 template<typename T, typename M> class successor_cache;
128 template<typename T, typename M> class broadcast_cache;
129 template<typename T, typename M> class round_robin_cache;
130 template<typename T, typename M> class predecessor_cache;
131 template<typename T, typename M> class reservable_predecessor_cache;
132
133 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
134 namespace order {
135 struct following;
136 struct preceding;
137 }
138 template<typename Order, typename... Args> struct node_set;
139 #endif
140
141 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
142 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
143 // C == receiver< ... > or sender< ... >, depending.
144 template<typename C>
145 class edge_container {
146
147 public:
148 typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
149
add_edge(C & s)150 void add_edge(C &s) {
151 built_edges.push_back(&s);
152 }
153
delete_edge(C & s)154 void delete_edge(C &s) {
155 for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
156 if (*i == &s) {
157 (void)built_edges.erase(i);
158 return; // only remove one predecessor per request
159 }
160 }
161 }
162
copy_edges(edge_list_type & v)163 void copy_edges(edge_list_type &v) {
164 v = built_edges;
165 }
166
edge_count()167 size_t edge_count() {
168 return (size_t)(built_edges.size());
169 }
170
clear()171 void clear() {
172 built_edges.clear();
173 }
174
175 // methods remove the statement from all predecessors/successors liste in the edge
176 // container.
177 template< typename S > void sender_extract(S &s);
178 template< typename R > void receiver_extract(R &r);
179
180 private:
181 edge_list_type built_edges;
182 }; // class edge_container
183 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
184
185 } // namespace internal
186
187 } // namespace interfaceX
188 } // namespace flow
189 } // namespace tbb
190
191 //! The graph class
192 #include "internal/_flow_graph_impl.h"
193
194 namespace tbb {
195 namespace flow {
196 namespace interface11 {
197
198 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
combine_tasks(graph & g,tbb::task * left,tbb::task * right)199 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
200 // if no RHS task, don't change left.
201 if (right == NULL) return left;
202 // right != NULL
203 if (left == NULL) return right;
204 if (left == SUCCESSFULLY_ENQUEUED) return right;
205 // left contains a task
206 if (right != SUCCESSFULLY_ENQUEUED) {
207 // both are valid tasks
208 internal::spawn_in_graph_arena(g, *left);
209 return right;
210 }
211 return left;
212 }
213
214 #if __TBB_PREVIEW_ASYNC_MSG
215
216 template < typename T > class __TBB_DEPRECATED async_msg;
217
218 namespace internal {
219
220 template < typename T > class async_storage;
221
222 template< typename T, typename = void >
223 struct async_helpers {
224 typedef async_msg<T> async_type;
225 typedef T filtered_type;
226
227 static const bool is_async_type = false;
228
to_void_ptrasync_helpers229 static const void* to_void_ptr(const T& t) {
230 return static_cast<const void*>(&t);
231 }
232
to_void_ptrasync_helpers233 static void* to_void_ptr(T& t) {
234 return static_cast<void*>(&t);
235 }
236
from_void_ptrasync_helpers237 static const T& from_void_ptr(const void* p) {
238 return *static_cast<const T*>(p);
239 }
240
from_void_ptrasync_helpers241 static T& from_void_ptr(void* p) {
242 return *static_cast<T*>(p);
243 }
244
try_put_task_wrapper_implasync_helpers245 static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
246 if (is_async) {
247 // This (T) is NOT async and incoming 'A<X> t' IS async
248 // Get data from async_msg
249 const async_msg<filtered_type>& msg = async_helpers< async_msg<filtered_type> >::from_void_ptr(p);
250 task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
251 // finalize() must be called after subscribe() because set() can be called in finalize()
252 // and 'this_recv' client must be subscribed by this moment
253 msg.finalize();
254 return new_task;
255 }
256 else {
257 // Incoming 't' is NOT async
258 return this_recv->try_put_task(from_void_ptr(p));
259 }
260 }
261 };
262
263 template< typename T >
264 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
265 typedef T async_type;
266 typedef typename T::async_msg_data_type filtered_type;
267
268 static const bool is_async_type = true;
269
270 // Receiver-classes use const interfaces
271 static const void* to_void_ptr(const T& t) {
272 return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
273 }
274
275 static void* to_void_ptr(T& t) {
276 return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
277 }
278
279 // Sender-classes use non-const interfaces
280 static const T& from_void_ptr(const void* p) {
281 return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
282 }
283
284 static T& from_void_ptr(void* p) {
285 return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
286 }
287
288 // Used in receiver<T> class
289 static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
290 if (is_async) {
291 // Both are async
292 return this_recv->try_put_task(from_void_ptr(p));
293 }
294 else {
295 // This (T) is async and incoming 'X t' is NOT async
296 // Create async_msg for X
297 const filtered_type& t = async_helpers<filtered_type>::from_void_ptr(p);
298 const T msg(t);
299 return this_recv->try_put_task(msg);
300 }
301 }
302 };
303
304 class untyped_receiver;
305
306 class untyped_sender {
307 template< typename, typename > friend class internal::predecessor_cache;
308 template< typename, typename > friend class internal::reservable_predecessor_cache;
309 public:
310 //! The successor type for this node
311 typedef untyped_receiver successor_type;
312
313 virtual ~untyped_sender() {}
314
315 // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
316
317 // TODO: Prevent untyped successor registration
318
319 //! Add a new successor to this node
320 virtual bool register_successor( successor_type &r ) = 0;
321
322 //! Removes a successor from this node
323 virtual bool remove_successor( successor_type &r ) = 0;
324
325 //! Releases the reserved item
326 virtual bool try_release( ) { return false; }
327
328 //! Consumes the reserved item
329 virtual bool try_consume( ) { return false; }
330
331 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
332 //! interface to record edges for traversal & deletion
333 typedef internal::edge_container<successor_type> built_successors_type;
334 typedef built_successors_type::edge_list_type successor_list_type;
335 virtual built_successors_type &built_successors() = 0;
336 virtual void internal_add_built_successor( successor_type & ) = 0;
337 virtual void internal_delete_built_successor( successor_type & ) = 0;
338 virtual void copy_successors( successor_list_type &) = 0;
339 virtual size_t successor_count() = 0;
340 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
341 protected:
342 //! Request an item from the sender
343 template< typename X >
344 bool try_get( X &t ) {
345 return try_get_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
346 }
347
348 //! Reserves an item in the sender
349 template< typename X >
350 bool try_reserve( X &t ) {
351 return try_reserve_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
352 }
353
354 virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
355 virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
356 };
357
358 class untyped_receiver {
359 template< typename, typename > friend class run_and_put_task;
360
361 template< typename, typename > friend class internal::broadcast_cache;
362 template< typename, typename > friend class internal::round_robin_cache;
363 template< typename, typename > friend class internal::successor_cache;
364
365 #if __TBB_PREVIEW_OPENCL_NODE
366 template< typename, typename > friend class proxy_dependency_receiver;
367 #endif /* __TBB_PREVIEW_OPENCL_NODE */
368 public:
369 //! The predecessor type for this node
370 typedef untyped_sender predecessor_type;
371
372 //! Destructor
373 virtual ~untyped_receiver() {}
374
375 //! Put an item to the receiver
376 template<typename X>
377 bool try_put(const X& t) {
378 task *res = try_put_task(t);
379 if (!res) return false;
380 if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
381 return true;
382 }
383
384 // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
385
386 // TODO: Prevent untyped predecessor registration
387
388 //! Add a predecessor to the node
389 virtual bool register_predecessor( predecessor_type & ) { return false; }
390
391 //! Remove a predecessor from the node
392 virtual bool remove_predecessor( predecessor_type & ) { return false; }
393
394 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
395 typedef internal::edge_container<predecessor_type> built_predecessors_type;
396 typedef built_predecessors_type::edge_list_type predecessor_list_type;
397 virtual built_predecessors_type &built_predecessors() = 0;
398 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
399 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
400 virtual void copy_predecessors( predecessor_list_type & ) = 0;
401 virtual size_t predecessor_count() = 0;
402 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
403 protected:
404 template<typename X>
405 task *try_put_task(const X& t) {
406 return try_put_task_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
407 }
408
409 virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
410
411 virtual graph& graph_reference() const = 0;
412
413 // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
414
415 //! put receiver back in initial state
416 virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
417
418 virtual bool is_continue_receiver() { return false; }
419 };
420
421 } // namespace internal
422
423 //! Pure virtual template class that defines a sender of messages of type T
424 template< typename T >
425 class sender : public internal::untyped_sender {
426 public:
427 //! The output type of this sender
428 __TBB_DEPRECATED typedef T output_type;
429
430 __TBB_DEPRECATED typedef typename internal::async_helpers<T>::filtered_type filtered_type;
431
432 //! Request an item from the sender
433 virtual bool try_get( T & ) { return false; }
434
435 //! Reserves an item in the sender
436 virtual bool try_reserve( T & ) { return false; }
437
438 protected:
439 virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
440 // Both async OR both are NOT async
441 if ( internal::async_helpers<T>::is_async_type == is_async ) {
442 return try_get( internal::async_helpers<T>::from_void_ptr(p) );
443 }
444 // Else: this (T) is async OR incoming 't' is async
445 __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
446 return false;
447 }
448
449 virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
450 // Both async OR both are NOT async
451 if ( internal::async_helpers<T>::is_async_type == is_async ) {
452 return try_reserve( internal::async_helpers<T>::from_void_ptr(p) );
453 }
454 // Else: this (T) is async OR incoming 't' is async
455 __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
456 return false;
457 }
458 }; // class sender<T>
459
460 //! Pure virtual template class that defines a receiver of messages of type T
461 template< typename T >
462 class receiver : public internal::untyped_receiver {
463 template< typename > friend class internal::async_storage;
464 template< typename, typename > friend struct internal::async_helpers;
465 public:
466 //! The input type of this receiver
467 __TBB_DEPRECATED typedef T input_type;
468
469 __TBB_DEPRECATED typedef typename internal::async_helpers<T>::filtered_type filtered_type;
470
471 //! Put an item to the receiver
472 bool try_put( const typename internal::async_helpers<T>::filtered_type& t ) {
473 return internal::untyped_receiver::try_put(t);
474 }
475
476 bool try_put( const typename internal::async_helpers<T>::async_type& t ) {
477 return internal::untyped_receiver::try_put(t);
478 }
479
480 protected:
481 virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
482 return internal::async_helpers<T>::try_put_task_wrapper_impl(this, p, is_async);
483 }
484
485 //! Put item to successor; return task to run the successor if possible.
486 virtual task *try_put_task(const T& t) = 0;
487
488 }; // class receiver<T>
489
490 #else // __TBB_PREVIEW_ASYNC_MSG
491
492 //! Pure virtual template class that defines a sender of messages of type T
493 template< typename T >
494 class sender {
495 public:
496 //! The output type of this sender
497 __TBB_DEPRECATED typedef T output_type;
498
499 //! The successor type for this node
500 __TBB_DEPRECATED typedef receiver<T> successor_type;
501
502 virtual ~sender() {}
503
504 // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
505
506 //! Add a new successor to this node
507 __TBB_DEPRECATED virtual bool register_successor( successor_type &r ) = 0;
508
509 //! Removes a successor from this node
510 __TBB_DEPRECATED virtual bool remove_successor( successor_type &r ) = 0;
511
512 //! Request an item from the sender
513 virtual bool try_get( T & ) { return false; }
514
515 //! Reserves an item in the sender
516 virtual bool try_reserve( T & ) { return false; }
517
518 //! Releases the reserved item
519 virtual bool try_release( ) { return false; }
520
521 //! Consumes the reserved item
522 virtual bool try_consume( ) { return false; }
523
524 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
525 //! interface to record edges for traversal & deletion
526 __TBB_DEPRECATED typedef typename internal::edge_container<successor_type> built_successors_type;
527 __TBB_DEPRECATED typedef typename built_successors_type::edge_list_type successor_list_type;
528 __TBB_DEPRECATED virtual built_successors_type &built_successors() = 0;
529 __TBB_DEPRECATED virtual void internal_add_built_successor( successor_type & ) = 0;
530 __TBB_DEPRECATED virtual void internal_delete_built_successor( successor_type & ) = 0;
531 __TBB_DEPRECATED virtual void copy_successors( successor_list_type &) = 0;
532 __TBB_DEPRECATED virtual size_t successor_count() = 0;
533 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
534 }; // class sender<T>
535
536 //! Pure virtual template class that defines a receiver of messages of type T
537 template< typename T >
538 class receiver {
539 public:
540 //! The input type of this receiver
541 __TBB_DEPRECATED typedef T input_type;
542
543 //! The predecessor type for this node
544 __TBB_DEPRECATED typedef sender<T> predecessor_type;
545
546 //! Destructor
547 virtual ~receiver() {}
548
549 //! Put an item to the receiver
550 bool try_put( const T& t ) {
551 task *res = try_put_task(t);
552 if (!res) return false;
553 if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
554 return true;
555 }
556
557 //! put item to successor; return task to run the successor if possible.
558 protected:
559 template< typename R, typename B > friend class run_and_put_task;
560 template< typename X, typename Y > friend class internal::broadcast_cache;
561 template< typename X, typename Y > friend class internal::round_robin_cache;
562 virtual task *try_put_task(const T& t) = 0;
563 virtual graph& graph_reference() const = 0;
564 public:
565 // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
566
567 //! Add a predecessor to the node
568 __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
569
570 //! Remove a predecessor from the node
571 __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
572
573 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
574 __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
575 __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
576 __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors() = 0;
577 __TBB_DEPRECATED virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
578 __TBB_DEPRECATED virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
579 __TBB_DEPRECATED virtual void copy_predecessors( predecessor_list_type & ) = 0;
580 __TBB_DEPRECATED virtual size_t predecessor_count() = 0;
581 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
582
583 protected:
584 //! put receiver back in initial state
585 virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
586
587 template<typename TT, typename M> friend class internal::successor_cache;
588 virtual bool is_continue_receiver() { return false; }
589
590 #if __TBB_PREVIEW_OPENCL_NODE
591 template< typename, typename > friend class proxy_dependency_receiver;
592 #endif /* __TBB_PREVIEW_OPENCL_NODE */
593 }; // class receiver<T>
594
595 #endif // __TBB_PREVIEW_ASYNC_MSG
596
597 //! Base class for receivers of completion messages
598 /** These receivers automatically reset, but cannot be explicitly waited on */
599 class continue_receiver : public receiver< continue_msg > {
600 public:
601
602 //! The input type
603 __TBB_DEPRECATED typedef continue_msg input_type;
604
605 //! The predecessor type for this node
606 __TBB_DEPRECATED typedef receiver<input_type>::predecessor_type predecessor_type;
607
608 //! Constructor
609 __TBB_DEPRECATED explicit continue_receiver(
610 __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
611 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
612 my_current_count = 0;
613 __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
614 }
615
616 //! Copy constructor
617 __TBB_DEPRECATED continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
618 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
619 my_current_count = 0;
620 __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
621 }
622
623 //! Increments the trigger threshold
624 __TBB_DEPRECATED bool register_predecessor( predecessor_type & ) __TBB_override {
625 spin_mutex::scoped_lock l(my_mutex);
626 ++my_predecessor_count;
627 return true;
628 }
629
630 //! Decrements the trigger threshold
631 /** Does not check to see if the removal of the predecessor now makes the current count
632 exceed the new threshold. So removing a predecessor while the graph is active can cause
633 unexpected results. */
634 __TBB_DEPRECATED bool remove_predecessor( predecessor_type & ) __TBB_override {
635 spin_mutex::scoped_lock l(my_mutex);
636 --my_predecessor_count;
637 return true;
638 }
639
640 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
641 __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
642 __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
643 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
644
645 __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
646 spin_mutex::scoped_lock l(my_mutex);
647 my_built_predecessors.add_edge( s );
648 }
649
650 __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
651 spin_mutex::scoped_lock l(my_mutex);
652 my_built_predecessors.delete_edge(s);
653 }
654
655 __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
656 spin_mutex::scoped_lock l(my_mutex);
657 my_built_predecessors.copy_edges(v);
658 }
659
660 __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
661 spin_mutex::scoped_lock l(my_mutex);
662 return my_built_predecessors.edge_count();
663 }
664
665 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
666
667 protected:
668 template< typename R, typename B > friend class run_and_put_task;
669 template<typename X, typename Y> friend class internal::broadcast_cache;
670 template<typename X, typename Y> friend class internal::round_robin_cache;
671 // execute body is supposed to be too small to create a task for.
672 task *try_put_task( const input_type & ) __TBB_override {
673 {
674 spin_mutex::scoped_lock l(my_mutex);
675 if ( ++my_current_count < my_predecessor_count )
676 return SUCCESSFULLY_ENQUEUED;
677 else
678 my_current_count = 0;
679 }
680 task * res = execute();
681 return res? res : SUCCESSFULLY_ENQUEUED;
682 }
683
684 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
685 // continue_receiver must contain its own built_predecessors because it does
686 // not have a node_cache.
687 built_predecessors_type my_built_predecessors;
688 #endif
689 spin_mutex my_mutex;
690 int my_predecessor_count;
691 int my_current_count;
692 int my_initial_predecessor_count;
693 __TBB_FLOW_GRAPH_PRIORITY_EXPR( node_priority_t my_priority; )
694 // the friend declaration in the base class did not eliminate the "protected class"
695 // error in gcc 4.1.2
696 template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
697
698 void reset_receiver( reset_flags f ) __TBB_override {
699 my_current_count = 0;
700 if (f & rf_clear_edges) {
701 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
702 my_built_predecessors.clear();
703 #endif
704 my_predecessor_count = my_initial_predecessor_count;
705 }
706 }
707
708 //! Does whatever should happen when the threshold is reached
709 /** This should be very fast or else spawn a task. This is
710 called while the sender is blocked in the try_put(). */
711 virtual task * execute() = 0;
712 template<typename TT, typename M> friend class internal::successor_cache;
713 bool is_continue_receiver() __TBB_override { return true; }
714
715 }; // class continue_receiver
716
717 } // interfaceX
718
719 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
720 template <typename K, typename T>
721 K key_from_message( const T &t ) {
722 return t.key();
723 }
724 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
725
726 using interface11::sender;
727 using interface11::receiver;
728 using interface11::continue_receiver;
729 } // flow
730 } // tbb
731
732 #include "internal/_flow_graph_trace_impl.h"
733 #include "internal/_tbb_hash_compare_impl.h"
734
735 namespace tbb {
736 namespace flow {
737 namespace interface11 {
738
739 #include "internal/_flow_graph_body_impl.h"
740 #include "internal/_flow_graph_cache_impl.h"
741 #include "internal/_flow_graph_types_impl.h"
742 #if __TBB_PREVIEW_ASYNC_MSG
743 #include "internal/_flow_graph_async_msg_impl.h"
744 #endif
745 using namespace internal::graph_policy_namespace;
746
747 template <typename C, typename N>
748 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
749 {
750 if (begin) current_node = my_graph->my_nodes;
751 //else it is an end iterator by default
752 }
753
754 template <typename C, typename N>
755 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
756 __TBB_ASSERT(current_node, "graph_iterator at end");
757 return *operator->();
758 }
759
760 template <typename C, typename N>
761 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
762 return current_node;
763 }
764
765 template <typename C, typename N>
766 void graph_iterator<C,N>::internal_forward() {
767 if (current_node) current_node = current_node->next;
768 }
769
770 } // namespace interfaceX
771
772 namespace interface10 {
773 //! Constructs a graph with isolated task_group_context
774 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
775 prepare_task_arena();
776 own_context = true;
777 cancelled = false;
778 caught_exception = false;
779 my_context = new task_group_context(tbb::internal::FLOW_TASKS);
780 my_root_task = (new (task::allocate_root(*my_context)) empty_task);
781 my_root_task->set_ref_count(1);
782 tbb::internal::fgt_graph(this);
783 my_is_active = true;
784 }
785
786 inline graph::graph(task_group_context& use_this_context) :
787 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
788 prepare_task_arena();
789 own_context = false;
790 cancelled = false;
791 caught_exception = false;
792 my_root_task = (new (task::allocate_root(*my_context)) empty_task);
793 my_root_task->set_ref_count(1);
794 tbb::internal::fgt_graph(this);
795 my_is_active = true;
796 }
797
798 inline graph::~graph() {
799 wait_for_all();
800 my_root_task->set_ref_count(0);
801 tbb::task::destroy(*my_root_task);
802 if (own_context) delete my_context;
803 delete my_task_arena;
804 }
805
806 inline void graph::reserve_wait() {
807 if (my_root_task) {
808 my_root_task->increment_ref_count();
809 tbb::internal::fgt_reserve_wait(this);
810 }
811 }
812
813 inline void graph::release_wait() {
814 if (my_root_task) {
815 tbb::internal::fgt_release_wait(this);
816 my_root_task->decrement_ref_count();
817 }
818 }
819
820 inline void graph::register_node(tbb::flow::interface11::graph_node *n) {
821 n->next = NULL;
822 {
823 spin_mutex::scoped_lock lock(nodelist_mutex);
824 n->prev = my_nodes_last;
825 if (my_nodes_last) my_nodes_last->next = n;
826 my_nodes_last = n;
827 if (!my_nodes) my_nodes = n;
828 }
829 }
830
831 inline void graph::remove_node(tbb::flow::interface11::graph_node *n) {
832 {
833 spin_mutex::scoped_lock lock(nodelist_mutex);
834 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
835 if (n->prev) n->prev->next = n->next;
836 if (n->next) n->next->prev = n->prev;
837 if (my_nodes_last == n) my_nodes_last = n->prev;
838 if (my_nodes == n) my_nodes = n->next;
839 }
840 n->prev = n->next = NULL;
841 }
842
843 inline void graph::reset( tbb::flow::interface11::reset_flags f ) {
844 // reset context
845 tbb::flow::interface11::internal::deactivate_graph(*this);
846
847 if(my_context) my_context->reset();
848 cancelled = false;
849 caught_exception = false;
850 // reset all the nodes comprising the graph
851 for(iterator ii = begin(); ii != end(); ++ii) {
852 tbb::flow::interface11::graph_node *my_p = &(*ii);
853 my_p->reset_node(f);
854 }
855 // Reattach the arena. Might be useful to run the graph in a particular task_arena
856 // while not limiting graph lifetime to a single task_arena::execute() call.
857 prepare_task_arena( /*reinit=*/true );
858 tbb::flow::interface11::internal::activate_graph(*this);
859 // now spawn the tasks necessary to start the graph
860 for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
861 tbb::flow::interface11::internal::spawn_in_graph_arena(*this, *(*rti));
862 }
863 my_reset_task_list.clear();
864 }
865
866 inline graph::iterator graph::begin() { return iterator(this, true); }
867
868 inline graph::iterator graph::end() { return iterator(this, false); }
869
870 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
871
872 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
873
874 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
875
876 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
877
878 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
879 inline void graph::set_name(const char *name) {
880 tbb::internal::fgt_graph_desc(this, name);
881 }
882 #endif
883
884 } // namespace interface10
885
886 namespace interface11 {
887
888 inline graph_node::graph_node(graph& g) : my_graph(g) {
889 my_graph.register_node(this);
890 }
891
892 inline graph_node::~graph_node() {
893 my_graph.remove_node(this);
894 }
895
896 #include "internal/_flow_graph_node_impl.h"
897
898 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
899 using internal::node_set;
900 #endif
901
902 //! An executable node that acts as a source, i.e. it has no predecessors
903 template < typename Output >
904 class input_node : public graph_node, public sender< Output > {
905 public:
906 //! The type of the output message, which is complete
907 typedef Output output_type;
908
909 //! The type of successors of this node
910 typedef typename sender<output_type>::successor_type successor_type;
911
912 //Source node has no input type
913 typedef null_type input_type;
914
915 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
916 typedef typename sender<output_type>::built_successors_type built_successors_type;
917 typedef typename sender<output_type>::successor_list_type successor_list_type;
918 #endif
919
920 //! Constructor for a node with a successor
921 template< typename Body >
922 __TBB_NOINLINE_SYM input_node( graph &g, Body body )
923 : graph_node(g), my_active(false),
924 my_body( new internal::input_body_leaf< output_type, Body>(body) ),
925 my_init_body( new internal::input_body_leaf< output_type, Body>(body) ),
926 my_reserved(false), my_has_cached_item(false)
927 {
928 my_successors.set_owner(this);
929 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
930 static_cast<sender<output_type> *>(this), this->my_body );
931 }
932
933 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
934 template <typename Body, typename... Successors>
935 input_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
936 : input_node(successors.graph_reference(), body) {
937 make_edges(*this, successors);
938 }
939 #endif
940
941 //! Copy constructor
942 __TBB_NOINLINE_SYM input_node( const input_node& src ) :
943 graph_node(src.my_graph), sender<Output>(),
944 my_active(false),
945 my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
946 my_reserved(false), my_has_cached_item(false)
947 {
948 my_successors.set_owner(this);
949 tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
950 static_cast<sender<output_type> *>(this), this->my_body );
951 }
952
953 //! The destructor
954 ~input_node() { delete my_body; delete my_init_body; }
955
956 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
957 void set_name( const char *name ) __TBB_override {
958 tbb::internal::fgt_node_desc( this, name );
959 }
960 #endif
961
962 //! Add a new successor to this node
963 bool register_successor( successor_type &r ) __TBB_override {
964 spin_mutex::scoped_lock lock(my_mutex);
965 my_successors.register_successor(r);
966 if ( my_active )
967 spawn_put();
968 return true;
969 }
970
971 //! Removes a successor from this node
972 bool remove_successor( successor_type &r ) __TBB_override {
973 spin_mutex::scoped_lock lock(my_mutex);
974 my_successors.remove_successor(r);
975 return true;
976 }
977
978 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
979
980 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
981
982 void internal_add_built_successor( successor_type &r) __TBB_override {
983 spin_mutex::scoped_lock lock(my_mutex);
984 my_successors.internal_add_built_successor(r);
985 }
986
987 void internal_delete_built_successor( successor_type &r) __TBB_override {
988 spin_mutex::scoped_lock lock(my_mutex);
989 my_successors.internal_delete_built_successor(r);
990 }
991
992 size_t successor_count() __TBB_override {
993 spin_mutex::scoped_lock lock(my_mutex);
994 return my_successors.successor_count();
995 }
996
997 void copy_successors(successor_list_type &v) __TBB_override {
998 spin_mutex::scoped_lock l(my_mutex);
999 my_successors.copy_successors(v);
1000 }
1001 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1002
1003 //! Request an item from the node
1004 bool try_get( output_type &v ) __TBB_override {
1005 spin_mutex::scoped_lock lock(my_mutex);
1006 if ( my_reserved )
1007 return false;
1008
1009 if ( my_has_cached_item ) {
1010 v = my_cached_item;
1011 my_has_cached_item = false;
1012 return true;
1013 }
1014 // we've been asked to provide an item, but we have none. enqueue a task to
1015 // provide one.
1016 if ( my_active )
1017 spawn_put();
1018 return false;
1019 }
1020
1021 //! Reserves an item.
1022 bool try_reserve( output_type &v ) __TBB_override {
1023 spin_mutex::scoped_lock lock(my_mutex);
1024 if ( my_reserved ) {
1025 return false;
1026 }
1027
1028 if ( my_has_cached_item ) {
1029 v = my_cached_item;
1030 my_reserved = true;
1031 return true;
1032 } else {
1033 return false;
1034 }
1035 }
1036
1037 //! Release a reserved item.
1038 /** true = item has been released and so remains in sender, dest must request or reserve future items */
1039 bool try_release( ) __TBB_override {
1040 spin_mutex::scoped_lock lock(my_mutex);
1041 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1042 my_reserved = false;
1043 if(!my_successors.empty())
1044 spawn_put();
1045 return true;
1046 }
1047
1048 //! Consumes a reserved item
1049 bool try_consume( ) __TBB_override {
1050 spin_mutex::scoped_lock lock(my_mutex);
1051 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1052 my_reserved = false;
1053 my_has_cached_item = false;
1054 if ( !my_successors.empty() ) {
1055 spawn_put();
1056 }
1057 return true;
1058 }
1059
1060 //! Activates a node that was created in the inactive state
1061 void activate() {
1062 spin_mutex::scoped_lock lock(my_mutex);
1063 my_active = true;
1064 if (!my_successors.empty())
1065 spawn_put();
1066 }
1067
1068 template<typename Body>
1069 Body copy_function_object() {
1070 internal::input_body<output_type> &body_ref = *this->my_body;
1071 return dynamic_cast< internal::input_body_leaf<output_type, Body> & >(body_ref).get_body();
1072 }
1073
1074 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1075 void extract( ) __TBB_override {
1076 my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1077 my_active = false;
1078 my_reserved = false;
1079 if(my_has_cached_item) my_has_cached_item = false;
1080 }
1081 #endif
1082
1083 protected:
1084
1085 //! resets the input_node to its initial state
1086 void reset_node( reset_flags f) __TBB_override {
1087 my_active = false;
1088 my_reserved = false;
1089 my_has_cached_item = false;
1090
1091 if(f & rf_clear_edges) my_successors.clear();
1092 if(f & rf_reset_bodies) {
1093 internal::input_body<output_type> *tmp = my_init_body->clone();
1094 delete my_body;
1095 my_body = tmp;
1096 }
1097 }
1098
1099 private:
1100 spin_mutex my_mutex;
1101 bool my_active;
1102 internal::input_body<output_type> *my_body;
1103 internal::input_body<output_type> *my_init_body;
1104 internal::broadcast_cache< output_type > my_successors;
1105 bool my_reserved;
1106 bool my_has_cached_item;
1107 output_type my_cached_item;
1108
1109 // used by apply_body_bypass, can invoke body of node.
1110 bool try_reserve_apply_body(output_type &v) {
1111 spin_mutex::scoped_lock lock(my_mutex);
1112 if ( my_reserved ) {
1113 return false;
1114 }
1115 if ( !my_has_cached_item ) {
1116 tbb::internal::fgt_begin_body( my_body );
1117
1118 #if TBB_DEPRECATED_INPUT_NODE_BODY
1119 bool r = (*my_body)(my_cached_item);
1120 if (r) {
1121 my_has_cached_item = true;
1122 }
1123 #else
1124 flow_control control;
1125 my_cached_item = (*my_body)(control);
1126 my_has_cached_item = !control.is_pipeline_stopped;
1127 #endif
1128 tbb::internal::fgt_end_body( my_body );
1129 }
1130 if ( my_has_cached_item ) {
1131 v = my_cached_item;
1132 my_reserved = true;
1133 return true;
1134 } else {
1135 return false;
1136 }
1137 }
1138
1139 task* create_put_task() {
1140 return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1141 internal:: source_task_bypass < input_node< output_type > >( *this ) );
1142 }
1143
1144 //! Spawns a task that applies the body
1145 void spawn_put( ) {
1146 if(internal::is_graph_active(this->my_graph)) {
1147 internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1148 }
1149 }
1150
1151 friend class internal::source_task_bypass< input_node< output_type > >;
1152 //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
1153 task * apply_body_bypass( ) {
1154 output_type v;
1155 if ( !try_reserve_apply_body(v) )
1156 return NULL;
1157
1158 task *last_task = my_successors.try_put_task(v);
1159 if ( last_task )
1160 try_consume();
1161 else
1162 try_release();
1163 return last_task;
1164 }
1165 }; // class input_node
1166
1167 #if TBB_USE_SOURCE_NODE_AS_ALIAS
1168 template < typename Output >
1169 class source_node : public input_node <Output> {
1170 public:
1171 //! Constructor for a node with a successor
1172 template< typename Body >
1173 __TBB_NOINLINE_SYM source_node( graph &g, Body body )
1174 : input_node<Output>(g, body)
1175 {
1176 }
1177
1178 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1179 template <typename Body, typename... Successors>
1180 source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
1181 : input_node<Output>(successors, body) {
1182 }
1183 #endif
1184 };
1185 #else // TBB_USE_SOURCE_NODE_AS_ALIAS
1186 //! An executable node that acts as a source, i.e. it has no predecessors
1187 template < typename Output > class
1188 __TBB_DEPRECATED_MSG("TBB Warning: tbb::flow::source_node is deprecated, use tbb::flow::input_node." )
1189 source_node : public graph_node, public sender< Output > {
1190 public:
1191 //! The type of the output message, which is complete
1192 typedef Output output_type;
1193
1194 //! The type of successors of this node
1195 typedef typename sender<output_type>::successor_type successor_type;
1196
1197 //Source node has no input type
1198 typedef null_type input_type;
1199
1200 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1201 typedef typename sender<output_type>::built_successors_type built_successors_type;
1202 typedef typename sender<output_type>::successor_list_type successor_list_type;
1203 #endif
1204
1205 //! Constructor for a node with a successor
1206 template< typename Body >
1207 __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
1208 : graph_node(g), my_active(is_active), init_my_active(is_active),
1209 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
1210 my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
1211 my_reserved(false), my_has_cached_item(false)
1212 {
1213 my_successors.set_owner(this);
1214 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1215 static_cast<sender<output_type> *>(this), this->my_body );
1216 }
1217
1218 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1219 template <typename Body, typename... Successors>
1220 source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
1221 : source_node(successors.graph_reference(), body, is_active) {
1222 make_edges(*this, successors);
1223 }
1224 #endif
1225
1226 //! Copy constructor
1227 __TBB_NOINLINE_SYM source_node( const source_node& src ) :
1228 graph_node(src.my_graph), sender<Output>(),
1229 my_active(src.init_my_active),
1230 init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
1231 my_reserved(false), my_has_cached_item(false)
1232 {
1233 my_successors.set_owner(this);
1234 tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1235 static_cast<sender<output_type> *>(this), this->my_body );
1236 }
1237
1238 //! The destructor
1239 ~source_node() { delete my_body; delete my_init_body; }
1240
1241 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1242 void set_name( const char *name ) __TBB_override {
1243 tbb::internal::fgt_node_desc( this, name );
1244 }
1245 #endif
1246
1247 //! Add a new successor to this node
1248 bool register_successor( successor_type &r ) __TBB_override {
1249 spin_mutex::scoped_lock lock(my_mutex);
1250 my_successors.register_successor(r);
1251 if ( my_active )
1252 spawn_put();
1253 return true;
1254 }
1255
1256 //! Removes a successor from this node
1257 bool remove_successor( successor_type &r ) __TBB_override {
1258 spin_mutex::scoped_lock lock(my_mutex);
1259 my_successors.remove_successor(r);
1260 return true;
1261 }
1262
1263 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1264
1265 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1266
1267 void internal_add_built_successor( successor_type &r) __TBB_override {
1268 spin_mutex::scoped_lock lock(my_mutex);
1269 my_successors.internal_add_built_successor(r);
1270 }
1271
1272 void internal_delete_built_successor( successor_type &r) __TBB_override {
1273 spin_mutex::scoped_lock lock(my_mutex);
1274 my_successors.internal_delete_built_successor(r);
1275 }
1276
1277 size_t successor_count() __TBB_override {
1278 spin_mutex::scoped_lock lock(my_mutex);
1279 return my_successors.successor_count();
1280 }
1281
1282 void copy_successors(successor_list_type &v) __TBB_override {
1283 spin_mutex::scoped_lock l(my_mutex);
1284 my_successors.copy_successors(v);
1285 }
1286 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1287
1288 //! Request an item from the node
1289 bool try_get( output_type &v ) __TBB_override {
1290 spin_mutex::scoped_lock lock(my_mutex);
1291 if ( my_reserved )
1292 return false;
1293
1294 if ( my_has_cached_item ) {
1295 v = my_cached_item;
1296 my_has_cached_item = false;
1297 return true;
1298 }
1299 // we've been asked to provide an item, but we have none. enqueue a task to
1300 // provide one.
1301 spawn_put();
1302 return false;
1303 }
1304
1305 //! Reserves an item.
1306 bool try_reserve( output_type &v ) __TBB_override {
1307 spin_mutex::scoped_lock lock(my_mutex);
1308 if ( my_reserved ) {
1309 return false;
1310 }
1311
1312 if ( my_has_cached_item ) {
1313 v = my_cached_item;
1314 my_reserved = true;
1315 return true;
1316 } else {
1317 return false;
1318 }
1319 }
1320
1321 //! Release a reserved item.
1322 /** true = item has been released and so remains in sender, dest must request or reserve future items */
1323 bool try_release( ) __TBB_override {
1324 spin_mutex::scoped_lock lock(my_mutex);
1325 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1326 my_reserved = false;
1327 if(!my_successors.empty())
1328 spawn_put();
1329 return true;
1330 }
1331
1332 //! Consumes a reserved item
1333 bool try_consume( ) __TBB_override {
1334 spin_mutex::scoped_lock lock(my_mutex);
1335 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1336 my_reserved = false;
1337 my_has_cached_item = false;
1338 if ( !my_successors.empty() ) {
1339 spawn_put();
1340 }
1341 return true;
1342 }
1343
1344 //! Activates a node that was created in the inactive state
1345 void activate() {
1346 spin_mutex::scoped_lock lock(my_mutex);
1347 my_active = true;
1348 if (!my_successors.empty())
1349 spawn_put();
1350 }
1351
1352 template<typename Body>
1353 Body copy_function_object() {
1354 internal::source_body<output_type> &body_ref = *this->my_body;
1355 return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1356 }
1357
1358 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1359 void extract( ) __TBB_override {
1360 my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1361 my_active = init_my_active;
1362 my_reserved = false;
1363 if(my_has_cached_item) my_has_cached_item = false;
1364 }
1365 #endif
1366
1367 protected:
1368
1369 //! resets the source_node to its initial state
1370 void reset_node( reset_flags f) __TBB_override {
1371 my_active = init_my_active;
1372 my_reserved =false;
1373 if(my_has_cached_item) {
1374 my_has_cached_item = false;
1375 }
1376 if(f & rf_clear_edges) my_successors.clear();
1377 if(f & rf_reset_bodies) {
1378 internal::source_body<output_type> *tmp = my_init_body->clone();
1379 delete my_body;
1380 my_body = tmp;
1381 }
1382 if(my_active)
1383 internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
1384 }
1385
1386 private:
1387 spin_mutex my_mutex;
1388 bool my_active;
1389 bool init_my_active;
1390 internal::source_body<output_type> *my_body;
1391 internal::source_body<output_type> *my_init_body;
1392 internal::broadcast_cache< output_type > my_successors;
1393 bool my_reserved;
1394 bool my_has_cached_item;
1395 output_type my_cached_item;
1396
1397 // used by apply_body_bypass, can invoke body of node.
1398 bool try_reserve_apply_body(output_type &v) {
1399 spin_mutex::scoped_lock lock(my_mutex);
1400 if ( my_reserved ) {
1401 return false;
1402 }
1403 if ( !my_has_cached_item ) {
1404 tbb::internal::fgt_begin_body( my_body );
1405 bool r = (*my_body)(my_cached_item);
1406 tbb::internal::fgt_end_body( my_body );
1407 if (r) {
1408 my_has_cached_item = true;
1409 }
1410 }
1411 if ( my_has_cached_item ) {
1412 v = my_cached_item;
1413 my_reserved = true;
1414 return true;
1415 } else {
1416 return false;
1417 }
1418 }
1419
1420 // when resetting, and if the source_node was created with my_active == true, then
1421 // when we reset the node we must store a task to run the node, and spawn it only
1422 // after the reset is complete and is_active() is again true. This is why we don't
1423 // test for is_active() here.
1424 task* create_put_task() {
1425 return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1426 internal:: source_task_bypass < source_node< output_type > >( *this ) );
1427 }
1428
1429 //! Spawns a task that applies the body
1430 void spawn_put( ) {
1431 if(internal::is_graph_active(this->my_graph)) {
1432 internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1433 }
1434 }
1435
1436 friend class internal::source_task_bypass< source_node< output_type > >;
1437 //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
1438 task * apply_body_bypass( ) {
1439 output_type v;
1440 if ( !try_reserve_apply_body(v) )
1441 return NULL;
1442
1443 task *last_task = my_successors.try_put_task(v);
1444 if ( last_task )
1445 try_consume();
1446 else
1447 try_release();
1448 return last_task;
1449 }
1450 }; // class source_node
1451 #endif // TBB_USE_SOURCE_NODE_AS_ALIAS
1452
1453 //! Implements a function node that supports Input -> Output
1454 template<typename Input, typename Output = continue_msg, typename Policy = queueing,
1455 typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1456 class function_node
1457 : public graph_node
1458 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1459 , public internal::function_input< Input, Output, Policy, Allocator >
1460 #else
1461 , public internal::function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
1462 #endif
1463 , public internal::function_output<Output> {
1464
1465 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1466 typedef Allocator internals_allocator;
1467 #else
1468 typedef cache_aligned_allocator<Input> internals_allocator;
1469
1470 __TBB_STATIC_ASSERT(
1471 (tbb::internal::is_same_type<Allocator, null_type>::value),
1472 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1473 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1474 );
1475 #endif
1476
1477 public:
1478 typedef Input input_type;
1479 typedef Output output_type;
1480 typedef internal::function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
1481 typedef internal::function_input_queue<input_type, internals_allocator> input_queue_type;
1482 typedef internal::function_output<output_type> fOutput_type;
1483 typedef typename input_impl_type::predecessor_type predecessor_type;
1484 typedef typename fOutput_type::successor_type successor_type;
1485 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1486 typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1487 typedef typename fOutput_type::successor_list_type successor_list_type;
1488 #endif
1489 using input_impl_type::my_predecessors;
1490
1491 //! Constructor
1492 // input_queue_type is allocated here, but destroyed in the function_input_base.
1493 // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1494 // be done in one place. This would be an interface-breaking change.
1495 template< typename Body >
1496 __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
1497 #if __TBB_CPP11_PRESENT
1498 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1499 #else
1500 __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority ))
1501 #endif
1502 : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1503 fOutput_type(g) {
1504 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1505 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1506 }
1507
1508 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1509 template <typename Body>
1510 function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
1511 : function_node(g, concurrency, body, Policy(), priority) {}
1512 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1513
1514 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1515 template <typename Body, typename... Args>
1516 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
1517 __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1518 : function_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1519 make_edges_in_order(nodes, *this);
1520 }
1521
1522 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1523 template <typename Body, typename... Args>
1524 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
1525 : function_node(nodes, concurrency, body, Policy(), priority) {}
1526 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1527 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1528
1529 //! Copy constructor
1530 __TBB_NOINLINE_SYM function_node( const function_node& src ) :
1531 graph_node(src.my_graph),
1532 input_impl_type(src),
1533 fOutput_type(src.my_graph) {
1534 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1535 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1536 }
1537
1538 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1539 void set_name( const char *name ) __TBB_override {
1540 tbb::internal::fgt_node_desc( this, name );
1541 }
1542 #endif
1543
1544 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1545 void extract( ) __TBB_override {
1546 my_predecessors.built_predecessors().receiver_extract(*this);
1547 successors().built_successors().sender_extract(*this);
1548 }
1549 #endif
1550
1551 protected:
1552 template< typename R, typename B > friend class run_and_put_task;
1553 template<typename X, typename Y> friend class internal::broadcast_cache;
1554 template<typename X, typename Y> friend class internal::round_robin_cache;
1555 using input_impl_type::try_put_task;
1556
1557 internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1558
1559 void reset_node(reset_flags f) __TBB_override {
1560 input_impl_type::reset_function_input(f);
1561 // TODO: use clear() instead.
1562 if(f & rf_clear_edges) {
1563 successors().clear();
1564 my_predecessors.clear();
1565 }
1566 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1567 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1568 }
1569
1570 }; // class function_node
1571
1572 //! implements a function node that supports Input -> (set of outputs)
1573 // Output is a tuple of output types.
1574 template<typename Input, typename Output, typename Policy = queueing,
1575 typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1576 class multifunction_node :
1577 public graph_node,
1578 public internal::multifunction_input
1579 <
1580 Input,
1581 typename internal::wrap_tuple_elements<
1582 tbb::flow::tuple_size<Output>::value, // #elements in tuple
1583 internal::multifunction_output, // wrap this around each element
1584 Output // the tuple providing the types
1585 >::type,
1586 Policy,
1587 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1588 Allocator
1589 #else
1590 cache_aligned_allocator<Input>
1591 #endif
1592 > {
1593 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1594 typedef Allocator internals_allocator;
1595 #else
1596 typedef cache_aligned_allocator<Input> internals_allocator;
1597
1598 __TBB_STATIC_ASSERT(
1599 (tbb::internal::is_same_type<Allocator, null_type>::value),
1600 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1601 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1602 );
1603 #endif
1604
1605 protected:
1606 static const int N = tbb::flow::tuple_size<Output>::value;
1607 public:
1608 typedef Input input_type;
1609 typedef null_type output_type;
1610 typedef typename internal::wrap_tuple_elements<N,internal::multifunction_output, Output>::type output_ports_type;
1611 typedef internal::multifunction_input<
1612 input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
1613 typedef internal::function_input_queue<input_type, internals_allocator> input_queue_type;
1614 private:
1615 using input_impl_type::my_predecessors;
1616 public:
1617 template<typename Body>
1618 __TBB_NOINLINE_SYM multifunction_node(
1619 graph &g, size_t concurrency,
1620 #if __TBB_CPP11_PRESENT
1621 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1622 #else
1623 __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority = tbb::flow::internal::no_priority)
1624 #endif
1625 ) : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1626 tbb::internal::fgt_multioutput_node_with_body<N>(
1627 CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1628 &this->my_graph, static_cast<receiver<input_type> *>(this),
1629 this->output_ports(), this->my_body
1630 );
1631 }
1632
1633 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1634 template <typename Body>
1635 __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
1636 : multifunction_node(g, concurrency, body, Policy(), priority) {}
1637 #endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1638
1639 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1640 template <typename Body, typename... Args>
1641 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1642 __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority))
1643 : multifunction_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1644 make_edges_in_order(nodes, *this);
1645 }
1646
1647 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1648 template <typename Body, typename... Args>
1649 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
1650 : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
1651 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1652 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1653
1654 __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
1655 graph_node(other.my_graph), input_impl_type(other) {
1656 tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1657 &this->my_graph, static_cast<receiver<input_type> *>(this),
1658 this->output_ports(), this->my_body );
1659 }
1660
1661 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1662 void set_name( const char *name ) __TBB_override {
1663 tbb::internal::fgt_multioutput_node_desc( this, name );
1664 }
1665 #endif
1666
1667 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1668 void extract( ) __TBB_override {
1669 my_predecessors.built_predecessors().receiver_extract(*this);
1670 input_impl_type::extract();
1671 }
1672 #endif
1673 // all the guts are in multifunction_input...
1674 protected:
1675 void reset_node(reset_flags f) __TBB_override { input_impl_type::reset(f); }
1676 }; // multifunction_node
1677
1678 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
1679 // successors. The node has unlimited concurrency, so it does not reject inputs.
1680 template<typename TupleType, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(TupleType)>
1681 class split_node : public graph_node, public receiver<TupleType> {
1682 static const int N = tbb::flow::tuple_size<TupleType>::value;
1683 typedef receiver<TupleType> base_type;
1684 public:
1685 typedef TupleType input_type;
1686 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1687 typedef Allocator allocator_type;
1688 #else
1689 __TBB_STATIC_ASSERT(
1690 (tbb::internal::is_same_type<Allocator, null_type>::value),
1691 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1692 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1693 );
1694 #endif
1695 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1696 typedef typename base_type::predecessor_type predecessor_type;
1697 typedef typename base_type::predecessor_list_type predecessor_list_type;
1698 typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1699 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1700 #endif
1701
1702 typedef typename internal::wrap_tuple_elements<
1703 N, // #elements in tuple
1704 internal::multifunction_output, // wrap this around each element
1705 TupleType // the tuple providing the types
1706 >::type output_ports_type;
1707
1708 __TBB_NOINLINE_SYM explicit split_node(graph &g)
1709 : graph_node(g),
1710 my_output_ports(internal::init_output_ports<output_ports_type>::call(g, my_output_ports))
1711 {
1712 tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1713 static_cast<receiver<input_type> *>(this), this->output_ports());
1714 }
1715
1716 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1717 template <typename... Args>
1718 __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1719 make_edges_in_order(nodes, *this);
1720 }
1721 #endif
1722
1723 __TBB_NOINLINE_SYM split_node(const split_node& other)
1724 : graph_node(other.my_graph), base_type(other),
1725 my_output_ports(internal::init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
1726 {
1727 tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1728 static_cast<receiver<input_type> *>(this), this->output_ports());
1729 }
1730
1731 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1732 void set_name( const char *name ) __TBB_override {
1733 tbb::internal::fgt_multioutput_node_desc( this, name );
1734 }
1735 #endif
1736
1737 output_ports_type &output_ports() { return my_output_ports; }
1738
1739 protected:
1740 task *try_put_task(const TupleType& t) __TBB_override {
1741 // Sending split messages in parallel is not justified, as overheads would prevail.
1742 // Also, we do not have successors here. So we just tell the task returned here is successful.
1743 return internal::emit_element<N>::emit_this(this->my_graph, t, output_ports());
1744 }
1745 void reset_node(reset_flags f) __TBB_override {
1746 if (f & rf_clear_edges)
1747 internal::clear_element<N>::clear_this(my_output_ports);
1748
1749 __TBB_ASSERT(!(f & rf_clear_edges) || internal::clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
1750 }
1751 void reset_receiver(reset_flags /*f*/) __TBB_override {}
1752 graph& graph_reference() const __TBB_override {
1753 return my_graph;
1754 }
1755 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1756 private: //! split_node doesn't use this "predecessors" functionality; so, we have "dummies" here;
1757 void extract() __TBB_override {}
1758
1759 //! Adds to list of predecessors added by make_edge
1760 void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1761
1762 //! removes from to list of predecessors (used by remove_edge)
1763 void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1764
1765 size_t predecessor_count() __TBB_override { return 0; }
1766
1767 void copy_predecessors(predecessor_list_type&) __TBB_override {}
1768
1769 built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1770
1771 //! dummy member
1772 built_predecessors_type my_predessors;
1773 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1774
1775 private:
1776 output_ports_type my_output_ports;
1777 };
1778
1779 //! Implements an executable node that supports continue_msg -> Output
1780 template <typename Output, typename Policy = internal::Policy<void> >
1781 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1782 public internal::function_output<Output> {
1783 public:
1784 typedef continue_msg input_type;
1785 typedef Output output_type;
1786 typedef internal::continue_input<Output, Policy> input_impl_type;
1787 typedef internal::function_output<output_type> fOutput_type;
1788 typedef typename input_impl_type::predecessor_type predecessor_type;
1789 typedef typename fOutput_type::successor_type successor_type;
1790
1791 //! Constructor for executable node with continue_msg -> Output
1792 template <typename Body >
1793 __TBB_NOINLINE_SYM continue_node(
1794 graph &g,
1795 #if __TBB_CPP11_PRESENT
1796 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1797 #else
1798 __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
1799 #endif
1800 ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
1801 fOutput_type(g) {
1802 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1803
1804 static_cast<receiver<input_type> *>(this),
1805 static_cast<sender<output_type> *>(this), this->my_body );
1806 }
1807
1808 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1809 template <typename Body>
1810 continue_node( graph& g, Body body, node_priority_t priority )
1811 : continue_node(g, body, Policy(), priority) {}
1812 #endif
1813
1814 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1815 template <typename Body, typename... Args>
1816 continue_node( const node_set<Args...>& nodes, Body body,
1817 __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority))
1818 : continue_node(nodes.graph_reference(), body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority) ) {
1819 make_edges_in_order(nodes, *this);
1820 }
1821 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1822 template <typename Body, typename... Args>
1823 continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
1824 : continue_node(nodes, body, Policy(), priority) {}
1825 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1826 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1827
1828 //! Constructor for executable node with continue_msg -> Output
1829 template <typename Body >
1830 __TBB_NOINLINE_SYM continue_node(
1831 graph &g, int number_of_predecessors,
1832 #if __TBB_CPP11_PRESENT
1833 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1834 #else
1835 __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
1836 #endif
1837 ) : graph_node(g)
1838 , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1839 fOutput_type(g) {
1840 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1841 static_cast<receiver<input_type> *>(this),
1842 static_cast<sender<output_type> *>(this), this->my_body );
1843 }
1844
1845 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1846 template <typename Body>
1847 continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
1848 : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
1849 #endif
1850
1851 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1852 template <typename Body, typename... Args>
1853 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1854 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1855 : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1856 make_edges_in_order(nodes, *this);
1857 }
1858
1859 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1860 template <typename Body, typename... Args>
1861 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1862 Body body, node_priority_t priority )
1863 : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
1864 #endif
1865 #endif
1866
1867 //! Copy constructor
1868 __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1869 graph_node(src.my_graph), input_impl_type(src),
1870 internal::function_output<Output>(src.my_graph) {
1871 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1872 static_cast<receiver<input_type> *>(this),
1873 static_cast<sender<output_type> *>(this), this->my_body );
1874 }
1875
1876 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1877 void set_name( const char *name ) __TBB_override {
1878 tbb::internal::fgt_node_desc( this, name );
1879 }
1880 #endif
1881
1882 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1883 void extract() __TBB_override {
1884 input_impl_type::my_built_predecessors.receiver_extract(*this);
1885 successors().built_successors().sender_extract(*this);
1886 }
1887 #endif
1888
1889 protected:
1890 template< typename R, typename B > friend class run_and_put_task;
1891 template<typename X, typename Y> friend class internal::broadcast_cache;
1892 template<typename X, typename Y> friend class internal::round_robin_cache;
1893 using input_impl_type::try_put_task;
1894 internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1895
1896 void reset_node(reset_flags f) __TBB_override {
1897 input_impl_type::reset_receiver(f);
1898 if(f & rf_clear_edges)successors().clear();
1899 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1900 }
1901 }; // continue_node
1902
1903 //! Forwards messages of type T to all successors
1904 template <typename T>
1905 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1906 public:
1907 typedef T input_type;
1908 typedef T output_type;
1909 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1910 typedef typename sender<output_type>::successor_type successor_type;
1911 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1912 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1913 typedef typename sender<output_type>::successor_list_type successor_list_type;
1914 #endif
1915 private:
1916 internal::broadcast_cache<input_type> my_successors;
1917 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1918 internal::edge_container<predecessor_type> my_built_predecessors;
1919 spin_mutex pred_mutex; // serialize accesses on edge_container
1920 #endif
1921 public:
1922
1923 __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g) {
1924 my_successors.set_owner( this );
1925 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1926 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1927 }
1928
1929 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1930 template <typename... Args>
1931 broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1932 make_edges_in_order(nodes, *this);
1933 }
1934 #endif
1935
1936 // Copy constructor
1937 __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) :
1938 graph_node(src.my_graph), receiver<T>(), sender<T>()
1939 {
1940 my_successors.set_owner( this );
1941 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1942 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1943 }
1944
1945 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1946 void set_name( const char *name ) __TBB_override {
1947 tbb::internal::fgt_node_desc( this, name );
1948 }
1949 #endif
1950
1951 //! Adds a successor
1952 bool register_successor( successor_type &r ) __TBB_override {
1953 my_successors.register_successor( r );
1954 return true;
1955 }
1956
1957 //! Removes s as a successor
1958 bool remove_successor( successor_type &r ) __TBB_override {
1959 my_successors.remove_successor( r );
1960 return true;
1961 }
1962
1963 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1964 typedef typename sender<T>::built_successors_type built_successors_type;
1965
1966 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1967
1968 void internal_add_built_successor(successor_type &r) __TBB_override {
1969 my_successors.internal_add_built_successor(r);
1970 }
1971
1972 void internal_delete_built_successor(successor_type &r) __TBB_override {
1973 my_successors.internal_delete_built_successor(r);
1974 }
1975
1976 size_t successor_count() __TBB_override {
1977 return my_successors.successor_count();
1978 }
1979
1980 void copy_successors(successor_list_type &v) __TBB_override {
1981 my_successors.copy_successors(v);
1982 }
1983
1984 typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1985
1986 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1987
1988 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1989 spin_mutex::scoped_lock l(pred_mutex);
1990 my_built_predecessors.add_edge(p);
1991 }
1992
1993 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1994 spin_mutex::scoped_lock l(pred_mutex);
1995 my_built_predecessors.delete_edge(p);
1996 }
1997
1998 size_t predecessor_count() __TBB_override {
1999 spin_mutex::scoped_lock l(pred_mutex);
2000 return my_built_predecessors.edge_count();
2001 }
2002
2003 void copy_predecessors(predecessor_list_type &v) __TBB_override {
2004 spin_mutex::scoped_lock l(pred_mutex);
2005 my_built_predecessors.copy_edges(v);
2006 }
2007
2008 void extract() __TBB_override {
2009 my_built_predecessors.receiver_extract(*this);
2010 my_successors.built_successors().sender_extract(*this);
2011 }
2012 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2013
2014 protected:
2015 template< typename R, typename B > friend class run_and_put_task;
2016 template<typename X, typename Y> friend class internal::broadcast_cache;
2017 template<typename X, typename Y> friend class internal::round_robin_cache;
2018 //! build a task to run the successor if possible. Default is old behavior.
2019 task *try_put_task(const T& t) __TBB_override {
2020 task *new_task = my_successors.try_put_task(t);
2021 if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
2022 return new_task;
2023 }
2024
2025 graph& graph_reference() const __TBB_override {
2026 return my_graph;
2027 }
2028
2029 void reset_receiver(reset_flags /*f*/) __TBB_override {}
2030
2031 void reset_node(reset_flags f) __TBB_override {
2032 if (f&rf_clear_edges) {
2033 my_successors.clear();
2034 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2035 my_built_predecessors.clear();
2036 #endif
2037 }
2038 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
2039 }
2040 }; // broadcast_node
2041
2042 //! Forwards messages in arbitrary order
2043 template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2044 class buffer_node
2045 : public graph_node
2046 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2047 , public internal::reservable_item_buffer< T, Allocator >
2048 #else
2049 , public internal::reservable_item_buffer< T, cache_aligned_allocator<T> >
2050 #endif
2051 , public receiver<T>, public sender<T> {
2052 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2053 typedef Allocator internals_allocator;
2054 #else
2055 typedef cache_aligned_allocator<T> internals_allocator;
2056 #endif
2057 public:
2058 typedef T input_type;
2059 typedef T output_type;
2060 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2061 typedef typename sender<output_type>::successor_type successor_type;
2062 typedef buffer_node<T, Allocator> class_type;
2063 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2064 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2065 typedef typename sender<output_type>::successor_list_type successor_list_type;
2066 #endif
2067 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2068 __TBB_STATIC_ASSERT(
2069 (tbb::internal::is_same_type<Allocator, null_type>::value),
2070 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2071 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2072 );
2073 #endif
2074
2075 protected:
2076 typedef size_t size_type;
2077 internal::round_robin_cache< T, null_rw_mutex > my_successors;
2078
2079 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2080 internal::edge_container<predecessor_type> my_built_predecessors;
2081 #endif
2082
2083 friend class internal::forward_task_bypass< class_type >;
2084
2085 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
2086 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2087 , add_blt_succ, del_blt_succ,
2088 add_blt_pred, del_blt_pred,
2089 blt_succ_cnt, blt_pred_cnt,
2090 blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
2091 #endif
2092 };
2093
2094 // implements the aggregator_operation concept
2095 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
2096 public:
2097 char type;
2098 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2099 task * ltask;
2100 union {
2101 input_type *elem;
2102 successor_type *r;
2103 predecessor_type *p;
2104 size_t cnt_val;
2105 successor_list_type *svec;
2106 predecessor_list_type *pvec;
2107 };
2108 #else
2109 T *elem;
2110 task * ltask;
2111 successor_type *r;
2112 #endif
2113 buffer_operation(const T& e, op_type t) : type(char(t))
2114
2115 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2116 , ltask(NULL), elem(const_cast<T*>(&e))
2117 #else
2118 , elem(const_cast<T*>(&e)) , ltask(NULL)
2119 #endif
2120 {}
2121 buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
2122 };
2123
2124 bool forwarder_busy;
2125 typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
2126 friend class internal::aggregating_functor<class_type, buffer_operation>;
2127 internal::aggregator< handler_type, buffer_operation> my_aggregator;
2128
2129 virtual void handle_operations(buffer_operation *op_list) {
2130 handle_operations_impl(op_list, this);
2131 }
2132
2133 template<typename derived_type>
2134 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
2135 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2136
2137 buffer_operation *tmp = NULL;
2138 bool try_forwarding = false;
2139 while (op_list) {
2140 tmp = op_list;
2141 op_list = op_list->next;
2142 switch (tmp->type) {
2143 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
2144 case rem_succ: internal_rem_succ(tmp); break;
2145 case req_item: internal_pop(tmp); break;
2146 case res_item: internal_reserve(tmp); break;
2147 case rel_res: internal_release(tmp); try_forwarding = true; break;
2148 case con_res: internal_consume(tmp); try_forwarding = true; break;
2149 case put_item: try_forwarding = internal_push(tmp); break;
2150 case try_fwd_task: internal_forward_task(tmp); break;
2151 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2152 // edge recording
2153 case add_blt_succ: internal_add_built_succ(tmp); break;
2154 case del_blt_succ: internal_del_built_succ(tmp); break;
2155 case add_blt_pred: internal_add_built_pred(tmp); break;
2156 case del_blt_pred: internal_del_built_pred(tmp); break;
2157 case blt_succ_cnt: internal_succ_cnt(tmp); break;
2158 case blt_pred_cnt: internal_pred_cnt(tmp); break;
2159 case blt_succ_cpy: internal_copy_succs(tmp); break;
2160 case blt_pred_cpy: internal_copy_preds(tmp); break;
2161 #endif
2162 }
2163 }
2164
2165 derived->order();
2166
2167 if (try_forwarding && !forwarder_busy) {
2168 if(internal::is_graph_active(this->my_graph)) {
2169 forwarder_busy = true;
2170 task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
2171 forward_task_bypass<class_type>(*this);
2172 // tmp should point to the last item handled by the aggregator. This is the operation
2173 // the handling thread enqueued. So modifying that record will be okay.
2174 // workaround for icc bug
2175 tbb::task *z = tmp->ltask;
2176 graph &g = this->my_graph;
2177 tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
2178 }
2179 }
2180 } // handle_operations
2181
2182 inline task *grab_forwarding_task( buffer_operation &op_data) {
2183 return op_data.ltask;
2184 }
2185
2186 inline bool enqueue_forwarding_task(buffer_operation &op_data) {
2187 task *ft = grab_forwarding_task(op_data);
2188 if(ft) {
2189 internal::spawn_in_graph_arena(graph_reference(), *ft);
2190 return true;
2191 }
2192 return false;
2193 }
2194
2195 //! This is executed by an enqueued task, the "forwarder"
2196 virtual task *forward_task() {
2197 buffer_operation op_data(try_fwd_task);
2198 task *last_task = NULL;
2199 do {
2200 op_data.status = internal::WAIT;
2201 op_data.ltask = NULL;
2202 my_aggregator.execute(&op_data);
2203
2204 // workaround for icc bug
2205 tbb::task *xtask = op_data.ltask;
2206 graph& g = this->my_graph;
2207 last_task = combine_tasks(g, last_task, xtask);
2208 } while (op_data.status ==internal::SUCCEEDED);
2209 return last_task;
2210 }
2211
2212 //! Register successor
2213 virtual void internal_reg_succ(buffer_operation *op) {
2214 my_successors.register_successor(*(op->r));
2215 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2216 }
2217
2218 //! Remove successor
2219 virtual void internal_rem_succ(buffer_operation *op) {
2220 my_successors.remove_successor(*(op->r));
2221 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2222 }
2223
2224 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2225 typedef typename sender<T>::built_successors_type built_successors_type;
2226
2227 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2228
2229 virtual void internal_add_built_succ(buffer_operation *op) {
2230 my_successors.internal_add_built_successor(*(op->r));
2231 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2232 }
2233
2234 virtual void internal_del_built_succ(buffer_operation *op) {
2235 my_successors.internal_delete_built_successor(*(op->r));
2236 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2237 }
2238
2239 typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
2240
2241 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
2242
2243 virtual void internal_add_built_pred(buffer_operation *op) {
2244 my_built_predecessors.add_edge(*(op->p));
2245 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2246 }
2247
2248 virtual void internal_del_built_pred(buffer_operation *op) {
2249 my_built_predecessors.delete_edge(*(op->p));
2250 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2251 }
2252
2253 virtual void internal_succ_cnt(buffer_operation *op) {
2254 op->cnt_val = my_successors.successor_count();
2255 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2256 }
2257
2258 virtual void internal_pred_cnt(buffer_operation *op) {
2259 op->cnt_val = my_built_predecessors.edge_count();
2260 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2261 }
2262
2263 virtual void internal_copy_succs(buffer_operation *op) {
2264 my_successors.copy_successors(*(op->svec));
2265 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2266 }
2267
2268 virtual void internal_copy_preds(buffer_operation *op) {
2269 my_built_predecessors.copy_edges(*(op->pvec));
2270 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2271 }
2272
2273 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2274
2275 private:
2276 void order() {}
2277
2278 bool is_item_valid() {
2279 return this->my_item_valid(this->my_tail - 1);
2280 }
2281
2282 void try_put_and_add_task(task*& last_task) {
2283 task *new_task = my_successors.try_put_task(this->back());
2284 if (new_task) {
2285 // workaround for icc bug
2286 graph& g = this->my_graph;
2287 last_task = combine_tasks(g, last_task, new_task);
2288 this->destroy_back();
2289 }
2290 }
2291
2292 protected:
2293 //! Tries to forward valid items to successors
2294 virtual void internal_forward_task(buffer_operation *op) {
2295 internal_forward_task_impl(op, this);
2296 }
2297
2298 template<typename derived_type>
2299 void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
2300 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2301
2302 if (this->my_reserved || !derived->is_item_valid()) {
2303 __TBB_store_with_release(op->status, internal::FAILED);
2304 this->forwarder_busy = false;
2305 return;
2306 }
2307 // Try forwarding, giving each successor a chance
2308 task * last_task = NULL;
2309 size_type counter = my_successors.size();
2310 for (; counter > 0 && derived->is_item_valid(); --counter)
2311 derived->try_put_and_add_task(last_task);
2312
2313 op->ltask = last_task; // return task
2314 if (last_task && !counter) {
2315 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2316 }
2317 else {
2318 __TBB_store_with_release(op->status, internal::FAILED);
2319 forwarder_busy = false;
2320 }
2321 }
2322
2323 virtual bool internal_push(buffer_operation *op) {
2324 this->push_back(*(op->elem));
2325 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2326 return true;
2327 }
2328
2329 virtual void internal_pop(buffer_operation *op) {
2330 if(this->pop_back(*(op->elem))) {
2331 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2332 }
2333 else {
2334 __TBB_store_with_release(op->status, internal::FAILED);
2335 }
2336 }
2337
2338 virtual void internal_reserve(buffer_operation *op) {
2339 if(this->reserve_front(*(op->elem))) {
2340 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2341 }
2342 else {
2343 __TBB_store_with_release(op->status, internal::FAILED);
2344 }
2345 }
2346
2347 virtual void internal_consume(buffer_operation *op) {
2348 this->consume_front();
2349 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2350 }
2351
2352 virtual void internal_release(buffer_operation *op) {
2353 this->release_front();
2354 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2355 }
2356
2357 public:
2358 //! Constructor
2359 __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
2360 : graph_node(g), internal::reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
2361 sender<T>(), forwarder_busy(false)
2362 {
2363 my_successors.set_owner(this);
2364 my_aggregator.initialize_handler(handler_type(this));
2365 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2366 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2367 }
2368
2369 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2370 template <typename... Args>
2371 buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
2372 make_edges_in_order(nodes, *this);
2373 }
2374 #endif
2375
2376 //! Copy constructor
2377 __TBB_NOINLINE_SYM buffer_node( const buffer_node& src )
2378 : graph_node(src.my_graph), internal::reservable_item_buffer<T, internals_allocator>(),
2379 receiver<T>(), sender<T>(), forwarder_busy(false)
2380 {
2381 my_successors.set_owner(this);
2382 my_aggregator.initialize_handler(handler_type(this));
2383 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2384 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2385 }
2386
2387 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2388 void set_name( const char *name ) __TBB_override {
2389 tbb::internal::fgt_node_desc( this, name );
2390 }
2391 #endif
2392
2393 //
2394 // message sender implementation
2395 //
2396
2397 //! Adds a new successor.
2398 /** Adds successor r to the list of successors; may forward tasks. */
2399 bool register_successor( successor_type &r ) __TBB_override {
2400 buffer_operation op_data(reg_succ);
2401 op_data.r = &r;
2402 my_aggregator.execute(&op_data);
2403 (void)enqueue_forwarding_task(op_data);
2404 return true;
2405 }
2406
2407 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2408 void internal_add_built_successor( successor_type &r) __TBB_override {
2409 buffer_operation op_data(add_blt_succ);
2410 op_data.r = &r;
2411 my_aggregator.execute(&op_data);
2412 }
2413
2414 void internal_delete_built_successor( successor_type &r) __TBB_override {
2415 buffer_operation op_data(del_blt_succ);
2416 op_data.r = &r;
2417 my_aggregator.execute(&op_data);
2418 }
2419
2420 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
2421 buffer_operation op_data(add_blt_pred);
2422 op_data.p = &p;
2423 my_aggregator.execute(&op_data);
2424 }
2425
2426 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
2427 buffer_operation op_data(del_blt_pred);
2428 op_data.p = &p;
2429 my_aggregator.execute(&op_data);
2430 }
2431
2432 size_t predecessor_count() __TBB_override {
2433 buffer_operation op_data(blt_pred_cnt);
2434 my_aggregator.execute(&op_data);
2435 return op_data.cnt_val;
2436 }
2437
2438 size_t successor_count() __TBB_override {
2439 buffer_operation op_data(blt_succ_cnt);
2440 my_aggregator.execute(&op_data);
2441 return op_data.cnt_val;
2442 }
2443
2444 void copy_predecessors( predecessor_list_type &v ) __TBB_override {
2445 buffer_operation op_data(blt_pred_cpy);
2446 op_data.pvec = &v;
2447 my_aggregator.execute(&op_data);
2448 }
2449
2450 void copy_successors( successor_list_type &v ) __TBB_override {
2451 buffer_operation op_data(blt_succ_cpy);
2452 op_data.svec = &v;
2453 my_aggregator.execute(&op_data);
2454 }
2455
2456 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2457
2458 //! Removes a successor.
2459 /** Removes successor r from the list of successors.
2460 It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
2461 bool remove_successor( successor_type &r ) __TBB_override {
2462 r.remove_predecessor(*this);
2463 buffer_operation op_data(rem_succ);
2464 op_data.r = &r;
2465 my_aggregator.execute(&op_data);
2466 // even though this operation does not cause a forward, if we are the handler, and
2467 // a forward is scheduled, we may be the first to reach this point after the aggregator,
2468 // and so should check for the task.
2469 (void)enqueue_forwarding_task(op_data);
2470 return true;
2471 }
2472
2473 //! Request an item from the buffer_node
2474 /** true = v contains the returned item<BR>
2475 false = no item has been returned */
2476 bool try_get( T &v ) __TBB_override {
2477 buffer_operation op_data(req_item);
2478 op_data.elem = &v;
2479 my_aggregator.execute(&op_data);
2480 (void)enqueue_forwarding_task(op_data);
2481 return (op_data.status==internal::SUCCEEDED);
2482 }
2483
2484 //! Reserves an item.
2485 /** false = no item can be reserved<BR>
2486 true = an item is reserved */
2487 bool try_reserve( T &v ) __TBB_override {
2488 buffer_operation op_data(res_item);
2489 op_data.elem = &v;
2490 my_aggregator.execute(&op_data);
2491 (void)enqueue_forwarding_task(op_data);
2492 return (op_data.status==internal::SUCCEEDED);
2493 }
2494
2495 //! Release a reserved item.
2496 /** true = item has been released and so remains in sender */
2497 bool try_release() __TBB_override {
2498 buffer_operation op_data(rel_res);
2499 my_aggregator.execute(&op_data);
2500 (void)enqueue_forwarding_task(op_data);
2501 return true;
2502 }
2503
2504 //! Consumes a reserved item.
2505 /** true = item is removed from sender and reservation removed */
2506 bool try_consume() __TBB_override {
2507 buffer_operation op_data(con_res);
2508 my_aggregator.execute(&op_data);
2509 (void)enqueue_forwarding_task(op_data);
2510 return true;
2511 }
2512
2513 protected:
2514
2515 template< typename R, typename B > friend class run_and_put_task;
2516 template<typename X, typename Y> friend class internal::broadcast_cache;
2517 template<typename X, typename Y> friend class internal::round_robin_cache;
2518 //! receive an item, return a task *if possible
2519 task *try_put_task(const T &t) __TBB_override {
2520 buffer_operation op_data(t, put_item);
2521 my_aggregator.execute(&op_data);
2522 task *ft = grab_forwarding_task(op_data);
2523 // sequencer_nodes can return failure (if an item has been previously inserted)
2524 // We have to spawn the returned task if our own operation fails.
2525
2526 if(ft && op_data.status ==internal::FAILED) {
2527 // we haven't succeeded queueing the item, but for some reason the
2528 // call returned a task (if another request resulted in a successful
2529 // forward this could happen.) Queue the task and reset the pointer.
2530 internal::spawn_in_graph_arena(graph_reference(), *ft); ft = NULL;
2531 }
2532 else if(!ft && op_data.status ==internal::SUCCEEDED) {
2533 ft = SUCCESSFULLY_ENQUEUED;
2534 }
2535 return ft;
2536 }
2537
2538 graph& graph_reference() const __TBB_override {
2539 return my_graph;
2540 }
2541
2542 void reset_receiver(reset_flags /*f*/) __TBB_override { }
2543
2544 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2545 public:
2546 void extract() __TBB_override {
2547 my_built_predecessors.receiver_extract(*this);
2548 my_successors.built_successors().sender_extract(*this);
2549 }
2550 #endif
2551
2552 protected:
2553 void reset_node( reset_flags f) __TBB_override {
2554 internal::reservable_item_buffer<T, internals_allocator>::reset();
2555 // TODO: just clear structures
2556 if (f&rf_clear_edges) {
2557 my_successors.clear();
2558 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2559 my_built_predecessors.clear();
2560 #endif
2561 }
2562 forwarder_busy = false;
2563 }
2564 }; // buffer_node
2565
2566 //! Forwards messages in FIFO order
2567 template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2568 class queue_node : public buffer_node<T, Allocator> {
2569 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2570 __TBB_STATIC_ASSERT(
2571 (tbb::internal::is_same_type<Allocator, null_type>::value),
2572 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2573 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2574 );
2575 #endif
2576 protected:
2577 typedef buffer_node<T, Allocator> base_type;
2578 typedef typename base_type::size_type size_type;
2579 typedef typename base_type::buffer_operation queue_operation;
2580 typedef queue_node class_type;
2581
2582 private:
2583 template<typename, typename> friend class buffer_node;
2584
2585 bool is_item_valid() {
2586 return this->my_item_valid(this->my_head);
2587 }
2588
2589 void try_put_and_add_task(task*& last_task) {
2590 task *new_task = this->my_successors.try_put_task(this->front());
2591 if (new_task) {
2592 // workaround for icc bug
2593 graph& graph_ref = this->graph_reference();
2594 last_task = combine_tasks(graph_ref, last_task, new_task);
2595 this->destroy_front();
2596 }
2597 }
2598
2599 protected:
2600 void internal_forward_task(queue_operation *op) __TBB_override {
2601 this->internal_forward_task_impl(op, this);
2602 }
2603
2604 void internal_pop(queue_operation *op) __TBB_override {
2605 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2606 __TBB_store_with_release(op->status, internal::FAILED);
2607 }
2608 else {
2609 this->pop_front(*(op->elem));
2610 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2611 }
2612 }
2613 void internal_reserve(queue_operation *op) __TBB_override {
2614 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2615 __TBB_store_with_release(op->status, internal::FAILED);
2616 }
2617 else {
2618 this->reserve_front(*(op->elem));
2619 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2620 }
2621 }
2622 void internal_consume(queue_operation *op) __TBB_override {
2623 this->consume_front();
2624 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2625 }
2626
2627 public:
2628 typedef T input_type;
2629 typedef T output_type;
2630 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2631 typedef typename sender<output_type>::successor_type successor_type;
2632
2633 //! Constructor
2634 __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
2635 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2636 static_cast<receiver<input_type> *>(this),
2637 static_cast<sender<output_type> *>(this) );
2638 }
2639
2640 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2641 template <typename... Args>
2642 queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
2643 make_edges_in_order(nodes, *this);
2644 }
2645 #endif
2646
2647 //! Copy constructor
2648 __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
2649 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2650 static_cast<receiver<input_type> *>(this),
2651 static_cast<sender<output_type> *>(this) );
2652 }
2653
2654 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2655 void set_name( const char *name ) __TBB_override {
2656 tbb::internal::fgt_node_desc( this, name );
2657 }
2658 #endif
2659
2660 protected:
2661 void reset_node( reset_flags f) __TBB_override {
2662 base_type::reset_node(f);
2663 }
2664 }; // queue_node
2665
2666 //! Forwards messages in sequence order
2667 template< typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2668 class sequencer_node : public queue_node<T, Allocator> {
2669 internal::function_body< T, size_t > *my_sequencer;
2670 // my_sequencer should be a benign function and must be callable
2671 // from a parallel context. Does this mean it needn't be reset?
2672 public:
2673 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2674 __TBB_STATIC_ASSERT(
2675 (tbb::internal::is_same_type<Allocator, null_type>::value),
2676 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2677 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2678 );
2679 #endif
2680 typedef T input_type;
2681 typedef T output_type;
2682 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2683 typedef typename sender<output_type>::successor_type successor_type;
2684
2685 //! Constructor
2686 template< typename Sequencer >
2687 __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, Allocator>(g),
2688 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2689 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2690 static_cast<receiver<input_type> *>(this),
2691 static_cast<sender<output_type> *>(this) );
2692 }
2693
2694 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2695 template <typename Sequencer, typename... Args>
2696 sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
2697 : sequencer_node(nodes.graph_reference(), s) {
2698 make_edges_in_order(nodes, *this);
2699 }
2700 #endif
2701
2702 //! Copy constructor
2703 __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T, Allocator>(src),
2704 my_sequencer( src.my_sequencer->clone() ) {
2705 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2706 static_cast<receiver<input_type> *>(this),
2707 static_cast<sender<output_type> *>(this) );
2708 }
2709
2710 //! Destructor
2711 ~sequencer_node() { delete my_sequencer; }
2712
2713 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2714 void set_name( const char *name ) __TBB_override {
2715 tbb::internal::fgt_node_desc( this, name );
2716 }
2717 #endif
2718
2719 protected:
2720 typedef typename buffer_node<T, Allocator>::size_type size_type;
2721 typedef typename buffer_node<T, Allocator>::buffer_operation sequencer_operation;
2722
2723 private:
2724 bool internal_push(sequencer_operation *op) __TBB_override {
2725 size_type tag = (*my_sequencer)(*(op->elem));
2726 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2727 if (tag < this->my_head) {
2728 // have already emitted a message with this tag
2729 __TBB_store_with_release(op->status, internal::FAILED);
2730 return false;
2731 }
2732 #endif
2733 // cannot modify this->my_tail now; the buffer would be inconsistent.
2734 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2735
2736 if (this->size(new_tail) > this->capacity()) {
2737 this->grow_my_array(this->size(new_tail));
2738 }
2739 this->my_tail = new_tail;
2740
2741 const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2742 __TBB_store_with_release(op->status, res);
2743 return res ==internal::SUCCEEDED;
2744 }
2745 }; // sequencer_node
2746
2747 //! Forwards messages in priority order
2748 template<typename T, typename Compare = std::less<T>, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T)>
2749 class priority_queue_node : public buffer_node<T, Allocator> {
2750 public:
2751 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2752 __TBB_STATIC_ASSERT(
2753 (tbb::internal::is_same_type<Allocator, null_type>::value),
2754 "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
2755 "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
2756 );
2757 #endif
2758 typedef T input_type;
2759 typedef T output_type;
2760 typedef buffer_node<T,Allocator> base_type;
2761 typedef priority_queue_node class_type;
2762 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2763 typedef typename sender<output_type>::successor_type successor_type;
2764
2765 //! Constructor
2766 __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
2767 : buffer_node<T, Allocator>(g), compare(comp), mark(0) {
2768 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2769 static_cast<receiver<input_type> *>(this),
2770 static_cast<sender<output_type> *>(this) );
2771 }
2772
2773 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2774 template <typename... Args>
2775 priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
2776 : priority_queue_node(nodes.graph_reference(), comp) {
2777 make_edges_in_order(nodes, *this);
2778 }
2779 #endif
2780
2781 //! Copy constructor
2782 __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
2783 : buffer_node<T, Allocator>(src), mark(0)
2784 {
2785 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2786 static_cast<receiver<input_type> *>(this),
2787 static_cast<sender<output_type> *>(this) );
2788 }
2789
2790 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2791 void set_name( const char *name ) __TBB_override {
2792 tbb::internal::fgt_node_desc( this, name );
2793 }
2794 #endif
2795
2796 protected:
2797
2798 void reset_node( reset_flags f) __TBB_override {
2799 mark = 0;
2800 base_type::reset_node(f);
2801 }
2802
2803 typedef typename buffer_node<T, Allocator>::size_type size_type;
2804 typedef typename buffer_node<T, Allocator>::item_type item_type;
2805 typedef typename buffer_node<T, Allocator>::buffer_operation prio_operation;
2806
2807 //! Tries to forward valid items to successors
2808 void internal_forward_task(prio_operation *op) __TBB_override {
2809 this->internal_forward_task_impl(op, this);
2810 }
2811
2812 void handle_operations(prio_operation *op_list) __TBB_override {
2813 this->handle_operations_impl(op_list, this);
2814 }
2815
2816 bool internal_push(prio_operation *op) __TBB_override {
2817 prio_push(*(op->elem));
2818 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2819 return true;
2820 }
2821
2822 void internal_pop(prio_operation *op) __TBB_override {
2823 // if empty or already reserved, don't pop
2824 if ( this->my_reserved == true || this->my_tail == 0 ) {
2825 __TBB_store_with_release(op->status, internal::FAILED);
2826 return;
2827 }
2828
2829 *(op->elem) = prio();
2830 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2831 prio_pop();
2832
2833 }
2834
2835 // pops the highest-priority item, saves copy
2836 void internal_reserve(prio_operation *op) __TBB_override {
2837 if (this->my_reserved == true || this->my_tail == 0) {
2838 __TBB_store_with_release(op->status, internal::FAILED);
2839 return;
2840 }
2841 this->my_reserved = true;
2842 *(op->elem) = prio();
2843 reserved_item = *(op->elem);
2844 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2845 prio_pop();
2846 }
2847
2848 void internal_consume(prio_operation *op) __TBB_override {
2849 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2850 this->my_reserved = false;
2851 reserved_item = input_type();
2852 }
2853
2854 void internal_release(prio_operation *op) __TBB_override {
2855 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2856 prio_push(reserved_item);
2857 this->my_reserved = false;
2858 reserved_item = input_type();
2859 }
2860
2861 private:
2862 template<typename, typename> friend class buffer_node;
2863
2864 void order() {
2865 if (mark < this->my_tail) heapify();
2866 __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2867 }
2868
2869 bool is_item_valid() {
2870 return this->my_tail > 0;
2871 }
2872
2873 void try_put_and_add_task(task*& last_task) {
2874 task * new_task = this->my_successors.try_put_task(this->prio());
2875 if (new_task) {
2876 // workaround for icc bug
2877 graph& graph_ref = this->graph_reference();
2878 last_task = combine_tasks(graph_ref, last_task, new_task);
2879 prio_pop();
2880 }
2881 }
2882
2883 private:
2884 Compare compare;
2885 size_type mark;
2886
2887 input_type reserved_item;
2888
2889 // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2890 bool prio_use_tail() {
2891 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2892 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2893 }
2894
2895 // prio_push: checks that the item will fit, expand array if necessary, put at end
2896 void prio_push(const T &src) {
2897 if ( this->my_tail >= this->my_array_size )
2898 this->grow_my_array( this->my_tail + 1 );
2899 (void) this->place_item(this->my_tail, src);
2900 ++(this->my_tail);
2901 __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2902 }
2903
2904 // prio_pop: deletes highest priority item from the array, and if it is item
2905 // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2906 // and mark. Assumes the array has already been tested for emptiness; no failure.
2907 void prio_pop() {
2908 if (prio_use_tail()) {
2909 // there are newly pushed elements; last one higher than top
2910 // copy the data
2911 this->destroy_item(this->my_tail-1);
2912 --(this->my_tail);
2913 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2914 return;
2915 }
2916 this->destroy_item(0);
2917 if(this->my_tail > 1) {
2918 // push the last element down heap
2919 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2920 this->move_item(0,this->my_tail - 1);
2921 }
2922 --(this->my_tail);
2923 if(mark > this->my_tail) --mark;
2924 if (this->my_tail > 1) // don't reheap for heap of size 1
2925 reheap();
2926 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2927 }
2928
2929 const T& prio() {
2930 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2931 }
2932
2933 // turn array into heap
2934 void heapify() {
2935 if(this->my_tail == 0) {
2936 mark = 0;
2937 return;
2938 }
2939 if (!mark) mark = 1;
2940 for (; mark<this->my_tail; ++mark) { // for each unheaped element
2941 size_type cur_pos = mark;
2942 input_type to_place;
2943 this->fetch_item(mark,to_place);
2944 do { // push to_place up the heap
2945 size_type parent = (cur_pos-1)>>1;
2946 if (!compare(this->get_my_item(parent), to_place))
2947 break;
2948 this->move_item(cur_pos, parent);
2949 cur_pos = parent;
2950 } while( cur_pos );
2951 (void) this->place_item(cur_pos, to_place);
2952 }
2953 }
2954
2955 // otherwise heapified array with new root element; rearrange to heap
2956 void reheap() {
2957 size_type cur_pos=0, child=1;
2958 while (child < mark) {
2959 size_type target = child;
2960 if (child+1<mark &&
2961 compare(this->get_my_item(child),
2962 this->get_my_item(child+1)))
2963 ++target;
2964 // target now has the higher priority child
2965 if (compare(this->get_my_item(target),
2966 this->get_my_item(cur_pos)))
2967 break;
2968 // swap
2969 this->swap_items(cur_pos, target);
2970 cur_pos = target;
2971 child = (cur_pos<<1)+1;
2972 }
2973 }
2974 }; // priority_queue_node
2975
2976 } // interfaceX
2977
2978 namespace interface11 {
2979
2980 //! Forwards messages only if the threshold has not been reached
2981 /** This node forwards items until its threshold is reached.
2982 It contains no buffering. If the downstream node rejects, the
2983 message is dropped. */
2984 template< typename T, typename DecrementType=continue_msg >
2985 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2986 public:
2987 typedef T input_type;
2988 typedef T output_type;
2989 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2990 typedef typename sender<output_type>::successor_type successor_type;
2991 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2992 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2993 typedef typename sender<output_type>::built_successors_type built_successors_type;
2994 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2995 typedef typename sender<output_type>::successor_list_type successor_list_type;
2996 #endif
2997 //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2998
2999 private:
3000 size_t my_threshold;
3001 size_t my_count; //number of successful puts
3002 size_t my_tries; //number of active put attempts
3003 internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors;
3004 spin_mutex my_mutex;
3005 internal::broadcast_cache< T > my_successors;
3006 __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
3007
3008 friend class internal::forward_task_bypass< limiter_node<T,DecrementType> >;
3009
3010 // Let decrementer call decrement_counter()
3011 friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
3012
3013 bool check_conditions() { // always called under lock
3014 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
3015 }
3016
3017 // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
3018 task *forward_task() {
3019 input_type v;
3020 task *rval = NULL;
3021 bool reserved = false;
3022 {
3023 spin_mutex::scoped_lock lock(my_mutex);
3024 if ( check_conditions() )
3025 ++my_tries;
3026 else
3027 return NULL;
3028 }
3029
3030 //SUCCESS
3031 // if we can reserve and can put, we consume the reservation
3032 // we increment the count and decrement the tries
3033 if ( (my_predecessors.try_reserve(v)) == true ){
3034 reserved=true;
3035 if ( (rval = my_successors.try_put_task(v)) != NULL ){
3036 {
3037 spin_mutex::scoped_lock lock(my_mutex);
3038 ++my_count;
3039 --my_tries;
3040 my_predecessors.try_consume();
3041 if ( check_conditions() ) {
3042 if ( internal::is_graph_active(this->my_graph) ) {
3043 task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3044 internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3045 internal::spawn_in_graph_arena(graph_reference(), *rtask);
3046 }
3047 }
3048 }
3049 return rval;
3050 }
3051 }
3052 //FAILURE
3053 //if we can't reserve, we decrement the tries
3054 //if we can reserve but can't put, we decrement the tries and release the reservation
3055 {
3056 spin_mutex::scoped_lock lock(my_mutex);
3057 --my_tries;
3058 if (reserved) my_predecessors.try_release();
3059 if ( check_conditions() ) {
3060 if ( internal::is_graph_active(this->my_graph) ) {
3061 task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3062 internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3063 __TBB_ASSERT(!rval, "Have two tasks to handle");
3064 return rtask;
3065 }
3066 }
3067 return rval;
3068 }
3069 }
3070
3071 void forward() {
3072 __TBB_ASSERT(false, "Should never be called");
3073 return;
3074 }
3075
3076 task* decrement_counter( long long delta ) {
3077 {
3078 spin_mutex::scoped_lock lock(my_mutex);
3079 if( delta > 0 && size_t(delta) > my_count )
3080 my_count = 0;
3081 else if( delta < 0 && size_t(delta) > my_threshold - my_count )
3082 my_count = my_threshold;
3083 else
3084 my_count -= size_t(delta); // absolute value of delta is sufficiently small
3085 }
3086 return forward_task();
3087 }
3088
3089 void initialize() {
3090 my_predecessors.set_owner(this);
3091 my_successors.set_owner(this);
3092 decrement.set_owner(this);
3093 tbb::internal::fgt_node(
3094 CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
3095 static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
3096 static_cast<sender<output_type> *>(this)
3097 );
3098 }
3099 public:
3100 //! The internal receiver< DecrementType > that decrements the count
3101 internal::decrementer< limiter_node<T, DecrementType>, DecrementType > decrement;
3102
3103 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
3104 __TBB_STATIC_ASSERT( (tbb::internal::is_same_type<DecrementType, continue_msg>::value),
3105 "Deprecated interface of the limiter node can be used only in conjunction "
3106 "with continue_msg as the type of DecrementType template parameter." );
3107 #endif // Check for incompatible interface
3108
3109 //! Constructor
3110 limiter_node(graph &g,
3111 __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
3112 : graph_node(g), my_threshold(threshold), my_count(0),
3113 __TBB_DEPRECATED_LIMITER_ARG4(
3114 my_tries(0), decrement(),
3115 init_decrement_predecessors(num_decrement_predecessors),
3116 decrement(num_decrement_predecessors)) {
3117 initialize();
3118 }
3119
3120 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3121 template <typename... Args>
3122 limiter_node(const node_set<Args...>& nodes, size_t threshold)
3123 : limiter_node(nodes.graph_reference(), threshold) {
3124 make_edges_in_order(nodes, *this);
3125 }
3126 #endif
3127
3128 //! Copy constructor
3129 limiter_node( const limiter_node& src ) :
3130 graph_node(src.my_graph), receiver<T>(), sender<T>(),
3131 my_threshold(src.my_threshold), my_count(0),
3132 __TBB_DEPRECATED_LIMITER_ARG4(
3133 my_tries(0), decrement(),
3134 init_decrement_predecessors(src.init_decrement_predecessors),
3135 decrement(src.init_decrement_predecessors)) {
3136 initialize();
3137 }
3138
3139 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3140 void set_name( const char *name ) __TBB_override {
3141 tbb::internal::fgt_node_desc( this, name );
3142 }
3143 #endif
3144
3145 //! Replace the current successor with this new successor
3146 bool register_successor( successor_type &r ) __TBB_override {
3147 spin_mutex::scoped_lock lock(my_mutex);
3148 bool was_empty = my_successors.empty();
3149 my_successors.register_successor(r);
3150 //spawn a forward task if this is the only successor
3151 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
3152 if ( internal::is_graph_active(this->my_graph) ) {
3153 task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3154 internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
3155 internal::spawn_in_graph_arena(graph_reference(), *task);
3156 }
3157 }
3158 return true;
3159 }
3160
3161 //! Removes a successor from this node
3162 /** r.remove_predecessor(*this) is also called. */
3163 bool remove_successor( successor_type &r ) __TBB_override {
3164 r.remove_predecessor(*this);
3165 my_successors.remove_successor(r);
3166 return true;
3167 }
3168
3169 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3170 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3171 built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
3172
3173 void internal_add_built_successor(successor_type &src) __TBB_override {
3174 my_successors.internal_add_built_successor(src);
3175 }
3176
3177 void internal_delete_built_successor(successor_type &src) __TBB_override {
3178 my_successors.internal_delete_built_successor(src);
3179 }
3180
3181 size_t successor_count() __TBB_override { return my_successors.successor_count(); }
3182
3183 void copy_successors(successor_list_type &v) __TBB_override {
3184 my_successors.copy_successors(v);
3185 }
3186
3187 void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
3188 my_predecessors.internal_add_built_predecessor(src);
3189 }
3190
3191 void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
3192 my_predecessors.internal_delete_built_predecessor(src);
3193 }
3194
3195 size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
3196
3197 void copy_predecessors(predecessor_list_type &v) __TBB_override {
3198 my_predecessors.copy_predecessors(v);
3199 }
3200
3201 void extract() __TBB_override {
3202 my_count = 0;
3203 my_successors.built_successors().sender_extract(*this);
3204 my_predecessors.built_predecessors().receiver_extract(*this);
3205 decrement.built_predecessors().receiver_extract(decrement);
3206 }
3207 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3208
3209 //! Adds src to the list of cached predecessors.
3210 bool register_predecessor( predecessor_type &src ) __TBB_override {
3211 spin_mutex::scoped_lock lock(my_mutex);
3212 my_predecessors.add( src );
3213 if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
3214 task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3215 internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
3216 internal::spawn_in_graph_arena(graph_reference(), *task);
3217 }
3218 return true;
3219 }
3220
3221 //! Removes src from the list of cached predecessors.
3222 bool remove_predecessor( predecessor_type &src ) __TBB_override {
3223 my_predecessors.remove( src );
3224 return true;
3225 }
3226
3227 protected:
3228
3229 template< typename R, typename B > friend class run_and_put_task;
3230 template<typename X, typename Y> friend class internal::broadcast_cache;
3231 template<typename X, typename Y> friend class internal::round_robin_cache;
3232 //! Puts an item to this receiver
3233 task *try_put_task( const T &t ) __TBB_override {
3234 {
3235 spin_mutex::scoped_lock lock(my_mutex);
3236 if ( my_count + my_tries >= my_threshold )
3237 return NULL;
3238 else
3239 ++my_tries;
3240 }
3241
3242 task * rtask = my_successors.try_put_task(t);
3243
3244 if ( !rtask ) { // try_put_task failed.
3245 spin_mutex::scoped_lock lock(my_mutex);
3246 --my_tries;
3247 if (check_conditions() && internal::is_graph_active(this->my_graph)) {
3248 rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3249 internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3250 }
3251 }
3252 else {
3253 spin_mutex::scoped_lock lock(my_mutex);
3254 ++my_count;
3255 --my_tries;
3256 }
3257 return rtask;
3258 }
3259
3260 graph& graph_reference() const __TBB_override { return my_graph; }
3261
3262 void reset_receiver(reset_flags /*f*/) __TBB_override {
3263 __TBB_ASSERT(false,NULL); // should never be called
3264 }
3265
3266 void reset_node( reset_flags f) __TBB_override {
3267 my_count = 0;
3268 if(f & rf_clear_edges) {
3269 my_predecessors.clear();
3270 my_successors.clear();
3271 }
3272 else
3273 {
3274 my_predecessors.reset( );
3275 }
3276 decrement.reset_receiver(f);
3277 }
3278 }; // limiter_node
3279
3280 #include "internal/_flow_graph_join_impl.h"
3281
3282 using internal::reserving_port;
3283 using internal::queueing_port;
3284 using internal::key_matching_port;
3285 using internal::input_port;
3286 using internal::tag_value;
3287
3288 template<typename OutputTuple, typename JP=queueing> class join_node;
3289
3290 template<typename OutputTuple>
3291 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
3292 private:
3293 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3294 typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
3295 public:
3296 typedef OutputTuple output_type;
3297 typedef typename unfolded_type::input_ports_type input_ports_type;
3298 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
3299 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3300 this->input_ports(), static_cast< sender< output_type > *>(this) );
3301 }
3302
3303 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3304 template <typename... Args>
3305 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
3306 make_edges_in_order(nodes, *this);
3307 }
3308 #endif
3309
3310 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3311 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3312 this->input_ports(), static_cast< sender< output_type > *>(this) );
3313 }
3314
3315 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3316 void set_name( const char *name ) __TBB_override {
3317 tbb::internal::fgt_node_desc( this, name );
3318 }
3319 #endif
3320
3321 };
3322
3323 template<typename OutputTuple>
3324 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
3325 private:
3326 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3327 typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
3328 public:
3329 typedef OutputTuple output_type;
3330 typedef typename unfolded_type::input_ports_type input_ports_type;
3331 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
3332 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3333 this->input_ports(), static_cast< sender< output_type > *>(this) );
3334 }
3335
3336 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3337 template <typename... Args>
3338 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
3339 make_edges_in_order(nodes, *this);
3340 }
3341 #endif
3342
3343 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3344 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3345 this->input_ports(), static_cast< sender< output_type > *>(this) );
3346 }
3347
3348 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3349 void set_name( const char *name ) __TBB_override {
3350 tbb::internal::fgt_node_desc( this, name );
3351 }
3352 #endif
3353
3354 };
3355
3356 // template for key_matching join_node
3357 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
3358 template<typename OutputTuple, typename K, typename KHash>
3359 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
3360 key_matching_port, OutputTuple, key_matching<K,KHash> > {
3361 private:
3362 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3363 typedef typename internal::unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
3364 public:
3365 typedef OutputTuple output_type;
3366 typedef typename unfolded_type::input_ports_type input_ports_type;
3367
3368 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
3369 join_node(graph &g) : unfolded_type(g) {}
3370
3371 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3372 template <typename... Args>
3373 join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
3374 : join_node(nodes.graph_reference()) {
3375 make_edges_in_order(nodes, *this);
3376 }
3377 #endif
3378
3379 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
3380
3381 template<typename __TBB_B0, typename __TBB_B1>
3382 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
3383 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3384 this->input_ports(), static_cast< sender< output_type > *>(this) );
3385 }
3386 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3387 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3388 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3389 this->input_ports(), static_cast< sender< output_type > *>(this) );
3390 }
3391 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3392 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3393 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3394 this->input_ports(), static_cast< sender< output_type > *>(this) );
3395 }
3396 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3397 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3398 unfolded_type(g, b0, b1, b2, b3, b4) {
3399 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3400 this->input_ports(), static_cast< sender< output_type > *>(this) );
3401 }
3402 #if __TBB_VARIADIC_MAX >= 6
3403 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3404 typename __TBB_B5>
3405 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3406 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3407 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3408 this->input_ports(), static_cast< sender< output_type > *>(this) );
3409 }
3410 #endif
3411 #if __TBB_VARIADIC_MAX >= 7
3412 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3413 typename __TBB_B5, typename __TBB_B6>
3414 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3415 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3416 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3417 this->input_ports(), static_cast< sender< output_type > *>(this) );
3418 }
3419 #endif
3420 #if __TBB_VARIADIC_MAX >= 8
3421 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3422 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3423 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3424 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3425 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3426 this->input_ports(), static_cast< sender< output_type > *>(this) );
3427 }
3428 #endif
3429 #if __TBB_VARIADIC_MAX >= 9
3430 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3431 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3432 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3433 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3434 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3435 this->input_ports(), static_cast< sender< output_type > *>(this) );
3436 }
3437 #endif
3438 #if __TBB_VARIADIC_MAX >= 10
3439 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3440 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3441 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3442 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3443 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3444 this->input_ports(), static_cast< sender< output_type > *>(this) );
3445 }
3446 #endif
3447
3448 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3449 template <typename... Args, typename... Bodies>
3450 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
3451 : join_node(nodes.graph_reference(), bodies...) {
3452 make_edges_in_order(nodes, *this);
3453 }
3454 #endif
3455
3456 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3457 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3458 this->input_ports(), static_cast< sender< output_type > *>(this) );
3459 }
3460
3461 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3462 void set_name( const char *name ) __TBB_override {
3463 tbb::internal::fgt_node_desc( this, name );
3464 }
3465 #endif
3466
3467 };
3468
3469 // indexer node
3470 #include "internal/_flow_graph_indexer_impl.h"
3471
3472 // TODO: Implement interface with variadic template or tuple
3473 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3474 typename T4=null_type, typename T5=null_type, typename T6=null_type,
3475 typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3476
3477 //indexer node specializations
3478 template<typename T0>
3479 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3480 private:
3481 static const int N = 1;
3482 public:
3483 typedef tuple<T0> InputTuple;
3484 typedef typename internal::tagged_msg<size_t, T0> output_type;
3485 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3486 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3487 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3488 this->input_ports(), static_cast< sender< output_type > *>(this) );
3489 }
3490
3491 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3492 template <typename... Args>
3493 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3494 make_edges_in_order(nodes, *this);
3495 }
3496 #endif
3497
3498 // Copy constructor
3499 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3500 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3501 this->input_ports(), static_cast< sender< output_type > *>(this) );
3502 }
3503
3504 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3505 void set_name( const char *name ) __TBB_override {
3506 tbb::internal::fgt_node_desc( this, name );
3507 }
3508 #endif
3509 };
3510
3511 template<typename T0, typename T1>
3512 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3513 private:
3514 static const int N = 2;
3515 public:
3516 typedef tuple<T0, T1> InputTuple;
3517 typedef typename internal::tagged_msg<size_t, T0, T1> output_type;
3518 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3519 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3520 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3521 this->input_ports(), static_cast< sender< output_type > *>(this) );
3522 }
3523
3524 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3525 template <typename... Args>
3526 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3527 make_edges_in_order(nodes, *this);
3528 }
3529 #endif
3530
3531 // Copy constructor
3532 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3533 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3534 this->input_ports(), static_cast< sender< output_type > *>(this) );
3535 }
3536
3537 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3538 void set_name( const char *name ) __TBB_override {
3539 tbb::internal::fgt_node_desc( this, name );
3540 }
3541 #endif
3542 };
3543
3544 template<typename T0, typename T1, typename T2>
3545 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3546 private:
3547 static const int N = 3;
3548 public:
3549 typedef tuple<T0, T1, T2> InputTuple;
3550 typedef typename internal::tagged_msg<size_t, T0, T1, T2> output_type;
3551 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3552 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3553 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3554 this->input_ports(), static_cast< sender< output_type > *>(this) );
3555 }
3556
3557 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3558 template <typename... Args>
3559 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3560 make_edges_in_order(nodes, *this);
3561 }
3562 #endif
3563
3564 // Copy constructor
3565 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3566 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3567 this->input_ports(), static_cast< sender< output_type > *>(this) );
3568 }
3569
3570 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3571 void set_name( const char *name ) __TBB_override {
3572 tbb::internal::fgt_node_desc( this, name );
3573 }
3574 #endif
3575 };
3576
3577 template<typename T0, typename T1, typename T2, typename T3>
3578 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3579 private:
3580 static const int N = 4;
3581 public:
3582 typedef tuple<T0, T1, T2, T3> InputTuple;
3583 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3> output_type;
3584 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3585 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3586 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3587 this->input_ports(), static_cast< sender< output_type > *>(this) );
3588 }
3589
3590 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3591 template <typename... Args>
3592 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3593 make_edges_in_order(nodes, *this);
3594 }
3595 #endif
3596
3597 // Copy constructor
3598 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3599 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3600 this->input_ports(), static_cast< sender< output_type > *>(this) );
3601 }
3602
3603 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3604 void set_name( const char *name ) __TBB_override {
3605 tbb::internal::fgt_node_desc( this, name );
3606 }
3607 #endif
3608 };
3609
3610 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3611 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3612 private:
3613 static const int N = 5;
3614 public:
3615 typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3616 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
3617 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3618 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3619 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3620 this->input_ports(), static_cast< sender< output_type > *>(this) );
3621 }
3622
3623 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3624 template <typename... Args>
3625 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3626 make_edges_in_order(nodes, *this);
3627 }
3628 #endif
3629
3630 // Copy constructor
3631 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3632 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3633 this->input_ports(), static_cast< sender< output_type > *>(this) );
3634 }
3635
3636 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3637 void set_name( const char *name ) __TBB_override {
3638 tbb::internal::fgt_node_desc( this, name );
3639 }
3640 #endif
3641 };
3642
3643 #if __TBB_VARIADIC_MAX >= 6
3644 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3645 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3646 private:
3647 static const int N = 6;
3648 public:
3649 typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3650 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
3651 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3652 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3653 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3654 this->input_ports(), static_cast< sender< output_type > *>(this) );
3655 }
3656
3657 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3658 template <typename... Args>
3659 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3660 make_edges_in_order(nodes, *this);
3661 }
3662 #endif
3663
3664 // Copy constructor
3665 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3666 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3667 this->input_ports(), static_cast< sender< output_type > *>(this) );
3668 }
3669
3670 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3671 void set_name( const char *name ) __TBB_override {
3672 tbb::internal::fgt_node_desc( this, name );
3673 }
3674 #endif
3675 };
3676 #endif //variadic max 6
3677
3678 #if __TBB_VARIADIC_MAX >= 7
3679 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3680 typename T6>
3681 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3682 private:
3683 static const int N = 7;
3684 public:
3685 typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3686 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
3687 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3688 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3689 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3690 this->input_ports(), static_cast< sender< output_type > *>(this) );
3691 }
3692
3693 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3694 template <typename... Args>
3695 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3696 make_edges_in_order(nodes, *this);
3697 }
3698 #endif
3699
3700 // Copy constructor
3701 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3702 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3703 this->input_ports(), static_cast< sender< output_type > *>(this) );
3704 }
3705
3706 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3707 void set_name( const char *name ) __TBB_override {
3708 tbb::internal::fgt_node_desc( this, name );
3709 }
3710 #endif
3711 };
3712 #endif //variadic max 7
3713
3714 #if __TBB_VARIADIC_MAX >= 8
3715 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3716 typename T6, typename T7>
3717 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3718 private:
3719 static const int N = 8;
3720 public:
3721 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3722 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
3723 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3724 indexer_node(graph& g) : unfolded_type(g) {
3725 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3726 this->input_ports(), static_cast< sender< output_type > *>(this) );
3727 }
3728
3729 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3730 template <typename... Args>
3731 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3732 make_edges_in_order(nodes, *this);
3733 }
3734 #endif
3735
3736 // Copy constructor
3737 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3738 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3739 this->input_ports(), static_cast< sender< output_type > *>(this) );
3740 }
3741
3742 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3743 void set_name( const char *name ) __TBB_override {
3744 tbb::internal::fgt_node_desc( this, name );
3745 }
3746 #endif
3747 };
3748 #endif //variadic max 8
3749
3750 #if __TBB_VARIADIC_MAX >= 9
3751 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3752 typename T6, typename T7, typename T8>
3753 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3754 private:
3755 static const int N = 9;
3756 public:
3757 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3758 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
3759 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3760 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3761 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3762 this->input_ports(), static_cast< sender< output_type > *>(this) );
3763 }
3764
3765 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3766 template <typename... Args>
3767 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3768 make_edges_in_order(nodes, *this);
3769 }
3770 #endif
3771
3772 // Copy constructor
3773 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3774 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3775 this->input_ports(), static_cast< sender< output_type > *>(this) );
3776 }
3777
3778 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3779 void set_name( const char *name ) __TBB_override {
3780 tbb::internal::fgt_node_desc( this, name );
3781 }
3782 #endif
3783 };
3784 #endif //variadic max 9
3785
3786 #if __TBB_VARIADIC_MAX >= 10
3787 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3788 typename T6, typename T7, typename T8, typename T9>
3789 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3790 private:
3791 static const int N = 10;
3792 public:
3793 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3794 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3795 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3796 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3797 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3798 this->input_ports(), static_cast< sender< output_type > *>(this) );
3799 }
3800
3801 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3802 template <typename... Args>
3803 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3804 make_edges_in_order(nodes, *this);
3805 }
3806 #endif
3807
3808 // Copy constructor
3809 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3810 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3811 this->input_ports(), static_cast< sender< output_type > *>(this) );
3812 }
3813
3814 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3815 void set_name( const char *name ) __TBB_override {
3816 tbb::internal::fgt_node_desc( this, name );
3817 }
3818 #endif
3819 };
3820 #endif //variadic max 10
3821
3822 #if __TBB_PREVIEW_ASYNC_MSG
3823 inline void internal_make_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3824 #else
3825 template< typename T >
3826 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3827 #endif
3828 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3829 s.internal_add_built_predecessor(p);
3830 p.internal_add_built_successor(s);
3831 #endif
3832 p.register_successor( s );
3833 tbb::internal::fgt_make_edge( &p, &s );
3834 }
3835
3836 //! Makes an edge between a single predecessor and a single successor
3837 template< typename T >
3838 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3839 internal_make_edge( p, s );
3840 }
3841
3842 #if __TBB_PREVIEW_ASYNC_MSG
3843 template< typename TS, typename TR,
3844 typename = typename tbb::internal::enable_if<tbb::internal::is_same_type<TS, internal::untyped_sender>::value
3845 || tbb::internal::is_same_type<TR, internal::untyped_receiver>::value>::type>
3846 inline void make_edge( TS &p, TR &s ) {
3847 internal_make_edge( p, s );
3848 }
3849
3850 template< typename T >
3851 inline void make_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
3852 internal_make_edge( p, s );
3853 }
3854
3855 template< typename T >
3856 inline void make_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
3857 internal_make_edge( p, s );
3858 }
3859
3860 #endif // __TBB_PREVIEW_ASYNC_MSG
3861
3862 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3863 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3864 template< typename T, typename V,
3865 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3866 inline void make_edge( T& output, V& input) {
3867 make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3868 }
3869
3870 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3871 template< typename T, typename R,
3872 typename = typename T::output_ports_type >
3873 inline void make_edge( T& output, receiver<R>& input) {
3874 make_edge(get<0>(output.output_ports()), input);
3875 }
3876
3877 //Makes an edge from a sender to port 0 of a multi-input successor.
3878 template< typename S, typename V,
3879 typename = typename V::input_ports_type >
3880 inline void make_edge( sender<S>& output, V& input) {
3881 make_edge(output, get<0>(input.input_ports()));
3882 }
3883 #endif
3884
3885 #if __TBB_PREVIEW_ASYNC_MSG
3886 inline void internal_remove_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3887 #else
3888 template< typename T >
3889 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3890 #endif
3891 p.remove_successor( s );
3892 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3893 // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3894 p.internal_delete_built_successor(s);
3895 s.internal_delete_built_predecessor(p);
3896 #endif
3897 tbb::internal::fgt_remove_edge( &p, &s );
3898 }
3899
3900 //! Removes an edge between a single predecessor and a single successor
3901 template< typename T >
3902 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3903 internal_remove_edge( p, s );
3904 }
3905
3906 #if __TBB_PREVIEW_ASYNC_MSG
3907 template< typename TS, typename TR,
3908 typename = typename tbb::internal::enable_if<tbb::internal::is_same_type<TS, internal::untyped_sender>::value
3909 || tbb::internal::is_same_type<TR, internal::untyped_receiver>::value>::type>
3910 inline void remove_edge( TS &p, TR &s ) {
3911 internal_remove_edge( p, s );
3912 }
3913
3914 template< typename T >
3915 inline void remove_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
3916 internal_remove_edge( p, s );
3917 }
3918
3919 template< typename T >
3920 inline void remove_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
3921 internal_remove_edge( p, s );
3922 }
3923 #endif // __TBB_PREVIEW_ASYNC_MSG
3924
3925 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3926 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3927 template< typename T, typename V,
3928 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3929 inline void remove_edge( T& output, V& input) {
3930 remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3931 }
3932
3933 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3934 template< typename T, typename R,
3935 typename = typename T::output_ports_type >
3936 inline void remove_edge( T& output, receiver<R>& input) {
3937 remove_edge(get<0>(output.output_ports()), input);
3938 }
3939 //Removes an edge between a sender and port 0 of a multi-input successor.
3940 template< typename S, typename V,
3941 typename = typename V::input_ports_type >
3942 inline void remove_edge( sender<S>& output, V& input) {
3943 remove_edge(output, get<0>(input.input_ports()));
3944 }
3945 #endif
3946
3947 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3948 template<typename C >
3949 template< typename S >
3950 void internal::edge_container<C>::sender_extract( S &s ) {
3951 edge_list_type e = built_edges;
3952 for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3953 remove_edge(s, **i);
3954 }
3955 }
3956
3957 template<typename C >
3958 template< typename R >
3959 void internal::edge_container<C>::receiver_extract( R &r ) {
3960 edge_list_type e = built_edges;
3961 for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3962 remove_edge(**i, r);
3963 }
3964 }
3965 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3966
3967 //! Returns a copy of the body from a function or continue node
3968 template< typename Body, typename Node >
3969 Body copy_body( Node &n ) {
3970 return n.template copy_function_object<Body>();
3971 }
3972
3973 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3974
3975 //composite_node
3976 template< typename InputTuple, typename OutputTuple > class composite_node;
3977
3978 template< typename... InputTypes, typename... OutputTypes>
3979 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3980
3981 public:
3982 typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3983 typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3984
3985 private:
3986 std::unique_ptr<input_ports_type> my_input_ports;
3987 std::unique_ptr<output_ports_type> my_output_ports;
3988
3989 static const size_t NUM_INPUTS = sizeof...(InputTypes);
3990 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3991
3992 protected:
3993 void reset_node(reset_flags) __TBB_override {}
3994
3995 public:
3996 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3997 composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3998 tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3999 tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
4000 }
4001 #else
4002 composite_node( graph &g ) : graph_node(g) {
4003 tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
4004 }
4005 #endif
4006
4007 template<typename T1, typename T2>
4008 void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
4009 __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4010 __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4011 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
4012 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
4013
4014 tbb::internal::fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
4015 tbb::internal::fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
4016 }
4017
4018 template< typename... NodeTypes >
4019 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4020
4021 template< typename... NodeTypes >
4022 void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4023
4024 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4025 void set_name( const char *name ) __TBB_override {
4026 tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
4027 }
4028 #endif
4029
4030 input_ports_type& input_ports() {
4031 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4032 return *my_input_ports;
4033 }
4034
4035 output_ports_type& output_ports() {
4036 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4037 return *my_output_ports;
4038 }
4039
4040 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4041 void extract() __TBB_override {
4042 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4043 }
4044 #endif
4045 }; // class composite_node
4046
4047 //composite_node with only input ports
4048 template< typename... InputTypes>
4049 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
4050 public:
4051 typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
4052
4053 private:
4054 std::unique_ptr<input_ports_type> my_input_ports;
4055 static const size_t NUM_INPUTS = sizeof...(InputTypes);
4056
4057 protected:
4058 void reset_node(reset_flags) __TBB_override {}
4059
4060 public:
4061 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4062 composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4063 tbb::internal::fgt_composite( CODEPTR(), this, &g );
4064 tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
4065 }
4066 #else
4067 composite_node( graph &g ) : graph_node(g) {
4068 tbb::internal::fgt_composite( CODEPTR(), this, &g );
4069 }
4070 #endif
4071
4072 template<typename T>
4073 void set_external_ports(T&& input_ports_tuple) {
4074 __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4075
4076 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
4077
4078 tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
4079 }
4080
4081 template< typename... NodeTypes >
4082 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4083
4084 template< typename... NodeTypes >
4085 void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4086
4087 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4088 void set_name( const char *name ) __TBB_override {
4089 tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
4090 }
4091 #endif
4092
4093 input_ports_type& input_ports() {
4094 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4095 return *my_input_ports;
4096 }
4097
4098 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4099 void extract() __TBB_override {
4100 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4101 }
4102 #endif
4103
4104 }; // class composite_node
4105
4106 //composite_nodes with only output_ports
4107 template<typename... OutputTypes>
4108 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
4109 public:
4110 typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
4111
4112 private:
4113 std::unique_ptr<output_ports_type> my_output_ports;
4114 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
4115
4116 protected:
4117 void reset_node(reset_flags) __TBB_override {}
4118
4119 public:
4120 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4121 __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4122 tbb::internal::fgt_composite( CODEPTR(), this, &g );
4123 tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
4124 }
4125 #else
4126 __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
4127 tbb::internal::fgt_composite( CODEPTR(), this, &g );
4128 }
4129 #endif
4130
4131 template<typename T>
4132 void set_external_ports(T&& output_ports_tuple) {
4133 __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4134
4135 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
4136
4137 tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
4138 }
4139
4140 template<typename... NodeTypes >
4141 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4142
4143 template<typename... NodeTypes >
4144 void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4145
4146 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4147 void set_name( const char *name ) __TBB_override {
4148 tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
4149 }
4150 #endif
4151
4152 output_ports_type& output_ports() {
4153 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4154 return *my_output_ports;
4155 }
4156
4157 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4158 void extract() __TBB_override {
4159 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4160 }
4161 #endif
4162
4163 }; // class composite_node
4164
4165 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
4166
4167 namespace internal {
4168
4169 template<typename Gateway>
4170 class async_body_base: tbb::internal::no_assign {
4171 public:
4172 typedef Gateway gateway_type;
4173
4174 async_body_base(gateway_type *gateway): my_gateway(gateway) { }
4175 void set_gateway(gateway_type *gateway) {
4176 my_gateway = gateway;
4177 }
4178
4179 protected:
4180 gateway_type *my_gateway;
4181 };
4182
4183 template<typename Input, typename Ports, typename Gateway, typename Body>
4184 class async_body: public async_body_base<Gateway> {
4185 public:
4186 typedef async_body_base<Gateway> base_type;
4187 typedef Gateway gateway_type;
4188
4189 async_body(const Body &body, gateway_type *gateway)
4190 : base_type(gateway), my_body(body) { }
4191
4192 void operator()( const Input &v, Ports & ) {
4193 my_body(v, *this->my_gateway);
4194 }
4195
4196 Body get_body() { return my_body; }
4197
4198 private:
4199 Body my_body;
4200 };
4201
4202 } // namespace internal
4203
4204 } // namespace interfaceX
4205 namespace interface11 {
4206
4207 //! Implements async node
4208 template < typename Input, typename Output,
4209 typename Policy = queueing_lightweight,
4210 typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input) >
4211 class async_node
4212 : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output >
4213 {
4214 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
4215 __TBB_STATIC_ASSERT(
4216 (tbb::internal::is_same_type<Allocator, null_type>::value),
4217 "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
4218 "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
4219 );
4220 #endif
4221 typedef multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type;
4222 typedef typename internal::multifunction_input<Input, typename base_type::output_ports_type, Policy, Allocator> mfn_input_type;
4223
4224 public:
4225 typedef Input input_type;
4226 typedef Output output_type;
4227 typedef receiver<input_type> receiver_type;
4228 typedef typename receiver_type::predecessor_type predecessor_type;
4229 typedef typename sender<output_type>::successor_type successor_type;
4230 typedef receiver_gateway<output_type> gateway_type;
4231 typedef internal::async_body_base<gateway_type> async_body_base_type;
4232 typedef typename base_type::output_ports_type output_ports_type;
4233
4234 private:
4235 struct try_put_functor {
4236 typedef internal::multifunction_output<Output> output_port_type;
4237 output_port_type *port;
4238 // TODO: pass value by copy since we do not want to block asynchronous thread.
4239 const Output *value;
4240 bool result;
4241 try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
4242 void operator()() {
4243 result = port->try_put(*value);
4244 }
4245 };
4246
4247 class receiver_gateway_impl: public receiver_gateway<Output> {
4248 public:
4249 receiver_gateway_impl(async_node* node): my_node(node) {}
4250 void reserve_wait() __TBB_override {
4251 tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4252 my_node->my_graph.reserve_wait();
4253 }
4254
4255 void release_wait() __TBB_override {
4256 my_node->my_graph.release_wait();
4257 tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4258 }
4259
4260 //! Implements gateway_type::try_put for an external activity to submit a message to FG
4261 bool try_put(const Output &i) __TBB_override {
4262 return my_node->try_put_impl(i);
4263 }
4264
4265 private:
4266 async_node* my_node;
4267 } my_gateway;
4268
4269 //The substitute of 'this' for member construction, to prevent compiler warnings
4270 async_node* self() { return this; }
4271
4272 //! Implements gateway_type::try_put for an external activity to submit a message to FG
4273 bool try_put_impl(const Output &i) {
4274 internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
4275 internal::broadcast_cache<output_type>& port_successors = port_0.successors();
4276 tbb::internal::fgt_async_try_put_begin(this, &port_0);
4277 task_list tasks;
4278 bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
4279 __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
4280 "Return status is inconsistent with the method operation." );
4281
4282 while( !tasks.empty() ) {
4283 internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
4284 }
4285 tbb::internal::fgt_async_try_put_end(this, &port_0);
4286 return is_at_least_one_put_successful;
4287 }
4288
4289 public:
4290 template<typename Body>
4291 __TBB_NOINLINE_SYM async_node(
4292 graph &g, size_t concurrency,
4293 #if __TBB_CPP11_PRESENT
4294 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
4295 #else
4296 __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority = tbb::flow::internal::no_priority)
4297 #endif
4298 ) : base_type(
4299 g, concurrency,
4300 internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
4301 (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
4302 tbb::internal::fgt_multioutput_node_with_body<1>(
4303 CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4304 &this->my_graph, static_cast<receiver<input_type> *>(this),
4305 this->output_ports(), this->my_body
4306 );
4307 }
4308
4309 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
4310 template <typename Body, typename... Args>
4311 __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
4312 : async_node(g, concurrency, body, Policy(), priority) {}
4313 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4314
4315 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4316 template <typename Body, typename... Args>
4317 __TBB_NOINLINE_SYM async_node(
4318 const node_set<Args...>& nodes, size_t concurrency, Body body,
4319 __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
4320 ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
4321 make_edges_in_order(nodes, *this);
4322 }
4323
4324 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4325 template <typename Body, typename... Args>
4326 __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
4327 : async_node(nodes, concurrency, body, Policy(), priority) {}
4328 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4329 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4330
4331 __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
4332 static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
4333 static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
4334
4335 tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4336 &this->my_graph, static_cast<receiver<input_type> *>(this),
4337 this->output_ports(), this->my_body );
4338 }
4339
4340 gateway_type& gateway() {
4341 return my_gateway;
4342 }
4343
4344 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4345 void set_name( const char *name ) __TBB_override {
4346 tbb::internal::fgt_multioutput_node_desc( this, name );
4347 }
4348 #endif
4349
4350 // Define sender< Output >
4351
4352 //! Add a new successor to this node
4353 bool register_successor( successor_type &r ) __TBB_override {
4354 return internal::output_port<0>(*this).register_successor(r);
4355 }
4356
4357 //! Removes a successor from this node
4358 bool remove_successor( successor_type &r ) __TBB_override {
4359 return internal::output_port<0>(*this).remove_successor(r);
4360 }
4361
4362 template<typename Body>
4363 Body copy_function_object() {
4364 typedef internal::multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
4365 typedef internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
4366 mfn_body_type &body_ref = *this->my_body;
4367 async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
4368 return ab.get_body();
4369 }
4370
4371 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4372 //! interface to record edges for traversal & deletion
4373 typedef typename internal::edge_container<successor_type> built_successors_type;
4374 typedef typename built_successors_type::edge_list_type successor_list_type;
4375 built_successors_type &built_successors() __TBB_override {
4376 return internal::output_port<0>(*this).built_successors();
4377 }
4378
4379 void internal_add_built_successor( successor_type &r ) __TBB_override {
4380 internal::output_port<0>(*this).internal_add_built_successor(r);
4381 }
4382
4383 void internal_delete_built_successor( successor_type &r ) __TBB_override {
4384 internal::output_port<0>(*this).internal_delete_built_successor(r);
4385 }
4386
4387 void copy_successors( successor_list_type &l ) __TBB_override {
4388 internal::output_port<0>(*this).copy_successors(l);
4389 }
4390
4391 size_t successor_count() __TBB_override {
4392 return internal::output_port<0>(*this).successor_count();
4393 }
4394 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4395
4396 protected:
4397
4398 void reset_node( reset_flags f) __TBB_override {
4399 base_type::reset_node(f);
4400 }
4401 };
4402
4403 #if __TBB_PREVIEW_STREAMING_NODE
4404 #include "internal/_flow_graph_streaming_node.h"
4405 #endif // __TBB_PREVIEW_STREAMING_NODE
4406
4407 #include "internal/_flow_graph_node_set_impl.h"
4408
4409 template< typename T >
4410 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
4411 public:
4412 typedef T input_type;
4413 typedef T output_type;
4414 typedef typename receiver<input_type>::predecessor_type predecessor_type;
4415 typedef typename sender<output_type>::successor_type successor_type;
4416 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4417 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
4418 typedef typename sender<output_type>::built_successors_type built_successors_type;
4419 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
4420 typedef typename sender<output_type>::successor_list_type successor_list_type;
4421 #endif
4422
4423 __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
4424 my_successors.set_owner( this );
4425 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4426 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4427 }
4428
4429 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4430 template <typename... Args>
4431 overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
4432 make_edges_in_order(nodes, *this);
4433 }
4434 #endif
4435
4436 //! Copy constructor; doesn't take anything from src; default won't work
4437 __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) :
4438 graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
4439 {
4440 my_successors.set_owner( this );
4441 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4442 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4443 }
4444
4445 ~overwrite_node() {}
4446
4447 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4448 void set_name( const char *name ) __TBB_override {
4449 tbb::internal::fgt_node_desc( this, name );
4450 }
4451 #endif
4452
4453 bool register_successor( successor_type &s ) __TBB_override {
4454 spin_mutex::scoped_lock l( my_mutex );
4455 if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
4456 // We have a valid value that must be forwarded immediately.
4457 bool ret = s.try_put( my_buffer );
4458 if ( ret ) {
4459 // We add the successor that accepted our put
4460 my_successors.register_successor( s );
4461 } else {
4462 // In case of reservation a race between the moment of reservation and register_successor can appear,
4463 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
4464 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
4465 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
4466 task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
4467 register_predecessor_task( *this, s );
4468 internal::spawn_in_graph_arena( my_graph, *rtask );
4469 }
4470 } else {
4471 // No valid value yet, just add as successor
4472 my_successors.register_successor( s );
4473 }
4474 return true;
4475 }
4476
4477 bool remove_successor( successor_type &s ) __TBB_override {
4478 spin_mutex::scoped_lock l( my_mutex );
4479 my_successors.remove_successor(s);
4480 return true;
4481 }
4482
4483 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4484 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
4485 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
4486
4487 void internal_add_built_successor( successor_type &s) __TBB_override {
4488 spin_mutex::scoped_lock l( my_mutex );
4489 my_successors.internal_add_built_successor(s);
4490 }
4491
4492 void internal_delete_built_successor( successor_type &s) __TBB_override {
4493 spin_mutex::scoped_lock l( my_mutex );
4494 my_successors.internal_delete_built_successor(s);
4495 }
4496
4497 size_t successor_count() __TBB_override {
4498 spin_mutex::scoped_lock l( my_mutex );
4499 return my_successors.successor_count();
4500 }
4501
4502 void copy_successors(successor_list_type &v) __TBB_override {
4503 spin_mutex::scoped_lock l( my_mutex );
4504 my_successors.copy_successors(v);
4505 }
4506
4507 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
4508 spin_mutex::scoped_lock l( my_mutex );
4509 my_built_predecessors.add_edge(p);
4510 }
4511
4512 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
4513 spin_mutex::scoped_lock l( my_mutex );
4514 my_built_predecessors.delete_edge(p);
4515 }
4516
4517 size_t predecessor_count() __TBB_override {
4518 spin_mutex::scoped_lock l( my_mutex );
4519 return my_built_predecessors.edge_count();
4520 }
4521
4522 void copy_predecessors( predecessor_list_type &v ) __TBB_override {
4523 spin_mutex::scoped_lock l( my_mutex );
4524 my_built_predecessors.copy_edges(v);
4525 }
4526
4527 void extract() __TBB_override {
4528 my_buffer_is_valid = false;
4529 built_successors().sender_extract(*this);
4530 built_predecessors().receiver_extract(*this);
4531 }
4532
4533 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4534
4535 bool try_get( input_type &v ) __TBB_override {
4536 spin_mutex::scoped_lock l( my_mutex );
4537 if ( my_buffer_is_valid ) {
4538 v = my_buffer;
4539 return true;
4540 }
4541 return false;
4542 }
4543
4544 //! Reserves an item
4545 bool try_reserve( T &v ) __TBB_override {
4546 return try_get(v);
4547 }
4548
4549 //! Releases the reserved item
4550 bool try_release() __TBB_override { return true; }
4551
4552 //! Consumes the reserved item
4553 bool try_consume() __TBB_override { return true; }
4554
4555 bool is_valid() {
4556 spin_mutex::scoped_lock l( my_mutex );
4557 return my_buffer_is_valid;
4558 }
4559
4560 void clear() {
4561 spin_mutex::scoped_lock l( my_mutex );
4562 my_buffer_is_valid = false;
4563 }
4564
4565 protected:
4566
4567 template< typename R, typename B > friend class run_and_put_task;
4568 template<typename X, typename Y> friend class internal::broadcast_cache;
4569 template<typename X, typename Y> friend class internal::round_robin_cache;
4570 task * try_put_task( const input_type &v ) __TBB_override {
4571 spin_mutex::scoped_lock l( my_mutex );
4572 return try_put_task_impl(v);
4573 }
4574
4575 task * try_put_task_impl(const input_type &v) {
4576 my_buffer = v;
4577 my_buffer_is_valid = true;
4578 task * rtask = my_successors.try_put_task(v);
4579 if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
4580 return rtask;
4581 }
4582
4583 graph& graph_reference() const __TBB_override {
4584 return my_graph;
4585 }
4586
4587 //! Breaks an infinite loop between the node reservation and register_successor call
4588 struct register_predecessor_task : public graph_task {
4589
4590 register_predecessor_task(predecessor_type& owner, successor_type& succ) :
4591 o(owner), s(succ) {};
4592
4593 tbb::task* execute() __TBB_override {
4594 if (!s.register_predecessor(o)) {
4595 o.register_successor(s);
4596 }
4597 return NULL;
4598 }
4599
4600 predecessor_type& o;
4601 successor_type& s;
4602 };
4603
4604 spin_mutex my_mutex;
4605 internal::broadcast_cache< input_type, null_rw_mutex > my_successors;
4606 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4607 internal::edge_container<predecessor_type> my_built_predecessors;
4608 #endif
4609 input_type my_buffer;
4610 bool my_buffer_is_valid;
4611 void reset_receiver(reset_flags /*f*/) __TBB_override {}
4612
4613 void reset_node( reset_flags f) __TBB_override {
4614 my_buffer_is_valid = false;
4615 if (f&rf_clear_edges) {
4616 my_successors.clear();
4617 }
4618 }
4619 }; // overwrite_node
4620
4621 template< typename T >
4622 class write_once_node : public overwrite_node<T> {
4623 public:
4624 typedef T input_type;
4625 typedef T output_type;
4626 typedef overwrite_node<T> base_type;
4627 typedef typename receiver<input_type>::predecessor_type predecessor_type;
4628 typedef typename sender<output_type>::successor_type successor_type;
4629
4630 //! Constructor
4631 __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
4632 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4633 static_cast<receiver<input_type> *>(this),
4634 static_cast<sender<output_type> *>(this) );
4635 }
4636
4637 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4638 template <typename... Args>
4639 write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
4640 make_edges_in_order(nodes, *this);
4641 }
4642 #endif
4643
4644 //! Copy constructor: call base class copy constructor
4645 __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
4646 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4647 static_cast<receiver<input_type> *>(this),
4648 static_cast<sender<output_type> *>(this) );
4649 }
4650
4651 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4652 void set_name( const char *name ) __TBB_override {
4653 tbb::internal::fgt_node_desc( this, name );
4654 }
4655 #endif
4656
4657 protected:
4658 template< typename R, typename B > friend class run_and_put_task;
4659 template<typename X, typename Y> friend class internal::broadcast_cache;
4660 template<typename X, typename Y> friend class internal::round_robin_cache;
4661 task *try_put_task( const T &v ) __TBB_override {
4662 spin_mutex::scoped_lock l( this->my_mutex );
4663 return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
4664 }
4665 };
4666
4667 } // interfaceX
4668
4669 using interface11::reset_flags;
4670 using interface11::rf_reset_protocol;
4671 using interface11::rf_reset_bodies;
4672 using interface11::rf_clear_edges;
4673
4674 using interface11::graph;
4675 using interface11::graph_node;
4676 using interface11::continue_msg;
4677 using interface11::source_node;
4678 using interface11::input_node;
4679 using interface11::function_node;
4680 using interface11::multifunction_node;
4681 using interface11::split_node;
4682 using interface11::internal::output_port;
4683 using interface11::indexer_node;
4684 using interface11::internal::tagged_msg;
4685 using interface11::internal::cast_to;
4686 using interface11::internal::is_a;
4687 using interface11::continue_node;
4688 using interface11::overwrite_node;
4689 using interface11::write_once_node;
4690 using interface11::broadcast_node;
4691 using interface11::buffer_node;
4692 using interface11::queue_node;
4693 using interface11::sequencer_node;
4694 using interface11::priority_queue_node;
4695 using interface11::limiter_node;
4696 using namespace interface11::internal::graph_policy_namespace;
4697 using interface11::join_node;
4698 using interface11::input_port;
4699 using interface11::copy_body;
4700 using interface11::make_edge;
4701 using interface11::remove_edge;
4702 using interface11::internal::tag_value;
4703 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
4704 using interface11::composite_node;
4705 #endif
4706 using interface11::async_node;
4707 #if __TBB_PREVIEW_ASYNC_MSG
4708 using interface11::async_msg;
4709 #endif
4710 #if __TBB_PREVIEW_STREAMING_NODE
4711 using interface11::port_ref;
4712 using interface11::streaming_node;
4713 #endif // __TBB_PREVIEW_STREAMING_NODE
4714 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4715 using internal::node_priority_t;
4716 using internal::no_priority;
4717 #endif
4718
4719 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4720 using interface11::internal::follows;
4721 using interface11::internal::precedes;
4722 using interface11::internal::make_node_set;
4723 using interface11::internal::make_edges;
4724 #endif
4725
4726 } // flow
4727 } // tbb
4728
4729 // Include deduction guides for node classes
4730 #include "internal/_flow_graph_nodes_deduction.h"
4731
4732 #undef __TBB_PFG_RESET_ARG
4733 #undef __TBB_COMMA
4734 #undef __TBB_DEFAULT_NODE_ALLOCATOR
4735
4736 #include "internal/_warning_suppress_disable_notice.h"
4737 #undef __TBB_flow_graph_H_include_area
4738
4739 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
4740 #undef __TBB_NOINLINE_SYM
4741 #endif
4742
4743 #endif // __TBB_flow_graph_H
4744