1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "oneapi/tbb/detail/_config.h"
18 #include "oneapi/tbb/detail/_utils.h"
19
20 #include "observer_proxy.h"
21 #include "arena.h"
22 #include "main.h"
23 #include "thread_data.h"
24
25 #include <atomic>
26
27 namespace tbb {
28 namespace detail {
29 namespace r1 {
30
31 #if TBB_USE_ASSERT
32 extern std::atomic<int> the_observer_proxy_count;
33 #endif /* TBB_USE_ASSERT */
34
observer_proxy(d1::task_scheduler_observer & tso)35 observer_proxy::observer_proxy( d1::task_scheduler_observer& tso )
36 : my_ref_count(1), my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso)
37 {
38 #if TBB_USE_ASSERT
39 ++the_observer_proxy_count;
40 #endif /* TBB_USE_ASSERT */
41 }
42
~observer_proxy()43 observer_proxy::~observer_proxy() {
44 __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
45 poison_value(my_ref_count);
46 poison_pointer(my_prev);
47 poison_pointer(my_next);
48 #if TBB_USE_ASSERT
49 --the_observer_proxy_count;
50 #endif /* TBB_USE_ASSERT */
51 }
52
clear()53 void observer_list::clear() {
54 {
55 scoped_lock lock(mutex(), /*is_writer=*/true);
56 observer_proxy *next = my_head.load(std::memory_order_relaxed);
57 while ( observer_proxy *p = next ) {
58 next = p->my_next;
59 // Both proxy p and observer p->my_observer (if non-null) are guaranteed
60 // to be alive while the list is locked.
61 d1::task_scheduler_observer *obs = p->my_observer;
62 // Make sure that possible concurrent observer destruction does not
63 // conflict with the proxy list cleanup.
64 if (!obs || !(p = obs->my_proxy.exchange(nullptr))) {
65 continue;
66 }
67 // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
68 __TBB_ASSERT(!next || p == next->my_prev, nullptr);
69 __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely");
70 __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing");
71 poison_pointer(p->my_observer);
72 remove(p);
73 --p->my_ref_count;
74 delete p;
75 }
76 }
77
78 // If observe(false) is called concurrently with the destruction of the arena,
79 // need to wait until all proxies are removed.
80 for (atomic_backoff backoff; ; backoff.pause()) {
81 scoped_lock lock(mutex(), /*is_writer=*/false);
82 if (my_head.load(std::memory_order_relaxed) == nullptr) {
83 break;
84 }
85 }
86
87 __TBB_ASSERT(my_head.load(std::memory_order_relaxed) == nullptr && my_tail.load(std::memory_order_relaxed) == nullptr, nullptr);
88 }
89
insert(observer_proxy * p)90 void observer_list::insert( observer_proxy* p ) {
91 scoped_lock lock(mutex(), /*is_writer=*/true);
92 if (my_head.load(std::memory_order_relaxed)) {
93 p->my_prev = my_tail.load(std::memory_order_relaxed);
94 my_tail.load(std::memory_order_relaxed)->my_next = p;
95 } else {
96 my_head.store(p, std::memory_order_relaxed);
97 }
98 my_tail.store(p, std::memory_order_relaxed);
99 }
100
remove(observer_proxy * p)101 void observer_list::remove(observer_proxy* p) {
102 __TBB_ASSERT(my_head.load(std::memory_order_relaxed), "Attempt to remove an item from an empty list");
103 __TBB_ASSERT(!my_tail.load(std::memory_order_relaxed)->my_next, "Last item's my_next must be NULL");
104 if (p == my_tail.load(std::memory_order_relaxed)) {
105 __TBB_ASSERT(!p->my_next, nullptr);
106 my_tail.store(p->my_prev, std::memory_order_relaxed);
107 } else {
108 __TBB_ASSERT(p->my_next, nullptr);
109 p->my_next->my_prev = p->my_prev;
110 }
111 if (p == my_head.load(std::memory_order_relaxed)) {
112 __TBB_ASSERT(!p->my_prev, nullptr);
113 my_head.store(p->my_next, std::memory_order_relaxed);
114 } else {
115 __TBB_ASSERT(p->my_prev, nullptr);
116 p->my_prev->my_next = p->my_next;
117 }
118 __TBB_ASSERT((my_head.load(std::memory_order_relaxed) && my_tail.load(std::memory_order_relaxed)) ||
119 (!my_head.load(std::memory_order_relaxed) && !my_tail.load(std::memory_order_relaxed)), nullptr);
120 }
121
remove_ref(observer_proxy * p)122 void observer_list::remove_ref(observer_proxy* p) {
123 std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire);
124 __TBB_ASSERT(is_alive(r), nullptr);
125 while (r > 1) {
126 if (p->my_ref_count.compare_exchange_strong(r, r - 1)) {
127 return;
128 }
129 }
130 __TBB_ASSERT(r == 1, nullptr);
131 // Reference count might go to zero
132 {
133 // Use lock to avoid resurrection by a thread concurrently walking the list
134 observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
135 r = --p->my_ref_count;
136 if (!r) {
137 remove(p);
138 }
139 }
140 __TBB_ASSERT(r || !p->my_ref_count, nullptr);
141 if (!r) {
142 delete p;
143 }
144 }
145
do_notify_entry_observers(observer_proxy * & last,bool worker)146 void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) {
147 // Pointer p marches though the list from last (exclusively) to the end.
148 observer_proxy* p = last, * prev = p;
149 for (;;) {
150 d1::task_scheduler_observer* tso = nullptr;
151 // Hold lock on list only long enough to advance to the next proxy in the list.
152 {
153 scoped_lock lock(mutex(), /*is_writer=*/false);
154 do {
155 if (p) {
156 // We were already processing the list.
157 if (observer_proxy* q = p->my_next) {
158 if (p == prev) {
159 remove_ref_fast(prev); // sets prev to NULL if successful
160 }
161 p = q;
162 } else {
163 // Reached the end of the list.
164 if (p == prev) {
165 // Keep the reference as we store the 'last' pointer in scheduler
166 __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr);
167 } else {
168 // The last few proxies were empty
169 __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr);
170 ++p->my_ref_count;
171 if (prev) {
172 lock.release();
173 remove_ref(prev);
174 }
175 }
176 last = p;
177 return;
178 }
179 } else {
180 // Starting pass through the list
181 p = my_head.load(std::memory_order_relaxed);
182 if (!p) {
183 return;
184 }
185 }
186 tso = p->my_observer;
187 } while (!tso);
188 ++p->my_ref_count;
189 ++tso->my_busy_count;
190 }
191 __TBB_ASSERT(!prev || p != prev, nullptr);
192 // Release the proxy pinned before p
193 if (prev) {
194 remove_ref(prev);
195 }
196 // Do not hold any locks on the list while calling user's code.
197 // Do not intercept any exceptions that may escape the callback so that
198 // they are either handled by the TBB scheduler or passed to the debugger.
199 tso->on_scheduler_entry(worker);
200 __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr);
201 intptr_t bc = --tso->my_busy_count;
202 __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
203 prev = p;
204 }
205 }
206
do_notify_exit_observers(observer_proxy * last,bool worker)207 void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) {
208 // Pointer p marches though the list from the beginning to last (inclusively).
209 observer_proxy* p = nullptr, * prev = nullptr;
210 for (;;) {
211 d1::task_scheduler_observer* tso = nullptr;
212 // Hold lock on list only long enough to advance to the next proxy in the list.
213 {
214 scoped_lock lock(mutex(), /*is_writer=*/false);
215 do {
216 if (p) {
217 // We were already processing the list.
218 if (p != last) {
219 __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer");
220 if (p == prev)
221 remove_ref_fast(prev); // sets prev to NULL if successful
222 p = p->my_next;
223 } else {
224 // remove the reference from the last item
225 remove_ref_fast(p);
226 if (p) {
227 lock.release();
228 if (p != prev && prev) {
229 remove_ref(prev);
230 }
231 remove_ref(p);
232 }
233 return;
234 }
235 } else {
236 // Starting pass through the list
237 p = my_head.load(std::memory_order_relaxed);
238 __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty");
239 }
240 tso = p->my_observer;
241 } while (!tso);
242 // The item is already refcounted
243 if (p != last) // the last is already referenced since entry notification
244 ++p->my_ref_count;
245 ++tso->my_busy_count;
246 }
247 __TBB_ASSERT(!prev || p != prev, nullptr);
248 if (prev)
249 remove_ref(prev);
250 // Do not hold any locks on the list while calling user's code.
251 // Do not intercept any exceptions that may escape the callback so that
252 // they are either handled by the TBB scheduler or passed to the debugger.
253 tso->on_scheduler_exit(worker);
254 __TBB_ASSERT(p->my_ref_count || p == last, nullptr);
255 intptr_t bc = --tso->my_busy_count;
256 __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
257 prev = p;
258 }
259 }
260
observe(d1::task_scheduler_observer & tso,bool enable)261 void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) {
262 if( enable ) {
263 if( !tso.my_proxy.load(std::memory_order_relaxed) ) {
264 observer_proxy* p = new observer_proxy(tso);
265 tso.my_proxy.store(p, std::memory_order_relaxed);
266 tso.my_busy_count.store(0, std::memory_order_relaxed);
267
268 thread_data* td = governor::get_thread_data_if_initialized();
269 if (p->my_observer->my_task_arena == nullptr) {
270 if (!(td && td->my_arena)) {
271 td = governor::get_thread_data();
272 }
273 __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr);
274 __TBB_ASSERT(td && td->my_arena, nullptr);
275 p->my_list = &td->my_arena->my_observers;
276 } else {
277 d1::task_arena* ta = p->my_observer->my_task_arena;
278 arena* a = ta->my_arena.load(std::memory_order_acquire);
279 if (a == nullptr) { // Avoid recursion during arena initialization
280 ta->initialize();
281 a = ta->my_arena.load(std::memory_order_relaxed);
282 }
283 __TBB_ASSERT(a != nullptr, nullptr);
284 p->my_list = &a->my_observers;
285 }
286 p->my_list->insert(p);
287 // Notify newly activated observer and other pending ones if it belongs to current arena
288 if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) {
289 p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker);
290 }
291 }
292 } else {
293 // Make sure that possible concurrent proxy list cleanup does not conflict
294 // with the observer destruction here.
295 if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) {
296 // List destruction should not touch this proxy after we've won the above interlocked exchange.
297 __TBB_ASSERT( proxy->my_observer == &tso, nullptr);
298 __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" );
299 __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" );
300 observer_list &list = *proxy->my_list;
301 {
302 // Ensure that none of the list walkers relies on observer pointer validity
303 observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
304 proxy->my_observer = nullptr;
305 // Proxy may still be held by other threads (to track the last notified observer)
306 if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
307 list.remove(proxy);
308 __TBB_ASSERT( !proxy->my_ref_count, NULL );
309 delete proxy;
310 }
311 }
312 spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback
313 }
314 }
315 }
316
317 } // namespace r1
318 } // namespace detail
319 } // namespace tbb
320