1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_concurrent_monitor_H
18 #define __TBB_concurrent_monitor_H
19
20 #include "tbb/tbb_stddef.h"
21 #include "tbb/atomic.h"
22 #include "tbb/spin_mutex.h"
23 #include "tbb/tbb_exception.h"
24 #include "tbb/aligned_space.h"
25
26 #include "semaphore.h"
27
28 namespace tbb {
29 namespace internal {
30
31 //! Circular doubly-linked list with sentinel
32 /** head.next points to the front and head.prev points to the back */
33 class circular_doubly_linked_list_with_sentinel : no_copy {
34 public:
35 struct node_t {
36 node_t* next;
37 node_t* prev;
node_tnode_t38 explicit node_t() : next((node_t*)(uintptr_t)0xcdcdcdcd), prev((node_t*)(uintptr_t)0xcdcdcdcd) {}
39 };
40
41 // ctor
circular_doubly_linked_list_with_sentinel()42 circular_doubly_linked_list_with_sentinel() {clear();}
43 // dtor
~circular_doubly_linked_list_with_sentinel()44 ~circular_doubly_linked_list_with_sentinel() {__TBB_ASSERT( head.next==&head && head.prev==&head, "the list is not empty" );}
45
size()46 inline size_t size() const {return count;}
empty()47 inline bool empty() const {return size()==0;}
front()48 inline node_t* front() const {return head.next;}
last()49 inline node_t* last() const {return head.prev;}
begin()50 inline node_t* begin() const {return front();}
end()51 inline const node_t* end() const {return &head;}
52
53 //! add to the back of the list
add(node_t * n)54 inline void add( node_t* n ) {
55 __TBB_store_relaxed(count, __TBB_load_relaxed(count) + 1);
56 n->prev = head.prev;
57 n->next = &head;
58 head.prev->next = n;
59 head.prev = n;
60 }
61
62 //! remove node 'n'
remove(node_t & n)63 inline void remove( node_t& n ) {
64 __TBB_ASSERT( count > 0, "attempt to remove an item from an empty list" );
65 __TBB_store_relaxed(count, __TBB_load_relaxed(count) - 1);
66 n.prev->next = n.next;
67 n.next->prev = n.prev;
68 }
69
70 //! move all elements to 'lst' and initialize the 'this' list
flush_to(circular_doubly_linked_list_with_sentinel & lst)71 inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) {
72 if( const size_t l_count = __TBB_load_relaxed(count) ) {
73 __TBB_store_relaxed(lst.count, l_count);
74 lst.head.next = head.next;
75 lst.head.prev = head.prev;
76 head.next->prev = &lst.head;
77 head.prev->next = &lst.head;
78 clear();
79 }
80 }
81
clear()82 void clear() {head.next = head.prev = &head; __TBB_store_relaxed(count, 0);}
83 private:
84 __TBB_atomic size_t count;
85 node_t head;
86 };
87
88 typedef circular_doubly_linked_list_with_sentinel waitset_t;
89 typedef circular_doubly_linked_list_with_sentinel::node_t waitset_node_t;
90
91 //! concurrent_monitor
92 /** fine-grained concurrent_monitor implementation */
93 class concurrent_monitor : no_copy {
94 public:
95 /** per-thread descriptor for concurrent_monitor */
96 class thread_context : waitset_node_t, no_copy {
97 friend class concurrent_monitor;
98 public:
thread_context()99 thread_context() : skipped_wakeup(false), aborted(false), ready(false), context(0) {
100 epoch = 0;
101 in_waitset = false;
102 }
~thread_context()103 ~thread_context() {
104 if (ready) {
105 if( skipped_wakeup ) semaphore().P();
106 semaphore().~binary_semaphore();
107 }
108 }
semaphore()109 binary_semaphore& semaphore() { return *sema.begin(); }
110 private:
111 //! The method for lazy initialization of the thread_context's semaphore.
112 // Inlining of the method is undesirable, due to extra instructions for
113 // exception support added at caller side.
114 __TBB_NOINLINE( void init() );
115 tbb::aligned_space<binary_semaphore> sema;
116 __TBB_atomic unsigned epoch;
117 tbb::atomic<bool> in_waitset;
118 bool skipped_wakeup;
119 bool aborted;
120 bool ready;
121 uintptr_t context;
122 };
123
124 //! ctor
concurrent_monitor()125 concurrent_monitor() {__TBB_store_relaxed(epoch, 0);}
126
127 //! dtor
128 ~concurrent_monitor() ;
129
130 //! prepare wait by inserting 'thr' into the wait queue
131 void prepare_wait( thread_context& thr, uintptr_t ctx = 0 );
132
133 //! Commit wait if event count has not changed; otherwise, cancel wait.
134 /** Returns true if committed, false if canceled. */
commit_wait(thread_context & thr)135 inline bool commit_wait( thread_context& thr ) {
136 const bool do_it = thr.epoch == __TBB_load_relaxed(epoch);
137 // this check is just an optimization
138 if( do_it ) {
139 __TBB_ASSERT( thr.ready, "use of commit_wait() without prior prepare_wait()");
140 thr.semaphore().P();
141 __TBB_ASSERT( !thr.in_waitset, "still in the queue?" );
142 if( thr.aborted )
143 throw_exception( eid_user_abort );
144 } else {
145 cancel_wait( thr );
146 }
147 return do_it;
148 }
149 //! Cancel the wait. Removes the thread from the wait queue if not removed yet.
150 void cancel_wait( thread_context& thr );
151
152 //! Wait for a condition to be satisfied with waiting-on context
153 template<typename WaitUntil, typename Context>
154 void wait( WaitUntil until, Context on );
155
156 //! Notify one thread about the event
notify_one()157 void notify_one() {atomic_fence(); notify_one_relaxed();}
158
159 //! Notify one thread about the event. Relaxed version.
160 void notify_one_relaxed();
161
162 //! Notify all waiting threads of the event
notify_all()163 void notify_all() {atomic_fence(); notify_all_relaxed();}
164
165 //! Notify all waiting threads of the event; Relaxed version
166 void notify_all_relaxed();
167
168 //! Notify waiting threads of the event that satisfies the given predicate
notify(const P & predicate)169 template<typename P> void notify( const P& predicate ) {atomic_fence(); notify_relaxed( predicate );}
170
171 //! Notify waiting threads of the event that satisfies the given predicate; Relaxed version
172 template<typename P> void notify_relaxed( const P& predicate );
173
174 //! Abort any sleeping threads at the time of the call
abort_all()175 void abort_all() {atomic_fence(); abort_all_relaxed(); }
176
177 //! Abort any sleeping threads at the time of the call; Relaxed version
178 void abort_all_relaxed();
179
180 private:
181 tbb::spin_mutex mutex_ec;
182 waitset_t waitset_ec;
183 __TBB_atomic unsigned epoch;
to_thread_context(waitset_node_t * n)184 thread_context* to_thread_context( waitset_node_t* n ) { return static_cast<thread_context*>(n); }
185 };
186
187 template<typename WaitUntil, typename Context>
wait(WaitUntil until,Context on)188 void concurrent_monitor::wait( WaitUntil until, Context on )
189 {
190 bool slept = false;
191 thread_context thr_ctx;
192 prepare_wait( thr_ctx, on() );
193 while( !until() ) {
194 if( (slept = commit_wait( thr_ctx ) )==true )
195 if( until() ) break;
196 slept = false;
197 prepare_wait( thr_ctx, on() );
198 }
199 if( !slept )
200 cancel_wait( thr_ctx );
201 }
202
203 template<typename P>
notify_relaxed(const P & predicate)204 void concurrent_monitor::notify_relaxed( const P& predicate ) {
205 if( waitset_ec.empty() )
206 return;
207 waitset_t temp;
208 waitset_node_t* nxt;
209 const waitset_node_t* end = waitset_ec.end();
210 {
211 tbb::spin_mutex::scoped_lock l( mutex_ec );
212 __TBB_store_relaxed(epoch, __TBB_load_relaxed(epoch) + 1);
213 for( waitset_node_t* n=waitset_ec.last(); n!=end; n=nxt ) {
214 nxt = n->prev;
215 thread_context* thr = to_thread_context( n );
216 if( predicate( thr->context ) ) {
217 waitset_ec.remove( *n );
218 thr->in_waitset = false;
219 temp.add( n );
220 }
221 }
222 }
223
224 end = temp.end();
225 for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
226 nxt = n->next;
227 to_thread_context(n)->semaphore().V();
228 }
229 #if TBB_USE_ASSERT
230 temp.clear();
231 #endif
232 }
233
234 } // namespace internal
235 } // namespace tbb
236
237 #endif /* __TBB_concurrent_monitor_H */
238