1 /*
2 Copyright (c) 2005-2017 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15
16
17
18
19 */
20
21 #include "tbb/global_control.h" // thread_stack_size
22
23 #include "scheduler.h"
24 #include "governor.h"
25 #include "arena.h"
26 #include "itt_notify.h"
27 #include "semaphore.h"
28 #include "tbb/internal/_flow_graph_impl.h"
29
30 #include <functional>
31
32 #if __TBB_STATISTICS_STDOUT
33 #include <cstdio>
34 #endif
35
36 namespace tbb {
37 namespace internal {
38
39 // put it here in order to enable compiler to inline it into arena::process and nested_arena_entry
attach_arena(arena * a,size_t index,bool is_master)40 void generic_scheduler::attach_arena( arena* a, size_t index, bool is_master ) {
41 __TBB_ASSERT( a->my_market == my_market, NULL );
42 my_arena = a;
43 my_arena_index = index;
44 my_arena_slot = a->my_slots + index;
45 attach_mailbox( affinity_id(index+1) );
46 if ( is_master && my_inbox.is_idle_state( true ) ) {
47 // Master enters an arena with its own task to be executed. It means that master is not
48 // going to enter stealing loop and take affinity tasks.
49 my_inbox.set_is_idle( false );
50 }
51 #if __TBB_TASK_GROUP_CONTEXT
52 // Context to be used by root tasks by default (if the user has not specified one).
53 if( !is_master )
54 my_dummy_task->prefix().context = a->my_default_ctx;
55 #endif /* __TBB_TASK_GROUP_CONTEXT */
56 #if __TBB_TASK_PRIORITY
57 // In the current implementation master threads continue processing even when
58 // there are other masters with higher priority. Only TBB worker threads are
59 // redistributed between arenas based on the latters' priority. Thus master
60 // threads use arena's top priority as a reference point (in contrast to workers
61 // that use my_market->my_global_top_priority).
62 if( is_master ) {
63 my_ref_top_priority = &a->my_top_priority;
64 my_ref_reload_epoch = &a->my_reload_epoch;
65 }
66 my_local_reload_epoch = *my_ref_reload_epoch;
67 __TBB_ASSERT( !my_offloaded_tasks, NULL );
68 #endif /* __TBB_TASK_PRIORITY */
69 }
70
occupy_slot(generic_scheduler * & slot,generic_scheduler & s)71 inline static bool occupy_slot( generic_scheduler*& slot, generic_scheduler& s ) {
72 return !slot && as_atomic( slot ).compare_and_swap( &s, NULL ) == NULL;
73 }
74
occupy_free_slot_in_range(generic_scheduler & s,size_t lower,size_t upper)75 size_t arena::occupy_free_slot_in_range( generic_scheduler& s, size_t lower, size_t upper ) {
76 if ( lower >= upper ) return out_of_arena;
77 // Start search for an empty slot from the one we occupied the last time
78 size_t index = s.my_arena_index;
79 if ( index < lower || index >= upper ) index = s.my_random.get() % (upper - lower) + lower;
80 __TBB_ASSERT( index >= lower && index < upper, NULL );
81 // Find a free slot
82 for ( size_t i = index; i < upper; ++i )
83 if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i;
84 for ( size_t i = lower; i < index; ++i )
85 if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i;
86 return out_of_arena;
87 }
88
89 template <bool as_worker>
occupy_free_slot(generic_scheduler & s)90 size_t arena::occupy_free_slot( generic_scheduler& s ) {
91 // Firstly, masters try to occupy reserved slots
92 size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( s, 0, my_num_reserved_slots );
93 if ( index == out_of_arena ) {
94 // Secondly, all threads try to occupy all non-reserved slots
95 index = occupy_free_slot_in_range( s, my_num_reserved_slots, my_num_slots );
96 // Likely this arena is already saturated
97 if ( index == out_of_arena )
98 return out_of_arena;
99 }
100
101 ITT_NOTIFY(sync_acquired, my_slots + index);
102 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
103 return index;
104 }
105
process(generic_scheduler & s)106 void arena::process( generic_scheduler& s ) {
107 __TBB_ASSERT( is_alive(my_guard), NULL );
108 __TBB_ASSERT( governor::is_set(&s), NULL );
109 __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
110 __TBB_ASSERT( s.worker_outermost_level(), NULL );
111
112 __TBB_ASSERT( my_num_slots > 1, NULL );
113
114 size_t index = occupy_free_slot</*as_worker*/true>( s );
115 if ( index == out_of_arena )
116 goto quit;
117
118 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
119 s.attach_arena( this, index, /*is_master*/false );
120
121 #if !__TBB_FP_CONTEXT
122 my_cpu_ctl_env.set_env();
123 #endif
124
125 #if __TBB_ARENA_OBSERVER
126 __TBB_ASSERT( !s.my_last_local_observer, "There cannot be notified local observers when entering arena" );
127 my_observers.notify_entry_observers( s.my_last_local_observer, /*worker=*/true );
128 #endif /* __TBB_ARENA_OBSERVER */
129
130 // Task pool can be marked as non-empty if the worker occupies the slot left by a master.
131 if ( s.my_arena_slot->task_pool != EmptyTaskPool ) {
132 __TBB_ASSERT( s.my_inbox.is_idle_state(false), NULL );
133 s.local_wait_for_all( *s.my_dummy_task, NULL );
134 __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL );
135 }
136
137 for ( ;; ) {
138 __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
139 __TBB_ASSERT( s.worker_outermost_level(), NULL );
140 __TBB_ASSERT( is_alive(my_guard), NULL );
141 __TBB_ASSERT( s.is_quiescent_local_task_pool_reset(),
142 "Worker cannot leave arena while its task pool is not reset" );
143 __TBB_ASSERT( s.my_arena_slot->task_pool == EmptyTaskPool, "Empty task pool is not marked appropriately" );
144 // This check prevents relinquishing more than necessary workers because
145 // of the non-atomicity of the decision making procedure
146 if ( num_workers_active() > my_num_workers_allotted
147 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
148 || recall_by_mandatory_request()
149 #endif
150 )
151 break;
152 // Try to steal a task.
153 // Passing reference count is technically unnecessary in this context,
154 // but omitting it here would add checks inside the function.
155 task* t = s.receive_or_steal_task( __TBB_ISOLATION_ARG( s.my_dummy_task->prefix().ref_count, no_isolation ) );
156 if (t) {
157 // A side effect of receive_or_steal_task is that my_innermost_running_task can be set.
158 // But for the outermost dispatch loop it has to be a dummy task.
159 s.my_innermost_running_task = s.my_dummy_task;
160 s.local_wait_for_all(*s.my_dummy_task,t);
161 }
162 }
163 #if __TBB_ARENA_OBSERVER
164 my_observers.notify_exit_observers( s.my_last_local_observer, /*worker=*/true );
165 s.my_last_local_observer = NULL;
166 #endif /* __TBB_ARENA_OBSERVER */
167 #if __TBB_TASK_PRIORITY
168 if ( s.my_offloaded_tasks )
169 orphan_offloaded_tasks( s );
170 #endif /* __TBB_TASK_PRIORITY */
171 #if __TBB_STATISTICS
172 ++s.my_counters.arena_roundtrips;
173 *my_slots[index].my_counters += s.my_counters;
174 s.my_counters.reset();
175 #endif /* __TBB_STATISTICS */
176 __TBB_store_with_release( my_slots[index].my_scheduler, (generic_scheduler*)NULL );
177 s.my_arena_slot = 0; // detached from slot
178 s.my_inbox.detach();
179 __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL );
180 __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
181 __TBB_ASSERT( s.worker_outermost_level(), NULL );
182 __TBB_ASSERT( is_alive(my_guard), NULL );
183 quit:
184 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
185 // that arena may be temporarily left unpopulated by threads. See comments in
186 // arena::on_thread_leaving() for more details.
187 on_thread_leaving<ref_worker>();
188 }
189
arena(market & m,unsigned num_slots,unsigned num_reserved_slots)190 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots ) {
191 __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
192 __TBB_ASSERT( sizeof(my_slots[0]) % NFS_GetLineSize()==0, "arena::slot size not multiple of cache line size" );
193 __TBB_ASSERT( (uintptr_t)this % NFS_GetLineSize()==0, "arena misaligned" );
194 #if __TBB_TASK_PRIORITY
195 __TBB_ASSERT( !my_reload_epoch && !my_orphaned_tasks && !my_skipped_fifo_priority, "New arena object is not zeroed" );
196 #endif /* __TBB_TASK_PRIORITY */
197 my_market = &m;
198 my_limit = 1;
199 // Two slots are mandatory: for the master, and for 1 worker (required to support starvation resistant tasks).
200 my_num_slots = num_arena_slots(num_slots);
201 my_num_reserved_slots = num_reserved_slots;
202 my_max_num_workers = num_slots-num_reserved_slots;
203 my_references = ref_external; // accounts for the master
204 #if __TBB_TASK_PRIORITY
205 my_bottom_priority = normalized_normal_priority;
206 my_top_priority = normalized_normal_priority;
207 #endif /* __TBB_TASK_PRIORITY */
208 my_aba_epoch = m.my_arenas_aba_epoch;
209 #if __TBB_ARENA_OBSERVER
210 my_observers.my_arena = this;
211 #endif
212 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL );
213 // Construct slots. Mark internal synchronization elements for the tools.
214 for( unsigned i = 0; i < my_num_slots; ++i ) {
215 __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
216 __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
217 __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
218 ITT_SYNC_CREATE(my_slots + i, SyncType_Scheduler, SyncObj_WorkerTaskPool);
219 mailbox(i+1).construct();
220 ITT_SYNC_CREATE(&mailbox(i+1), SyncType_Scheduler, SyncObj_Mailbox);
221 my_slots[i].hint_for_pop = i;
222 #if __TBB_STATISTICS
223 my_slots[i].my_counters = new ( NFS_Allocate(1, sizeof(statistics_counters), NULL) ) statistics_counters;
224 #endif /* __TBB_STATISTICS */
225 }
226 my_task_stream.initialize(my_num_slots);
227 ITT_SYNC_CREATE(&my_task_stream, SyncType_Scheduler, SyncObj_TaskStream);
228 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
229 my_concurrency_mode = cm_normal;
230 #endif
231 #if !__TBB_FP_CONTEXT
232 my_cpu_ctl_env.get_env();
233 #endif
234 }
235
allocate_arena(market & m,unsigned num_slots,unsigned num_reserved_slots)236 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots ) {
237 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
238 __TBB_ASSERT( sizeof(base_type) % NFS_GetLineSize() == 0, "arena slots area misaligned: wrong padding" );
239 __TBB_ASSERT( sizeof(mail_outbox) == NFS_MaxLineSize, "Mailbox padding is wrong" );
240 size_t n = allocation_size(num_arena_slots(num_slots));
241 unsigned char* storage = (unsigned char*)NFS_Allocate( 1, n, NULL );
242 // Zero all slots to indicate that they are empty
243 memset( storage, 0, n );
244 return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) arena(m, num_slots, num_reserved_slots);
245 }
246
free_arena()247 void arena::free_arena () {
248 __TBB_ASSERT( is_alive(my_guard), NULL );
249 __TBB_ASSERT( !my_references, "There are threads in the dying arena" );
250 __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
251 __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, "Inconsistent state of a dying arena" );
252 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
253 __TBB_ASSERT( my_concurrency_mode != cm_enforced_global, NULL );
254 #endif
255 #if !__TBB_STATISTICS_EARLY_DUMP
256 GATHER_STATISTIC( dump_arena_statistics() );
257 #endif
258 poison_value( my_guard );
259 intptr_t drained = 0;
260 for ( unsigned i = 0; i < my_num_slots; ++i ) {
261 __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
262 // TODO: understand the assertion and modify
263 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
264 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
265 my_slots[i].free_task_pool();
266 #if __TBB_STATISTICS
267 NFS_Free( my_slots[i].my_counters );
268 #endif /* __TBB_STATISTICS */
269 drained += mailbox(i+1).drain();
270 }
271 __TBB_ASSERT( my_task_stream.drain()==0, "Not all enqueued tasks were executed");
272 #if __TBB_COUNT_TASK_NODES
273 my_market->update_task_node_count( -drained );
274 #endif /* __TBB_COUNT_TASK_NODES */
275 // remove an internal reference
276 my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
277 #if __TBB_TASK_GROUP_CONTEXT
278 __TBB_ASSERT( my_default_ctx, "Master thread never entered the arena?" );
279 my_default_ctx->~task_group_context();
280 NFS_Free(my_default_ctx);
281 #endif /* __TBB_TASK_GROUP_CONTEXT */
282 #if __TBB_ARENA_OBSERVER
283 if ( !my_observers.empty() )
284 my_observers.clear();
285 #endif /* __TBB_ARENA_OBSERVER */
286 void* storage = &mailbox(my_num_slots);
287 __TBB_ASSERT( my_references == 0, NULL );
288 __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, NULL );
289 this->~arena();
290 #if TBB_USE_ASSERT > 1
291 memset( storage, 0, allocation_size(my_num_slots) );
292 #endif /* TBB_USE_ASSERT */
293 NFS_Free( storage );
294 }
295
296 #if __TBB_STATISTICS
dump_arena_statistics()297 void arena::dump_arena_statistics () {
298 statistics_counters total;
299 for( unsigned i = 0; i < my_num_slots; ++i ) {
300 #if __TBB_STATISTICS_EARLY_DUMP
301 generic_scheduler* s = my_slots[i].my_scheduler;
302 if ( s )
303 *my_slots[i].my_counters += s->my_counters;
304 #else
305 __TBB_ASSERT( !my_slots[i].my_scheduler, NULL );
306 #endif
307 if ( i != 0 ) {
308 total += *my_slots[i].my_counters;
309 dump_statistics( *my_slots[i].my_counters, i );
310 }
311 }
312 dump_statistics( *my_slots[0].my_counters, 0 );
313 #if __TBB_STATISTICS_STDOUT
314 #if !__TBB_STATISTICS_TOTALS_ONLY
315 printf( "----------------------------------------------\n" );
316 #endif
317 dump_statistics( total, workers_counters_total );
318 total += *my_slots[0].my_counters;
319 dump_statistics( total, arena_counters_total );
320 #if !__TBB_STATISTICS_TOTALS_ONLY
321 printf( "==============================================\n" );
322 #endif
323 #endif /* __TBB_STATISTICS_STDOUT */
324 }
325 #endif /* __TBB_STATISTICS */
326
327 #if __TBB_TASK_PRIORITY
328 // The method inspects a scheduler to determine:
329 // 1. if it has tasks that can be retrieved and executed (via the return value);
330 // 2. if it has any tasks at all, including those of lower priority (via tasks_present);
331 // 3. if it is able to work with enqueued tasks (via dequeuing_possible).
may_have_tasks(generic_scheduler * s,bool & tasks_present,bool & dequeuing_possible)332 inline bool arena::may_have_tasks ( generic_scheduler* s, bool& tasks_present, bool& dequeuing_possible ) {
333 if ( !s || s->my_arena != this )
334 return false;
335 dequeuing_possible |= s->worker_outermost_level();
336 if ( s->my_pool_reshuffling_pending ) {
337 // This primary task pool is nonempty and may contain tasks at the current
338 // priority level. Its owner is winnowing lower priority tasks at the moment.
339 tasks_present = true;
340 return true;
341 }
342 if ( s->my_offloaded_tasks ) {
343 tasks_present = true;
344 if ( s->my_local_reload_epoch < *s->my_ref_reload_epoch ) {
345 // This scheduler's offload area is nonempty and may contain tasks at the
346 // current priority level.
347 return true;
348 }
349 }
350 return false;
351 }
352
orphan_offloaded_tasks(generic_scheduler & s)353 void arena::orphan_offloaded_tasks(generic_scheduler& s) {
354 __TBB_ASSERT( s.my_offloaded_tasks, NULL );
355 GATHER_STATISTIC( ++s.my_counters.prio_orphanings );
356 ++my_abandonment_epoch;
357 __TBB_ASSERT( s.my_offloaded_task_list_tail_link && !*s.my_offloaded_task_list_tail_link, NULL );
358 task* orphans;
359 do {
360 orphans = const_cast<task*>(my_orphaned_tasks);
361 *s.my_offloaded_task_list_tail_link = orphans;
362 } while ( as_atomic(my_orphaned_tasks).compare_and_swap(s.my_offloaded_tasks, orphans) != orphans );
363 s.my_offloaded_tasks = NULL;
364 #if TBB_USE_ASSERT
365 s.my_offloaded_task_list_tail_link = NULL;
366 #endif /* TBB_USE_ASSERT */
367 }
368 #endif /* __TBB_TASK_PRIORITY */
369
has_enqueued_tasks()370 bool arena::has_enqueued_tasks() {
371 // Look for enqueued tasks at all priority levels
372 for ( int p = 0; p < num_priority_levels; ++p )
373 if ( !my_task_stream.empty(p) )
374 return true;
375 return false;
376 }
377
restore_priority_if_need()378 void arena::restore_priority_if_need() {
379 // Check for the presence of enqueued tasks "lost" on some of
380 // priority levels because updating arena priority and switching
381 // arena into "populated" (FULL) state happen non-atomically.
382 // Imposing atomicity would require task::enqueue() to use a lock,
383 // which is unacceptable.
384 if ( has_enqueued_tasks() ) {
385 advertise_new_work<work_enqueued>();
386 #if __TBB_TASK_PRIORITY
387 // update_arena_priority() expects non-zero arena::my_num_workers_requested,
388 // so must be called after advertise_new_work<work_enqueued>()
389 for ( int p = 0; p < num_priority_levels; ++p )
390 if ( !my_task_stream.empty(p) ) {
391 if ( p < my_bottom_priority || p > my_top_priority )
392 my_market->update_arena_priority(*this, p);
393 }
394 #endif
395 }
396 }
397
is_out_of_work()398 bool arena::is_out_of_work() {
399 // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
400 for(;;) {
401 pool_state_t snapshot = my_pool_state;
402 switch( snapshot ) {
403 case SNAPSHOT_EMPTY:
404 return true;
405 case SNAPSHOT_FULL: {
406 // Use unique id for "busy" in order to avoid ABA problems.
407 const pool_state_t busy = pool_state_t(&busy);
408 // Request permission to take snapshot
409 if( my_pool_state.compare_and_swap( busy, SNAPSHOT_FULL )==SNAPSHOT_FULL ) {
410 // Got permission. Take the snapshot.
411 // NOTE: This is not a lock, as the state can be set to FULL at
412 // any moment by a thread that spawns/enqueues new task.
413 size_t n = my_limit;
414 // Make local copies of volatile parameters. Their change during
415 // snapshot taking procedure invalidates the attempt, and returns
416 // this thread into the dispatch loop.
417 #if __TBB_TASK_PRIORITY
418 uintptr_t reload_epoch = __TBB_load_with_acquire( my_reload_epoch );
419 intptr_t top_priority = my_top_priority;
420 // Inspect primary task pools first
421 #endif /* __TBB_TASK_PRIORITY */
422 size_t k;
423 for( k=0; k<n; ++k ) {
424 if( my_slots[k].task_pool != EmptyTaskPool &&
425 __TBB_load_relaxed(my_slots[k].head) < __TBB_load_relaxed(my_slots[k].tail) )
426 {
427 // k-th primary task pool is nonempty and does contain tasks.
428 break;
429 }
430 if( my_pool_state!=busy )
431 return false; // the work was published
432 }
433 __TBB_ASSERT( k <= n, NULL );
434 bool work_absent = k == n;
435 #if __TBB_TASK_PRIORITY
436 // Variable tasks_present indicates presence of tasks at any priority
437 // level, while work_absent refers only to the current priority.
438 bool tasks_present = !work_absent || my_orphaned_tasks;
439 bool dequeuing_possible = false;
440 if ( work_absent ) {
441 // Check for the possibility that recent priority changes
442 // brought some tasks to the current priority level
443
444 uintptr_t abandonment_epoch = my_abandonment_epoch;
445 // Master thread's scheduler needs special handling as it
446 // may be destroyed at any moment (workers' schedulers are
447 // guaranteed to be alive while at least one thread is in arena).
448 // The lock below excludes concurrency with task group state change
449 // propagation and guarantees lifetime of the master thread.
450 the_context_state_propagation_mutex.lock();
451 work_absent = !may_have_tasks( my_slots[0].my_scheduler, tasks_present, dequeuing_possible );
452 the_context_state_propagation_mutex.unlock();
453 // The following loop is subject to data races. While k-th slot's
454 // scheduler is being examined, corresponding worker can either
455 // leave to RML or migrate to another arena.
456 // But the races are not prevented because all of them are benign.
457 // First, the code relies on the fact that worker thread's scheduler
458 // object persists until the whole library is deinitialized.
459 // Second, in the worst case the races can only cause another
460 // round of stealing attempts to be undertaken. Introducing complex
461 // synchronization into this coldest part of the scheduler's control
462 // flow does not seem to make sense because it both is unlikely to
463 // ever have any observable performance effect, and will require
464 // additional synchronization code on the hotter paths.
465 for( k = 1; work_absent && k < n; ++k ) {
466 if( my_pool_state!=busy )
467 return false; // the work was published
468 work_absent = !may_have_tasks( my_slots[k].my_scheduler, tasks_present, dequeuing_possible );
469 }
470 // Preclude premature switching arena off because of a race in the previous loop.
471 work_absent = work_absent
472 && !__TBB_load_with_acquire(my_orphaned_tasks)
473 && abandonment_epoch == my_abandonment_epoch;
474 }
475 #endif /* __TBB_TASK_PRIORITY */
476 // Test and test-and-set.
477 if( my_pool_state==busy ) {
478 #if __TBB_TASK_PRIORITY
479 bool no_fifo_tasks = my_task_stream.empty(top_priority);
480 work_absent = work_absent && (!dequeuing_possible || no_fifo_tasks)
481 && top_priority == my_top_priority && reload_epoch == my_reload_epoch;
482 #else
483 bool no_fifo_tasks = my_task_stream.empty(0);
484 work_absent = work_absent && no_fifo_tasks;
485 #endif /* __TBB_TASK_PRIORITY */
486 if( work_absent ) {
487 #if __TBB_TASK_PRIORITY
488 if ( top_priority > my_bottom_priority ) {
489 if ( my_market->lower_arena_priority(*this, top_priority - 1, reload_epoch)
490 && !my_task_stream.empty(top_priority) )
491 {
492 atomic_update( my_skipped_fifo_priority, top_priority, std::less<intptr_t>());
493 }
494 }
495 else if ( !tasks_present && !my_orphaned_tasks && no_fifo_tasks ) {
496 #endif /* __TBB_TASK_PRIORITY */
497 // save current demand value before setting SNAPSHOT_EMPTY,
498 // to avoid race with advertise_new_work.
499 int current_demand = (int)my_max_num_workers;
500 if( my_pool_state.compare_and_swap( SNAPSHOT_EMPTY, busy )==busy ) {
501 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
502 if( my_concurrency_mode==cm_enforced_global ) {
503 // adjust_demand() called inside, if needed
504 my_market->mandatory_concurrency_disable( this );
505 } else
506 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
507 {
508 // This thread transitioned pool to empty state, and thus is
509 // responsible for telling the market that there is no work to do.
510 my_market->adjust_demand( *this, -current_demand );
511 }
512 restore_priority_if_need();
513 return true;
514 }
515 return false;
516 #if __TBB_TASK_PRIORITY
517 }
518 #endif /* __TBB_TASK_PRIORITY */
519 }
520 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
521 my_pool_state.compare_and_swap( SNAPSHOT_FULL, busy );
522 }
523 }
524 return false;
525 }
526 default:
527 // Another thread is taking a snapshot.
528 return false;
529 }
530 }
531 }
532
533 #if __TBB_COUNT_TASK_NODES
workers_task_node_count()534 intptr_t arena::workers_task_node_count() {
535 intptr_t result = 0;
536 for( unsigned i = 1; i < my_num_slots; ++i ) {
537 generic_scheduler* s = my_slots[i].my_scheduler;
538 if( s )
539 result += s->my_task_node_count;
540 }
541 return result;
542 }
543 #endif /* __TBB_COUNT_TASK_NODES */
544
enqueue_task(task & t,intptr_t prio,FastRandom & random)545 void arena::enqueue_task( task& t, intptr_t prio, FastRandom &random )
546 {
547 #if __TBB_RECYCLE_TO_ENQUEUE
548 __TBB_ASSERT( t.state()==task::allocated || t.state()==task::to_enqueue, "attempt to enqueue task with inappropriate state" );
549 #else
550 __TBB_ASSERT( t.state()==task::allocated, "attempt to enqueue task that is not in 'allocated' state" );
551 #endif
552 t.prefix().state = task::ready;
553 t.prefix().extra_state |= es_task_enqueued; // enqueued task marker
554
555 #if TBB_USE_ASSERT
556 if( task* parent = t.parent() ) {
557 internal::reference_count ref_count = parent->prefix().ref_count;
558 __TBB_ASSERT( ref_count!=0, "attempt to enqueue task whose parent has a ref_count==0 (forgot to set_ref_count?)" );
559 __TBB_ASSERT( ref_count>0, "attempt to enqueue task whose parent has a ref_count<0" );
560 parent->prefix().extra_state |= es_ref_count_active;
561 }
562 __TBB_ASSERT(t.prefix().affinity==affinity_id(0), "affinity is ignored for enqueued tasks");
563 #endif /* TBB_USE_ASSERT */
564
565 ITT_NOTIFY(sync_releasing, &my_task_stream);
566 #if __TBB_TASK_PRIORITY
567 intptr_t p = prio ? normalize_priority(priority_t(prio)) : normalized_normal_priority;
568 assert_priority_valid(p);
569 my_task_stream.push( &t, p, random );
570 if ( p != my_top_priority )
571 my_market->update_arena_priority( *this, p );
572 #else /* !__TBB_TASK_PRIORITY */
573 __TBB_ASSERT_EX(prio == 0, "the library is not configured to respect the task priority");
574 my_task_stream.push( &t, 0, random );
575 #endif /* !__TBB_TASK_PRIORITY */
576 advertise_new_work<work_enqueued>();
577 #if __TBB_TASK_PRIORITY
578 if ( p != my_top_priority )
579 my_market->update_arena_priority( *this, p );
580 #endif /* __TBB_TASK_PRIORITY */
581 }
582
583 class nested_arena_context : no_copy {
584 public:
nested_arena_context(generic_scheduler * s,arena * a,size_t slot_index,bool type,bool same)585 nested_arena_context(generic_scheduler *s, arena* a, size_t slot_index, bool type, bool same)
586 : my_scheduler(*s), my_orig_ctx(NULL), same_arena(same) {
587 if (same_arena) {
588 my_orig_state.my_properties = my_scheduler.my_properties;
589 my_orig_state.my_innermost_running_task = my_scheduler.my_innermost_running_task;
590 mimic_outermost_level(a, type);
591 } else {
592 my_orig_state = *s;
593 mimic_outermost_level(a, type);
594 s->nested_arena_entry(a, slot_index);
595 }
596 }
~nested_arena_context()597 ~nested_arena_context() {
598 #if __TBB_TASK_GROUP_CONTEXT
599 my_scheduler.my_dummy_task->prefix().context = my_orig_ctx; // restore context of dummy task
600 #endif
601 if (same_arena) {
602 my_scheduler.my_properties = my_orig_state.my_properties;
603 my_scheduler.my_innermost_running_task = my_orig_state.my_innermost_running_task;
604 } else {
605 my_scheduler.nested_arena_exit();
606 static_cast<scheduler_state&>(my_scheduler) = my_orig_state; // restore arena settings
607 #if __TBB_TASK_PRIORITY
608 my_scheduler.my_local_reload_epoch = *my_orig_state.my_ref_reload_epoch;
609 #endif
610 governor::assume_scheduler(&my_scheduler);
611 }
612 }
613
614 private:
615 generic_scheduler &my_scheduler;
616 scheduler_state my_orig_state;
617 task_group_context *my_orig_ctx;
618 const bool same_arena;
619
mimic_outermost_level(arena * a,bool type)620 void mimic_outermost_level(arena* a, bool type) {
621 my_scheduler.my_properties.outermost = true;
622 my_scheduler.my_properties.type = type;
623 my_scheduler.my_innermost_running_task = my_scheduler.my_dummy_task;
624 #if __TBB_TASK_GROUP_CONTEXT
625 // Save dummy's context and replace it by arena's context
626 my_orig_ctx = my_scheduler.my_dummy_task->prefix().context;
627 my_scheduler.my_dummy_task->prefix().context = a->my_default_ctx;
628 #endif
629 }
630 };
631
nested_arena_entry(arena * a,size_t slot_index)632 void generic_scheduler::nested_arena_entry(arena* a, size_t slot_index) {
633 __TBB_ASSERT( is_alive(a->my_guard), NULL );
634 __TBB_ASSERT( a!=my_arena, NULL);
635
636 // overwrite arena settings
637 #if __TBB_TASK_PRIORITY
638 if ( my_offloaded_tasks )
639 my_arena->orphan_offloaded_tasks( *this );
640 my_offloaded_tasks = NULL;
641 #endif /* __TBB_TASK_PRIORITY */
642 attach_arena( a, slot_index, /*is_master*/true );
643 __TBB_ASSERT( my_arena == a, NULL );
644 governor::assume_scheduler( this );
645 // TODO? ITT_NOTIFY(sync_acquired, a->my_slots + index);
646 // TODO: it requires market to have P workers (not P-1)
647 // TODO: a preempted worker should be excluded from assignment to other arenas e.g. my_slack--
648 if( !is_worker() && slot_index >= my_arena->my_num_reserved_slots )
649 my_arena->my_market->adjust_demand(*my_arena, -1);
650 #if __TBB_ARENA_OBSERVER
651 my_last_local_observer = 0; // TODO: try optimize number of calls
652 my_arena->my_observers.notify_entry_observers( my_last_local_observer, /*worker=*/false );
653 #endif
654 }
655
nested_arena_exit()656 void generic_scheduler::nested_arena_exit() {
657 #if __TBB_ARENA_OBSERVER
658 my_arena->my_observers.notify_exit_observers( my_last_local_observer, /*worker=*/false );
659 #endif /* __TBB_ARENA_OBSERVER */
660 #if __TBB_TASK_PRIORITY
661 if ( my_offloaded_tasks )
662 my_arena->orphan_offloaded_tasks( *this );
663 #endif
664 if( !is_worker() && my_arena_index >= my_arena->my_num_reserved_slots )
665 my_arena->my_market->adjust_demand(*my_arena, 1);
666 // Free the master slot.
667 __TBB_ASSERT(my_arena->my_slots[my_arena_index].my_scheduler, "A slot is already empty");
668 __TBB_store_with_release(my_arena->my_slots[my_arena_index].my_scheduler, (generic_scheduler*)NULL);
669 my_arena->my_exit_monitors.notify_one(); // do not relax!
670 }
671
wait_until_empty()672 void generic_scheduler::wait_until_empty() {
673 my_dummy_task->prefix().ref_count++; // prevents exit from local_wait_for_all when local work is done enforcing the stealing
674 while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY )
675 local_wait_for_all(*my_dummy_task, NULL);
676 my_dummy_task->prefix().ref_count--;
677 }
678
679 } // namespace internal
680 } // namespace tbb
681
682 #include "scheduler_utility.h"
683 #include "tbb/task_arena.h" // task_arena_base
684
685 namespace tbb {
686 namespace interface7 {
687 namespace internal {
688
internal_initialize()689 void task_arena_base::internal_initialize( ) {
690 governor::one_time_init();
691 if( my_max_concurrency < 1 )
692 my_max_concurrency = (int)governor::default_num_threads();
693 __TBB_ASSERT( my_master_slots <= (unsigned)my_max_concurrency, "Number of slots reserved for master should not exceed arena concurrency");
694 arena* new_arena = market::create_arena( my_max_concurrency, my_master_slots, 0 );
695 // add an internal market reference; a public reference was added in create_arena
696 market &m = market::global_market( /*is_public=*/false );
697 // allocate default context for task_arena
698 #if __TBB_TASK_GROUP_CONTEXT
699 new_arena->my_default_ctx = new ( NFS_Allocate(1, sizeof(task_group_context), NULL) )
700 task_group_context( task_group_context::isolated, task_group_context::default_traits );
701 #if __TBB_FP_CONTEXT
702 new_arena->my_default_ctx->capture_fp_settings();
703 #endif
704 #endif /* __TBB_TASK_GROUP_CONTEXT */
705 // threads might race to initialize the arena
706 if(as_atomic(my_arena).compare_and_swap(new_arena, NULL) != NULL) {
707 __TBB_ASSERT(my_arena, NULL); // another thread won the race
708 // release public market reference
709 m.release( /*is_public=*/true, /*blocking_terminate=*/false );
710 new_arena->on_thread_leaving<arena::ref_external>(); // destroy unneeded arena
711 #if __TBB_TASK_GROUP_CONTEXT
712 spin_wait_while_eq(my_context, (task_group_context*)NULL);
713 } else {
714 new_arena->my_default_ctx->my_version_and_traits |= my_version_and_traits & exact_exception_flag;
715 as_atomic(my_context) = new_arena->my_default_ctx;
716 #endif
717 }
718 // TODO: should it trigger automatic initialization of this thread?
719 governor::local_scheduler_weak();
720 }
721
internal_terminate()722 void task_arena_base::internal_terminate( ) {
723 if( my_arena ) {// task_arena was initialized
724 my_arena->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
725 my_arena->on_thread_leaving<arena::ref_external>();
726 my_arena = 0;
727 #if __TBB_TASK_GROUP_CONTEXT
728 my_context = 0;
729 #endif
730 }
731 }
732
internal_attach()733 void task_arena_base::internal_attach( ) {
734 __TBB_ASSERT(!my_arena, NULL);
735 generic_scheduler* s = governor::local_scheduler_if_initialized();
736 if( s && s->my_arena ) {
737 // There is an active arena to attach to.
738 // It's still used by s, so won't be destroyed right away.
739 my_arena = s->my_arena;
740 __TBB_ASSERT( my_arena->my_references > 0, NULL );
741 my_arena->my_references += arena::ref_external;
742 #if __TBB_TASK_GROUP_CONTEXT
743 my_context = my_arena->my_default_ctx;
744 my_version_and_traits |= my_context->my_version_and_traits & exact_exception_flag;
745 #endif
746 my_master_slots = my_arena->my_num_reserved_slots;
747 my_max_concurrency = my_master_slots + my_arena->my_max_num_workers;
748 __TBB_ASSERT(arena::num_arena_slots(my_max_concurrency)==my_arena->my_num_slots, NULL);
749 // increases market's ref count for task_arena
750 market::global_market( /*is_public=*/true );
751 }
752 }
753
internal_enqueue(task & t,intptr_t prio) const754 void task_arena_base::internal_enqueue( task& t, intptr_t prio ) const {
755 __TBB_ASSERT(my_arena, NULL);
756 generic_scheduler* s = governor::local_scheduler_if_initialized();
757 __TBB_ASSERT(s, "Scheduler is not initialized"); // we allocated a task so can expect the scheduler
758 #if __TBB_TASK_GROUP_CONTEXT
759 __TBB_ASSERT(my_arena->my_default_ctx == t.prefix().context, NULL);
760 __TBB_ASSERT(!my_arena->my_default_ctx->is_group_execution_cancelled(), // TODO: any better idea?
761 "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?");
762 #endif
763 my_arena->enqueue_task( t, prio, s->my_random );
764 }
765
766 class delegated_task : public task {
767 internal::delegate_base & my_delegate;
768 concurrent_monitor & my_monitor;
769 task * my_root;
execute()770 task* execute() __TBB_override {
771 generic_scheduler& s = *(generic_scheduler*)prefix().owner;
772 __TBB_ASSERT(s.outermost_level(), "expected to be enqueued and received on the outermost level");
773 struct outermost_context : internal::no_copy {
774 delegated_task * t;
775 generic_scheduler & s;
776 task * orig_dummy;
777 task_group_context * orig_ctx;
778 scheduler_properties orig_props;
779 outermost_context(delegated_task *_t, generic_scheduler &_s)
780 : t(_t), s(_s), orig_dummy(s.my_dummy_task), orig_props(s.my_properties) {
781 __TBB_ASSERT(s.my_innermost_running_task == t, NULL);
782 #if __TBB_TASK_GROUP_CONTEXT
783 orig_ctx = t->prefix().context;
784 t->prefix().context = s.my_arena->my_default_ctx;
785 #endif
786 // Mimics outermost master
787 s.my_dummy_task = t;
788 s.my_properties.type = scheduler_properties::master;
789 }
790 ~outermost_context() {
791 #if __TBB_TASK_GROUP_CONTEXT
792 // Restore context for sake of registering potential exception
793 t->prefix().context = orig_ctx;
794 #endif
795 s.my_properties = orig_props;
796 s.my_dummy_task = orig_dummy;
797 }
798 } scope(this, s);
799 my_delegate();
800 return NULL;
801 }
~delegated_task()802 ~delegated_task() {
803 // potential exception was already registered. It must happen before the notification
804 __TBB_ASSERT(my_root->ref_count()==2, NULL);
805 __TBB_store_with_release(my_root->prefix().ref_count, 1); // must precede the wakeup
806 my_monitor.notify(*this); // do not relax, it needs a fence!
807 }
808 public:
delegated_task(internal::delegate_base & d,concurrent_monitor & s,task * t)809 delegated_task( internal::delegate_base & d, concurrent_monitor & s, task * t )
810 : my_delegate(d), my_monitor(s), my_root(t) {}
811 // predicate for concurrent_monitor notification
operator ()(uintptr_t ctx) const812 bool operator()(uintptr_t ctx) const { return (void*)ctx == (void*)&my_delegate; }
813 };
814
internal_execute(internal::delegate_base & d) const815 void task_arena_base::internal_execute(internal::delegate_base& d) const {
816 __TBB_ASSERT(my_arena, NULL);
817 generic_scheduler* s = governor::local_scheduler_weak();
818 __TBB_ASSERT(s, "Scheduler is not initialized");
819
820 bool same_arena = s->my_arena == my_arena;
821 size_t index1 = s->my_arena_index;
822 if (!same_arena) {
823 index1 = my_arena->occupy_free_slot</* as_worker*/false>(*s);
824 if (index1 == arena::out_of_arena) {
825
826 #if __TBB_USE_OPTIONAL_RTTI
827 // Workaround for the bug inside graph. If the thread can not occupy arena slot during task_arena::execute()
828 // and all aggregator operations depend on this task completion (all other threads are inside arena already)
829 // deadlock appears, because enqueued task will never enter arena.
830 // Workaround: check if the task came from graph via RTTI (casting to graph::spawn_functor)
831 // and enqueue this task with non-blocking internal_enqueue method.
832 // TODO: have to change behaviour later in next GOLD release (maybe to add new library entry point - try_execute)
833 typedef tbb::flow::interface10::graph::spawn_functor graph_funct;
834 internal::delegated_function< graph_funct, void >* deleg_funct =
835 dynamic_cast< internal::delegated_function< graph_funct, void>* >(&d);
836
837 if (deleg_funct) {
838 internal_enqueue(*new(task::allocate_root(*my_context))
839 internal::function_task< internal::strip< graph_funct >::type >
840 (internal::forward< graph_funct >(deleg_funct->my_func)), 0);
841 return;
842 } else {
843 #endif
844 concurrent_monitor::thread_context waiter;
845 #if __TBB_TASK_GROUP_CONTEXT
846 task_group_context exec_context(task_group_context::isolated, my_version_and_traits & exact_exception_flag);
847 #if __TBB_FP_CONTEXT
848 exec_context.copy_fp_settings(*my_context);
849 #endif
850 #endif
851 auto_empty_task root(__TBB_CONTEXT_ARG(s, &exec_context));
852 root.prefix().ref_count = 2;
853 my_arena->enqueue_task(*new(task::allocate_root(__TBB_CONTEXT_ARG1(exec_context)))
854 delegated_task(d, my_arena->my_exit_monitors, &root),
855 0, s->my_random); // TODO: priority?
856 size_t index2 = arena::out_of_arena;
857 do {
858 my_arena->my_exit_monitors.prepare_wait(waiter, (uintptr_t)&d);
859 if (__TBB_load_with_acquire(root.prefix().ref_count) < 2) {
860 my_arena->my_exit_monitors.cancel_wait(waiter);
861 break;
862 }
863 index2 = my_arena->occupy_free_slot</*as_worker*/false>(*s);
864 if (index2 != arena::out_of_arena) {
865 my_arena->my_exit_monitors.cancel_wait(waiter);
866 nested_arena_context scope(s, my_arena, index2, scheduler_properties::master, same_arena);
867 s->local_wait_for_all(root, NULL);
868 #if TBB_USE_EXCEPTIONS
869 __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred
870 #endif
871 __TBB_ASSERT(root.prefix().ref_count == 0, NULL);
872 break;
873 }
874 my_arena->my_exit_monitors.commit_wait(waiter);
875 } while (__TBB_load_with_acquire(root.prefix().ref_count) == 2);
876 if (index2 == arena::out_of_arena) {
877 // notify a waiting thread even if this thread did not enter arena,
878 // in case it was woken by a leaving thread but did not need to enter
879 my_arena->my_exit_monitors.notify_one(); // do not relax!
880 }
881 #if TBB_USE_EXCEPTIONS
882 // process possible exception
883 if (task_group_context::exception_container_type *pe = exec_context.my_exception)
884 TbbRethrowException(pe);
885 #endif
886 return;
887 #if __TBB_USE_OPTIONAL_RTTI
888 } // if task came from graph
889 #endif
890 } // if (index1 == arena::out_of_arena)
891 } // if (!same_arena)
892
893 cpu_ctl_env_helper cpu_ctl_helper;
894 cpu_ctl_helper.set_env(__TBB_CONTEXT_ARG1(my_context));
895 #if TBB_USE_EXCEPTIONS
896 try {
897 #endif
898 //TODO: replace dummy tasks for workers as well to avoid using of the_dummy_context
899 nested_arena_context scope(s, my_arena, index1, scheduler_properties::master, same_arena);
900 d();
901 #if TBB_USE_EXCEPTIONS
902 }
903 catch (...) {
904 cpu_ctl_helper.restore_default(); // TODO: is it needed on Windows?
905 if (my_version_and_traits & exact_exception_flag) throw;
906 else {
907 task_group_context exception_container(task_group_context::isolated,
908 task_group_context::default_traits & ~task_group_context::exact_exception);
909 exception_container.register_pending_exception();
910 __TBB_ASSERT(exception_container.my_exception, NULL);
911 TbbRethrowException(exception_container.my_exception);
912 }
913 }
914 #endif
915 }
916
917 // this wait task is a temporary approach to wait for arena emptiness for masters without slots
918 // TODO: it will be rather reworked for one source of notification from is_out_of_work
919 class wait_task : public task {
920 binary_semaphore & my_signal;
execute()921 task* execute() __TBB_override {
922 generic_scheduler* s = governor::local_scheduler_if_initialized();
923 __TBB_ASSERT( s, NULL );
924 __TBB_ASSERT( s->outermost_level(), "The enqueued task can be processed only on outermost level" );
925 if ( s->is_worker() ) {
926 __TBB_ASSERT( s->my_innermost_running_task == this, NULL );
927 // Mimic worker on outermost level to run remaining tasks
928 s->my_innermost_running_task = s->my_dummy_task;
929 s->local_wait_for_all( *s->my_dummy_task, NULL );
930 s->my_innermost_running_task = this;
931 } else s->my_arena->is_out_of_work(); // avoids starvation of internal_wait: issuing this task makes arena full
932 my_signal.V();
933 return NULL;
934 }
935 public:
wait_task(binary_semaphore & sema)936 wait_task ( binary_semaphore & sema ) : my_signal(sema) {}
937 };
938
internal_wait() const939 void task_arena_base::internal_wait() const {
940 __TBB_ASSERT(my_arena, NULL);
941 generic_scheduler* s = governor::local_scheduler_weak();
942 __TBB_ASSERT(s, "Scheduler is not initialized");
943 __TBB_ASSERT(s->my_arena != my_arena || s->my_arena_index == 0, "task_arena::wait_until_empty() is not supported within a worker context" );
944 if( s->my_arena == my_arena ) {
945 //unsupported, but try do something for outermost master
946 __TBB_ASSERT(s->master_outermost_level(), "unsupported");
947 if( !s->my_arena_index )
948 while( my_arena->num_workers_active() )
949 s->wait_until_empty();
950 } else for(;;) {
951 while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY ) {
952 if( !__TBB_load_with_acquire(my_arena->my_slots[0].my_scheduler) // TODO TEMP: one master, make more masters
953 && as_atomic(my_arena->my_slots[0].my_scheduler).compare_and_swap(s, NULL) == NULL ) {
954 nested_arena_context a(s, my_arena, 0, scheduler_properties::worker, false);
955 s->wait_until_empty();
956 } else {
957 binary_semaphore waiter; // TODO: replace by a single event notification from is_out_of_work
958 internal_enqueue( *new( task::allocate_root(__TBB_CONTEXT_ARG1(*my_context)) ) wait_task(waiter), 0 ); // TODO: priority?
959 waiter.P(); // TODO: concurrent_monitor
960 }
961 }
962 if( !my_arena->num_workers_active() && !my_arena->my_slots[0].my_scheduler) // no activity
963 break; // spin until workers active but avoid spinning in a worker
964 __TBB_Yield(); // wait until workers and master leave
965 }
966 }
967
internal_current_slot()968 /*static*/ int task_arena_base::internal_current_slot() {
969 generic_scheduler* s = governor::local_scheduler_if_initialized();
970 return s? int(s->my_arena_index) : -1;
971 }
972
973 #if __TBB_TASK_ISOLATION
974 class isolation_guard : tbb::internal::no_copy {
975 isolation_tag &guarded;
976 isolation_tag previous_value;
977 public:
isolation_guard(isolation_tag & isolation)978 isolation_guard( isolation_tag &isolation ) : guarded( isolation ), previous_value( isolation ) {}
~isolation_guard()979 ~isolation_guard() {
980 guarded = previous_value;
981 }
982 };
983
isolate_within_arena(delegate_base & d,intptr_t reserved)984 void isolate_within_arena( delegate_base& d, intptr_t reserved ) {
985 (void)reserved;
986 __TBB_ASSERT( reserved == 0, NULL );
987 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
988 generic_scheduler* s = governor::local_scheduler_weak();
989 __TBB_ASSERT( s, "this_task_arena::isolate() needs an initialized scheduler" );
990 // Theoretically, we can keep the current isolation in the scheduler; however, it makes sense to store it in innermost
991 // running task because it can in principle be queried via task::self().
992 isolation_tag& current_isolation = s->my_innermost_running_task->prefix().isolation;
993 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
994 isolation_guard guard( current_isolation );
995 current_isolation = reinterpret_cast<isolation_tag>(&d);
996 d();
997 }
998 #endif /* __TBB_TASK_ISOLATION */
999
internal_max_concurrency(const task_arena * ta)1000 int task_arena_base::internal_max_concurrency(const task_arena *ta) {
1001 arena* a = NULL;
1002 if( ta ) // for special cases of ta->max_concurrency()
1003 a = ta->my_arena;
1004 else if( generic_scheduler* s = governor::local_scheduler_if_initialized() )
1005 a = s->my_arena; // the current arena if any
1006
1007 if( a ) { // Get parameters from the arena
1008 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
1009 return a->my_num_reserved_slots + a->my_max_num_workers;
1010 } else {
1011 __TBB_ASSERT( !ta || ta->my_max_concurrency==automatic, NULL );
1012 return int(governor::default_num_threads());
1013 }
1014 }
1015 } // tbb::interfaceX::internal
1016 } // tbb::interfaceX
1017 } // tbb
1018