1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_monitor_H 18 #define __TBB_concurrent_monitor_H 19 20 #include "oneapi/tbb/spin_mutex.h" 21 #include "oneapi/tbb/detail/_exception.h" 22 #include "oneapi/tbb/detail/_aligned_space.h" 23 #include "concurrent_monitor_mutex.h" 24 #include "semaphore.h" 25 26 #include <atomic> 27 28 namespace tbb { 29 namespace detail { 30 namespace r1 { 31 32 //! Circular doubly-linked list with sentinel 33 /** head.next points to the front and head.prev points to the back */ 34 class circular_doubly_linked_list_with_sentinel { 35 public: 36 struct base_node { 37 base_node* next; 38 base_node* prev; 39 base_nodebase_node40 constexpr base_node(base_node* n, base_node* p) : next(n), prev(p) {} base_nodebase_node41 explicit base_node() : next((base_node*)(uintptr_t)0xcdcdcdcd), prev((base_node*)(uintptr_t)0xcdcdcdcd) {} 42 }; 43 44 // ctor circular_doubly_linked_list_with_sentinel()45 constexpr circular_doubly_linked_list_with_sentinel() : count(0), head(&head, &head) {} 46 47 circular_doubly_linked_list_with_sentinel(const circular_doubly_linked_list_with_sentinel&) = delete; 48 circular_doubly_linked_list_with_sentinel& operator=(const circular_doubly_linked_list_with_sentinel&) = delete; 49 size()50 inline std::size_t size() const { return count.load(std::memory_order_relaxed); } empty()51 inline bool empty() const { return size() == 0; } front()52 inline base_node* front() const { return head.next; } last()53 inline base_node* last() const { return head.prev; } end()54 inline const base_node* end() const { return &head; } 55 56 //! add to the back of the list add(base_node * n)57 inline void add( base_node* n ) { 58 count.store(count.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 59 n->prev = head.prev; 60 n->next = &head; 61 head.prev->next = n; 62 head.prev = n; 63 } 64 65 //! remove node 'n' remove(base_node & n)66 inline void remove( base_node& n ) { 67 __TBB_ASSERT(count.load(std::memory_order_relaxed) > 0, "attempt to remove an item from an empty list"); 68 count.store(count.load( std::memory_order_relaxed ) - 1, std::memory_order_relaxed); 69 n.prev->next = n.next; 70 n.next->prev = n.prev; 71 } 72 73 //! move all elements to 'lst' and initialize the 'this' list flush_to(circular_doubly_linked_list_with_sentinel & lst)74 inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) { 75 const std::size_t l_count = size(); 76 if (l_count > 0) { 77 lst.count.store(l_count, std::memory_order_relaxed); 78 lst.head.next = head.next; 79 lst.head.prev = head.prev; 80 head.next->prev = &lst.head; 81 head.prev->next = &lst.head; 82 clear(); 83 } 84 } 85 clear()86 void clear() { 87 head.next = &head; 88 head.prev = &head; 89 count.store(0, std::memory_order_relaxed); 90 } 91 private: 92 std::atomic<std::size_t> count; 93 base_node head; 94 }; 95 96 using base_list = circular_doubly_linked_list_with_sentinel; 97 using base_node = circular_doubly_linked_list_with_sentinel::base_node; 98 99 template <typename Context> 100 class concurrent_monitor_base; 101 102 template <typename Context> 103 class wait_node : public base_node { 104 public: 105 106 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900 wait_node(Context ctx)107 wait_node(Context ctx) : my_context(ctx), my_is_in_list(false) {} 108 #else 109 wait_node(Context ctx) : my_context(ctx) {} 110 #endif 111 112 virtual ~wait_node() = default; 113 init()114 virtual void init() { 115 __TBB_ASSERT(!my_initialized, nullptr); 116 my_initialized = true; 117 } 118 119 virtual void wait() = 0; 120 reset()121 virtual void reset() { 122 __TBB_ASSERT(my_skipped_wakeup, nullptr); 123 my_skipped_wakeup = false; 124 } 125 126 virtual void notify() = 0; 127 128 protected: 129 friend class concurrent_monitor_base<Context>; 130 friend class thread_data; 131 132 Context my_context{}; 133 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900 134 std::atomic<bool> my_is_in_list; 135 #else 136 std::atomic<bool> my_is_in_list{false}; 137 #endif 138 139 bool my_initialized{false}; 140 bool my_skipped_wakeup{false}; 141 bool my_aborted{false}; 142 unsigned my_epoch{0}; 143 }; 144 145 template <typename Context> 146 class sleep_node : public wait_node<Context> { 147 using base_type = wait_node<Context>; 148 public: 149 using base_type::base_type; 150 151 // Make it virtual due to Intel Compiler warning ~sleep_node()152 virtual ~sleep_node() { 153 if (this->my_initialized) { 154 if (this->my_skipped_wakeup) semaphore().P(); 155 semaphore().~binary_semaphore(); 156 } 157 } 158 semaphore()159 binary_semaphore& semaphore() { return *sema.begin(); } 160 init()161 void init() override { 162 if (!this->my_initialized) { 163 new (sema.begin()) binary_semaphore; 164 base_type::init(); 165 } 166 } 167 wait()168 void wait() override { 169 __TBB_ASSERT(this->my_initialized, 170 "Use of commit_wait() without prior prepare_wait()"); 171 semaphore().P(); 172 __TBB_ASSERT(!this->my_is_in_list.load(std::memory_order_relaxed), "Still in the queue?"); 173 if (this->my_aborted) 174 throw_exception(exception_id::user_abort); 175 } 176 reset()177 void reset() override { 178 base_type::reset(); 179 semaphore().P(); 180 } 181 notify()182 void notify() override { 183 semaphore().V(); 184 } 185 186 private: 187 tbb::detail::aligned_space<binary_semaphore> sema; 188 }; 189 190 //! concurrent_monitor 191 /** fine-grained concurrent_monitor implementation */ 192 template <typename Context> 193 class concurrent_monitor_base { 194 public: 195 //! ctor concurrent_monitor_base()196 constexpr concurrent_monitor_base() {} 197 //! dtor 198 ~concurrent_monitor_base() = default; 199 200 concurrent_monitor_base(const concurrent_monitor_base&) = delete; 201 concurrent_monitor_base& operator=(const concurrent_monitor_base&) = delete; 202 203 //! prepare wait by inserting 'thr' into the wait queue prepare_wait(wait_node<Context> & node)204 void prepare_wait( wait_node<Context>& node) { 205 // TODO: consider making even more lazy instantiation of the semaphore, that is only when it is actually needed, e.g. move it in node::wait() 206 if (!node.my_initialized) { 207 node.init(); 208 } 209 // this is good place to pump previous skipped wakeup 210 else if (node.my_skipped_wakeup) { 211 node.reset(); 212 } 213 214 node.my_is_in_list.store(true, std::memory_order_relaxed); 215 216 { 217 concurrent_monitor_mutex::scoped_lock l(my_mutex); 218 node.my_epoch = my_epoch.load(std::memory_order_relaxed); 219 my_waitset.add(&node); 220 } 221 222 // Prepare wait guarantees Write Read memory barrier. 223 // In C++ only full fence covers this type of barrier. 224 atomic_fence(std::memory_order_seq_cst); 225 } 226 227 //! Commit wait if event count has not changed; otherwise, cancel wait. 228 /** Returns true if committed, false if canceled. */ commit_wait(wait_node<Context> & node)229 inline bool commit_wait( wait_node<Context>& node ) { 230 const bool do_it = node.my_epoch == my_epoch.load(std::memory_order_relaxed); 231 // this check is just an optimization 232 if (do_it) { 233 node.wait(); 234 } else { 235 cancel_wait( node ); 236 } 237 return do_it; 238 } 239 240 //! Cancel the wait. Removes the thread from the wait queue if not removed yet. cancel_wait(wait_node<Context> & node)241 void cancel_wait( wait_node<Context>& node ) { 242 // possible skipped wakeup will be pumped in the following prepare_wait() 243 node.my_skipped_wakeup = true; 244 // try to remove node from waitset 245 // Cancel wait guarantees acquire memory barrier. 246 bool in_list = node.my_is_in_list.load(std::memory_order_acquire); 247 if (in_list) { 248 concurrent_monitor_mutex::scoped_lock l(my_mutex); 249 if (node.my_is_in_list.load(std::memory_order_relaxed)) { 250 my_waitset.remove(node); 251 // node is removed from waitset, so there will be no wakeup 252 node.my_is_in_list.store(false, std::memory_order_relaxed); 253 node.my_skipped_wakeup = false; 254 } 255 } 256 } 257 258 //! Wait for a condition to be satisfied with waiting-on my_context 259 template <typename NodeType, typename Pred> wait(Pred && pred,NodeType && node)260 bool wait(Pred&& pred, NodeType&& node) { 261 prepare_wait(node); 262 while (!guarded_call(std::forward<Pred>(pred), node)) { 263 if (commit_wait(node)) { 264 return true; 265 } 266 267 prepare_wait(node); 268 } 269 270 cancel_wait(node); 271 return false; 272 } 273 274 //! Notify one thread about the event notify_one()275 void notify_one() { 276 atomic_fence(std::memory_order_seq_cst); 277 notify_one_relaxed(); 278 } 279 280 //! Notify one thread about the event. Relaxed version. notify_one_relaxed()281 void notify_one_relaxed() { 282 if (my_waitset.empty()) { 283 return; 284 } 285 286 base_node* n; 287 const base_node* end = my_waitset.end(); 288 { 289 concurrent_monitor_mutex::scoped_lock l(my_mutex); 290 my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 291 n = my_waitset.front(); 292 if (n != end) { 293 my_waitset.remove(*n); 294 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed); 295 } 296 } 297 298 if (n != end) { 299 to_wait_node(n)->notify(); 300 } 301 } 302 303 //! Notify all waiting threads of the event notify_all()304 void notify_all() { 305 atomic_fence(std::memory_order_seq_cst); 306 notify_all_relaxed(); 307 } 308 309 // ! Notify all waiting threads of the event; Relaxed version notify_all_relaxed()310 void notify_all_relaxed() { 311 if (my_waitset.empty()) { 312 return; 313 } 314 315 base_list temp; 316 const base_node* end; 317 { 318 concurrent_monitor_mutex::scoped_lock l(my_mutex); 319 my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 320 // TODO: Possible optimization, don't change node state under lock, just do flush 321 my_waitset.flush_to(temp); 322 end = temp.end(); 323 for (base_node* n = temp.front(); n != end; n = n->next) { 324 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed); 325 } 326 } 327 328 base_node* nxt; 329 for (base_node* n = temp.front(); n != end; n=nxt) { 330 nxt = n->next; 331 to_wait_node(n)->notify(); 332 } 333 #if TBB_USE_ASSERT 334 temp.clear(); 335 #endif 336 } 337 338 //! Notify waiting threads of the event that satisfies the given predicate 339 template <typename P> notify(const P & predicate)340 void notify( const P& predicate ) { 341 atomic_fence(std::memory_order_seq_cst); 342 notify_relaxed( predicate ); 343 } 344 345 //! Notify waiting threads of the event that satisfies the given predicate; 346 //! the predicate is called under the lock. Relaxed version. 347 template<typename P> notify_relaxed(const P & predicate)348 void notify_relaxed( const P& predicate ) { 349 if (my_waitset.empty()) { 350 return; 351 } 352 353 base_list temp; 354 base_node* nxt; 355 const base_node* end = my_waitset.end(); 356 { 357 concurrent_monitor_mutex::scoped_lock l(my_mutex); 358 my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed); 359 for (base_node* n = my_waitset.last(); n != end; n = nxt) { 360 nxt = n->prev; 361 auto* node = static_cast<wait_node<Context>*>(n); 362 if (predicate(node->my_context)) { 363 my_waitset.remove(*n); 364 node->my_is_in_list.store(false, std::memory_order_relaxed); 365 temp.add(n); 366 } 367 } 368 } 369 370 end = temp.end(); 371 for (base_node* n=temp.front(); n != end; n = nxt) { 372 nxt = n->next; 373 to_wait_node(n)->notify(); 374 } 375 #if TBB_USE_ASSERT 376 temp.clear(); 377 #endif 378 } 379 380 //! Notify waiting threads of the event that satisfies the given predicate; 381 //! the predicate is called under the lock. Relaxed version. 382 template<typename P> notify_one_relaxed(const P & predicate)383 void notify_one_relaxed( const P& predicate ) { 384 if (my_waitset.empty()) { 385 return; 386 } 387 388 base_node* tmp = nullptr; 389 base_node* next{}; 390 const base_node* end = my_waitset.end(); 391 { 392 concurrent_monitor_mutex::scoped_lock l(my_mutex); 393 my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed); 394 for (base_node* n = my_waitset.last(); n != end; n = next) { 395 next = n->prev; 396 auto* node = static_cast<wait_node<Context>*>(n); 397 if (predicate(node->my_context)) { 398 my_waitset.remove(*n); 399 node->my_is_in_list.store(false, std::memory_order_relaxed); 400 tmp = n; 401 break; 402 } 403 } 404 } 405 406 if (tmp) { 407 to_wait_node(tmp)->notify(); 408 } 409 } 410 411 //! Abort any sleeping threads at the time of the call abort_all()412 void abort_all() { 413 atomic_fence( std::memory_order_seq_cst ); 414 abort_all_relaxed(); 415 } 416 417 //! Abort any sleeping threads at the time of the call; Relaxed version abort_all_relaxed()418 void abort_all_relaxed() { 419 if (my_waitset.empty()) { 420 return; 421 } 422 423 base_list temp; 424 const base_node* end; 425 { 426 concurrent_monitor_mutex::scoped_lock l(my_mutex); 427 my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 428 my_waitset.flush_to(temp); 429 end = temp.end(); 430 for (base_node* n = temp.front(); n != end; n = n->next) { 431 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed); 432 } 433 } 434 435 base_node* nxt; 436 for (base_node* n = temp.front(); n != end; n = nxt) { 437 nxt = n->next; 438 to_wait_node(n)->my_aborted = true; 439 to_wait_node(n)->notify(); 440 } 441 #if TBB_USE_ASSERT 442 temp.clear(); 443 #endif 444 } 445 destroy()446 void destroy() { 447 this->abort_all(); 448 my_mutex.destroy(); 449 __TBB_ASSERT(this->my_waitset.empty(), "waitset not empty?"); 450 } 451 452 private: 453 template <typename NodeType, typename Pred> guarded_call(Pred && predicate,NodeType & node)454 bool guarded_call(Pred&& predicate, NodeType& node) { 455 bool res = false; 456 tbb::detail::d0::try_call( [&] { 457 res = std::forward<Pred>(predicate)(); 458 }).on_exception( [&] { 459 cancel_wait(node); 460 }); 461 462 return res; 463 } 464 465 concurrent_monitor_mutex my_mutex{}; 466 base_list my_waitset{}; 467 std::atomic<unsigned> my_epoch{}; 468 to_wait_node(base_node * node)469 wait_node<Context>* to_wait_node( base_node* node ) { return static_cast<wait_node<Context>*>(node); } 470 }; 471 472 class concurrent_monitor : public concurrent_monitor_base<std::uintptr_t> { 473 using base_type = concurrent_monitor_base<std::uintptr_t>; 474 public: 475 using base_type::base_type; 476 ~concurrent_monitor()477 ~concurrent_monitor() { 478 destroy(); 479 } 480 481 /** per-thread descriptor for concurrent_monitor */ 482 using thread_context = sleep_node<std::uintptr_t>; 483 }; 484 485 } // namespace r1 486 } // namespace detail 487 } // namespace tbb 488 489 #endif /* __TBB_concurrent_monitor_H */ 490