1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "arena.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "waiters.h"
23 #include "oneapi/tbb/detail/_task.h"
24 #include "oneapi/tbb/info.h"
25 #include "oneapi/tbb/tbb_allocator.h"
26
27 #include <atomic>
28 #include <cstring>
29 #include <functional>
30
31 namespace tbb {
32 namespace detail {
33 namespace r1 {
34
35 #if __TBB_ARENA_BINDING
36 class numa_binding_observer : public tbb::task_scheduler_observer {
37 binding_handler* my_binding_handler;
38 public:
numa_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)39 numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
40 : task_scheduler_observer(*ta)
41 , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
42 {}
43
on_scheduler_entry(bool)44 void on_scheduler_entry( bool ) override {
45 apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
46 }
47
on_scheduler_exit(bool)48 void on_scheduler_exit( bool ) override {
49 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
50 }
51
~numa_binding_observer()52 ~numa_binding_observer(){
53 destroy_binding_handler(my_binding_handler);
54 }
55 };
56
construct_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
58 numa_binding_observer* binding_observer = nullptr;
59 if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
60 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
61 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
62 binding_observer->observe(true);
63 }
64 return binding_observer;
65 }
66
destroy_binding_observer(numa_binding_observer * binding_observer)67 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
68 __TBB_ASSERT(binding_observer, "Trying to deallocate NULL pointer");
69 binding_observer->observe(false);
70 binding_observer->~numa_binding_observer();
71 deallocate_memory(binding_observer);
72 }
73 #endif /*!__TBB_ARENA_BINDING*/
74
occupy_free_slot_in_range(thread_data & tls,std::size_t lower,std::size_t upper)75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
76 if ( lower >= upper ) return out_of_arena;
77 // Start search for an empty slot from the one we occupied the last time
78 std::size_t index = tls.my_arena_index;
79 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
80 __TBB_ASSERT( index >= lower && index < upper, NULL );
81 // Find a free slot
82 for ( std::size_t i = index; i < upper; ++i )
83 if (my_slots[i].try_occupy()) return i;
84 for ( std::size_t i = lower; i < index; ++i )
85 if (my_slots[i].try_occupy()) return i;
86 return out_of_arena;
87 }
88
89 template <bool as_worker>
occupy_free_slot(thread_data & tls)90 std::size_t arena::occupy_free_slot(thread_data& tls) {
91 // Firstly, external threads try to occupy reserved slots
92 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots );
93 if ( index == out_of_arena ) {
94 // Secondly, all threads try to occupy all non-reserved slots
95 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
96 // Likely this arena is already saturated
97 if ( index == out_of_arena )
98 return out_of_arena;
99 }
100
101 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
102 return index;
103 }
104
calculate_stealing_threshold()105 std::uintptr_t arena::calculate_stealing_threshold() {
106 stack_anchor_type anchor;
107 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size());
108 }
109
process(thread_data & tls)110 void arena::process(thread_data& tls) {
111 governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
112 __TBB_ASSERT( is_alive(my_guard), nullptr);
113 __TBB_ASSERT( my_num_slots > 1, nullptr);
114
115 std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
116 if (index == out_of_arena) {
117 on_thread_leaving<ref_worker>();
118 return;
119 }
120 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
121 tls.attach_arena(*this, index);
122 // worker thread enters the dispatch loop to look for a work
123 tls.my_inbox.set_is_idle(true);
124 if (tls.my_arena_slot->is_task_pool_published()) {
125 tls.my_inbox.set_is_idle(false);
126 }
127
128 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
129 task_disp.set_stealing_threshold(calculate_stealing_threshold());
130 __TBB_ASSERT(task_disp.can_steal(), nullptr);
131 tls.attach_task_dispatcher(task_disp);
132
133 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
134 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
135
136 // Waiting on special object tied to this arena
137 outermost_worker_waiter waiter(*this);
138 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
139 // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
140 // attached to it.
141 tls.my_inbox.set_is_idle(true);
142
143 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
144 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
145 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
146
147 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
148 tls.my_last_observer = nullptr;
149
150 task_disp.set_stealing_threshold(0);
151 tls.detach_task_dispatcher();
152
153 // Arena slot detach (arena may be used in market::process)
154 // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
155 tls.my_arena_slot->release();
156 tls.my_arena_slot = nullptr;
157 tls.my_inbox.detach();
158 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
159 __TBB_ASSERT(is_alive(my_guard), nullptr);
160
161 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
162 // that arena may be temporarily left unpopulated by threads. See comments in
163 // arena::on_thread_leaving() for more details.
164 on_thread_leaving<ref_worker>();
165 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
166 }
167
arena(market & m,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)168 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level )
169 {
170 __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
171 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
172 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
173 my_market = &m;
174 my_limit = 1;
175 // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
176 my_num_slots = num_arena_slots(num_slots);
177 my_num_reserved_slots = num_reserved_slots;
178 my_max_num_workers = num_slots-num_reserved_slots;
179 my_priority_level = priority_level;
180 my_references = ref_external; // accounts for the external thread
181 my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed);
182 my_observers.my_arena = this;
183 my_co_cache.init(4 * num_slots);
184 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL );
185 // Initialize the default context. It should be allocated before task_dispatch construction.
186 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
187 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
188 // Construct slots. Mark internal synchronization elements for the tools.
189 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
190 for( unsigned i = 0; i < my_num_slots; ++i ) {
191 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
192 __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
193 __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
194 mailbox(i).construct();
195 my_slots[i].init_task_streams(i);
196 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
197 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
198 }
199 my_fifo_task_stream.initialize(my_num_slots);
200 my_resume_task_stream.initialize(my_num_slots);
201 #if __TBB_PREVIEW_CRITICAL_TASKS
202 my_critical_task_stream.initialize(my_num_slots);
203 #endif
204 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
205 my_local_concurrency_requests = 0;
206 my_local_concurrency_flag.clear();
207 my_global_concurrency_mode.store(false, std::memory_order_relaxed);
208 #endif
209 }
210
allocate_arena(market & m,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)211 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
212 unsigned priority_level )
213 {
214 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
215 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
216 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
217 std::size_t n = allocation_size(num_arena_slots(num_slots));
218 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
219 // Zero all slots to indicate that they are empty
220 std::memset( storage, 0, n );
221 return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) )
222 arena(m, num_slots, num_reserved_slots, priority_level);
223 }
224
free_arena()225 void arena::free_arena () {
226 __TBB_ASSERT( is_alive(my_guard), NULL );
227 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
228 __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
229 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers,
230 "Inconsistent state of a dying arena" );
231 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
232 __TBB_ASSERT( !my_global_concurrency_mode, NULL );
233 #endif
234 #if __TBB_ARENA_BINDING
235 if (my_numa_binding_observer != nullptr) {
236 destroy_binding_observer(my_numa_binding_observer);
237 my_numa_binding_observer = nullptr;
238 }
239 #endif /*__TBB_ARENA_BINDING*/
240 poison_value( my_guard );
241 for ( unsigned i = 0; i < my_num_slots; ++i ) {
242 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
243 // TODO: understand the assertion and modify
244 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
245 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
246 my_slots[i].free_task_pool();
247 mailbox(i).drain();
248 my_slots[i].my_default_task_dispatcher->~task_dispatcher();
249 }
250 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
251 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
252 // Cleanup coroutines/schedulers cache
253 my_co_cache.cleanup();
254 my_default_ctx->~task_group_context();
255 cache_aligned_deallocate(my_default_ctx);
256 #if __TBB_PREVIEW_CRITICAL_TASKS
257 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
258 #endif
259 // remove an internal reference
260 my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
261
262 // Clear enfources synchronization with observe(false)
263 my_observers.clear();
264
265 void* storage = &mailbox(my_num_slots-1);
266 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, NULL );
267 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, NULL );
268 this->~arena();
269 #if TBB_USE_ASSERT > 1
270 std::memset( storage, 0, allocation_size(my_num_slots) );
271 #endif /* TBB_USE_ASSERT */
272 cache_aligned_deallocate( storage );
273 }
274
has_enqueued_tasks()275 bool arena::has_enqueued_tasks() {
276 return !my_fifo_task_stream.empty();
277 }
278
is_out_of_work()279 bool arena::is_out_of_work() {
280 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
281 if (my_local_concurrency_flag.try_clear_if([this] {
282 return !has_enqueued_tasks();
283 })) {
284 my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true);
285 }
286 #endif
287
288 // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
289 switch (my_pool_state.load(std::memory_order_acquire)) {
290 case SNAPSHOT_EMPTY:
291 return true;
292 case SNAPSHOT_FULL: {
293 // Use unique id for "busy" in order to avoid ABA problems.
294 const pool_state_t busy = pool_state_t(&busy);
295 // Helper for CAS execution
296 pool_state_t expected_state;
297
298 // Request permission to take snapshot
299 expected_state = SNAPSHOT_FULL;
300 if (my_pool_state.compare_exchange_strong(expected_state, busy)) {
301 // Got permission. Take the snapshot.
302 // NOTE: This is not a lock, as the state can be set to FULL at
303 // any moment by a thread that spawns/enqueues new task.
304 std::size_t n = my_limit.load(std::memory_order_acquire);
305 // Make local copies of volatile parameters. Their change during
306 // snapshot taking procedure invalidates the attempt, and returns
307 // this thread into the dispatch loop.
308 std::size_t k;
309 for (k = 0; k < n; ++k) {
310 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool &&
311 my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed))
312 {
313 // k-th primary task pool is nonempty and does contain tasks.
314 break;
315 }
316 if (my_pool_state.load(std::memory_order_acquire) != busy)
317 return false; // the work was published
318 }
319 bool work_absent = k == n;
320 // Test and test-and-set.
321 if (my_pool_state.load(std::memory_order_acquire) == busy) {
322 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty();
323 #if __TBB_PREVIEW_CRITICAL_TASKS
324 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty();
325 #endif
326 work_absent = work_absent && no_stream_tasks;
327 if (work_absent) {
328 // save current demand value before setting SNAPSHOT_EMPTY,
329 // to avoid race with advertise_new_work.
330 int current_demand = (int)my_max_num_workers;
331 expected_state = busy;
332 if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) {
333 // This thread transitioned pool to empty state, and thus is
334 // responsible for telling the market that there is no work to do.
335 my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false);
336 return true;
337 }
338 return false;
339 }
340 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
341 expected_state = busy;
342 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL);
343 }
344 }
345 return false;
346 }
347 default:
348 // Another thread is taking a snapshot.
349 return false;
350 }
351 }
352
enqueue_task(d1::task & t,d1::task_group_context & ctx,thread_data & td)353 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
354 task_group_context_impl::bind_to(ctx, &td);
355 task_accessor::context(t) = &ctx;
356 task_accessor::isolation(t) = no_isolation;
357 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
358 advertise_new_work<work_enqueued>();
359 }
360
361 } // namespace r1
362 } // namespace detail
363 } // namespace tbb
364
365 // Enable task_arena.h
366 #include "oneapi/tbb/task_arena.h" // task_arena_base
367
368 namespace tbb {
369 namespace detail {
370 namespace r1 {
371
372 #if TBB_USE_ASSERT
assert_arena_priority_valid(tbb::task_arena::priority a_priority)373 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
374 bool is_arena_priority_correct =
375 a_priority == tbb::task_arena::priority::high ||
376 a_priority == tbb::task_arena::priority::normal ||
377 a_priority == tbb::task_arena::priority::low;
378 __TBB_ASSERT( is_arena_priority_correct,
379 "Task arena priority should be equal to one of the predefined values." );
380 }
381 #else
382 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
383 #endif
384
arena_priority_level(tbb::task_arena::priority a_priority)385 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
386 assert_arena_priority_valid( a_priority );
387 return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
388 }
389
arena_priority(unsigned priority_level)390 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
391 auto priority = tbb::task_arena::priority(
392 (market::num_priority_levels - priority_level) * d1::priority_stride
393 );
394 assert_arena_priority_valid( priority );
395 return priority;
396 }
397
398 struct task_arena_impl {
399 static void initialize(d1::task_arena_base&);
400 static void terminate(d1::task_arena_base&);
401 static bool attach(d1::task_arena_base&);
402 static void execute(d1::task_arena_base&, d1::delegate_base&);
403 static void wait(d1::task_arena_base&);
404 static int max_concurrency(const d1::task_arena_base*);
405 static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
406 };
407
initialize(d1::task_arena_base & ta)408 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
409 task_arena_impl::initialize(ta);
410 }
terminate(d1::task_arena_base & ta)411 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
412 task_arena_impl::terminate(ta);
413 }
attach(d1::task_arena_base & ta)414 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
415 return task_arena_impl::attach(ta);
416 }
execute(d1::task_arena_base & ta,d1::delegate_base & d)417 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
418 task_arena_impl::execute(ta, d);
419 }
wait(d1::task_arena_base & ta)420 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
421 task_arena_impl::wait(ta);
422 }
423
max_concurrency(const d1::task_arena_base * ta)424 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
425 return task_arena_impl::max_concurrency(ta);
426 }
427
enqueue(d1::task & t,d1::task_arena_base * ta)428 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
429 task_arena_impl::enqueue(t, nullptr, ta);
430 }
431
enqueue(d1::task & t,d1::task_group_context & ctx,d1::task_arena_base * ta)432 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
433 task_arena_impl::enqueue(t, &ctx, ta);
434 }
435
initialize(d1::task_arena_base & ta)436 void task_arena_impl::initialize(d1::task_arena_base& ta) {
437 // Enforce global market initialization to properly initialize soft limit
438 (void)governor::get_thread_data();
439 if (ta.my_max_concurrency < 1) {
440 #if __TBB_ARENA_BINDING
441
442 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
443 d1::constraints arena_constraints = d1::constraints{}
444 .set_core_type(ta.core_type())
445 .set_max_threads_per_core(ta.max_threads_per_core())
446 .set_numa_id(ta.my_numa_id);
447 ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
448 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
449 ta.my_max_concurrency = (int)default_concurrency(ta.my_numa_id);
450 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
451
452 #else /*!__TBB_ARENA_BINDING*/
453 ta.my_max_concurrency = (int)governor::default_num_threads();
454 #endif /*!__TBB_ARENA_BINDING*/
455 }
456
457 __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
458 unsigned priority_level = arena_priority_level(ta.my_priority);
459 arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0);
460 ta.my_arena.store(a, std::memory_order_release);
461 // add an internal market reference; a public reference was added in create_arena
462 market::global_market( /*is_public=*/false);
463 #if __TBB_ARENA_BINDING
464 a->my_numa_binding_observer = construct_binding_observer(
465 static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
466 #endif /*__TBB_ARENA_BINDING*/
467 }
468
terminate(d1::task_arena_base & ta)469 void task_arena_impl::terminate(d1::task_arena_base& ta) {
470 arena* a = ta.my_arena.load(std::memory_order_relaxed);
471 assert_pointer_valid(a);
472 a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
473 a->on_thread_leaving<arena::ref_external>();
474 ta.my_arena.store(nullptr, std::memory_order_relaxed);
475 }
476
attach(d1::task_arena_base & ta)477 bool task_arena_impl::attach(d1::task_arena_base& ta) {
478 __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
479 thread_data* td = governor::get_thread_data_if_initialized();
480 if( td && td->my_arena ) {
481 arena* a = td->my_arena;
482 // There is an active arena to attach to.
483 // It's still used by s, so won't be destroyed right away.
484 __TBB_ASSERT(a->my_references > 0, NULL );
485 a->my_references += arena::ref_external;
486 ta.my_num_reserved_slots = a->my_num_reserved_slots;
487 ta.my_priority = arena_priority(a->my_priority_level);
488 ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
489 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, NULL);
490 ta.my_arena.store(a, std::memory_order_release);
491 // increases market's ref count for task_arena
492 market::global_market( /*is_public=*/true );
493 return true;
494 }
495 return false;
496 }
497
enqueue(d1::task & t,d1::task_group_context * c,d1::task_arena_base * ta)498 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
499 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance
500 assert_pointer_valid(td, "thread_data pointer should not be null");
501 arena* a = ta ?
502 ta->my_arena.load(std::memory_order_relaxed)
503 : td->my_arena
504 ;
505 assert_pointer_valid(a, "arena pointer should not be null");
506 auto* ctx = c ? c : a->my_default_ctx;
507 assert_pointer_valid(ctx, "context pointer should not be null");
508 // Is there a better place for checking the state of ctx?
509 __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
510 "The task will not be executed because its task_group_context is cancelled.");
511 a->enqueue_task(t, *ctx, *td);
512 }
513
514 class nested_arena_context : no_copy {
515 public:
nested_arena_context(thread_data & td,arena & nested_arena,std::size_t slot_index)516 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
517 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
518 {
519 if (td.my_arena != &nested_arena) {
520 m_orig_arena = td.my_arena;
521 m_orig_slot_index = td.my_arena_index;
522 m_orig_last_observer = td.my_last_observer;
523
524 td.detach_task_dispatcher();
525 td.attach_arena(nested_arena, slot_index);
526 if (td.my_inbox.is_idle_state(true))
527 td.my_inbox.set_is_idle(false);
528 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
529 task_disp.set_stealing_threshold(m_orig_execute_data_ext.task_disp->m_stealing_threshold);
530 td.attach_task_dispatcher(task_disp);
531
532 // If the calling thread occupies the slots out of external thread reserve we need to notify the
533 // market that this arena requires one worker less.
534 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
535 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false);
536 }
537
538 td.my_last_observer = nullptr;
539 // The task_arena::execute method considers each calling thread as an external thread.
540 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
541 }
542
543 m_task_dispatcher = td.my_task_dispatcher;
544 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
545 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
546 m_task_dispatcher->m_properties.critical_task_allowed = true;
547
548 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
549 ed_ext.context = td.my_arena->my_default_ctx;
550 ed_ext.original_slot = td.my_arena_index;
551 ed_ext.affinity_slot = d1::no_slot;
552 ed_ext.task_disp = td.my_task_dispatcher;
553 ed_ext.isolation = no_isolation;
554
555 __TBB_ASSERT(td.my_arena_slot, nullptr);
556 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
557 __TBB_ASSERT(td.my_task_dispatcher, nullptr);
558 }
~nested_arena_context()559 ~nested_arena_context() {
560 thread_data& td = *m_task_dispatcher->m_thread_data;
561 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
562 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
563 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
564 if (m_orig_arena) {
565 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
566 td.my_last_observer = m_orig_last_observer;
567
568 // Notify the market that this thread releasing a one slot
569 // that can be used by a worker thread.
570 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
571 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false);
572 }
573
574 td.my_task_dispatcher->set_stealing_threshold(0);
575 td.detach_task_dispatcher();
576 td.my_arena_slot->release();
577 td.my_arena->my_exit_monitors.notify_one(); // do not relax!
578
579 td.attach_arena(*m_orig_arena, m_orig_slot_index);
580 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
581 __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
582 }
583 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
584 }
585
586 private:
587 execution_data_ext m_orig_execute_data_ext{};
588 arena* m_orig_arena{ nullptr };
589 observer_proxy* m_orig_last_observer{ nullptr };
590 task_dispatcher* m_task_dispatcher{ nullptr };
591 unsigned m_orig_slot_index{};
592 bool m_orig_fifo_tasks_allowed{};
593 bool m_orig_critical_task_allowed{};
594 };
595
596 class delegated_task : public d1::task {
597 d1::delegate_base& m_delegate;
598 concurrent_monitor& m_monitor;
599 d1::wait_context& m_wait_ctx;
600 std::atomic<bool> m_completed;
execute(d1::execution_data & ed)601 d1::task* execute(d1::execution_data& ed) override {
602 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
603 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
604 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
605 "The execute data shall point to the current task dispatcher execute data");
606 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
607
608 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
609 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
610 try_call([&] {
611 m_delegate();
612 }).on_completion([&] {
613 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
614 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
615 });
616
617 finalize();
618 return nullptr;
619 }
cancel(d1::execution_data &)620 d1::task* cancel(d1::execution_data&) override {
621 finalize();
622 return nullptr;
623 }
finalize()624 void finalize() {
625 m_wait_ctx.release(); // must precede the wakeup
626 m_monitor.notify([this](std::uintptr_t ctx) {
627 return ctx == std::uintptr_t(&m_delegate);
628 }); // do not relax, it needs a fence!
629 m_completed.store(true, std::memory_order_release);
630 }
631 public:
delegated_task(d1::delegate_base & d,concurrent_monitor & s,d1::wait_context & wo)632 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
633 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
~delegated_task()634 ~delegated_task() {
635 // The destructor can be called earlier than the m_monitor is notified
636 // because the waiting thread can be released after m_wait_ctx.release_wait.
637 // To close that race we wait for the m_completed signal.
638 spin_wait_until_eq(m_completed, true);
639 }
640 };
641
execute(d1::task_arena_base & ta,d1::delegate_base & d)642 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
643 arena* a = ta.my_arena.load(std::memory_order_relaxed);
644 __TBB_ASSERT(a != nullptr, nullptr);
645 thread_data* td = governor::get_thread_data();
646
647 bool same_arena = td->my_arena == a;
648 std::size_t index1 = td->my_arena_index;
649 if (!same_arena) {
650 index1 = a->occupy_free_slot</*as_worker */false>(*td);
651 if (index1 == arena::out_of_arena) {
652 concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
653 d1::wait_context wo(1);
654 d1::task_group_context exec_context(d1::task_group_context::isolated);
655 task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
656
657 delegated_task dt(d, a->my_exit_monitors, wo);
658 a->enqueue_task( dt, exec_context, *td);
659 size_t index2 = arena::out_of_arena;
660 do {
661 a->my_exit_monitors.prepare_wait(waiter);
662 if (!wo.continue_execution()) {
663 a->my_exit_monitors.cancel_wait(waiter);
664 break;
665 }
666 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
667 if (index2 != arena::out_of_arena) {
668 a->my_exit_monitors.cancel_wait(waiter);
669 nested_arena_context scope(*td, *a, index2 );
670 r1::wait(wo, exec_context);
671 __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
672 break;
673 }
674 a->my_exit_monitors.commit_wait(waiter);
675 } while (wo.continue_execution());
676 if (index2 == arena::out_of_arena) {
677 // notify a waiting thread even if this thread did not enter arena,
678 // in case it was woken by a leaving thread but did not need to enter
679 a->my_exit_monitors.notify_one(); // do not relax!
680 }
681 // process possible exception
682 auto exception = exec_context.my_exception.load(std::memory_order_acquire);
683 if (exception) {
684 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
685 exception->throw_self();
686 }
687 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
688 return;
689 } // if (index1 == arena::out_of_arena)
690 } // if (!same_arena)
691
692 context_guard_helper</*report_tasks=*/false> context_guard;
693 context_guard.set_ctx(a->my_default_ctx);
694 nested_arena_context scope(*td, *a, index1);
695 #if _WIN64
696 try {
697 #endif
698 d();
699 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
700 #if _WIN64
701 } catch (...) {
702 context_guard.restore_default();
703 throw;
704 }
705 #endif
706 }
707
wait(d1::task_arena_base & ta)708 void task_arena_impl::wait(d1::task_arena_base& ta) {
709 arena* a = ta.my_arena.load(std::memory_order_relaxed);
710 __TBB_ASSERT(a != nullptr, nullptr);
711 thread_data* td = governor::get_thread_data();
712 __TBB_ASSERT_EX(td, "Scheduler is not initialized");
713 __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
714 if (a->my_max_num_workers != 0) {
715 while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) {
716 yield();
717 }
718 }
719 }
720
max_concurrency(const d1::task_arena_base * ta)721 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
722 arena* a = nullptr;
723 if( ta ) // for special cases of ta->max_concurrency()
724 a = ta->my_arena.load(std::memory_order_relaxed);
725 else if( thread_data* td = governor::get_thread_data_if_initialized() )
726 a = td->my_arena; // the current arena if any
727
728 if( a ) { // Get parameters from the arena
729 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
730 return a->my_num_reserved_slots + a->my_max_num_workers
731 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
732 + (a->my_local_concurrency_flag.test() ? 1 : 0)
733 #endif
734 ;
735 }
736
737 if (ta && ta->my_max_concurrency == 1) {
738 return 1;
739 }
740
741 #if __TBB_ARENA_BINDING
742 if (ta) {
743 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
744 d1::constraints arena_constraints = d1::constraints{}
745 .set_numa_id(ta->my_numa_id)
746 .set_core_type(ta->core_type())
747 .set_max_threads_per_core(ta->max_threads_per_core());
748 return (int)default_concurrency(arena_constraints);
749 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
750 return (int)default_concurrency(ta->my_numa_id);
751 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
752 }
753 #endif /*!__TBB_ARENA_BINDING*/
754
755 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, NULL );
756 return int(governor::default_num_threads());
757 }
758
isolate_within_arena(d1::delegate_base & d,std::intptr_t isolation)759 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
760 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
761 thread_data* tls = governor::get_thread_data();
762 assert_pointers_valid(tls, tls->my_task_dispatcher);
763 task_dispatcher* dispatcher = tls->my_task_dispatcher;
764 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
765 try_call([&] {
766 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
767 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
768 // Save the current isolation value and set new one
769 previous_isolation = dispatcher->set_isolation(current_isolation);
770 // Isolation within this callable
771 d();
772 }).on_completion([&] {
773 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, NULL);
774 dispatcher->set_isolation(previous_isolation);
775 });
776 }
777
778 } // namespace r1
779 } // namespace detail
780 } // namespace tbb
781