1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB__flow_graph_cache_impl_H 18 #define __TBB__flow_graph_cache_impl_H 19 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 22 #endif 23 24 // included in namespace tbb::flow::interfaceX (in flow_graph.h) 25 26 namespace internal { 27 28 //! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock. 29 template< typename T, typename M=spin_mutex > 30 class node_cache { 31 public: 32 33 typedef size_t size_type; 34 empty()35 bool empty() { 36 typename mutex_type::scoped_lock lock( my_mutex ); 37 return internal_empty(); 38 } 39 add(T & n)40 void add( T &n ) { 41 typename mutex_type::scoped_lock lock( my_mutex ); 42 internal_push(n); 43 } 44 remove(T & n)45 void remove( T &n ) { 46 typename mutex_type::scoped_lock lock( my_mutex ); 47 for ( size_t i = internal_size(); i != 0; --i ) { 48 T &s = internal_pop(); 49 if ( &s == &n ) return; // only remove one predecessor per request 50 internal_push(s); 51 } 52 } 53 clear()54 void clear() { 55 while( !my_q.empty()) (void)my_q.pop(); 56 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 57 my_built_predecessors.clear(); 58 #endif 59 } 60 61 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 62 typedef edge_container<T> built_predecessors_type; built_predecessors()63 built_predecessors_type &built_predecessors() { return my_built_predecessors; } 64 65 typedef typename edge_container<T>::edge_list_type predecessor_list_type; internal_add_built_predecessor(T & n)66 void internal_add_built_predecessor( T &n ) { 67 typename mutex_type::scoped_lock lock( my_mutex ); 68 my_built_predecessors.add_edge(n); 69 } 70 internal_delete_built_predecessor(T & n)71 void internal_delete_built_predecessor( T &n ) { 72 typename mutex_type::scoped_lock lock( my_mutex ); 73 my_built_predecessors.delete_edge(n); 74 } 75 copy_predecessors(predecessor_list_type & v)76 void copy_predecessors( predecessor_list_type &v) { 77 typename mutex_type::scoped_lock lock( my_mutex ); 78 my_built_predecessors.copy_edges(v); 79 } 80 predecessor_count()81 size_t predecessor_count() { 82 typename mutex_type::scoped_lock lock(my_mutex); 83 return (size_t)(my_built_predecessors.edge_count()); 84 } 85 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */ 86 87 protected: 88 89 typedef M mutex_type; 90 mutex_type my_mutex; 91 std::queue< T * > my_q; 92 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 93 built_predecessors_type my_built_predecessors; 94 #endif 95 96 // Assumes lock is held internal_empty()97 inline bool internal_empty( ) { 98 return my_q.empty(); 99 } 100 101 // Assumes lock is held internal_size()102 inline size_type internal_size( ) { 103 return my_q.size(); 104 } 105 106 // Assumes lock is held internal_push(T & n)107 inline void internal_push( T &n ) { 108 my_q.push(&n); 109 } 110 111 // Assumes lock is held internal_pop()112 inline T &internal_pop() { 113 T *v = my_q.front(); 114 my_q.pop(); 115 return *v; 116 } 117 118 }; 119 120 //! A cache of predecessors that only supports try_get 121 template< typename T, typename M=spin_mutex > 122 #if __TBB_PREVIEW_ASYNC_MSG 123 // TODO: make predecessor_cache type T-independent when async_msg becomes regular feature 124 class predecessor_cache : public node_cache< untyped_sender, M > { 125 #else 126 class predecessor_cache : public node_cache< sender<T>, M > { 127 #endif // __TBB_PREVIEW_ASYNC_MSG 128 public: 129 typedef M mutex_type; 130 typedef T output_type; 131 #if __TBB_PREVIEW_ASYNC_MSG 132 typedef untyped_sender predecessor_type; 133 typedef untyped_receiver successor_type; 134 #else 135 typedef sender<output_type> predecessor_type; 136 typedef receiver<output_type> successor_type; 137 #endif // __TBB_PREVIEW_ASYNC_MSG 138 predecessor_cache()139 predecessor_cache( ) : my_owner( NULL ) { } 140 set_owner(successor_type * owner)141 void set_owner( successor_type *owner ) { my_owner = owner; } 142 get_item(output_type & v)143 bool get_item( output_type &v ) { 144 145 bool msg = false; 146 147 do { 148 predecessor_type *src; 149 { 150 typename mutex_type::scoped_lock lock(this->my_mutex); 151 if ( this->internal_empty() ) { 152 break; 153 } 154 src = &this->internal_pop(); 155 } 156 157 // Try to get from this sender 158 msg = src->try_get( v ); 159 160 if (msg == false) { 161 // Relinquish ownership of the edge 162 if (my_owner) 163 src->register_successor( *my_owner ); 164 } else { 165 // Retain ownership of the edge 166 this->add(*src); 167 } 168 } while ( msg == false ); 169 return msg; 170 } 171 172 // If we are removing arcs (rf_clear_edges), call clear() rather than reset(). reset()173 void reset() { 174 if (my_owner) { 175 for(;;) { 176 predecessor_type *src; 177 { 178 if (this->internal_empty()) break; 179 src = &this->internal_pop(); 180 } 181 src->register_successor( *my_owner ); 182 } 183 } 184 } 185 186 protected: 187 188 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 189 using node_cache< predecessor_type, M >::my_built_predecessors; 190 #endif 191 successor_type *my_owner; 192 }; 193 194 //! An cache of predecessors that supports requests and reservations 195 // TODO: make reservable_predecessor_cache type T-independent when async_msg becomes regular feature 196 template< typename T, typename M=spin_mutex > 197 class reservable_predecessor_cache : public predecessor_cache< T, M > { 198 public: 199 typedef M mutex_type; 200 typedef T output_type; 201 #if __TBB_PREVIEW_ASYNC_MSG 202 typedef untyped_sender predecessor_type; 203 typedef untyped_receiver successor_type; 204 #else 205 typedef sender<T> predecessor_type; 206 typedef receiver<T> successor_type; 207 #endif // __TBB_PREVIEW_ASYNC_MSG 208 reservable_predecessor_cache()209 reservable_predecessor_cache( ) : reserved_src(NULL) { } 210 211 bool try_reserve(output_type & v)212 try_reserve( output_type &v ) { 213 bool msg = false; 214 215 do { 216 { 217 typename mutex_type::scoped_lock lock(this->my_mutex); 218 if ( reserved_src || this->internal_empty() ) 219 return false; 220 221 reserved_src = &this->internal_pop(); 222 } 223 224 // Try to get from this sender 225 msg = reserved_src->try_reserve( v ); 226 227 if (msg == false) { 228 typename mutex_type::scoped_lock lock(this->my_mutex); 229 // Relinquish ownership of the edge 230 reserved_src->register_successor( *this->my_owner ); 231 reserved_src = NULL; 232 } else { 233 // Retain ownership of the edge 234 this->add( *reserved_src ); 235 } 236 } while ( msg == false ); 237 238 return msg; 239 } 240 241 bool try_release()242 try_release( ) { 243 reserved_src->try_release( ); 244 reserved_src = NULL; 245 return true; 246 } 247 248 bool try_consume()249 try_consume( ) { 250 reserved_src->try_consume( ); 251 reserved_src = NULL; 252 return true; 253 } 254 reset()255 void reset( ) { 256 reserved_src = NULL; 257 predecessor_cache<T,M>::reset( ); 258 } 259 clear()260 void clear() { 261 reserved_src = NULL; 262 predecessor_cache<T,M>::clear(); 263 } 264 265 private: 266 predecessor_type *reserved_src; 267 }; 268 269 270 //! An abstract cache of successors 271 // TODO: make successor_cache type T-independent when async_msg becomes regular feature 272 template<typename T, typename M=spin_rw_mutex > 273 class successor_cache : tbb::internal::no_copy { 274 protected: 275 276 typedef M mutex_type; 277 mutex_type my_mutex; 278 279 #if __TBB_PREVIEW_ASYNC_MSG 280 typedef untyped_receiver successor_type; 281 typedef untyped_receiver *pointer_type; 282 typedef untyped_sender owner_type; 283 #else 284 typedef receiver<T> successor_type; 285 typedef receiver<T> *pointer_type; 286 typedef sender<T> owner_type; 287 #endif // __TBB_PREVIEW_ASYNC_MSG 288 typedef std::list< pointer_type > successors_type; 289 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 290 edge_container<successor_type> my_built_successors; 291 #endif 292 successors_type my_successors; 293 294 owner_type *my_owner; 295 296 public: 297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 298 typedef typename edge_container<successor_type>::edge_list_type successor_list_type; 299 built_successors()300 edge_container<successor_type> &built_successors() { return my_built_successors; } 301 internal_add_built_successor(successor_type & r)302 void internal_add_built_successor( successor_type &r) { 303 typename mutex_type::scoped_lock l(my_mutex, true); 304 my_built_successors.add_edge( r ); 305 } 306 internal_delete_built_successor(successor_type & r)307 void internal_delete_built_successor( successor_type &r) { 308 typename mutex_type::scoped_lock l(my_mutex, true); 309 my_built_successors.delete_edge(r); 310 } 311 copy_successors(successor_list_type & v)312 void copy_successors( successor_list_type &v) { 313 typename mutex_type::scoped_lock l(my_mutex, false); 314 my_built_successors.copy_edges(v); 315 } 316 successor_count()317 size_t successor_count() { 318 typename mutex_type::scoped_lock l(my_mutex,false); 319 return my_built_successors.edge_count(); 320 } 321 322 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */ 323 successor_cache()324 successor_cache( ) : my_owner(NULL) {} 325 set_owner(owner_type * owner)326 void set_owner( owner_type *owner ) { my_owner = owner; } 327 ~successor_cache()328 virtual ~successor_cache() {} 329 register_successor(successor_type & r)330 void register_successor( successor_type &r ) { 331 typename mutex_type::scoped_lock l(my_mutex, true); 332 my_successors.push_back( &r ); 333 } 334 remove_successor(successor_type & r)335 void remove_successor( successor_type &r ) { 336 typename mutex_type::scoped_lock l(my_mutex, true); 337 for ( typename successors_type::iterator i = my_successors.begin(); 338 i != my_successors.end(); ++i ) { 339 if ( *i == & r ) { 340 my_successors.erase(i); 341 break; 342 } 343 } 344 } 345 empty()346 bool empty() { 347 typename mutex_type::scoped_lock l(my_mutex, false); 348 return my_successors.empty(); 349 } 350 clear()351 void clear() { 352 my_successors.clear(); 353 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 354 my_built_successors.clear(); 355 #endif 356 } 357 358 #if !__TBB_PREVIEW_ASYNC_MSG 359 virtual task * try_put_task( const T &t ) = 0; 360 #endif // __TBB_PREVIEW_ASYNC_MSG 361 }; // successor_cache<T> 362 363 //! An abstract cache of successors, specialized to continue_msg 364 template<typename M> 365 class successor_cache< continue_msg, M > : tbb::internal::no_copy { 366 protected: 367 368 typedef M mutex_type; 369 mutex_type my_mutex; 370 371 #if __TBB_PREVIEW_ASYNC_MSG 372 typedef untyped_receiver successor_type; 373 typedef untyped_receiver *pointer_type; 374 #else 375 typedef receiver<continue_msg> successor_type; 376 typedef receiver<continue_msg> *pointer_type; 377 #endif // __TBB_PREVIEW_ASYNC_MSG 378 typedef std::list< pointer_type > successors_type; 379 successors_type my_successors; 380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 381 edge_container<successor_type> my_built_successors; 382 typedef edge_container<successor_type>::edge_list_type successor_list_type; 383 #endif 384 385 sender<continue_msg> *my_owner; 386 387 public: 388 389 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 390 built_successors()391 edge_container<successor_type> &built_successors() { return my_built_successors; } 392 internal_add_built_successor(successor_type & r)393 void internal_add_built_successor( successor_type &r) { 394 typename mutex_type::scoped_lock l(my_mutex, true); 395 my_built_successors.add_edge( r ); 396 } 397 internal_delete_built_successor(successor_type & r)398 void internal_delete_built_successor( successor_type &r) { 399 typename mutex_type::scoped_lock l(my_mutex, true); 400 my_built_successors.delete_edge(r); 401 } 402 copy_successors(successor_list_type & v)403 void copy_successors( successor_list_type &v) { 404 typename mutex_type::scoped_lock l(my_mutex, false); 405 my_built_successors.copy_edges(v); 406 } 407 successor_count()408 size_t successor_count() { 409 typename mutex_type::scoped_lock l(my_mutex,false); 410 return my_built_successors.edge_count(); 411 } 412 413 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */ 414 successor_cache()415 successor_cache( ) : my_owner(NULL) {} 416 set_owner(sender<continue_msg> * owner)417 void set_owner( sender<continue_msg> *owner ) { my_owner = owner; } 418 ~successor_cache()419 virtual ~successor_cache() {} 420 register_successor(successor_type & r)421 void register_successor( successor_type &r ) { 422 typename mutex_type::scoped_lock l(my_mutex, true); 423 my_successors.push_back( &r ); 424 if ( my_owner && r.is_continue_receiver() ) { 425 r.register_predecessor( *my_owner ); 426 } 427 } 428 remove_successor(successor_type & r)429 void remove_successor( successor_type &r ) { 430 typename mutex_type::scoped_lock l(my_mutex, true); 431 for ( successors_type::iterator i = my_successors.begin(); 432 i != my_successors.end(); ++i ) { 433 if ( *i == & r ) { 434 // TODO: Check if we need to test for continue_receiver before 435 // removing from r. 436 if ( my_owner ) 437 r.remove_predecessor( *my_owner ); 438 my_successors.erase(i); 439 break; 440 } 441 } 442 } 443 empty()444 bool empty() { 445 typename mutex_type::scoped_lock l(my_mutex, false); 446 return my_successors.empty(); 447 } 448 clear()449 void clear() { 450 my_successors.clear(); 451 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 452 my_built_successors.clear(); 453 #endif 454 } 455 456 #if !__TBB_PREVIEW_ASYNC_MSG 457 virtual task * try_put_task( const continue_msg &t ) = 0; 458 #endif // __TBB_PREVIEW_ASYNC_MSG 459 460 }; // successor_cache< continue_msg > 461 462 //! A cache of successors that are broadcast to 463 // TODO: make broadcast_cache type T-independent when async_msg becomes regular feature 464 template<typename T, typename M=spin_rw_mutex> 465 class broadcast_cache : public successor_cache<T, M> { 466 typedef M mutex_type; 467 typedef typename successor_cache<T,M>::successors_type successors_type; 468 469 public: 470 broadcast_cache()471 broadcast_cache( ) {} 472 473 // as above, but call try_put_task instead, and return the last task we received (if any) 474 #if __TBB_PREVIEW_ASYNC_MSG 475 template<typename X> try_put_task(const X & t)476 task * try_put_task( const X &t ) { 477 #else 478 task * try_put_task( const T &t ) __TBB_override { 479 #endif // __TBB_PREVIEW_ASYNC_MSG 480 task * last_task = NULL; 481 bool upgraded = true; 482 typename mutex_type::scoped_lock l(this->my_mutex, upgraded); 483 typename successors_type::iterator i = this->my_successors.begin(); 484 while ( i != this->my_successors.end() ) { 485 task *new_task = (*i)->try_put_task(t); 486 // workaround for icc bug 487 graph& graph_ref = (*i)->graph_reference(); 488 last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary 489 if(new_task) { 490 ++i; 491 } 492 else { // failed 493 if ( (*i)->register_predecessor(*this->my_owner) ) { 494 if (!upgraded) { 495 l.upgrade_to_writer(); 496 upgraded = true; 497 } 498 i = this->my_successors.erase(i); 499 } else { 500 ++i; 501 } 502 } 503 } 504 return last_task; 505 } 506 507 // call try_put_task and return list of received tasks 508 #if __TBB_PREVIEW_ASYNC_MSG 509 template<typename X> 510 bool gather_successful_try_puts( const X &t, task_list &tasks ) { 511 #else 512 bool gather_successful_try_puts( const T &t, task_list &tasks ) { 513 #endif // __TBB_PREVIEW_ASYNC_MSG 514 bool upgraded = true; 515 bool is_at_least_one_put_successful = false; 516 typename mutex_type::scoped_lock l(this->my_mutex, upgraded); 517 typename successors_type::iterator i = this->my_successors.begin(); 518 while ( i != this->my_successors.end() ) { 519 task * new_task = (*i)->try_put_task(t); 520 if(new_task) { 521 ++i; 522 if(new_task != SUCCESSFULLY_ENQUEUED) { 523 tasks.push_back(*new_task); 524 } 525 is_at_least_one_put_successful = true; 526 } 527 else { // failed 528 if ( (*i)->register_predecessor(*this->my_owner) ) { 529 if (!upgraded) { 530 l.upgrade_to_writer(); 531 upgraded = true; 532 } 533 i = this->my_successors.erase(i); 534 } else { 535 ++i; 536 } 537 } 538 } 539 return is_at_least_one_put_successful; 540 } 541 }; 542 543 //! A cache of successors that are put in a round-robin fashion 544 // TODO: make round_robin_cache type T-independent when async_msg becomes regular feature 545 template<typename T, typename M=spin_rw_mutex > 546 class round_robin_cache : public successor_cache<T, M> { 547 typedef size_t size_type; 548 typedef M mutex_type; 549 typedef typename successor_cache<T,M>::successors_type successors_type; 550 551 public: 552 553 round_robin_cache( ) {} 554 555 size_type size() { 556 typename mutex_type::scoped_lock l(this->my_mutex, false); 557 return this->my_successors.size(); 558 } 559 560 #if __TBB_PREVIEW_ASYNC_MSG 561 template<typename X> 562 task * try_put_task( const X &t ) { 563 #else 564 task *try_put_task( const T &t ) __TBB_override { 565 #endif // __TBB_PREVIEW_ASYNC_MSG 566 bool upgraded = true; 567 typename mutex_type::scoped_lock l(this->my_mutex, upgraded); 568 typename successors_type::iterator i = this->my_successors.begin(); 569 while ( i != this->my_successors.end() ) { 570 task *new_task = (*i)->try_put_task(t); 571 if ( new_task ) { 572 return new_task; 573 } else { 574 if ( (*i)->register_predecessor(*this->my_owner) ) { 575 if (!upgraded) { 576 l.upgrade_to_writer(); 577 upgraded = true; 578 } 579 i = this->my_successors.erase(i); 580 } 581 else { 582 ++i; 583 } 584 } 585 } 586 return NULL; 587 } 588 }; 589 590 } // namespace internal 591 592 #endif // __TBB__flow_graph_cache_impl_H 593