1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_cache_impl_H
18 #define __TBB__flow_graph_cache_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 // included in namespace tbb::flow::interfaceX (in flow_graph.h)
25 
26 namespace internal {
27 
28 //! A node_cache maintains a std::queue of elements of type T.  Each operation is protected by a lock.
29 template< typename T, typename M=spin_mutex >
30 class node_cache {
31     public:
32 
33     typedef size_t size_type;
34 
empty()35     bool empty() {
36         typename mutex_type::scoped_lock lock( my_mutex );
37         return internal_empty();
38     }
39 
add(T & n)40     void add( T &n ) {
41         typename mutex_type::scoped_lock lock( my_mutex );
42         internal_push(n);
43     }
44 
remove(T & n)45     void remove( T &n ) {
46         typename mutex_type::scoped_lock lock( my_mutex );
47         for ( size_t i = internal_size(); i != 0; --i ) {
48             T &s = internal_pop();
49             if ( &s == &n )  return;  // only remove one predecessor per request
50             internal_push(s);
51         }
52     }
53 
clear()54     void clear() {
55         while( !my_q.empty()) (void)my_q.pop();
56 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
57         my_built_predecessors.clear();
58 #endif
59     }
60 
61 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
62     typedef edge_container<T> built_predecessors_type;
built_predecessors()63     built_predecessors_type &built_predecessors() { return my_built_predecessors; }
64 
65     typedef typename edge_container<T>::edge_list_type predecessor_list_type;
internal_add_built_predecessor(T & n)66     void internal_add_built_predecessor( T &n ) {
67         typename mutex_type::scoped_lock lock( my_mutex );
68         my_built_predecessors.add_edge(n);
69     }
70 
internal_delete_built_predecessor(T & n)71     void internal_delete_built_predecessor( T &n ) {
72         typename mutex_type::scoped_lock lock( my_mutex );
73         my_built_predecessors.delete_edge(n);
74     }
75 
copy_predecessors(predecessor_list_type & v)76     void copy_predecessors( predecessor_list_type &v) {
77         typename mutex_type::scoped_lock lock( my_mutex );
78         my_built_predecessors.copy_edges(v);
79     }
80 
predecessor_count()81     size_t predecessor_count() {
82         typename mutex_type::scoped_lock lock(my_mutex);
83         return (size_t)(my_built_predecessors.edge_count());
84     }
85 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
86 
87 protected:
88 
89     typedef M mutex_type;
90     mutex_type my_mutex;
91     std::queue< T * > my_q;
92 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
93     built_predecessors_type my_built_predecessors;
94 #endif
95 
96     // Assumes lock is held
internal_empty()97     inline bool internal_empty( )  {
98         return my_q.empty();
99     }
100 
101     // Assumes lock is held
internal_size()102     inline size_type internal_size( )  {
103         return my_q.size();
104     }
105 
106     // Assumes lock is held
internal_push(T & n)107     inline void internal_push( T &n )  {
108         my_q.push(&n);
109     }
110 
111     // Assumes lock is held
internal_pop()112     inline T &internal_pop() {
113         T *v = my_q.front();
114         my_q.pop();
115         return *v;
116     }
117 
118 };
119 
120 //! A cache of predecessors that only supports try_get
121 template< typename T, typename M=spin_mutex >
122 #if __TBB_PREVIEW_ASYNC_MSG
123 // TODO: make predecessor_cache type T-independent when async_msg becomes regular feature
124 class predecessor_cache : public node_cache< untyped_sender, M > {
125 #else
126 class predecessor_cache : public node_cache< sender<T>, M > {
127 #endif // __TBB_PREVIEW_ASYNC_MSG
128 public:
129     typedef M mutex_type;
130     typedef T output_type;
131 #if __TBB_PREVIEW_ASYNC_MSG
132     typedef untyped_sender predecessor_type;
133     typedef untyped_receiver successor_type;
134 #else
135     typedef sender<output_type> predecessor_type;
136     typedef receiver<output_type> successor_type;
137 #endif // __TBB_PREVIEW_ASYNC_MSG
138 
predecessor_cache()139     predecessor_cache( ) : my_owner( NULL ) { }
140 
set_owner(successor_type * owner)141     void set_owner( successor_type *owner ) { my_owner = owner; }
142 
get_item(output_type & v)143     bool get_item( output_type &v ) {
144 
145         bool msg = false;
146 
147         do {
148             predecessor_type *src;
149             {
150                 typename mutex_type::scoped_lock lock(this->my_mutex);
151                 if ( this->internal_empty() ) {
152                     break;
153                 }
154                 src = &this->internal_pop();
155             }
156 
157             // Try to get from this sender
158             msg = src->try_get( v );
159 
160             if (msg == false) {
161                 // Relinquish ownership of the edge
162                 if (my_owner)
163                     src->register_successor( *my_owner );
164             } else {
165                 // Retain ownership of the edge
166                 this->add(*src);
167             }
168         } while ( msg == false );
169         return msg;
170     }
171 
172     // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
reset()173     void reset() {
174         if (my_owner) {
175             for(;;) {
176                 predecessor_type *src;
177                 {
178                     if (this->internal_empty()) break;
179                     src = &this->internal_pop();
180                 }
181                 src->register_successor( *my_owner );
182             }
183         }
184     }
185 
186 protected:
187 
188 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
189     using node_cache< predecessor_type, M >::my_built_predecessors;
190 #endif
191     successor_type *my_owner;
192 };
193 
194 //! An cache of predecessors that supports requests and reservations
195 // TODO: make reservable_predecessor_cache type T-independent when async_msg becomes regular feature
196 template< typename T, typename M=spin_mutex >
197 class reservable_predecessor_cache : public predecessor_cache< T, M > {
198 public:
199     typedef M mutex_type;
200     typedef T output_type;
201 #if __TBB_PREVIEW_ASYNC_MSG
202     typedef untyped_sender predecessor_type;
203     typedef untyped_receiver successor_type;
204 #else
205     typedef sender<T> predecessor_type;
206     typedef receiver<T> successor_type;
207 #endif // __TBB_PREVIEW_ASYNC_MSG
208 
reservable_predecessor_cache()209     reservable_predecessor_cache( ) : reserved_src(NULL) { }
210 
211     bool
try_reserve(output_type & v)212     try_reserve( output_type &v ) {
213         bool msg = false;
214 
215         do {
216             {
217                 typename mutex_type::scoped_lock lock(this->my_mutex);
218                 if ( reserved_src || this->internal_empty() )
219                     return false;
220 
221                 reserved_src = &this->internal_pop();
222             }
223 
224             // Try to get from this sender
225             msg = reserved_src->try_reserve( v );
226 
227             if (msg == false) {
228                 typename mutex_type::scoped_lock lock(this->my_mutex);
229                 // Relinquish ownership of the edge
230                 reserved_src->register_successor( *this->my_owner );
231                 reserved_src = NULL;
232             } else {
233                 // Retain ownership of the edge
234                 this->add( *reserved_src );
235             }
236         } while ( msg == false );
237 
238         return msg;
239     }
240 
241     bool
try_release()242     try_release( ) {
243         reserved_src->try_release( );
244         reserved_src = NULL;
245         return true;
246     }
247 
248     bool
try_consume()249     try_consume( ) {
250         reserved_src->try_consume( );
251         reserved_src = NULL;
252         return true;
253     }
254 
reset()255     void reset( ) {
256         reserved_src = NULL;
257         predecessor_cache<T,M>::reset( );
258     }
259 
clear()260     void clear() {
261         reserved_src = NULL;
262         predecessor_cache<T,M>::clear();
263     }
264 
265 private:
266     predecessor_type *reserved_src;
267 };
268 
269 
270 //! An abstract cache of successors
271 // TODO: make successor_cache type T-independent when async_msg becomes regular feature
272 template<typename T, typename M=spin_rw_mutex >
273 class successor_cache : tbb::internal::no_copy {
274 protected:
275 
276     typedef M mutex_type;
277     mutex_type my_mutex;
278 
279 #if __TBB_PREVIEW_ASYNC_MSG
280     typedef untyped_receiver successor_type;
281     typedef untyped_receiver *pointer_type;
282     typedef untyped_sender owner_type;
283 #else
284     typedef receiver<T> successor_type;
285     typedef receiver<T> *pointer_type;
286     typedef sender<T> owner_type;
287 #endif // __TBB_PREVIEW_ASYNC_MSG
288     typedef std::list< pointer_type > successors_type;
289 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
290     edge_container<successor_type> my_built_successors;
291 #endif
292     successors_type my_successors;
293 
294     owner_type *my_owner;
295 
296 public:
297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
298     typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
299 
built_successors()300     edge_container<successor_type> &built_successors() { return my_built_successors; }
301 
internal_add_built_successor(successor_type & r)302     void internal_add_built_successor( successor_type &r) {
303         typename mutex_type::scoped_lock l(my_mutex, true);
304         my_built_successors.add_edge( r );
305     }
306 
internal_delete_built_successor(successor_type & r)307     void internal_delete_built_successor( successor_type &r) {
308         typename mutex_type::scoped_lock l(my_mutex, true);
309         my_built_successors.delete_edge(r);
310     }
311 
copy_successors(successor_list_type & v)312     void copy_successors( successor_list_type &v) {
313         typename mutex_type::scoped_lock l(my_mutex, false);
314         my_built_successors.copy_edges(v);
315     }
316 
successor_count()317     size_t successor_count() {
318         typename mutex_type::scoped_lock l(my_mutex,false);
319         return my_built_successors.edge_count();
320     }
321 
322 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
323 
successor_cache()324     successor_cache( ) : my_owner(NULL) {}
325 
set_owner(owner_type * owner)326     void set_owner( owner_type *owner ) { my_owner = owner; }
327 
~successor_cache()328     virtual ~successor_cache() {}
329 
register_successor(successor_type & r)330     void register_successor( successor_type &r ) {
331         typename mutex_type::scoped_lock l(my_mutex, true);
332         my_successors.push_back( &r );
333     }
334 
remove_successor(successor_type & r)335     void remove_successor( successor_type &r ) {
336         typename mutex_type::scoped_lock l(my_mutex, true);
337         for ( typename successors_type::iterator i = my_successors.begin();
338               i != my_successors.end(); ++i ) {
339             if ( *i == & r ) {
340                 my_successors.erase(i);
341                 break;
342             }
343         }
344     }
345 
empty()346     bool empty() {
347         typename mutex_type::scoped_lock l(my_mutex, false);
348         return my_successors.empty();
349     }
350 
clear()351     void clear() {
352         my_successors.clear();
353 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
354         my_built_successors.clear();
355 #endif
356     }
357 
358 #if !__TBB_PREVIEW_ASYNC_MSG
359     virtual task * try_put_task( const T &t ) = 0;
360 #endif // __TBB_PREVIEW_ASYNC_MSG
361  };  // successor_cache<T>
362 
363 //! An abstract cache of successors, specialized to continue_msg
364 template<typename M>
365 class successor_cache< continue_msg, M > : tbb::internal::no_copy {
366 protected:
367 
368     typedef M mutex_type;
369     mutex_type my_mutex;
370 
371 #if __TBB_PREVIEW_ASYNC_MSG
372     typedef untyped_receiver successor_type;
373     typedef untyped_receiver *pointer_type;
374 #else
375     typedef receiver<continue_msg> successor_type;
376     typedef receiver<continue_msg> *pointer_type;
377 #endif // __TBB_PREVIEW_ASYNC_MSG
378     typedef std::list< pointer_type > successors_type;
379     successors_type my_successors;
380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
381     edge_container<successor_type> my_built_successors;
382     typedef edge_container<successor_type>::edge_list_type successor_list_type;
383 #endif
384 
385     sender<continue_msg> *my_owner;
386 
387 public:
388 
389 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
390 
built_successors()391     edge_container<successor_type> &built_successors() { return my_built_successors; }
392 
internal_add_built_successor(successor_type & r)393     void internal_add_built_successor( successor_type &r) {
394         typename mutex_type::scoped_lock l(my_mutex, true);
395         my_built_successors.add_edge( r );
396     }
397 
internal_delete_built_successor(successor_type & r)398     void internal_delete_built_successor( successor_type &r) {
399         typename mutex_type::scoped_lock l(my_mutex, true);
400         my_built_successors.delete_edge(r);
401     }
402 
copy_successors(successor_list_type & v)403     void copy_successors( successor_list_type &v) {
404         typename mutex_type::scoped_lock l(my_mutex, false);
405         my_built_successors.copy_edges(v);
406     }
407 
successor_count()408     size_t successor_count() {
409         typename mutex_type::scoped_lock l(my_mutex,false);
410         return my_built_successors.edge_count();
411     }
412 
413 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
414 
successor_cache()415     successor_cache( ) : my_owner(NULL) {}
416 
set_owner(sender<continue_msg> * owner)417     void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
418 
~successor_cache()419     virtual ~successor_cache() {}
420 
register_successor(successor_type & r)421     void register_successor( successor_type &r ) {
422         typename mutex_type::scoped_lock l(my_mutex, true);
423         my_successors.push_back( &r );
424         if ( my_owner && r.is_continue_receiver() ) {
425             r.register_predecessor( *my_owner );
426         }
427     }
428 
remove_successor(successor_type & r)429     void remove_successor( successor_type &r ) {
430         typename mutex_type::scoped_lock l(my_mutex, true);
431         for ( successors_type::iterator i = my_successors.begin();
432               i != my_successors.end(); ++i ) {
433             if ( *i == & r ) {
434                 // TODO: Check if we need to test for continue_receiver before
435                 // removing from r.
436                 if ( my_owner )
437                     r.remove_predecessor( *my_owner );
438                 my_successors.erase(i);
439                 break;
440             }
441         }
442     }
443 
empty()444     bool empty() {
445         typename mutex_type::scoped_lock l(my_mutex, false);
446         return my_successors.empty();
447     }
448 
clear()449     void clear() {
450         my_successors.clear();
451 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
452         my_built_successors.clear();
453 #endif
454     }
455 
456 #if !__TBB_PREVIEW_ASYNC_MSG
457     virtual task * try_put_task( const continue_msg &t ) = 0;
458 #endif // __TBB_PREVIEW_ASYNC_MSG
459 
460 };  // successor_cache< continue_msg >
461 
462 //! A cache of successors that are broadcast to
463 // TODO: make broadcast_cache type T-independent when async_msg becomes regular feature
464 template<typename T, typename M=spin_rw_mutex>
465 class broadcast_cache : public successor_cache<T, M> {
466     typedef M mutex_type;
467     typedef typename successor_cache<T,M>::successors_type successors_type;
468 
469 public:
470 
broadcast_cache()471     broadcast_cache( ) {}
472 
473     // as above, but call try_put_task instead, and return the last task we received (if any)
474 #if __TBB_PREVIEW_ASYNC_MSG
475     template<typename X>
try_put_task(const X & t)476     task * try_put_task( const X &t ) {
477 #else
478     task * try_put_task( const T &t ) __TBB_override {
479 #endif // __TBB_PREVIEW_ASYNC_MSG
480         task * last_task = NULL;
481         bool upgraded = true;
482         typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
483         typename successors_type::iterator i = this->my_successors.begin();
484         while ( i != this->my_successors.end() ) {
485             task *new_task = (*i)->try_put_task(t);
486             // workaround for icc bug
487             graph& graph_ref = (*i)->graph_reference();
488             last_task = combine_tasks(graph_ref, last_task, new_task);  // enqueue if necessary
489             if(new_task) {
490                 ++i;
491             }
492             else {  // failed
493                 if ( (*i)->register_predecessor(*this->my_owner) ) {
494                     if (!upgraded) {
495                         l.upgrade_to_writer();
496                         upgraded = true;
497                     }
498                     i = this->my_successors.erase(i);
499                 } else {
500                     ++i;
501                 }
502             }
503         }
504         return last_task;
505     }
506 
507     // call try_put_task and return list of received tasks
508 #if __TBB_PREVIEW_ASYNC_MSG
509     template<typename X>
510     bool gather_successful_try_puts( const X &t, task_list &tasks ) {
511 #else
512     bool gather_successful_try_puts( const T &t, task_list &tasks ) {
513 #endif // __TBB_PREVIEW_ASYNC_MSG
514         bool upgraded = true;
515         bool is_at_least_one_put_successful = false;
516         typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
517         typename successors_type::iterator i = this->my_successors.begin();
518         while ( i != this->my_successors.end() ) {
519             task * new_task = (*i)->try_put_task(t);
520             if(new_task) {
521                 ++i;
522                 if(new_task != SUCCESSFULLY_ENQUEUED) {
523                     tasks.push_back(*new_task);
524                 }
525                 is_at_least_one_put_successful = true;
526             }
527             else {  // failed
528                 if ( (*i)->register_predecessor(*this->my_owner) ) {
529                     if (!upgraded) {
530                         l.upgrade_to_writer();
531                         upgraded = true;
532                     }
533                     i = this->my_successors.erase(i);
534                 } else {
535                     ++i;
536                 }
537             }
538         }
539         return is_at_least_one_put_successful;
540     }
541 };
542 
543 //! A cache of successors that are put in a round-robin fashion
544 // TODO: make round_robin_cache type T-independent when async_msg becomes regular feature
545 template<typename T, typename M=spin_rw_mutex >
546 class round_robin_cache : public successor_cache<T, M> {
547     typedef size_t size_type;
548     typedef M mutex_type;
549     typedef typename successor_cache<T,M>::successors_type successors_type;
550 
551 public:
552 
553     round_robin_cache( ) {}
554 
555     size_type size() {
556         typename mutex_type::scoped_lock l(this->my_mutex, false);
557         return this->my_successors.size();
558     }
559 
560 #if __TBB_PREVIEW_ASYNC_MSG
561     template<typename X>
562     task * try_put_task( const X &t ) {
563 #else
564     task *try_put_task( const T &t ) __TBB_override {
565 #endif // __TBB_PREVIEW_ASYNC_MSG
566         bool upgraded = true;
567         typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
568         typename successors_type::iterator i = this->my_successors.begin();
569         while ( i != this->my_successors.end() ) {
570             task *new_task = (*i)->try_put_task(t);
571             if ( new_task ) {
572                 return new_task;
573             } else {
574                if ( (*i)->register_predecessor(*this->my_owner) ) {
575                    if (!upgraded) {
576                        l.upgrade_to_writer();
577                        upgraded = true;
578                    }
579                    i = this->my_successors.erase(i);
580                }
581                else {
582                    ++i;
583                }
584             }
585         }
586         return NULL;
587     }
588 };
589 
590 } // namespace internal
591 
592 #endif // __TBB__flow_graph_cache_impl_H
593