1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "arena.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "waiters.h"
23 #include "oneapi/tbb/detail/_task.h"
24 #include "oneapi/tbb/info.h"
25 #include "oneapi/tbb/tbb_allocator.h"
26 
27 #include <atomic>
28 #include <cstring>
29 #include <functional>
30 
31 namespace tbb {
32 namespace detail {
33 namespace r1 {
34 
35 #if __TBB_ARENA_BINDING
36 class numa_binding_observer : public tbb::task_scheduler_observer {
37     binding_handler* my_binding_handler;
38 public:
numa_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)39     numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
40         : task_scheduler_observer(*ta)
41         , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
42     {}
43 
on_scheduler_entry(bool)44     void on_scheduler_entry( bool ) override {
45         apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
46     }
47 
on_scheduler_exit(bool)48     void on_scheduler_exit( bool ) override {
49         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
50     }
51 
~numa_binding_observer()52     ~numa_binding_observer(){
53         destroy_binding_handler(my_binding_handler);
54     }
55 };
56 
construct_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
58     numa_binding_observer* binding_observer = nullptr;
59     if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
60         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
61         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
62         binding_observer->observe(true);
63     }
64     return binding_observer;
65 }
66 
destroy_binding_observer(numa_binding_observer * binding_observer)67 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
68     __TBB_ASSERT(binding_observer, "Trying to deallocate NULL pointer");
69     binding_observer->observe(false);
70     binding_observer->~numa_binding_observer();
71     deallocate_memory(binding_observer);
72 }
73 #endif /*!__TBB_ARENA_BINDING*/
74 
occupy_free_slot_in_range(thread_data & tls,std::size_t lower,std::size_t upper)75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
76     if ( lower >= upper ) return out_of_arena;
77     // Start search for an empty slot from the one we occupied the last time
78     std::size_t index = tls.my_arena_index;
79     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
80     __TBB_ASSERT( index >= lower && index < upper, NULL );
81     // Find a free slot
82     for ( std::size_t i = index; i < upper; ++i )
83         if (my_slots[i].try_occupy()) return i;
84     for ( std::size_t i = lower; i < index; ++i )
85         if (my_slots[i].try_occupy()) return i;
86     return out_of_arena;
87 }
88 
89 template <bool as_worker>
occupy_free_slot(thread_data & tls)90 std::size_t arena::occupy_free_slot(thread_data& tls) {
91     // Firstly, external threads try to occupy reserved slots
92     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
93     if ( index == out_of_arena ) {
94         // Secondly, all threads try to occupy all non-reserved slots
95         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
96         // Likely this arena is already saturated
97         if ( index == out_of_arena )
98             return out_of_arena;
99     }
100 
101     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
102     return index;
103 }
104 
calculate_stealing_threshold()105 std::uintptr_t arena::calculate_stealing_threshold() {
106     stack_anchor_type anchor;
107     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size());
108 }
109 
process(thread_data & tls)110 void arena::process(thread_data& tls) {
111     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
112     __TBB_ASSERT( is_alive(my_guard), nullptr);
113     __TBB_ASSERT( my_num_slots > 1, nullptr);
114 
115     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
116     if (index == out_of_arena) {
117         on_thread_leaving<ref_worker>();
118         return;
119     }
120     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
121     tls.attach_arena(*this, index);
122     // worker thread enters the dispatch loop to look for a work
123     tls.my_inbox.set_is_idle(true);
124     if (tls.my_arena_slot->is_task_pool_published()) {
125         tls.my_inbox.set_is_idle(false);
126     }
127 
128     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
129     task_disp.set_stealing_threshold(calculate_stealing_threshold());
130     __TBB_ASSERT(task_disp.can_steal(), nullptr);
131     tls.attach_task_dispatcher(task_disp);
132 
133     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
134     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
135 
136     // Waiting on special object tied to this arena
137     outermost_worker_waiter waiter(*this);
138     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
139     // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
140     // attached to it.
141     tls.my_inbox.set_is_idle(true);
142 
143     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
144     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
145     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
146 
147     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
148     tls.my_last_observer = nullptr;
149 
150     task_disp.set_stealing_threshold(0);
151     tls.detach_task_dispatcher();
152 
153     // Arena slot detach (arena may be used in market::process)
154     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
155     tls.my_arena_slot->release();
156     tls.my_arena_slot = nullptr;
157     tls.my_inbox.detach();
158     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
159     __TBB_ASSERT(is_alive(my_guard), nullptr);
160 
161     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
162     // that arena may be temporarily left unpopulated by threads. See comments in
163     // arena::on_thread_leaving() for more details.
164     on_thread_leaving<ref_worker>();
165     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
166 }
167 
arena(market & m,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)168 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level )
169 {
170     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
171     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
172     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
173     my_market = &m;
174     my_limit = 1;
175     // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
176     my_num_slots = num_arena_slots(num_slots);
177     my_num_reserved_slots = num_reserved_slots;
178     my_max_num_workers = num_slots-num_reserved_slots;
179     my_priority_level = priority_level;
180     my_references = ref_external; // accounts for the external thread
181     my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed);
182     my_observers.my_arena = this;
183     my_co_cache.init(4 * num_slots);
184     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL );
185     // Initialize the default context. It should be allocated before task_dispatch construction.
186     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
187         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
188     // Construct slots. Mark internal synchronization elements for the tools.
189     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
190     for( unsigned i = 0; i < my_num_slots; ++i ) {
191         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
192         __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
193         __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
194         mailbox(i).construct();
195         my_slots[i].init_task_streams(i);
196         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
197         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
198     }
199     my_fifo_task_stream.initialize(my_num_slots);
200     my_resume_task_stream.initialize(my_num_slots);
201 #if __TBB_PREVIEW_CRITICAL_TASKS
202     my_critical_task_stream.initialize(my_num_slots);
203 #endif
204 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
205     my_local_concurrency_requests = 0;
206     my_local_concurrency_flag.clear();
207     my_global_concurrency_mode.store(false, std::memory_order_relaxed);
208 #endif
209 }
210 
allocate_arena(market & m,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)211 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
212                               unsigned priority_level )
213 {
214     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
215     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
216     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
217     std::size_t n = allocation_size(num_arena_slots(num_slots));
218     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
219     // Zero all slots to indicate that they are empty
220     std::memset( storage, 0, n );
221     return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) )
222         arena(m, num_slots, num_reserved_slots, priority_level);
223 }
224 
free_arena()225 void arena::free_arena () {
226     __TBB_ASSERT( is_alive(my_guard), NULL );
227     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
228     __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
229     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers,
230                   "Inconsistent state of a dying arena" );
231 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
232     __TBB_ASSERT( !my_global_concurrency_mode, NULL );
233 #endif
234 #if __TBB_ARENA_BINDING
235     if (my_numa_binding_observer != nullptr) {
236         destroy_binding_observer(my_numa_binding_observer);
237         my_numa_binding_observer = nullptr;
238     }
239 #endif /*__TBB_ARENA_BINDING*/
240     poison_value( my_guard );
241     for ( unsigned i = 0; i < my_num_slots; ++i ) {
242         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
243         // TODO: understand the assertion and modify
244         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
245         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
246         my_slots[i].free_task_pool();
247         mailbox(i).drain();
248         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
249     }
250     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
251     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
252     // Cleanup coroutines/schedulers cache
253     my_co_cache.cleanup();
254     my_default_ctx->~task_group_context();
255     cache_aligned_deallocate(my_default_ctx);
256 #if __TBB_PREVIEW_CRITICAL_TASKS
257     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
258 #endif
259     // remove an internal reference
260     my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
261 
262     // Clear enfources synchronization with observe(false)
263     my_observers.clear();
264 
265     void* storage  = &mailbox(my_num_slots-1);
266     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, NULL );
267     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, NULL );
268     this->~arena();
269 #if TBB_USE_ASSERT > 1
270     std::memset( storage, 0, allocation_size(my_num_slots) );
271 #endif /* TBB_USE_ASSERT */
272     cache_aligned_deallocate( storage );
273 }
274 
has_enqueued_tasks()275 bool arena::has_enqueued_tasks() {
276     return !my_fifo_task_stream.empty();
277 }
278 
is_out_of_work()279 bool arena::is_out_of_work() {
280 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
281     if (my_local_concurrency_flag.try_clear_if([this] {
282         return !has_enqueued_tasks();
283     })) {
284         my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true);
285     }
286 #endif
287 
288     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
289     switch (my_pool_state.load(std::memory_order_acquire)) {
290     case SNAPSHOT_EMPTY:
291         return true;
292     case SNAPSHOT_FULL: {
293         // Use unique id for "busy" in order to avoid ABA problems.
294         const pool_state_t busy = pool_state_t(&busy);
295         // Helper for CAS execution
296         pool_state_t expected_state;
297 
298         // Request permission to take snapshot
299         expected_state = SNAPSHOT_FULL;
300         if (my_pool_state.compare_exchange_strong(expected_state, busy)) {
301             // Got permission. Take the snapshot.
302             // NOTE: This is not a lock, as the state can be set to FULL at
303             //       any moment by a thread that spawns/enqueues new task.
304             std::size_t n = my_limit.load(std::memory_order_acquire);
305             // Make local copies of volatile parameters. Their change during
306             // snapshot taking procedure invalidates the attempt, and returns
307             // this thread into the dispatch loop.
308             std::size_t k;
309             for (k = 0; k < n; ++k) {
310                 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool &&
311                     my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed))
312                 {
313                     // k-th primary task pool is nonempty and does contain tasks.
314                     break;
315                 }
316                 if (my_pool_state.load(std::memory_order_acquire) != busy)
317                     return false; // the work was published
318             }
319             bool work_absent = k == n;
320             // Test and test-and-set.
321             if (my_pool_state.load(std::memory_order_acquire) == busy) {
322                 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty();
323 #if __TBB_PREVIEW_CRITICAL_TASKS
324                 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty();
325 #endif
326                 work_absent = work_absent && no_stream_tasks;
327                 if (work_absent) {
328                     // save current demand value before setting SNAPSHOT_EMPTY,
329                     // to avoid race with advertise_new_work.
330                     int current_demand = (int)my_max_num_workers;
331                     expected_state = busy;
332                     if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) {
333                         // This thread transitioned pool to empty state, and thus is
334                         // responsible for telling the market that there is no work to do.
335                         my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false);
336                         return true;
337                     }
338                     return false;
339                 }
340                 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
341                 expected_state = busy;
342                 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL);
343             }
344         }
345         return false;
346     }
347     default:
348         // Another thread is taking a snapshot.
349         return false;
350     }
351 }
352 
enqueue_task(d1::task & t,d1::task_group_context & ctx,thread_data & td)353 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
354     task_group_context_impl::bind_to(ctx, &td);
355     task_accessor::context(t) = &ctx;
356     task_accessor::isolation(t) = no_isolation;
357     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
358     advertise_new_work<work_enqueued>();
359 }
360 
361 } // namespace r1
362 } // namespace detail
363 } // namespace tbb
364 
365 // Enable task_arena.h
366 #include "oneapi/tbb/task_arena.h" // task_arena_base
367 
368 namespace tbb {
369 namespace detail {
370 namespace r1 {
371 
372 #if TBB_USE_ASSERT
assert_arena_priority_valid(tbb::task_arena::priority a_priority)373 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
374     bool is_arena_priority_correct =
375         a_priority == tbb::task_arena::priority::high   ||
376         a_priority == tbb::task_arena::priority::normal ||
377         a_priority == tbb::task_arena::priority::low;
378     __TBB_ASSERT( is_arena_priority_correct,
379                   "Task arena priority should be equal to one of the predefined values." );
380 }
381 #else
382 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
383 #endif
384 
arena_priority_level(tbb::task_arena::priority a_priority)385 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
386     assert_arena_priority_valid( a_priority );
387     return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
388 }
389 
arena_priority(unsigned priority_level)390 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
391     auto priority = tbb::task_arena::priority(
392         (market::num_priority_levels - priority_level) * d1::priority_stride
393     );
394     assert_arena_priority_valid( priority );
395     return priority;
396 }
397 
398 struct task_arena_impl {
399     static void initialize(d1::task_arena_base&);
400     static void terminate(d1::task_arena_base&);
401     static bool attach(d1::task_arena_base&);
402     static void execute(d1::task_arena_base&, d1::delegate_base&);
403     static void wait(d1::task_arena_base&);
404     static int max_concurrency(const d1::task_arena_base*);
405     static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
406 };
407 
initialize(d1::task_arena_base & ta)408 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
409     task_arena_impl::initialize(ta);
410 }
terminate(d1::task_arena_base & ta)411 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
412     task_arena_impl::terminate(ta);
413 }
attach(d1::task_arena_base & ta)414 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
415     return task_arena_impl::attach(ta);
416 }
execute(d1::task_arena_base & ta,d1::delegate_base & d)417 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
418     task_arena_impl::execute(ta, d);
419 }
wait(d1::task_arena_base & ta)420 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
421     task_arena_impl::wait(ta);
422 }
423 
max_concurrency(const d1::task_arena_base * ta)424 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
425     return task_arena_impl::max_concurrency(ta);
426 }
427 
enqueue(d1::task & t,d1::task_arena_base * ta)428 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
429     task_arena_impl::enqueue(t, nullptr, ta);
430 }
431 
enqueue(d1::task & t,d1::task_group_context & ctx,d1::task_arena_base * ta)432 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
433     task_arena_impl::enqueue(t, &ctx, ta);
434 }
435 
initialize(d1::task_arena_base & ta)436 void task_arena_impl::initialize(d1::task_arena_base& ta) {
437     // Enforce global market initialization to properly initialize soft limit
438     (void)governor::get_thread_data();
439     if (ta.my_max_concurrency < 1) {
440 #if __TBB_ARENA_BINDING
441 
442 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
443         d1::constraints arena_constraints = d1::constraints{}
444             .set_core_type(ta.core_type())
445             .set_max_threads_per_core(ta.max_threads_per_core())
446             .set_numa_id(ta.my_numa_id);
447         ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
448 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
449         ta.my_max_concurrency = (int)default_concurrency(ta.my_numa_id);
450 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
451 
452 #else /*!__TBB_ARENA_BINDING*/
453         ta.my_max_concurrency = (int)governor::default_num_threads();
454 #endif /*!__TBB_ARENA_BINDING*/
455     }
456 
457     __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
458     unsigned priority_level = arena_priority_level(ta.my_priority);
459     arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0);
460     ta.my_arena.store(a, std::memory_order_release);
461     // add an internal market reference; a public reference was added in create_arena
462     market::global_market( /*is_public=*/false);
463 #if __TBB_ARENA_BINDING
464     a->my_numa_binding_observer = construct_binding_observer(
465         static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
466 #endif /*__TBB_ARENA_BINDING*/
467 }
468 
terminate(d1::task_arena_base & ta)469 void task_arena_impl::terminate(d1::task_arena_base& ta) {
470     arena* a = ta.my_arena.load(std::memory_order_relaxed);
471     assert_pointer_valid(a);
472     a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
473     a->on_thread_leaving<arena::ref_external>();
474     ta.my_arena.store(nullptr, std::memory_order_relaxed);
475 }
476 
attach(d1::task_arena_base & ta)477 bool task_arena_impl::attach(d1::task_arena_base& ta) {
478     __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
479     thread_data* td = governor::get_thread_data_if_initialized();
480     if( td && td->my_arena ) {
481         arena* a = td->my_arena;
482         // There is an active arena to attach to.
483         // It's still used by s, so won't be destroyed right away.
484         __TBB_ASSERT(a->my_references > 0, NULL );
485         a->my_references += arena::ref_external;
486         ta.my_num_reserved_slots = a->my_num_reserved_slots;
487         ta.my_priority = arena_priority(a->my_priority_level);
488         ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
489         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, NULL);
490         ta.my_arena.store(a, std::memory_order_release);
491         // increases market's ref count for task_arena
492         market::global_market( /*is_public=*/true );
493         return true;
494     }
495     return false;
496 }
497 
enqueue(d1::task & t,d1::task_group_context * c,d1::task_arena_base * ta)498 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
499     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
500     assert_pointer_valid(td, "thread_data pointer should not be null");
501     arena* a = ta ?
502               ta->my_arena.load(std::memory_order_relaxed)
503             : td->my_arena
504     ;
505     assert_pointer_valid(a, "arena pointer should not be null");
506     auto* ctx = c ? c : a->my_default_ctx;
507     assert_pointer_valid(ctx, "context pointer should not be null");
508     // Is there a better place for checking the state of ctx?
509      __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
510                   "The task will not be executed because its task_group_context is cancelled.");
511      a->enqueue_task(t, *ctx, *td);
512 }
513 
514 class nested_arena_context : no_copy {
515 public:
nested_arena_context(thread_data & td,arena & nested_arena,std::size_t slot_index)516     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
517         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
518     {
519         if (td.my_arena != &nested_arena) {
520             m_orig_arena = td.my_arena;
521             m_orig_slot_index = td.my_arena_index;
522             m_orig_last_observer = td.my_last_observer;
523 
524             td.detach_task_dispatcher();
525             td.attach_arena(nested_arena, slot_index);
526             if (td.my_inbox.is_idle_state(true))
527                 td.my_inbox.set_is_idle(false);
528             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
529             task_disp.set_stealing_threshold(m_orig_execute_data_ext.task_disp->m_stealing_threshold);
530             td.attach_task_dispatcher(task_disp);
531 
532             // If the calling thread occupies the slots out of external thread reserve we need to notify the
533             // market that this arena requires one worker less.
534             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
535                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false);
536             }
537 
538             td.my_last_observer = nullptr;
539             // The task_arena::execute method considers each calling thread as an external thread.
540             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
541         }
542 
543         m_task_dispatcher = td.my_task_dispatcher;
544         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
545         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
546         m_task_dispatcher->m_properties.critical_task_allowed = true;
547 
548         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
549         ed_ext.context = td.my_arena->my_default_ctx;
550         ed_ext.original_slot = td.my_arena_index;
551         ed_ext.affinity_slot = d1::no_slot;
552         ed_ext.task_disp = td.my_task_dispatcher;
553         ed_ext.isolation = no_isolation;
554 
555         __TBB_ASSERT(td.my_arena_slot, nullptr);
556         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
557         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
558     }
~nested_arena_context()559     ~nested_arena_context() {
560         thread_data& td = *m_task_dispatcher->m_thread_data;
561         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
562         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
563         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
564         if (m_orig_arena) {
565             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
566             td.my_last_observer = m_orig_last_observer;
567 
568             // Notify the market that this thread releasing a one slot
569             // that can be used by a worker thread.
570             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
571                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false);
572             }
573 
574             td.my_task_dispatcher->set_stealing_threshold(0);
575             td.detach_task_dispatcher();
576             td.my_arena_slot->release();
577             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
578 
579             td.attach_arena(*m_orig_arena, m_orig_slot_index);
580             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
581             __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
582         }
583         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
584     }
585 
586 private:
587     execution_data_ext    m_orig_execute_data_ext{};
588     arena*              m_orig_arena{ nullptr };
589     observer_proxy*     m_orig_last_observer{ nullptr };
590     task_dispatcher*    m_task_dispatcher{ nullptr };
591     unsigned            m_orig_slot_index{};
592     bool                m_orig_fifo_tasks_allowed{};
593     bool                m_orig_critical_task_allowed{};
594 };
595 
596 class delegated_task : public d1::task {
597     d1::delegate_base&  m_delegate;
598     concurrent_monitor& m_monitor;
599     d1::wait_context&   m_wait_ctx;
600     std::atomic<bool>   m_completed;
execute(d1::execution_data & ed)601     d1::task* execute(d1::execution_data& ed) override {
602         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
603         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
604         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
605             "The execute data shall point to the current task dispatcher execute data");
606         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
607 
608         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
609         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
610         try_call([&] {
611             m_delegate();
612         }).on_completion([&] {
613             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
614             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
615         });
616 
617         finalize();
618         return nullptr;
619     }
cancel(d1::execution_data &)620     d1::task* cancel(d1::execution_data&) override {
621         finalize();
622         return nullptr;
623     }
finalize()624     void finalize() {
625         m_wait_ctx.release(); // must precede the wakeup
626         m_monitor.notify([this](std::uintptr_t ctx) {
627             return ctx == std::uintptr_t(&m_delegate);
628         }); // do not relax, it needs a fence!
629         m_completed.store(true, std::memory_order_release);
630     }
631 public:
delegated_task(d1::delegate_base & d,concurrent_monitor & s,d1::wait_context & wo)632     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
633         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
~delegated_task()634     ~delegated_task() {
635         // The destructor can be called earlier than the m_monitor is notified
636         // because the waiting thread can be released after m_wait_ctx.release_wait.
637         // To close that race we wait for the m_completed signal.
638         spin_wait_until_eq(m_completed, true);
639     }
640 };
641 
execute(d1::task_arena_base & ta,d1::delegate_base & d)642 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
643     arena* a = ta.my_arena.load(std::memory_order_relaxed);
644     __TBB_ASSERT(a != nullptr, nullptr);
645     thread_data* td = governor::get_thread_data();
646 
647     bool same_arena = td->my_arena == a;
648     std::size_t index1 = td->my_arena_index;
649     if (!same_arena) {
650         index1 = a->occupy_free_slot</*as_worker */false>(*td);
651         if (index1 == arena::out_of_arena) {
652             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
653             d1::wait_context wo(1);
654             d1::task_group_context exec_context(d1::task_group_context::isolated);
655             task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
656 
657             delegated_task dt(d, a->my_exit_monitors, wo);
658             a->enqueue_task( dt, exec_context, *td);
659             size_t index2 = arena::out_of_arena;
660             do {
661                 a->my_exit_monitors.prepare_wait(waiter);
662                 if (!wo.continue_execution()) {
663                     a->my_exit_monitors.cancel_wait(waiter);
664                     break;
665                 }
666                 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
667                 if (index2 != arena::out_of_arena) {
668                     a->my_exit_monitors.cancel_wait(waiter);
669                     nested_arena_context scope(*td, *a, index2 );
670                     r1::wait(wo, exec_context);
671                     __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
672                     break;
673                 }
674                 a->my_exit_monitors.commit_wait(waiter);
675             } while (wo.continue_execution());
676             if (index2 == arena::out_of_arena) {
677                 // notify a waiting thread even if this thread did not enter arena,
678                 // in case it was woken by a leaving thread but did not need to enter
679                 a->my_exit_monitors.notify_one(); // do not relax!
680             }
681             // process possible exception
682             auto exception = exec_context.my_exception.load(std::memory_order_acquire);
683             if (exception) {
684                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
685                 exception->throw_self();
686             }
687             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
688             return;
689         } // if (index1 == arena::out_of_arena)
690     } // if (!same_arena)
691 
692     context_guard_helper</*report_tasks=*/false> context_guard;
693     context_guard.set_ctx(a->my_default_ctx);
694     nested_arena_context scope(*td, *a, index1);
695 #if _WIN64
696     try {
697 #endif
698         d();
699         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
700 #if _WIN64
701     } catch (...) {
702         context_guard.restore_default();
703         throw;
704     }
705 #endif
706 }
707 
wait(d1::task_arena_base & ta)708 void task_arena_impl::wait(d1::task_arena_base& ta) {
709     arena* a = ta.my_arena.load(std::memory_order_relaxed);
710     __TBB_ASSERT(a != nullptr, nullptr);
711     thread_data* td = governor::get_thread_data();
712     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
713     __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
714     if (a->my_max_num_workers != 0) {
715         while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) {
716             yield();
717         }
718     }
719 }
720 
max_concurrency(const d1::task_arena_base * ta)721 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
722     arena* a = nullptr;
723     if( ta ) // for special cases of ta->max_concurrency()
724         a = ta->my_arena.load(std::memory_order_relaxed);
725     else if( thread_data* td = governor::get_thread_data_if_initialized() )
726         a = td->my_arena; // the current arena if any
727 
728     if( a ) { // Get parameters from the arena
729         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
730         return a->my_num_reserved_slots + a->my_max_num_workers
731 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
732             + (a->my_local_concurrency_flag.test() ? 1 : 0)
733 #endif
734             ;
735     }
736 
737     if (ta && ta->my_max_concurrency == 1) {
738         return 1;
739     }
740 
741 #if __TBB_ARENA_BINDING
742     if (ta) {
743 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
744         d1::constraints arena_constraints = d1::constraints{}
745             .set_numa_id(ta->my_numa_id)
746             .set_core_type(ta->core_type())
747             .set_max_threads_per_core(ta->max_threads_per_core());
748         return (int)default_concurrency(arena_constraints);
749 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
750         return (int)default_concurrency(ta->my_numa_id);
751 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
752     }
753 #endif /*!__TBB_ARENA_BINDING*/
754 
755     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, NULL );
756     return int(governor::default_num_threads());
757 }
758 
isolate_within_arena(d1::delegate_base & d,std::intptr_t isolation)759 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
760     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
761     thread_data* tls = governor::get_thread_data();
762     assert_pointers_valid(tls, tls->my_task_dispatcher);
763     task_dispatcher* dispatcher = tls->my_task_dispatcher;
764     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
765     try_call([&] {
766         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
767         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
768         // Save the current isolation value and set new one
769         previous_isolation = dispatcher->set_isolation(current_isolation);
770         // Isolation within this callable
771         d();
772     }).on_completion([&] {
773         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, NULL);
774         dispatcher->set_isolation(previous_isolation);
775     });
776 }
777 
778 } // namespace r1
779 } // namespace detail
780 } // namespace tbb
781