1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_monitor_H
18 #define __TBB_concurrent_monitor_H
19 
20 #include "oneapi/tbb/spin_mutex.h"
21 #include "oneapi/tbb/detail/_exception.h"
22 #include "oneapi/tbb/detail/_aligned_space.h"
23 #include "concurrent_monitor_mutex.h"
24 #include "semaphore.h"
25 
26 #include <atomic>
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 //! Circular doubly-linked list with sentinel
33 /** head.next points to the front and head.prev points to the back */
34 class circular_doubly_linked_list_with_sentinel {
35 public:
36     struct base_node {
37         base_node* next;
38         base_node* prev;
39 
base_nodebase_node40         constexpr base_node(base_node* n, base_node* p) : next(n), prev(p) {}
base_nodebase_node41         explicit base_node() : next((base_node*)(uintptr_t)0xcdcdcdcd), prev((base_node*)(uintptr_t)0xcdcdcdcd) {}
42     };
43 
44     // ctor
circular_doubly_linked_list_with_sentinel()45     constexpr circular_doubly_linked_list_with_sentinel() : count(0), head(&head, &head) {}
46 
47     circular_doubly_linked_list_with_sentinel(const circular_doubly_linked_list_with_sentinel&) = delete;
48     circular_doubly_linked_list_with_sentinel& operator=(const circular_doubly_linked_list_with_sentinel&) = delete;
49 
size()50     inline std::size_t size() const { return count.load(std::memory_order_relaxed); }
empty()51     inline bool empty() const { return size() == 0; }
front()52     inline base_node* front() const { return head.next; }
last()53     inline base_node* last() const { return head.prev; }
end()54     inline const base_node* end() const { return &head; }
55 
56     //! add to the back of the list
add(base_node * n)57     inline void add( base_node* n ) {
58         count.store(count.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
59         n->prev = head.prev;
60         n->next = &head;
61         head.prev->next = n;
62         head.prev = n;
63     }
64 
65     //! remove node 'n'
remove(base_node & n)66     inline void remove( base_node& n ) {
67         __TBB_ASSERT(count.load(std::memory_order_relaxed) > 0, "attempt to remove an item from an empty list");
68         count.store(count.load( std::memory_order_relaxed ) - 1, std::memory_order_relaxed);
69         n.prev->next = n.next;
70         n.next->prev = n.prev;
71     }
72 
73     //! move all elements to 'lst' and initialize the 'this' list
flush_to(circular_doubly_linked_list_with_sentinel & lst)74     inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) {
75         const std::size_t l_count = size();
76         if (l_count > 0) {
77             lst.count.store(l_count, std::memory_order_relaxed);
78             lst.head.next = head.next;
79             lst.head.prev = head.prev;
80             head.next->prev = &lst.head;
81             head.prev->next = &lst.head;
82             clear();
83         }
84     }
85 
clear()86     void clear() {
87         head.next = &head;
88         head.prev = &head;
89         count.store(0, std::memory_order_relaxed);
90     }
91 private:
92     std::atomic<std::size_t> count;
93     base_node head;
94 };
95 
96 using base_list = circular_doubly_linked_list_with_sentinel;
97 using base_node = circular_doubly_linked_list_with_sentinel::base_node;
98 
99 template <typename Context>
100 class concurrent_monitor_base;
101 
102 template <typename Context>
103 class wait_node : public base_node {
104 public:
105 
106 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900
wait_node(Context ctx)107     wait_node(Context ctx) : my_context(ctx), my_is_in_list(false) {}
108 #else
109     wait_node(Context ctx) : my_context(ctx) {}
110 #endif
111 
112     virtual ~wait_node() = default;
113 
init()114     virtual void init() {
115         __TBB_ASSERT(!my_initialized, nullptr);
116         my_initialized = true;
117     }
118 
119     virtual void wait() = 0;
120 
reset()121     virtual void reset() {
122         __TBB_ASSERT(my_skipped_wakeup, nullptr);
123         my_skipped_wakeup = false;
124     }
125 
126     virtual void notify() = 0;
127 
128 protected:
129     friend class concurrent_monitor_base<Context>;
130     friend class thread_data;
131 
132     Context my_context{};
133 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900
134     std::atomic<bool> my_is_in_list;
135 #else
136     std::atomic<bool> my_is_in_list{false};
137 #endif
138 
139     bool my_initialized{false};
140     bool my_skipped_wakeup{false};
141     bool my_aborted{false};
142     unsigned my_epoch{0};
143 };
144 
145 template <typename Context>
146 class sleep_node : public wait_node<Context> {
147     using base_type = wait_node<Context>;
148 public:
149     using base_type::base_type;
150 
151     // Make it virtual due to Intel Compiler warning
~sleep_node()152     virtual ~sleep_node() {
153         if (this->my_initialized) {
154             if (this->my_skipped_wakeup) semaphore().P();
155             semaphore().~binary_semaphore();
156         }
157     }
158 
semaphore()159     binary_semaphore& semaphore() { return *sema.begin(); }
160 
init()161     void init() override {
162         if (!this->my_initialized) {
163             new (sema.begin()) binary_semaphore;
164             base_type::init();
165         }
166     }
167 
wait()168     void wait() override {
169         __TBB_ASSERT(this->my_initialized,
170             "Use of commit_wait() without prior prepare_wait()");
171         semaphore().P();
172         __TBB_ASSERT(!this->my_is_in_list.load(std::memory_order_relaxed), "Still in the queue?");
173         if (this->my_aborted)
174             throw_exception(exception_id::user_abort);
175     }
176 
reset()177     void reset() override {
178         base_type::reset();
179         semaphore().P();
180     }
181 
notify()182     void notify() override {
183         semaphore().V();
184     }
185 
186 private:
187     tbb::detail::aligned_space<binary_semaphore> sema;
188 };
189 
190 //! concurrent_monitor
191 /** fine-grained concurrent_monitor implementation */
192 template <typename Context>
193 class concurrent_monitor_base {
194 public:
195     //! ctor
concurrent_monitor_base()196     constexpr concurrent_monitor_base() {}
197     //! dtor
198     ~concurrent_monitor_base() = default;
199 
200     concurrent_monitor_base(const concurrent_monitor_base&) = delete;
201     concurrent_monitor_base& operator=(const concurrent_monitor_base&) = delete;
202 
203     //! prepare wait by inserting 'thr' into the wait queue
prepare_wait(wait_node<Context> & node)204     void prepare_wait( wait_node<Context>& node) {
205         // TODO: consider making even more lazy instantiation of the semaphore, that is only when it is actually needed, e.g. move it in node::wait()
206         if (!node.my_initialized) {
207             node.init();
208         }
209         // this is good place to pump previous skipped wakeup
210         else if (node.my_skipped_wakeup) {
211             node.reset();
212         }
213 
214         node.my_is_in_list.store(true, std::memory_order_relaxed);
215 
216         {
217             concurrent_monitor_mutex::scoped_lock l(my_mutex);
218             node.my_epoch = my_epoch.load(std::memory_order_relaxed);
219             my_waitset.add(&node);
220         }
221 
222         // Prepare wait guarantees Write Read memory barrier.
223         // In C++ only full fence covers this type of barrier.
224         atomic_fence(std::memory_order_seq_cst);
225     }
226 
227     //! Commit wait if event count has not changed; otherwise, cancel wait.
228     /** Returns true if committed, false if canceled. */
commit_wait(wait_node<Context> & node)229     inline bool commit_wait( wait_node<Context>& node ) {
230         const bool do_it = node.my_epoch == my_epoch.load(std::memory_order_relaxed);
231         // this check is just an optimization
232         if (do_it) {
233            node.wait();
234         } else {
235             cancel_wait( node );
236         }
237         return do_it;
238     }
239 
240     //! Cancel the wait. Removes the thread from the wait queue if not removed yet.
cancel_wait(wait_node<Context> & node)241     void cancel_wait( wait_node<Context>& node ) {
242         // possible skipped wakeup will be pumped in the following prepare_wait()
243         node.my_skipped_wakeup = true;
244         // try to remove node from waitset
245         // Cancel wait guarantees acquire memory barrier.
246         bool in_list = node.my_is_in_list.load(std::memory_order_acquire);
247         if (in_list) {
248             concurrent_monitor_mutex::scoped_lock l(my_mutex);
249             if (node.my_is_in_list.load(std::memory_order_relaxed)) {
250                 my_waitset.remove(node);
251                 // node is removed from waitset, so there will be no wakeup
252                 node.my_is_in_list.store(false, std::memory_order_relaxed);
253                 node.my_skipped_wakeup = false;
254             }
255         }
256     }
257 
258     //! Wait for a condition to be satisfied with waiting-on my_context
259     template <typename NodeType, typename Pred>
wait(Pred && pred,NodeType && node)260     bool wait(Pred&& pred, NodeType&& node) {
261         prepare_wait(node);
262         while (!guarded_call(std::forward<Pred>(pred), node)) {
263             if (commit_wait(node)) {
264                 return true;
265             }
266 
267             prepare_wait(node);
268         }
269 
270         cancel_wait(node);
271         return false;
272     }
273 
274     //! Notify one thread about the event
notify_one()275     void notify_one() {
276         atomic_fence(std::memory_order_seq_cst);
277         notify_one_relaxed();
278     }
279 
280     //! Notify one thread about the event. Relaxed version.
notify_one_relaxed()281     void notify_one_relaxed() {
282         if (my_waitset.empty()) {
283             return;
284         }
285 
286         base_node* n;
287         const base_node* end = my_waitset.end();
288         {
289             concurrent_monitor_mutex::scoped_lock l(my_mutex);
290             my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
291             n = my_waitset.front();
292             if (n != end) {
293                 my_waitset.remove(*n);
294                 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed);
295             }
296         }
297 
298         if (n != end) {
299             to_wait_node(n)->notify();
300         }
301     }
302 
303     //! Notify all waiting threads of the event
notify_all()304     void notify_all() {
305         atomic_fence(std::memory_order_seq_cst);
306         notify_all_relaxed();
307     }
308 
309     // ! Notify all waiting threads of the event; Relaxed version
notify_all_relaxed()310     void notify_all_relaxed() {
311         if (my_waitset.empty()) {
312             return;
313         }
314 
315         base_list temp;
316         const base_node* end;
317         {
318             concurrent_monitor_mutex::scoped_lock l(my_mutex);
319             my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
320             // TODO: Possible optimization, don't change node state under lock, just do flush
321             my_waitset.flush_to(temp);
322             end = temp.end();
323             for (base_node* n = temp.front(); n != end; n = n->next) {
324                 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed);
325             }
326         }
327 
328         base_node* nxt;
329         for (base_node* n = temp.front(); n != end; n=nxt) {
330             nxt = n->next;
331             to_wait_node(n)->notify();
332         }
333 #if TBB_USE_ASSERT
334         temp.clear();
335 #endif
336     }
337 
338     //! Notify waiting threads of the event that satisfies the given predicate
339     template <typename P>
notify(const P & predicate)340     void notify( const P& predicate ) {
341         atomic_fence(std::memory_order_seq_cst);
342         notify_relaxed( predicate );
343     }
344 
345     //! Notify waiting threads of the event that satisfies the given predicate;
346     //! the predicate is called under the lock. Relaxed version.
347     template<typename P>
notify_relaxed(const P & predicate)348     void notify_relaxed( const P& predicate ) {
349         if (my_waitset.empty()) {
350             return;
351         }
352 
353         base_list temp;
354         base_node* nxt;
355         const base_node* end = my_waitset.end();
356         {
357             concurrent_monitor_mutex::scoped_lock l(my_mutex);
358             my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed);
359             for (base_node* n = my_waitset.last(); n != end; n = nxt) {
360                 nxt = n->prev;
361                 auto* node = static_cast<wait_node<Context>*>(n);
362                 if (predicate(node->my_context)) {
363                     my_waitset.remove(*n);
364                     node->my_is_in_list.store(false, std::memory_order_relaxed);
365                     temp.add(n);
366                 }
367             }
368         }
369 
370         end = temp.end();
371         for (base_node* n=temp.front(); n != end; n = nxt) {
372             nxt = n->next;
373             to_wait_node(n)->notify();
374         }
375 #if TBB_USE_ASSERT
376         temp.clear();
377 #endif
378     }
379 
380     //! Notify waiting threads of the event that satisfies the given predicate;
381     //! the predicate is called under the lock. Relaxed version.
382     template<typename P>
notify_one_relaxed(const P & predicate)383     void notify_one_relaxed( const P& predicate ) {
384         if (my_waitset.empty()) {
385             return;
386         }
387 
388         base_node* tmp = nullptr;
389         base_node* next{};
390         const base_node* end = my_waitset.end();
391         {
392             concurrent_monitor_mutex::scoped_lock l(my_mutex);
393             my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed);
394             for (base_node* n = my_waitset.last(); n != end; n = next) {
395                 next = n->prev;
396                 auto* node = static_cast<wait_node<Context>*>(n);
397                 if (predicate(node->my_context)) {
398                     my_waitset.remove(*n);
399                     node->my_is_in_list.store(false, std::memory_order_relaxed);
400                     tmp = n;
401                     break;
402                 }
403             }
404         }
405 
406         if (tmp) {
407             to_wait_node(tmp)->notify();
408         }
409     }
410 
411     //! Abort any sleeping threads at the time of the call
abort_all()412     void abort_all() {
413         atomic_fence( std::memory_order_seq_cst );
414         abort_all_relaxed();
415     }
416 
417     //! Abort any sleeping threads at the time of the call; Relaxed version
abort_all_relaxed()418     void abort_all_relaxed() {
419         if (my_waitset.empty()) {
420             return;
421         }
422 
423         base_list temp;
424         const base_node* end;
425         {
426             concurrent_monitor_mutex::scoped_lock l(my_mutex);
427             my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
428             my_waitset.flush_to(temp);
429             end = temp.end();
430             for (base_node* n = temp.front(); n != end; n = n->next) {
431                 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed);
432             }
433         }
434 
435         base_node* nxt;
436         for (base_node* n = temp.front(); n != end; n = nxt) {
437             nxt = n->next;
438             to_wait_node(n)->my_aborted = true;
439             to_wait_node(n)->notify();
440         }
441 #if TBB_USE_ASSERT
442         temp.clear();
443 #endif
444     }
445 
destroy()446     void destroy() {
447         this->abort_all();
448         my_mutex.destroy();
449         __TBB_ASSERT(this->my_waitset.empty(), "waitset not empty?");
450     }
451 
452 private:
453     template <typename NodeType, typename Pred>
guarded_call(Pred && predicate,NodeType & node)454     bool guarded_call(Pred&& predicate, NodeType& node) {
455         bool res = false;
456         tbb::detail::d0::try_call( [&] {
457             res = std::forward<Pred>(predicate)();
458         }).on_exception( [&] {
459             cancel_wait(node);
460         });
461 
462         return res;
463     }
464 
465     concurrent_monitor_mutex my_mutex{};
466     base_list my_waitset{};
467     std::atomic<unsigned> my_epoch{};
468 
to_wait_node(base_node * node)469     wait_node<Context>* to_wait_node( base_node* node ) { return static_cast<wait_node<Context>*>(node); }
470 };
471 
472 class concurrent_monitor : public concurrent_monitor_base<std::uintptr_t> {
473     using base_type = concurrent_monitor_base<std::uintptr_t>;
474 public:
475     using base_type::base_type;
476 
~concurrent_monitor()477     ~concurrent_monitor() {
478         destroy();
479     }
480 
481     /** per-thread descriptor for concurrent_monitor */
482     using thread_context = sleep_node<std::uintptr_t>;
483 };
484 
485 } // namespace r1
486 } // namespace detail
487 } // namespace tbb
488 
489 #endif /* __TBB_concurrent_monitor_H */
490