1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_monitor_H
18 #define __TBB_concurrent_monitor_H
19 
20 #include "tbb/tbb_stddef.h"
21 #include "tbb/atomic.h"
22 #include "tbb/spin_mutex.h"
23 #include "tbb/tbb_exception.h"
24 #include "tbb/aligned_space.h"
25 
26 #include "semaphore.h"
27 
28 namespace tbb {
29 namespace internal {
30 
31 //! Circular doubly-linked list with sentinel
32 /** head.next points to the front and head.prev points to the back */
33 class circular_doubly_linked_list_with_sentinel : no_copy {
34 public:
35     struct node_t {
36         node_t* next;
37         node_t* prev;
node_tnode_t38         explicit node_t() : next((node_t*)(uintptr_t)0xcdcdcdcd), prev((node_t*)(uintptr_t)0xcdcdcdcd) {}
39     };
40 
41     // ctor
circular_doubly_linked_list_with_sentinel()42     circular_doubly_linked_list_with_sentinel() {clear();}
43     // dtor
~circular_doubly_linked_list_with_sentinel()44     ~circular_doubly_linked_list_with_sentinel() {__TBB_ASSERT( head.next==&head && head.prev==&head, "the list is not empty" );}
45 
size()46     inline size_t  size()  const {return count;}
empty()47     inline bool    empty() const {return size()==0;}
front()48     inline node_t* front() const {return head.next;}
last()49     inline node_t* last()  const {return head.prev;}
begin()50     inline node_t* begin() const {return front();}
end()51     inline const node_t* end() const {return &head;}
52 
53     //! add to the back of the list
add(node_t * n)54     inline void add( node_t* n ) {
55         __TBB_store_relaxed(count, __TBB_load_relaxed(count) + 1);
56         n->prev = head.prev;
57         n->next = &head;
58         head.prev->next = n;
59         head.prev = n;
60     }
61 
62     //! remove node 'n'
remove(node_t & n)63     inline void remove( node_t& n ) {
64         __TBB_ASSERT( count > 0, "attempt to remove an item from an empty list" );
65         __TBB_store_relaxed(count, __TBB_load_relaxed(count) - 1);
66         n.prev->next = n.next;
67         n.next->prev = n.prev;
68     }
69 
70     //! move all elements to 'lst' and initialize the 'this' list
flush_to(circular_doubly_linked_list_with_sentinel & lst)71     inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) {
72         if( const size_t l_count = __TBB_load_relaxed(count) ) {
73             __TBB_store_relaxed(lst.count, l_count);
74             lst.head.next = head.next;
75             lst.head.prev = head.prev;
76             head.next->prev = &lst.head;
77             head.prev->next = &lst.head;
78             clear();
79         }
80     }
81 
clear()82     void clear() {head.next = head.prev = &head; __TBB_store_relaxed(count, 0);}
83 private:
84     __TBB_atomic size_t count;
85     node_t head;
86 };
87 
88 typedef circular_doubly_linked_list_with_sentinel waitset_t;
89 typedef circular_doubly_linked_list_with_sentinel::node_t waitset_node_t;
90 
91 //! concurrent_monitor
92 /** fine-grained concurrent_monitor implementation */
93 class concurrent_monitor : no_copy {
94 public:
95     /** per-thread descriptor for concurrent_monitor */
96     class thread_context : waitset_node_t, no_copy {
97         friend class concurrent_monitor;
98     public:
thread_context()99         thread_context() : skipped_wakeup(false), aborted(false), ready(false), context(0) {
100             epoch = 0;
101             in_waitset = false;
102         }
~thread_context()103         ~thread_context() {
104             if (ready) {
105                 if( skipped_wakeup ) semaphore().P();
106                 semaphore().~binary_semaphore();
107             }
108         }
semaphore()109         binary_semaphore& semaphore() { return *sema.begin(); }
110     private:
111         //! The method for lazy initialization of the thread_context's semaphore.
112         //  Inlining of the method is undesirable, due to extra instructions for
113         //  exception support added at caller side.
114         __TBB_NOINLINE( void init() );
115         tbb::aligned_space<binary_semaphore> sema;
116         __TBB_atomic unsigned epoch;
117         tbb::atomic<bool> in_waitset;
118         bool  skipped_wakeup;
119         bool  aborted;
120         bool  ready;
121         uintptr_t context;
122     };
123 
124     //! ctor
concurrent_monitor()125     concurrent_monitor() {__TBB_store_relaxed(epoch, 0);}
126 
127     //! dtor
128     ~concurrent_monitor() ;
129 
130     //! prepare wait by inserting 'thr' into the wait queue
131     void prepare_wait( thread_context& thr, uintptr_t ctx = 0 );
132 
133     //! Commit wait if event count has not changed; otherwise, cancel wait.
134     /** Returns true if committed, false if canceled. */
commit_wait(thread_context & thr)135     inline bool commit_wait( thread_context& thr ) {
136         const bool do_it = thr.epoch == __TBB_load_relaxed(epoch);
137         // this check is just an optimization
138         if( do_it ) {
139             __TBB_ASSERT( thr.ready, "use of commit_wait() without prior prepare_wait()");
140             thr.semaphore().P();
141             __TBB_ASSERT( !thr.in_waitset, "still in the queue?" );
142             if( thr.aborted )
143                 throw_exception( eid_user_abort );
144         } else {
145             cancel_wait( thr );
146         }
147         return do_it;
148     }
149     //! Cancel the wait. Removes the thread from the wait queue if not removed yet.
150     void cancel_wait( thread_context& thr );
151 
152     //! Wait for a condition to be satisfied with waiting-on context
153     template<typename WaitUntil, typename Context>
154     void wait( WaitUntil until, Context on );
155 
156     //! Notify one thread about the event
notify_one()157     void notify_one() {atomic_fence(); notify_one_relaxed();}
158 
159     //! Notify one thread about the event. Relaxed version.
160     void notify_one_relaxed();
161 
162     //! Notify all waiting threads of the event
notify_all()163     void notify_all() {atomic_fence(); notify_all_relaxed();}
164 
165     //! Notify all waiting threads of the event; Relaxed version
166     void notify_all_relaxed();
167 
168     //! Notify waiting threads of the event that satisfies the given predicate
notify(const P & predicate)169     template<typename P> void notify( const P& predicate ) {atomic_fence(); notify_relaxed( predicate );}
170 
171     //! Notify waiting threads of the event that satisfies the given predicate; Relaxed version
172     template<typename P> void notify_relaxed( const P& predicate );
173 
174     //! Abort any sleeping threads at the time of the call
abort_all()175     void abort_all() {atomic_fence(); abort_all_relaxed(); }
176 
177     //! Abort any sleeping threads at the time of the call; Relaxed version
178     void abort_all_relaxed();
179 
180 private:
181     tbb::spin_mutex mutex_ec;
182     waitset_t       waitset_ec;
183     __TBB_atomic unsigned epoch;
to_thread_context(waitset_node_t * n)184     thread_context* to_thread_context( waitset_node_t* n ) { return static_cast<thread_context*>(n); }
185 };
186 
187 template<typename WaitUntil, typename Context>
wait(WaitUntil until,Context on)188 void concurrent_monitor::wait( WaitUntil until, Context on )
189 {
190     bool slept = false;
191     thread_context thr_ctx;
192     prepare_wait( thr_ctx, on() );
193     while( !until() ) {
194         if( (slept = commit_wait( thr_ctx ) )==true )
195             if( until() ) break;
196         slept = false;
197         prepare_wait( thr_ctx, on() );
198     }
199     if( !slept )
200         cancel_wait( thr_ctx );
201 }
202 
203 template<typename P>
notify_relaxed(const P & predicate)204 void concurrent_monitor::notify_relaxed( const P& predicate ) {
205         if( waitset_ec.empty() )
206             return;
207         waitset_t temp;
208         waitset_node_t* nxt;
209         const waitset_node_t* end = waitset_ec.end();
210         {
211             tbb::spin_mutex::scoped_lock l( mutex_ec );
212             __TBB_store_relaxed(epoch, __TBB_load_relaxed(epoch) + 1);
213             for( waitset_node_t* n=waitset_ec.last(); n!=end; n=nxt ) {
214                 nxt = n->prev;
215                 thread_context* thr = to_thread_context( n );
216                 if( predicate( thr->context ) ) {
217                     waitset_ec.remove( *n );
218                     thr->in_waitset = false;
219                     temp.add( n );
220                 }
221             }
222         }
223 
224         end = temp.end();
225         for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
226             nxt = n->next;
227             to_thread_context(n)->semaphore().V();
228         }
229 #if TBB_USE_ASSERT
230         temp.clear();
231 #endif
232 }
233 
234 } // namespace internal
235 } // namespace tbb
236 
237 #endif /* __TBB_concurrent_monitor_H */
238