1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/detail/_config.h"
18 #include "oneapi/tbb/detail/_utils.h"
19 
20 #include "observer_proxy.h"
21 #include "arena.h"
22 #include "main.h"
23 #include "thread_data.h"
24 
25 #include <atomic>
26 
27 namespace tbb {
28 namespace detail {
29 namespace r1 {
30 
31 #if TBB_USE_ASSERT
32 extern std::atomic<int> the_observer_proxy_count;
33 #endif /* TBB_USE_ASSERT */
34 
observer_proxy(d1::task_scheduler_observer & tso)35 observer_proxy::observer_proxy( d1::task_scheduler_observer& tso )
36     : my_ref_count(1), my_list(NULL), my_next(NULL), my_prev(NULL), my_observer(&tso)
37 {
38 #if TBB_USE_ASSERT
39     ++the_observer_proxy_count;
40 #endif /* TBB_USE_ASSERT */
41 }
42 
~observer_proxy()43 observer_proxy::~observer_proxy() {
44     __TBB_ASSERT( !my_ref_count, "Attempt to destroy proxy still in use" );
45     poison_value(my_ref_count);
46     poison_pointer(my_prev);
47     poison_pointer(my_next);
48 #if TBB_USE_ASSERT
49     --the_observer_proxy_count;
50 #endif /* TBB_USE_ASSERT */
51 }
52 
clear()53 void observer_list::clear() {
54     {
55         scoped_lock lock(mutex(), /*is_writer=*/true);
56         observer_proxy *next = my_head.load(std::memory_order_relaxed);
57         while ( observer_proxy *p = next ) {
58             next = p->my_next;
59             // Both proxy p and observer p->my_observer (if non-null) are guaranteed
60             // to be alive while the list is locked.
61             d1::task_scheduler_observer *obs = p->my_observer;
62             // Make sure that possible concurrent observer destruction does not
63             // conflict with the proxy list cleanup.
64             if (!obs || !(p = obs->my_proxy.exchange(nullptr))) {
65                 continue;
66             }
67             // accessing 'obs' after detaching of obs->my_proxy leads to the race with observer destruction
68             __TBB_ASSERT(!next || p == next->my_prev, nullptr);
69             __TBB_ASSERT(is_alive(p->my_ref_count), "Observer's proxy died prematurely");
70             __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed) == 1, "Reference for observer is missing");
71             poison_pointer(p->my_observer);
72             remove(p);
73             --p->my_ref_count;
74             delete p;
75         }
76     }
77 
78     // If observe(false) is called concurrently with the destruction of the arena,
79     // need to wait until all proxies are removed.
80     for (atomic_backoff backoff; ; backoff.pause()) {
81         scoped_lock lock(mutex(), /*is_writer=*/false);
82         if (my_head.load(std::memory_order_relaxed) == nullptr) {
83             break;
84         }
85     }
86 
87     __TBB_ASSERT(my_head.load(std::memory_order_relaxed) == nullptr && my_tail.load(std::memory_order_relaxed) == nullptr, nullptr);
88 }
89 
insert(observer_proxy * p)90 void observer_list::insert( observer_proxy* p ) {
91     scoped_lock lock(mutex(), /*is_writer=*/true);
92     if (my_head.load(std::memory_order_relaxed)) {
93         p->my_prev = my_tail.load(std::memory_order_relaxed);
94         my_tail.load(std::memory_order_relaxed)->my_next = p;
95     } else {
96         my_head.store(p, std::memory_order_relaxed);
97     }
98     my_tail.store(p, std::memory_order_relaxed);
99 }
100 
remove(observer_proxy * p)101 void observer_list::remove(observer_proxy* p) {
102     __TBB_ASSERT(my_head.load(std::memory_order_relaxed), "Attempt to remove an item from an empty list");
103     __TBB_ASSERT(!my_tail.load(std::memory_order_relaxed)->my_next, "Last item's my_next must be NULL");
104     if (p == my_tail.load(std::memory_order_relaxed)) {
105         __TBB_ASSERT(!p->my_next, nullptr);
106         my_tail.store(p->my_prev, std::memory_order_relaxed);
107     } else {
108         __TBB_ASSERT(p->my_next, nullptr);
109         p->my_next->my_prev = p->my_prev;
110     }
111     if (p == my_head.load(std::memory_order_relaxed)) {
112         __TBB_ASSERT(!p->my_prev, nullptr);
113         my_head.store(p->my_next, std::memory_order_relaxed);
114     } else {
115         __TBB_ASSERT(p->my_prev, nullptr);
116         p->my_prev->my_next = p->my_next;
117     }
118     __TBB_ASSERT((my_head.load(std::memory_order_relaxed) && my_tail.load(std::memory_order_relaxed)) ||
119         (!my_head.load(std::memory_order_relaxed) && !my_tail.load(std::memory_order_relaxed)), nullptr);
120 }
121 
remove_ref(observer_proxy * p)122 void observer_list::remove_ref(observer_proxy* p) {
123     std::uintptr_t r = p->my_ref_count.load(std::memory_order_acquire);
124     __TBB_ASSERT(is_alive(r), nullptr);
125     while (r > 1) {
126         if (p->my_ref_count.compare_exchange_strong(r, r - 1)) {
127             return;
128         }
129     }
130     __TBB_ASSERT(r == 1, nullptr);
131     // Reference count might go to zero
132     {
133         // Use lock to avoid resurrection by a thread concurrently walking the list
134         observer_list::scoped_lock lock(mutex(), /*is_writer=*/true);
135         r = --p->my_ref_count;
136         if (!r) {
137             remove(p);
138         }
139     }
140     __TBB_ASSERT(r || !p->my_ref_count, nullptr);
141     if (!r) {
142         delete p;
143     }
144 }
145 
do_notify_entry_observers(observer_proxy * & last,bool worker)146 void observer_list::do_notify_entry_observers(observer_proxy*& last, bool worker) {
147     // Pointer p marches though the list from last (exclusively) to the end.
148     observer_proxy* p = last, * prev = p;
149     for (;;) {
150         d1::task_scheduler_observer* tso = nullptr;
151         // Hold lock on list only long enough to advance to the next proxy in the list.
152         {
153             scoped_lock lock(mutex(), /*is_writer=*/false);
154             do {
155                 if (p) {
156                     // We were already processing the list.
157                     if (observer_proxy* q = p->my_next) {
158                         if (p == prev) {
159                             remove_ref_fast(prev); // sets prev to NULL if successful
160                         }
161                         p = q;
162                     } else {
163                         // Reached the end of the list.
164                         if (p == prev) {
165                             // Keep the reference as we store the 'last' pointer in scheduler
166                             __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)) >= 1 + (p->my_observer ? 1 : 0), nullptr);
167                         } else {
168                             // The last few proxies were empty
169                             __TBB_ASSERT(int(p->my_ref_count.load(std::memory_order_relaxed)), nullptr);
170                             ++p->my_ref_count;
171                             if (prev) {
172                                 lock.release();
173                                 remove_ref(prev);
174                             }
175                         }
176                         last = p;
177                         return;
178                     }
179                 } else {
180                     // Starting pass through the list
181                     p = my_head.load(std::memory_order_relaxed);
182                     if (!p) {
183                         return;
184                     }
185                 }
186                 tso = p->my_observer;
187             } while (!tso);
188             ++p->my_ref_count;
189             ++tso->my_busy_count;
190         }
191         __TBB_ASSERT(!prev || p != prev, nullptr);
192         // Release the proxy pinned before p
193         if (prev) {
194             remove_ref(prev);
195         }
196         // Do not hold any locks on the list while calling user's code.
197         // Do not intercept any exceptions that may escape the callback so that
198         // they are either handled by the TBB scheduler or passed to the debugger.
199         tso->on_scheduler_entry(worker);
200         __TBB_ASSERT(p->my_ref_count.load(std::memory_order_relaxed), nullptr);
201         intptr_t bc = --tso->my_busy_count;
202         __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
203         prev = p;
204     }
205 }
206 
do_notify_exit_observers(observer_proxy * last,bool worker)207 void observer_list::do_notify_exit_observers(observer_proxy* last, bool worker) {
208     // Pointer p marches though the list from the beginning to last (inclusively).
209     observer_proxy* p = nullptr, * prev = nullptr;
210     for (;;) {
211         d1::task_scheduler_observer* tso = nullptr;
212         // Hold lock on list only long enough to advance to the next proxy in the list.
213         {
214             scoped_lock lock(mutex(), /*is_writer=*/false);
215             do {
216                 if (p) {
217                     // We were already processing the list.
218                     if (p != last) {
219                         __TBB_ASSERT(p->my_next, "List items before 'last' must have valid my_next pointer");
220                         if (p == prev)
221                             remove_ref_fast(prev); // sets prev to NULL if successful
222                         p = p->my_next;
223                     } else {
224                         // remove the reference from the last item
225                         remove_ref_fast(p);
226                         if (p) {
227                             lock.release();
228                             if (p != prev && prev) {
229                                 remove_ref(prev);
230                             }
231                             remove_ref(p);
232                         }
233                         return;
234                     }
235                 } else {
236                     // Starting pass through the list
237                     p = my_head.load(std::memory_order_relaxed);
238                     __TBB_ASSERT(p, "Nonzero 'last' must guarantee that the global list is non-empty");
239                 }
240                 tso = p->my_observer;
241             } while (!tso);
242             // The item is already refcounted
243             if (p != last) // the last is already referenced since entry notification
244                 ++p->my_ref_count;
245             ++tso->my_busy_count;
246         }
247         __TBB_ASSERT(!prev || p != prev, nullptr);
248         if (prev)
249             remove_ref(prev);
250         // Do not hold any locks on the list while calling user's code.
251         // Do not intercept any exceptions that may escape the callback so that
252         // they are either handled by the TBB scheduler or passed to the debugger.
253         tso->on_scheduler_exit(worker);
254         __TBB_ASSERT(p->my_ref_count || p == last, nullptr);
255         intptr_t bc = --tso->my_busy_count;
256         __TBB_ASSERT_EX(bc >= 0, "my_busy_count underflowed");
257         prev = p;
258     }
259 }
260 
observe(d1::task_scheduler_observer & tso,bool enable)261 void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer &tso, bool enable) {
262     if( enable ) {
263         if( !tso.my_proxy.load(std::memory_order_relaxed) ) {
264             observer_proxy* p = new observer_proxy(tso);
265             tso.my_proxy.store(p, std::memory_order_relaxed);
266             tso.my_busy_count.store(0, std::memory_order_relaxed);
267 
268             thread_data* td = governor::get_thread_data_if_initialized();
269             if (p->my_observer->my_task_arena == nullptr) {
270                 if (!(td && td->my_arena)) {
271                     td = governor::get_thread_data();
272                 }
273                 __TBB_ASSERT(__TBB_InitOnce::initialization_done(), nullptr);
274                 __TBB_ASSERT(td && td->my_arena, nullptr);
275                 p->my_list = &td->my_arena->my_observers;
276             } else {
277                 d1::task_arena* ta = p->my_observer->my_task_arena;
278                 arena* a = ta->my_arena.load(std::memory_order_acquire);
279                 if (a == nullptr) { // Avoid recursion during arena initialization
280                     ta->initialize();
281                     a = ta->my_arena.load(std::memory_order_relaxed);
282                 }
283                 __TBB_ASSERT(a != nullptr, nullptr);
284                 p->my_list = &a->my_observers;
285             }
286             p->my_list->insert(p);
287             // Notify newly activated observer and other pending ones if it belongs to current arena
288             if (td && td->my_arena && &td->my_arena->my_observers == p->my_list) {
289                 p->my_list->notify_entry_observers(td->my_last_observer, td->my_is_worker);
290             }
291         }
292     } else {
293         // Make sure that possible concurrent proxy list cleanup does not conflict
294         // with the observer destruction here.
295         if ( observer_proxy* proxy = tso.my_proxy.exchange(nullptr) ) {
296             // List destruction should not touch this proxy after we've won the above interlocked exchange.
297             __TBB_ASSERT( proxy->my_observer == &tso, nullptr);
298             __TBB_ASSERT( is_alive(proxy->my_ref_count.load(std::memory_order_relaxed)), "Observer's proxy died prematurely" );
299             __TBB_ASSERT( proxy->my_ref_count.load(std::memory_order_relaxed) >= 1, "reference for observer missing" );
300             observer_list &list = *proxy->my_list;
301             {
302                 // Ensure that none of the list walkers relies on observer pointer validity
303                 observer_list::scoped_lock lock(list.mutex(), /*is_writer=*/true);
304                 proxy->my_observer = nullptr;
305                 // Proxy may still be held by other threads (to track the last notified observer)
306                 if( !--proxy->my_ref_count ) {// nobody can increase it under exclusive lock
307                     list.remove(proxy);
308                     __TBB_ASSERT( !proxy->my_ref_count, NULL );
309                     delete proxy;
310                 }
311             }
312             spin_wait_until_eq(tso.my_busy_count, 0); // other threads are still accessing the callback
313         }
314     }
315 }
316 
317 } // namespace r1
318 } // namespace detail
319 } // namespace tbb
320