1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_arena_slot_H
18 #define _TBB_arena_slot_H
19 
20 #include "oneapi/tbb/detail/_config.h"
21 #include "oneapi/tbb/detail/_utils.h"
22 #include "oneapi/tbb/detail/_template_helpers.h"
23 #include "oneapi/tbb/detail/_task.h"
24 
25 #include "oneapi/tbb/cache_aligned_allocator.h"
26 
27 #include "misc.h"
28 #include "mailbox.h"
29 #include "scheduler_common.h"
30 
31 #include <atomic>
32 
33 namespace tbb {
34 namespace detail {
35 namespace r1 {
36 
37 class arena;
38 class task_group_context;
39 
40 //--------------------------------------------------------------------------------------------------------
41 // Arena Slot
42 //--------------------------------------------------------------------------------------------------------
43 
44 static d1::task** const EmptyTaskPool  = nullptr;
45 static d1::task** const LockedTaskPool = reinterpret_cast<d1::task**>(~std::intptr_t(0));
46 
alignas(max_nfs_size)47 struct alignas(max_nfs_size) arena_slot_shared_state {
48     //! Scheduler of the thread attached to the slot
49     /** Marks the slot as busy, and is used to iterate through the schedulers belonging to this arena **/
50     std::atomic<bool> my_is_occupied;
51 
52     // Synchronization of access to Task pool
53     /** Also is used to specify if the slot is empty or locked:
54          0 - empty
55         -1 - locked **/
56     std::atomic<d1::task**> task_pool;
57 
58     //! Index of the first ready task in the deque.
59     /** Modified by thieves, and by the owner during compaction/reallocation **/
60     std::atomic<std::size_t> head;
61 };
62 
alignas(max_nfs_size)63 struct alignas(max_nfs_size) arena_slot_private_state {
64     //! Hint provided for operations with the container of starvation-resistant tasks.
65     /** Modified by the owner thread (during these operations). **/
66     unsigned hint_for_fifo_stream;
67 
68 #if __TBB_PREVIEW_CRITICAL_TASKS
69     //! Similar to 'hint_for_fifo_stream' but for critical tasks.
70     unsigned hint_for_critical_stream;
71 #endif
72 
73     //! Similar to 'hint_for_fifo_stream' but for the resume tasks.
74     unsigned hint_for_resume_stream;
75 
76     //! Index of the element following the last ready task in the deque.
77     /** Modified by the owner thread. **/
78     std::atomic<std::size_t> tail;
79 
80     //! Capacity of the primary task pool (number of elements - pointers to task).
81     std::size_t my_task_pool_size;
82 
83     //! Task pool of the scheduler that owns this slot
84     // TODO: previously was task**__TBB_atomic, but seems like not accessed on other thread
85     d1::task** task_pool_ptr;
86 };
87 
88 class arena_slot : private arena_slot_shared_state, private arena_slot_private_state {
89     friend class arena;
90     friend class outermost_worker_waiter;
91     friend class task_dispatcher;
92     friend class thread_data;
93     friend class nested_arena_context;
94 
95     //! The original task dispather associated with this slot
96     task_dispatcher* my_default_task_dispatcher;
97 
98 #if TBB_USE_ASSERT
fill_with_canary_pattern(std::size_t first,std::size_t last)99     void fill_with_canary_pattern ( std::size_t first, std::size_t last ) {
100         for ( std::size_t i = first; i < last; ++i )
101             poison_pointer(task_pool_ptr[i]);
102     }
103 #else
fill_with_canary_pattern(size_t,std::size_t)104     void fill_with_canary_pattern ( size_t, std::size_t ) {}
105 #endif /* TBB_USE_ASSERT */
106 
107     static constexpr std::size_t min_task_pool_size = 64;
108 
allocate_task_pool(std::size_t n)109     void allocate_task_pool( std::size_t n ) {
110         std::size_t byte_size = ((n * sizeof(d1::task*) + max_nfs_size - 1) / max_nfs_size) * max_nfs_size;
111         my_task_pool_size = byte_size / sizeof(d1::task*);
112         task_pool_ptr = (d1::task**)cache_aligned_allocate(byte_size);
113         // No need to clear the fresh deque since valid items are designated by the head and tail members.
114         // But fill it with a canary pattern in the high vigilance debug mode.
115         fill_with_canary_pattern( 0, my_task_pool_size );
116     }
117 
118 public:
119     //! Deallocate task pool that was allocated by means of allocate_task_pool.
free_task_pool()120     void free_task_pool( ) {
121         // TODO: understand the assertion and modify
122         // __TBB_ASSERT( !task_pool /* TODO: == EmptyTaskPool */, NULL);
123         if( task_pool_ptr ) {
124            __TBB_ASSERT( my_task_pool_size, NULL);
125            cache_aligned_deallocate( task_pool_ptr );
126            task_pool_ptr = NULL;
127            my_task_pool_size = 0;
128         }
129     }
130 
131     //! Get a task from the local pool.
132     /** Called only by the pool owner.
133         Returns the pointer to the task or NULL if a suitable task is not found.
134         Resets the pool if it is empty. **/
135     d1::task* get_task(execution_data_ext&, isolation_type);
136 
137     //! Steal task from slot's ready pool
138     d1::task* steal_task(arena&, isolation_type, std::size_t);
139 
140     //! Some thread is now the owner of this slot
occupy()141     void occupy() {
142         __TBB_ASSERT(!my_is_occupied.load(std::memory_order_relaxed), nullptr);
143         my_is_occupied.store(true, std::memory_order_release);
144     }
145 
146     //! Try to occupy the slot
try_occupy()147     bool try_occupy() {
148         return !is_occupied() && my_is_occupied.exchange(true) == false;
149     }
150 
151     //! Some thread is now the owner of this slot
release()152     void release() {
153         __TBB_ASSERT(my_is_occupied.load(std::memory_order_relaxed), nullptr);
154         my_is_occupied.store(false, std::memory_order_release);
155     }
156 
157     //! Spawn newly created tasks
spawn(d1::task & t)158     void spawn(d1::task& t) {
159         std::size_t T = prepare_task_pool(1);
160         __TBB_ASSERT(is_poisoned(task_pool_ptr[T]), NULL);
161         task_pool_ptr[T] = &t;
162         commit_spawned_tasks(T + 1);
163         if (!is_task_pool_published()) {
164             publish_task_pool();
165         }
166     }
167 
is_task_pool_published()168     bool is_task_pool_published() const {
169         return task_pool.load(std::memory_order_relaxed) != EmptyTaskPool;
170     }
171 
is_occupied()172     bool is_occupied() const {
173         return my_is_occupied.load(std::memory_order_relaxed);
174     }
175 
default_task_dispatcher()176     task_dispatcher& default_task_dispatcher() {
177         __TBB_ASSERT(my_default_task_dispatcher != nullptr, nullptr);
178         return *my_default_task_dispatcher;
179     }
180 
init_task_streams(unsigned h)181     void init_task_streams(unsigned h) {
182         hint_for_fifo_stream = h;
183 #if __TBB_RESUMABLE_TASKS
184         hint_for_resume_stream = h;
185 #endif
186 #if __TBB_PREVIEW_CRITICAL_TASKS
187         hint_for_critical_stream = h;
188 #endif
189     }
190 
191 #if __TBB_PREVIEW_CRITICAL_TASKS
critical_hint()192     unsigned& critical_hint() {
193         return hint_for_critical_stream;
194     }
195 #endif
196 private:
197     //! Get a task from the local pool at specified location T.
198     /** Returns the pointer to the task or NULL if the task cannot be executed,
199         e.g. proxy has been deallocated or isolation constraint is not met.
200         tasks_omitted tells if some tasks have been omitted.
201         Called only by the pool owner. The caller should guarantee that the
202         position T is not available for a thief. **/
203     d1::task* get_task_impl(size_t T, execution_data_ext& ed, bool& tasks_omitted, isolation_type isolation);
204 
205     //! Makes sure that the task pool can accommodate at least n more elements
206     /** If necessary relocates existing task pointers or grows the ready task deque.
207      *  Returns (possible updated) tail index (not accounting for n). **/
prepare_task_pool(std::size_t num_tasks)208     std::size_t prepare_task_pool(std::size_t num_tasks) {
209         std::size_t T = tail.load(std::memory_order_relaxed); // mirror
210         if ( T + num_tasks <= my_task_pool_size ) {
211             return T;
212         }
213 
214         std::size_t new_size = num_tasks;
215         if ( !my_task_pool_size ) {
216             __TBB_ASSERT( !is_task_pool_published() && is_quiescent_local_task_pool_reset(), NULL );
217             __TBB_ASSERT( !task_pool_ptr, NULL );
218             if ( num_tasks < min_task_pool_size ) new_size = min_task_pool_size;
219             allocate_task_pool( new_size );
220             return 0;
221         }
222         acquire_task_pool();
223         std::size_t H =  head.load(std::memory_order_relaxed); // mirror
224         d1::task** new_task_pool = task_pool_ptr;;
225         __TBB_ASSERT( my_task_pool_size >= min_task_pool_size, NULL );
226         // Count not skipped tasks. Consider using std::count_if.
227         for ( std::size_t i = H; i < T; ++i )
228             if ( new_task_pool[i] ) ++new_size;
229         // If the free space at the beginning of the task pool is too short, we
230         // are likely facing a pathological single-producer-multiple-consumers
231         // scenario, and thus it's better to expand the task pool
232         bool allocate = new_size > my_task_pool_size - min_task_pool_size/4;
233         if ( allocate ) {
234             // Grow task pool. As this operation is rare, and its cost is asymptotically
235             // amortizable, we can tolerate new task pool allocation done under the lock.
236             if ( new_size < 2 * my_task_pool_size )
237                 new_size = 2 * my_task_pool_size;
238             allocate_task_pool( new_size ); // updates my_task_pool_size
239         }
240         // Filter out skipped tasks. Consider using std::copy_if.
241         std::size_t T1 = 0;
242         for ( std::size_t i = H; i < T; ++i ) {
243             if ( new_task_pool[i] ) {
244                 task_pool_ptr[T1++] = new_task_pool[i];
245             }
246         }
247         // Deallocate the previous task pool if a new one has been allocated.
248         if ( allocate )
249             cache_aligned_deallocate( new_task_pool );
250         else
251             fill_with_canary_pattern( T1, tail );
252         // Publish the new state.
253         commit_relocated_tasks( T1 );
254         // assert_task_pool_valid();
255         return T1;
256     }
257 
258     //! Makes newly spawned tasks visible to thieves
commit_spawned_tasks(std::size_t new_tail)259     void commit_spawned_tasks(std::size_t new_tail) {
260         __TBB_ASSERT (new_tail <= my_task_pool_size, "task deque end was overwritten");
261         // emit "task was released" signal
262         // Release fence is necessary to make sure that previously stored task pointers
263         // are visible to thieves.
264         tail.store(new_tail, std::memory_order_release);
265     }
266 
267     //! Used by workers to enter the task pool
268     /** Does not lock the task pool in case if arena slot has been successfully grabbed. **/
publish_task_pool()269     void publish_task_pool() {
270         __TBB_ASSERT ( task_pool == EmptyTaskPool, "someone else grabbed my arena slot?" );
271         __TBB_ASSERT ( head.load(std::memory_order_relaxed) < tail.load(std::memory_order_relaxed),
272                 "entering arena without tasks to share" );
273         // Release signal on behalf of previously spawned tasks (when this thread was not in arena yet)
274         task_pool.store(task_pool_ptr, std::memory_order_release );
275     }
276 
277     //! Locks the local task pool
278     /** Garbles task_pool for the duration of the lock. Requires correctly set task_pool_ptr.
279         ATTENTION: This method is mostly the same as generic_scheduler::lock_task_pool(), with
280         a little different logic of slot state checks (slot is either locked or points
281         to our task pool). Thus if either of them is changed, consider changing the counterpart as well. **/
acquire_task_pool()282     void acquire_task_pool() {
283         if (!is_task_pool_published()) {
284             return; // we are not in arena - nothing to lock
285         }
286         bool sync_prepare_done = false;
287         for( atomic_backoff b;;b.pause() ) {
288 #if TBB_USE_ASSERT
289             // Local copy of the arena slot task pool pointer is necessary for the next
290             // assertion to work correctly to exclude asynchronous state transition effect.
291             d1::task** tp = task_pool.load(std::memory_order_relaxed);
292             __TBB_ASSERT( tp == LockedTaskPool || tp == task_pool_ptr, "slot ownership corrupt?" );
293 #endif
294             d1::task** expected = task_pool_ptr;
295             if( task_pool.load(std::memory_order_relaxed) != LockedTaskPool &&
296                 task_pool.compare_exchange_strong(expected, LockedTaskPool ) ) {
297                 // We acquired our own slot
298                 break;
299             } else if( !sync_prepare_done ) {
300                 // Start waiting
301                 sync_prepare_done = true;
302             }
303             // Someone else acquired a lock, so pause and do exponential backoff.
304         }
305         __TBB_ASSERT( task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "not really acquired task pool" );
306     }
307 
308     //! Unlocks the local task pool
309     /** Restores task_pool munged by acquire_task_pool. Requires
310         correctly set task_pool_ptr. **/
release_task_pool()311     void release_task_pool() {
312         if ( !(task_pool.load(std::memory_order_relaxed) != EmptyTaskPool) )
313             return; // we are not in arena - nothing to unlock
314         __TBB_ASSERT( task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "arena slot is not locked" );
315         task_pool.store( task_pool_ptr, std::memory_order_release );
316     }
317 
318     //! Locks victim's task pool, and returns pointer to it. The pointer can be NULL.
319     /** Garbles victim_arena_slot->task_pool for the duration of the lock. **/
lock_task_pool()320     d1::task** lock_task_pool() {
321         d1::task** victim_task_pool;
322         for ( atomic_backoff backoff;; /*backoff pause embedded in the loop*/) {
323             victim_task_pool = task_pool.load(std::memory_order_relaxed);
324             // Microbenchmarks demonstrated that aborting stealing attempt when the
325             // victim's task pool is locked degrade performance.
326             // NOTE: Do not use comparison of head and tail indices to check for
327             // the presence of work in the victim's task pool, as they may give
328             // incorrect indication because of task pool relocations and resizes.
329             if (victim_task_pool == EmptyTaskPool) {
330                 break;
331             }
332             d1::task** expected = victim_task_pool;
333             if (victim_task_pool != LockedTaskPool && task_pool.compare_exchange_strong(expected, LockedTaskPool) ) {
334                 // We've locked victim's task pool
335                 break;
336             }
337             // Someone else acquired a lock, so pause and do exponential backoff.
338             backoff.pause();
339         }
340         __TBB_ASSERT(victim_task_pool == EmptyTaskPool ||
341                     (task_pool.load(std::memory_order_relaxed) == LockedTaskPool &&
342                     victim_task_pool != LockedTaskPool), "not really locked victim's task pool?");
343         return victim_task_pool;
344     }
345 
346     //! Unlocks victim's task pool
347     /** Restores victim_arena_slot->task_pool munged by lock_task_pool. **/
unlock_task_pool(d1::task ** victim_task_pool)348     void unlock_task_pool(d1::task** victim_task_pool) {
349         __TBB_ASSERT(task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "victim arena slot is not locked");
350         __TBB_ASSERT(victim_task_pool != LockedTaskPool, NULL);
351         task_pool.store(victim_task_pool, std::memory_order_release);
352     }
353 
354 #if TBB_USE_ASSERT
is_local_task_pool_quiescent()355     bool is_local_task_pool_quiescent() const {
356         d1::task** tp = task_pool.load(std::memory_order_relaxed);
357         return tp == EmptyTaskPool || tp == LockedTaskPool;
358     }
359 
is_quiescent_local_task_pool_empty()360     bool is_quiescent_local_task_pool_empty() const {
361         __TBB_ASSERT(is_local_task_pool_quiescent(), "Task pool is not quiescent");
362         return head.load(std::memory_order_relaxed) == tail.load(std::memory_order_relaxed);
363     }
364 
is_quiescent_local_task_pool_reset()365     bool is_quiescent_local_task_pool_reset() const {
366         __TBB_ASSERT(is_local_task_pool_quiescent(), "Task pool is not quiescent");
367         return head.load(std::memory_order_relaxed) == 0 && tail.load(std::memory_order_relaxed) == 0;
368     }
369 #endif // TBB_USE_ASSERT
370 
371     //! Leave the task pool
372     /** Leaving task pool automatically releases the task pool if it is locked. **/
leave_task_pool()373     void leave_task_pool() {
374         __TBB_ASSERT(is_task_pool_published(), "Not in arena");
375         // Do not reset my_arena_index. It will be used to (attempt to) re-acquire the slot next time
376         __TBB_ASSERT(task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "Task pool must be locked when leaving arena");
377         __TBB_ASSERT(is_quiescent_local_task_pool_empty(), "Cannot leave arena when the task pool is not empty");
378         // No release fence is necessary here as this assignment precludes external
379         // accesses to the local task pool when becomes visible. Thus it is harmless
380         // if it gets hoisted above preceding local bookkeeping manipulations.
381         task_pool.store(EmptyTaskPool, std::memory_order_relaxed);
382     }
383 
384     //! Resets head and tail indices to 0, and leaves task pool
385     /** The task pool must be locked by the owner (via acquire_task_pool).**/
reset_task_pool_and_leave()386     void reset_task_pool_and_leave() {
387         __TBB_ASSERT(task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "Task pool must be locked when resetting task pool");
388         tail.store(0, std::memory_order_relaxed);
389         head.store(0, std::memory_order_relaxed);
390         leave_task_pool();
391     }
392 
393     //! Makes relocated tasks visible to thieves and releases the local task pool.
394     /** Obviously, the task pool must be locked when calling this method. **/
commit_relocated_tasks(std::size_t new_tail)395     void commit_relocated_tasks(std::size_t new_tail) {
396         __TBB_ASSERT(is_local_task_pool_quiescent(), "Task pool must be locked when calling commit_relocated_tasks()");
397         head.store(0, std::memory_order_relaxed);
398         // Tail is updated last to minimize probability of a thread making arena
399         // snapshot being misguided into thinking that this task pool is empty.
400         tail.store(new_tail, std::memory_order_release);
401         release_task_pool();
402     }
403 };
404 
405 } // namespace r1
406 } // namespace detail
407 } // namespace tbb
408 
409 #endif // __TBB_arena_slot_H
410