1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "arena_slot.h"
18 #include "arena.h"
19 #include "thread_data.h"
20 
21 namespace tbb {
22 namespace detail {
23 namespace r1 {
24 
25 //------------------------------------------------------------------------
26 // Arena Slot
27 //------------------------------------------------------------------------
get_task_impl(size_t T,execution_data_ext & ed,bool & tasks_omitted,isolation_type isolation)28 d1::task* arena_slot::get_task_impl(size_t T, execution_data_ext& ed, bool& tasks_omitted, isolation_type isolation) {
29     __TBB_ASSERT(tail.load(std::memory_order_relaxed) <= T || is_local_task_pool_quiescent(),
30             "Is it safe to get a task at position T?");
31 
32     d1::task* result = task_pool_ptr[T];
33     __TBB_ASSERT(!is_poisoned( result ), "The poisoned task is going to be processed");
34 
35     if (!result) {
36         return nullptr;
37     }
38     bool omit = isolation != no_isolation && isolation != task_accessor::isolation(*result);
39     if (!omit && !task_accessor::is_proxy_task(*result)) {
40         return result;
41     } else if (omit) {
42         tasks_omitted = true;
43         return nullptr;
44     }
45 
46     task_proxy& tp = static_cast<task_proxy&>(*result);
47     d1::slot_id aff_id = tp.slot;
48     if ( d1::task *t = tp.extract_task<task_proxy::pool_bit>() ) {
49         ed.affinity_slot = aff_id;
50         return t;
51     }
52     // Proxy was empty, so it's our responsibility to free it
53     tp.allocator.delete_object(&tp, ed);
54 
55     if ( tasks_omitted ) {
56         task_pool_ptr[T] = nullptr;
57     }
58     return nullptr;
59 }
60 
get_task(execution_data_ext & ed,isolation_type isolation)61 d1::task* arena_slot::get_task(execution_data_ext& ed, isolation_type isolation) {
62     __TBB_ASSERT(is_task_pool_published(), nullptr);
63     // The current task position in the task pool.
64     std::size_t T0 = tail.load(std::memory_order_relaxed);
65     // The bounds of available tasks in the task pool. H0 is only used when the head bound is reached.
66     std::size_t H0 = (std::size_t)-1, T = T0;
67     d1::task* result = nullptr;
68     bool task_pool_empty = false;
69     bool tasks_omitted = false;
70     do {
71         __TBB_ASSERT( !result, nullptr );
72         // The full fence is required to sync the store of `tail` with the load of `head` (write-read barrier)
73         T = --tail;
74         // The acquire load of head is required to guarantee consistency of our task pool
75         // when a thief rolls back the head.
76         if ( (std::intptr_t)( head.load(std::memory_order_acquire) ) > (std::intptr_t)T ) {
77             acquire_task_pool();
78             H0 = head.load(std::memory_order_relaxed);
79             if ( (std::intptr_t)H0 > (std::intptr_t)T ) {
80                 // The thief has not backed off - nothing to grab.
81                 __TBB_ASSERT( H0 == head.load(std::memory_order_relaxed)
82                     && T == tail.load(std::memory_order_relaxed)
83                     && H0 == T + 1, "victim/thief arbitration algorithm failure" );
84                 reset_task_pool_and_leave();
85                 // No tasks in the task pool.
86                 task_pool_empty = true;
87                 break;
88             } else if ( H0 == T ) {
89                 // There is only one task in the task pool.
90                 reset_task_pool_and_leave();
91                 task_pool_empty = true;
92             } else {
93                 // Release task pool if there are still some tasks.
94                 // After the release, the tail will be less than T, thus a thief
95                 // will not attempt to get a task at position T.
96                 release_task_pool();
97             }
98         }
99         result = get_task_impl( T, ed, tasks_omitted, isolation );
100         if ( result ) {
101             poison_pointer( task_pool_ptr[T] );
102             break;
103         } else if ( !tasks_omitted ) {
104             poison_pointer( task_pool_ptr[T] );
105             __TBB_ASSERT( T0 == T+1, nullptr );
106             T0 = T;
107         }
108     } while ( !result && !task_pool_empty );
109 
110     if ( tasks_omitted ) {
111         if ( task_pool_empty ) {
112             // All tasks have been checked. The task pool should be  in reset state.
113             // We just restore the bounds for the available tasks.
114             // TODO: Does it have sense to move them to the beginning of the task pool?
115             __TBB_ASSERT( is_quiescent_local_task_pool_reset(), nullptr );
116             if ( result ) {
117                 // If we have a task, it should be at H0 position.
118                 __TBB_ASSERT( H0 == T, nullptr );
119                 ++H0;
120             }
121             __TBB_ASSERT( H0 <= T0, nullptr );
122             if ( H0 < T0 ) {
123                 // Restore the task pool if there are some tasks.
124                 head.store(H0, std::memory_order_relaxed);
125                 tail.store(T0, std::memory_order_relaxed);
126                 // The release fence is used in publish_task_pool.
127                 publish_task_pool();
128                 // Synchronize with snapshot as we published some tasks.
129                 ed.task_disp->m_thread_data->my_arena->advertise_new_work<arena::wakeup>();
130             }
131         } else {
132             // A task has been obtained. We need to make a hole in position T.
133             __TBB_ASSERT( is_task_pool_published(), nullptr );
134             __TBB_ASSERT( result, nullptr );
135             task_pool_ptr[T] = nullptr;
136             tail.store(T0, std::memory_order_release);
137             // Synchronize with snapshot as we published some tasks.
138             // TODO: consider some approach not to call wakeup for each time. E.g. check if the tail reached the head.
139             ed.task_disp->m_thread_data->my_arena->advertise_new_work<arena::wakeup>();
140         }
141     }
142 
143     __TBB_ASSERT( (std::intptr_t)tail.load(std::memory_order_relaxed) >= 0, nullptr );
144     __TBB_ASSERT( result || tasks_omitted || is_quiescent_local_task_pool_reset(), nullptr );
145     return result;
146 }
147 
steal_task(arena & a,isolation_type isolation,std::size_t slot_index)148 d1::task* arena_slot::steal_task(arena& a, isolation_type isolation, std::size_t slot_index) {
149     d1::task** victim_pool = lock_task_pool();
150     if (!victim_pool) {
151         return nullptr;
152     }
153     d1::task* result = nullptr;
154     std::size_t H = head.load(std::memory_order_relaxed); // mirror
155     std::size_t H0 = H;
156     bool tasks_omitted = false;
157     do {
158         // The full fence is required to sync the store of `head` with the load of `tail` (write-read barrier)
159         H = ++head;
160         // The acquire load of tail is required to guarantee consistency of victim_pool
161         // because the owner synchronizes task spawning via tail.
162         if ((std::intptr_t)H > (std::intptr_t)(tail.load(std::memory_order_acquire))) {
163             // Stealing attempt failed, deque contents has not been changed by us
164             head.store( /*dead: H = */ H0, std::memory_order_relaxed );
165             __TBB_ASSERT( !result, nullptr );
166             goto unlock;
167         }
168         result = victim_pool[H-1];
169         __TBB_ASSERT( !is_poisoned( result ), nullptr );
170 
171         if (result) {
172             if (isolation == no_isolation || isolation == task_accessor::isolation(*result)) {
173                 if (!task_accessor::is_proxy_task(*result)) {
174                     break;
175                 }
176                 task_proxy& tp = *static_cast<task_proxy*>(result);
177                 // If mailed task is likely to be grabbed by its destination thread, skip it.
178                 if (!task_proxy::is_shared(tp.task_and_tag) || !tp.outbox->recipient_is_idle() || a.mailbox(slot_index).recipient_is_idle()) {
179                     break;
180                 }
181             }
182             // The task cannot be executed either due to isolation or proxy constraints.
183             result = nullptr;
184             tasks_omitted = true;
185         } else if (!tasks_omitted) {
186             // Cleanup the task pool from holes until a task is skipped.
187             __TBB_ASSERT( H0 == H-1, nullptr );
188             poison_pointer( victim_pool[H0] );
189             H0 = H;
190         }
191     } while (!result);
192     __TBB_ASSERT( result, nullptr );
193 
194     // emit "task was consumed" signal
195     poison_pointer( victim_pool[H-1] );
196     if (tasks_omitted) {
197         // Some proxies in the task pool have been omitted. Set the stolen task to nullptr.
198         victim_pool[H-1] = nullptr;
199         // The release store synchronizes the victim_pool update(the store of nullptr).
200         head.store( /*dead: H = */ H0, std::memory_order_release );
201     }
202 unlock:
203     unlock_task_pool(victim_pool);
204 
205 #if __TBB_PREFETCHING
206     __TBB_cl_evict(&victim_slot.head);
207     __TBB_cl_evict(&victim_slot.tail);
208 #endif
209     if (tasks_omitted) {
210         // Synchronize with snapshot as the head and tail can be bumped which can falsely trigger EMPTY state
211         a.advertise_new_work<arena::wakeup>();
212     }
213     return result;
214 }
215 
216 } // namespace r1
217 } // namespace detail
218 } // namespace tbb
219