1 /*
2     Copyright (c) 2005-2017 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #include "tbb/global_control.h" // thread_stack_size
22 
23 #include "scheduler.h"
24 #include "governor.h"
25 #include "arena.h"
26 #include "itt_notify.h"
27 #include "semaphore.h"
28 #include "tbb/internal/_flow_graph_impl.h"
29 
30 #include <functional>
31 
32 #if __TBB_STATISTICS_STDOUT
33 #include <cstdio>
34 #endif
35 
36 namespace tbb {
37 namespace internal {
38 
39 // put it here in order to enable compiler to inline it into arena::process and nested_arena_entry
attach_arena(arena * a,size_t index,bool is_master)40 void generic_scheduler::attach_arena( arena* a, size_t index, bool is_master ) {
41     __TBB_ASSERT( a->my_market == my_market, NULL );
42     my_arena = a;
43     my_arena_index = index;
44     my_arena_slot = a->my_slots + index;
45     attach_mailbox( affinity_id(index+1) );
46     if ( is_master && my_inbox.is_idle_state( true ) ) {
47         // Master enters an arena with its own task to be executed. It means that master is not
48         // going to enter stealing loop and take affinity tasks.
49         my_inbox.set_is_idle( false );
50     }
51 #if __TBB_TASK_GROUP_CONTEXT
52     // Context to be used by root tasks by default (if the user has not specified one).
53     if( !is_master )
54         my_dummy_task->prefix().context = a->my_default_ctx;
55 #endif /* __TBB_TASK_GROUP_CONTEXT */
56 #if __TBB_TASK_PRIORITY
57     // In the current implementation master threads continue processing even when
58     // there are other masters with higher priority. Only TBB worker threads are
59     // redistributed between arenas based on the latters' priority. Thus master
60     // threads use arena's top priority as a reference point (in contrast to workers
61     // that use my_market->my_global_top_priority).
62     if( is_master ) {
63         my_ref_top_priority = &a->my_top_priority;
64         my_ref_reload_epoch = &a->my_reload_epoch;
65     }
66     my_local_reload_epoch = *my_ref_reload_epoch;
67     __TBB_ASSERT( !my_offloaded_tasks, NULL );
68 #endif /* __TBB_TASK_PRIORITY */
69 }
70 
occupy_slot(generic_scheduler * & slot,generic_scheduler & s)71 inline static bool occupy_slot( generic_scheduler*& slot, generic_scheduler& s ) {
72     return !slot && as_atomic( slot ).compare_and_swap( &s, NULL ) == NULL;
73 }
74 
occupy_free_slot_in_range(generic_scheduler & s,size_t lower,size_t upper)75 size_t arena::occupy_free_slot_in_range( generic_scheduler& s, size_t lower, size_t upper ) {
76     if ( lower >= upper ) return out_of_arena;
77     // Start search for an empty slot from the one we occupied the last time
78     size_t index = s.my_arena_index;
79     if ( index < lower || index >= upper ) index = s.my_random.get() % (upper - lower) + lower;
80     __TBB_ASSERT( index >= lower && index < upper, NULL );
81     // Find a free slot
82     for ( size_t i = index; i < upper; ++i )
83         if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i;
84     for ( size_t i = lower; i < index; ++i )
85         if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i;
86     return out_of_arena;
87 }
88 
89 template <bool as_worker>
occupy_free_slot(generic_scheduler & s)90 size_t arena::occupy_free_slot( generic_scheduler& s ) {
91     // Firstly, masters try to occupy reserved slots
92     size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( s, 0, my_num_reserved_slots );
93     if ( index == out_of_arena ) {
94         // Secondly, all threads try to occupy all non-reserved slots
95         index = occupy_free_slot_in_range( s, my_num_reserved_slots, my_num_slots );
96         // Likely this arena is already saturated
97         if ( index == out_of_arena )
98             return out_of_arena;
99     }
100 
101     ITT_NOTIFY(sync_acquired, my_slots + index);
102     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
103     return index;
104 }
105 
process(generic_scheduler & s)106 void arena::process( generic_scheduler& s ) {
107     __TBB_ASSERT( is_alive(my_guard), NULL );
108     __TBB_ASSERT( governor::is_set(&s), NULL );
109     __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
110     __TBB_ASSERT( s.worker_outermost_level(), NULL );
111 
112     __TBB_ASSERT( my_num_slots > 1, NULL );
113 
114     size_t index = occupy_free_slot</*as_worker*/true>( s );
115     if ( index == out_of_arena )
116         goto quit;
117 
118     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
119     s.attach_arena( this, index, /*is_master*/false );
120 
121 #if !__TBB_FP_CONTEXT
122     my_cpu_ctl_env.set_env();
123 #endif
124 
125 #if __TBB_ARENA_OBSERVER
126     __TBB_ASSERT( !s.my_last_local_observer, "There cannot be notified local observers when entering arena" );
127     my_observers.notify_entry_observers( s.my_last_local_observer, /*worker=*/true );
128 #endif /* __TBB_ARENA_OBSERVER */
129 
130     // Task pool can be marked as non-empty if the worker occupies the slot left by a master.
131     if ( s.my_arena_slot->task_pool != EmptyTaskPool ) {
132         __TBB_ASSERT( s.my_inbox.is_idle_state(false), NULL );
133         s.local_wait_for_all( *s.my_dummy_task, NULL );
134         __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL );
135     }
136 
137     for ( ;; ) {
138         __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
139         __TBB_ASSERT( s.worker_outermost_level(), NULL );
140         __TBB_ASSERT( is_alive(my_guard), NULL );
141         __TBB_ASSERT( s.is_quiescent_local_task_pool_reset(),
142                       "Worker cannot leave arena while its task pool is not reset" );
143         __TBB_ASSERT( s.my_arena_slot->task_pool == EmptyTaskPool, "Empty task pool is not marked appropriately" );
144         // This check prevents relinquishing more than necessary workers because
145         // of the non-atomicity of the decision making procedure
146         if ( num_workers_active() > my_num_workers_allotted
147 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
148              || recall_by_mandatory_request()
149 #endif
150             )
151             break;
152         // Try to steal a task.
153         // Passing reference count is technically unnecessary in this context,
154         // but omitting it here would add checks inside the function.
155         task* t = s.receive_or_steal_task( __TBB_ISOLATION_ARG( s.my_dummy_task->prefix().ref_count, no_isolation ) );
156         if (t) {
157             // A side effect of receive_or_steal_task is that my_innermost_running_task can be set.
158             // But for the outermost dispatch loop it has to be a dummy task.
159             s.my_innermost_running_task = s.my_dummy_task;
160             s.local_wait_for_all(*s.my_dummy_task,t);
161         }
162     }
163 #if __TBB_ARENA_OBSERVER
164     my_observers.notify_exit_observers( s.my_last_local_observer, /*worker=*/true );
165     s.my_last_local_observer = NULL;
166 #endif /* __TBB_ARENA_OBSERVER */
167 #if __TBB_TASK_PRIORITY
168     if ( s.my_offloaded_tasks )
169         orphan_offloaded_tasks( s );
170 #endif /* __TBB_TASK_PRIORITY */
171 #if __TBB_STATISTICS
172     ++s.my_counters.arena_roundtrips;
173     *my_slots[index].my_counters += s.my_counters;
174     s.my_counters.reset();
175 #endif /* __TBB_STATISTICS */
176     __TBB_store_with_release( my_slots[index].my_scheduler, (generic_scheduler*)NULL );
177     s.my_arena_slot = 0; // detached from slot
178     s.my_inbox.detach();
179     __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL );
180     __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
181     __TBB_ASSERT( s.worker_outermost_level(), NULL );
182     __TBB_ASSERT( is_alive(my_guard), NULL );
183 quit:
184     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
185     // that arena may be temporarily left unpopulated by threads. See comments in
186     // arena::on_thread_leaving() for more details.
187     on_thread_leaving<ref_worker>();
188 }
189 
arena(market & m,unsigned num_slots,unsigned num_reserved_slots)190 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots ) {
191     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
192     __TBB_ASSERT( sizeof(my_slots[0]) % NFS_GetLineSize()==0, "arena::slot size not multiple of cache line size" );
193     __TBB_ASSERT( (uintptr_t)this % NFS_GetLineSize()==0, "arena misaligned" );
194 #if __TBB_TASK_PRIORITY
195     __TBB_ASSERT( !my_reload_epoch && !my_orphaned_tasks && !my_skipped_fifo_priority, "New arena object is not zeroed" );
196 #endif /* __TBB_TASK_PRIORITY */
197     my_market = &m;
198     my_limit = 1;
199     // Two slots are mandatory: for the master, and for 1 worker (required to support starvation resistant tasks).
200     my_num_slots = num_arena_slots(num_slots);
201     my_num_reserved_slots = num_reserved_slots;
202     my_max_num_workers = num_slots-num_reserved_slots;
203     my_references = ref_external; // accounts for the master
204 #if __TBB_TASK_PRIORITY
205     my_bottom_priority = normalized_normal_priority;
206     my_top_priority = normalized_normal_priority;
207 #endif /* __TBB_TASK_PRIORITY */
208     my_aba_epoch = m.my_arenas_aba_epoch;
209 #if __TBB_ARENA_OBSERVER
210     my_observers.my_arena = this;
211 #endif
212     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL );
213     // Construct slots. Mark internal synchronization elements for the tools.
214     for( unsigned i = 0; i < my_num_slots; ++i ) {
215         __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
216         __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
217         __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
218         ITT_SYNC_CREATE(my_slots + i, SyncType_Scheduler, SyncObj_WorkerTaskPool);
219         mailbox(i+1).construct();
220         ITT_SYNC_CREATE(&mailbox(i+1), SyncType_Scheduler, SyncObj_Mailbox);
221         my_slots[i].hint_for_pop = i;
222 #if __TBB_STATISTICS
223         my_slots[i].my_counters = new ( NFS_Allocate(1, sizeof(statistics_counters), NULL) ) statistics_counters;
224 #endif /* __TBB_STATISTICS */
225     }
226     my_task_stream.initialize(my_num_slots);
227     ITT_SYNC_CREATE(&my_task_stream, SyncType_Scheduler, SyncObj_TaskStream);
228 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
229     my_concurrency_mode = cm_normal;
230 #endif
231 #if !__TBB_FP_CONTEXT
232     my_cpu_ctl_env.get_env();
233 #endif
234 }
235 
allocate_arena(market & m,unsigned num_slots,unsigned num_reserved_slots)236 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots ) {
237     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
238     __TBB_ASSERT( sizeof(base_type) % NFS_GetLineSize() == 0, "arena slots area misaligned: wrong padding" );
239     __TBB_ASSERT( sizeof(mail_outbox) == NFS_MaxLineSize, "Mailbox padding is wrong" );
240     size_t n = allocation_size(num_arena_slots(num_slots));
241     unsigned char* storage = (unsigned char*)NFS_Allocate( 1, n, NULL );
242     // Zero all slots to indicate that they are empty
243     memset( storage, 0, n );
244     return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) arena(m, num_slots, num_reserved_slots);
245 }
246 
free_arena()247 void arena::free_arena () {
248     __TBB_ASSERT( is_alive(my_guard), NULL );
249     __TBB_ASSERT( !my_references, "There are threads in the dying arena" );
250     __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
251     __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, "Inconsistent state of a dying arena" );
252 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
253     __TBB_ASSERT( my_concurrency_mode != cm_enforced_global, NULL );
254 #endif
255 #if !__TBB_STATISTICS_EARLY_DUMP
256     GATHER_STATISTIC( dump_arena_statistics() );
257 #endif
258     poison_value( my_guard );
259     intptr_t drained = 0;
260     for ( unsigned i = 0; i < my_num_slots; ++i ) {
261         __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
262         // TODO: understand the assertion and modify
263         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
264         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
265         my_slots[i].free_task_pool();
266 #if __TBB_STATISTICS
267         NFS_Free( my_slots[i].my_counters );
268 #endif /* __TBB_STATISTICS */
269         drained += mailbox(i+1).drain();
270     }
271     __TBB_ASSERT( my_task_stream.drain()==0, "Not all enqueued tasks were executed");
272 #if __TBB_COUNT_TASK_NODES
273     my_market->update_task_node_count( -drained );
274 #endif /* __TBB_COUNT_TASK_NODES */
275     // remove an internal reference
276     my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
277 #if __TBB_TASK_GROUP_CONTEXT
278     __TBB_ASSERT( my_default_ctx, "Master thread never entered the arena?" );
279     my_default_ctx->~task_group_context();
280     NFS_Free(my_default_ctx);
281 #endif /* __TBB_TASK_GROUP_CONTEXT */
282 #if __TBB_ARENA_OBSERVER
283     if ( !my_observers.empty() )
284         my_observers.clear();
285 #endif /* __TBB_ARENA_OBSERVER */
286     void* storage  = &mailbox(my_num_slots);
287     __TBB_ASSERT( my_references == 0, NULL );
288     __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, NULL );
289     this->~arena();
290 #if TBB_USE_ASSERT > 1
291     memset( storage, 0, allocation_size(my_num_slots) );
292 #endif /* TBB_USE_ASSERT */
293     NFS_Free( storage );
294 }
295 
296 #if __TBB_STATISTICS
dump_arena_statistics()297 void arena::dump_arena_statistics () {
298     statistics_counters total;
299     for( unsigned i = 0; i < my_num_slots; ++i ) {
300 #if __TBB_STATISTICS_EARLY_DUMP
301         generic_scheduler* s = my_slots[i].my_scheduler;
302         if ( s )
303             *my_slots[i].my_counters += s->my_counters;
304 #else
305         __TBB_ASSERT( !my_slots[i].my_scheduler, NULL );
306 #endif
307         if ( i != 0 ) {
308             total += *my_slots[i].my_counters;
309             dump_statistics( *my_slots[i].my_counters, i );
310         }
311     }
312     dump_statistics( *my_slots[0].my_counters, 0 );
313 #if __TBB_STATISTICS_STDOUT
314 #if !__TBB_STATISTICS_TOTALS_ONLY
315     printf( "----------------------------------------------\n" );
316 #endif
317     dump_statistics( total, workers_counters_total );
318     total += *my_slots[0].my_counters;
319     dump_statistics( total, arena_counters_total );
320 #if !__TBB_STATISTICS_TOTALS_ONLY
321     printf( "==============================================\n" );
322 #endif
323 #endif /* __TBB_STATISTICS_STDOUT */
324 }
325 #endif /* __TBB_STATISTICS */
326 
327 #if __TBB_TASK_PRIORITY
328 // The method inspects a scheduler to determine:
329 // 1. if it has tasks that can be retrieved and executed (via the return value);
330 // 2. if it has any tasks at all, including those of lower priority (via tasks_present);
331 // 3. if it is able to work with enqueued tasks (via dequeuing_possible).
may_have_tasks(generic_scheduler * s,bool & tasks_present,bool & dequeuing_possible)332 inline bool arena::may_have_tasks ( generic_scheduler* s, bool& tasks_present, bool& dequeuing_possible ) {
333     if ( !s || s->my_arena != this )
334         return false;
335     dequeuing_possible |= s->worker_outermost_level();
336     if ( s->my_pool_reshuffling_pending ) {
337         // This primary task pool is nonempty and may contain tasks at the current
338         // priority level. Its owner is winnowing lower priority tasks at the moment.
339         tasks_present = true;
340         return true;
341     }
342     if ( s->my_offloaded_tasks ) {
343         tasks_present = true;
344         if ( s->my_local_reload_epoch < *s->my_ref_reload_epoch ) {
345             // This scheduler's offload area is nonempty and may contain tasks at the
346             // current priority level.
347             return true;
348         }
349     }
350     return false;
351 }
352 
orphan_offloaded_tasks(generic_scheduler & s)353 void arena::orphan_offloaded_tasks(generic_scheduler& s) {
354     __TBB_ASSERT( s.my_offloaded_tasks, NULL );
355     GATHER_STATISTIC( ++s.my_counters.prio_orphanings );
356     ++my_abandonment_epoch;
357     __TBB_ASSERT( s.my_offloaded_task_list_tail_link && !*s.my_offloaded_task_list_tail_link, NULL );
358     task* orphans;
359     do {
360         orphans = const_cast<task*>(my_orphaned_tasks);
361         *s.my_offloaded_task_list_tail_link = orphans;
362     } while ( as_atomic(my_orphaned_tasks).compare_and_swap(s.my_offloaded_tasks, orphans) != orphans );
363     s.my_offloaded_tasks = NULL;
364 #if TBB_USE_ASSERT
365     s.my_offloaded_task_list_tail_link = NULL;
366 #endif /* TBB_USE_ASSERT */
367 }
368 #endif /* __TBB_TASK_PRIORITY */
369 
has_enqueued_tasks()370 bool arena::has_enqueued_tasks() {
371     // Look for enqueued tasks at all priority levels
372     for ( int p = 0; p < num_priority_levels; ++p )
373         if ( !my_task_stream.empty(p) )
374             return true;
375     return false;
376 }
377 
restore_priority_if_need()378 void arena::restore_priority_if_need() {
379     // Check for the presence of enqueued tasks "lost" on some of
380     // priority levels because updating arena priority and switching
381     // arena into "populated" (FULL) state happen non-atomically.
382     // Imposing atomicity would require task::enqueue() to use a lock,
383     // which is unacceptable.
384     if ( has_enqueued_tasks() ) {
385         advertise_new_work<work_enqueued>();
386 #if __TBB_TASK_PRIORITY
387         // update_arena_priority() expects non-zero arena::my_num_workers_requested,
388         // so must be called after advertise_new_work<work_enqueued>()
389         for ( int p = 0; p < num_priority_levels; ++p )
390             if ( !my_task_stream.empty(p) ) {
391                 if ( p < my_bottom_priority || p > my_top_priority )
392                     my_market->update_arena_priority(*this, p);
393             }
394 #endif
395     }
396 }
397 
is_out_of_work()398 bool arena::is_out_of_work() {
399     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
400     for(;;) {
401         pool_state_t snapshot = my_pool_state;
402         switch( snapshot ) {
403             case SNAPSHOT_EMPTY:
404                 return true;
405             case SNAPSHOT_FULL: {
406                 // Use unique id for "busy" in order to avoid ABA problems.
407                 const pool_state_t busy = pool_state_t(&busy);
408                 // Request permission to take snapshot
409                 if( my_pool_state.compare_and_swap( busy, SNAPSHOT_FULL )==SNAPSHOT_FULL ) {
410                     // Got permission. Take the snapshot.
411                     // NOTE: This is not a lock, as the state can be set to FULL at
412                     //       any moment by a thread that spawns/enqueues new task.
413                     size_t n = my_limit;
414                     // Make local copies of volatile parameters. Their change during
415                     // snapshot taking procedure invalidates the attempt, and returns
416                     // this thread into the dispatch loop.
417 #if __TBB_TASK_PRIORITY
418                     uintptr_t reload_epoch = __TBB_load_with_acquire( my_reload_epoch );
419                     intptr_t top_priority = my_top_priority;
420                     // Inspect primary task pools first
421 #endif /* __TBB_TASK_PRIORITY */
422                     size_t k;
423                     for( k=0; k<n; ++k ) {
424                         if( my_slots[k].task_pool != EmptyTaskPool &&
425                             __TBB_load_relaxed(my_slots[k].head) < __TBB_load_relaxed(my_slots[k].tail) )
426                         {
427                             // k-th primary task pool is nonempty and does contain tasks.
428                             break;
429                         }
430                         if( my_pool_state!=busy )
431                             return false; // the work was published
432                     }
433                     __TBB_ASSERT( k <= n, NULL );
434                     bool work_absent = k == n;
435 #if __TBB_TASK_PRIORITY
436                     // Variable tasks_present indicates presence of tasks at any priority
437                     // level, while work_absent refers only to the current priority.
438                     bool tasks_present = !work_absent || my_orphaned_tasks;
439                     bool dequeuing_possible = false;
440                     if ( work_absent ) {
441                         // Check for the possibility that recent priority changes
442                         // brought some tasks to the current priority level
443 
444                         uintptr_t abandonment_epoch = my_abandonment_epoch;
445                         // Master thread's scheduler needs special handling as it
446                         // may be destroyed at any moment (workers' schedulers are
447                         // guaranteed to be alive while at least one thread is in arena).
448                         // The lock below excludes concurrency with task group state change
449                         // propagation and guarantees lifetime of the master thread.
450                         the_context_state_propagation_mutex.lock();
451                         work_absent = !may_have_tasks( my_slots[0].my_scheduler, tasks_present, dequeuing_possible );
452                         the_context_state_propagation_mutex.unlock();
453                         // The following loop is subject to data races. While k-th slot's
454                         // scheduler is being examined, corresponding worker can either
455                         // leave to RML or migrate to another arena.
456                         // But the races are not prevented because all of them are benign.
457                         // First, the code relies on the fact that worker thread's scheduler
458                         // object persists until the whole library is deinitialized.
459                         // Second, in the worst case the races can only cause another
460                         // round of stealing attempts to be undertaken. Introducing complex
461                         // synchronization into this coldest part of the scheduler's control
462                         // flow does not seem to make sense because it both is unlikely to
463                         // ever have any observable performance effect, and will require
464                         // additional synchronization code on the hotter paths.
465                         for( k = 1; work_absent && k < n; ++k ) {
466                             if( my_pool_state!=busy )
467                                 return false; // the work was published
468                             work_absent = !may_have_tasks( my_slots[k].my_scheduler, tasks_present, dequeuing_possible );
469                         }
470                         // Preclude premature switching arena off because of a race in the previous loop.
471                         work_absent = work_absent
472                                       && !__TBB_load_with_acquire(my_orphaned_tasks)
473                                       && abandonment_epoch == my_abandonment_epoch;
474                     }
475 #endif /* __TBB_TASK_PRIORITY */
476                     // Test and test-and-set.
477                     if( my_pool_state==busy ) {
478 #if __TBB_TASK_PRIORITY
479                         bool no_fifo_tasks = my_task_stream.empty(top_priority);
480                         work_absent = work_absent && (!dequeuing_possible || no_fifo_tasks)
481                                       && top_priority == my_top_priority && reload_epoch == my_reload_epoch;
482 #else
483                         bool no_fifo_tasks = my_task_stream.empty(0);
484                         work_absent = work_absent && no_fifo_tasks;
485 #endif /* __TBB_TASK_PRIORITY */
486                         if( work_absent ) {
487 #if __TBB_TASK_PRIORITY
488                             if ( top_priority > my_bottom_priority ) {
489                                 if ( my_market->lower_arena_priority(*this, top_priority - 1, reload_epoch)
490                                      && !my_task_stream.empty(top_priority) )
491                                 {
492                                     atomic_update( my_skipped_fifo_priority, top_priority, std::less<intptr_t>());
493                                 }
494                             }
495                             else if ( !tasks_present && !my_orphaned_tasks && no_fifo_tasks ) {
496 #endif /* __TBB_TASK_PRIORITY */
497                                 // save current demand value before setting SNAPSHOT_EMPTY,
498                                 // to avoid race with advertise_new_work.
499                                 int current_demand = (int)my_max_num_workers;
500                                 if( my_pool_state.compare_and_swap( SNAPSHOT_EMPTY, busy )==busy ) {
501 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
502                                     if( my_concurrency_mode==cm_enforced_global  ) {
503                                         // adjust_demand() called inside, if needed
504                                         my_market->mandatory_concurrency_disable( this );
505                                     } else
506 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
507                                     {
508                                         // This thread transitioned pool to empty state, and thus is
509                                         // responsible for telling the market that there is no work to do.
510                                         my_market->adjust_demand( *this, -current_demand );
511                                     }
512                                     restore_priority_if_need();
513                                     return true;
514                                 }
515                                 return false;
516 #if __TBB_TASK_PRIORITY
517                             }
518 #endif /* __TBB_TASK_PRIORITY */
519                         }
520                         // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
521                         my_pool_state.compare_and_swap( SNAPSHOT_FULL, busy );
522                     }
523                 }
524                 return false;
525             }
526             default:
527                 // Another thread is taking a snapshot.
528                 return false;
529         }
530     }
531 }
532 
533 #if __TBB_COUNT_TASK_NODES
workers_task_node_count()534 intptr_t arena::workers_task_node_count() {
535     intptr_t result = 0;
536     for( unsigned i = 1; i < my_num_slots; ++i ) {
537         generic_scheduler* s = my_slots[i].my_scheduler;
538         if( s )
539             result += s->my_task_node_count;
540     }
541     return result;
542 }
543 #endif /* __TBB_COUNT_TASK_NODES */
544 
enqueue_task(task & t,intptr_t prio,FastRandom & random)545 void arena::enqueue_task( task& t, intptr_t prio, FastRandom &random )
546 {
547 #if __TBB_RECYCLE_TO_ENQUEUE
548     __TBB_ASSERT( t.state()==task::allocated || t.state()==task::to_enqueue, "attempt to enqueue task with inappropriate state" );
549 #else
550     __TBB_ASSERT( t.state()==task::allocated, "attempt to enqueue task that is not in 'allocated' state" );
551 #endif
552     t.prefix().state = task::ready;
553     t.prefix().extra_state |= es_task_enqueued; // enqueued task marker
554 
555 #if TBB_USE_ASSERT
556     if( task* parent = t.parent() ) {
557         internal::reference_count ref_count = parent->prefix().ref_count;
558         __TBB_ASSERT( ref_count!=0, "attempt to enqueue task whose parent has a ref_count==0 (forgot to set_ref_count?)" );
559         __TBB_ASSERT( ref_count>0, "attempt to enqueue task whose parent has a ref_count<0" );
560         parent->prefix().extra_state |= es_ref_count_active;
561     }
562     __TBB_ASSERT(t.prefix().affinity==affinity_id(0), "affinity is ignored for enqueued tasks");
563 #endif /* TBB_USE_ASSERT */
564 
565     ITT_NOTIFY(sync_releasing, &my_task_stream);
566 #if __TBB_TASK_PRIORITY
567     intptr_t p = prio ? normalize_priority(priority_t(prio)) : normalized_normal_priority;
568     assert_priority_valid(p);
569     my_task_stream.push( &t, p, random );
570     if ( p != my_top_priority )
571         my_market->update_arena_priority( *this, p );
572 #else /* !__TBB_TASK_PRIORITY */
573     __TBB_ASSERT_EX(prio == 0, "the library is not configured to respect the task priority");
574     my_task_stream.push( &t, 0, random );
575 #endif /* !__TBB_TASK_PRIORITY */
576     advertise_new_work<work_enqueued>();
577 #if __TBB_TASK_PRIORITY
578     if ( p != my_top_priority )
579         my_market->update_arena_priority( *this, p );
580 #endif /* __TBB_TASK_PRIORITY */
581 }
582 
583 class nested_arena_context : no_copy {
584 public:
nested_arena_context(generic_scheduler * s,arena * a,size_t slot_index,bool type,bool same)585     nested_arena_context(generic_scheduler *s, arena* a, size_t slot_index, bool type, bool same)
586         : my_scheduler(*s), my_orig_ctx(NULL), same_arena(same) {
587         if (same_arena) {
588             my_orig_state.my_properties = my_scheduler.my_properties;
589             my_orig_state.my_innermost_running_task = my_scheduler.my_innermost_running_task;
590             mimic_outermost_level(a, type);
591         } else {
592             my_orig_state = *s;
593             mimic_outermost_level(a, type);
594             s->nested_arena_entry(a, slot_index);
595         }
596     }
~nested_arena_context()597     ~nested_arena_context() {
598 #if __TBB_TASK_GROUP_CONTEXT
599         my_scheduler.my_dummy_task->prefix().context = my_orig_ctx; // restore context of dummy task
600 #endif
601         if (same_arena) {
602             my_scheduler.my_properties = my_orig_state.my_properties;
603             my_scheduler.my_innermost_running_task = my_orig_state.my_innermost_running_task;
604         } else {
605             my_scheduler.nested_arena_exit();
606             static_cast<scheduler_state&>(my_scheduler) = my_orig_state; // restore arena settings
607 #if __TBB_TASK_PRIORITY
608             my_scheduler.my_local_reload_epoch = *my_orig_state.my_ref_reload_epoch;
609 #endif
610             governor::assume_scheduler(&my_scheduler);
611         }
612     }
613 
614 private:
615     generic_scheduler &my_scheduler;
616     scheduler_state my_orig_state;
617     task_group_context *my_orig_ctx;
618     const bool same_arena;
619 
mimic_outermost_level(arena * a,bool type)620     void mimic_outermost_level(arena* a, bool type) {
621         my_scheduler.my_properties.outermost = true;
622         my_scheduler.my_properties.type = type;
623         my_scheduler.my_innermost_running_task = my_scheduler.my_dummy_task;
624 #if __TBB_TASK_GROUP_CONTEXT
625         // Save dummy's context and replace it by arena's context
626         my_orig_ctx = my_scheduler.my_dummy_task->prefix().context;
627         my_scheduler.my_dummy_task->prefix().context = a->my_default_ctx;
628 #endif
629     }
630 };
631 
nested_arena_entry(arena * a,size_t slot_index)632 void generic_scheduler::nested_arena_entry(arena* a, size_t slot_index) {
633     __TBB_ASSERT( is_alive(a->my_guard), NULL );
634     __TBB_ASSERT( a!=my_arena, NULL);
635 
636     // overwrite arena settings
637 #if __TBB_TASK_PRIORITY
638     if ( my_offloaded_tasks )
639         my_arena->orphan_offloaded_tasks( *this );
640     my_offloaded_tasks = NULL;
641 #endif /* __TBB_TASK_PRIORITY */
642     attach_arena( a, slot_index, /*is_master*/true );
643     __TBB_ASSERT( my_arena == a, NULL );
644     governor::assume_scheduler( this );
645     // TODO? ITT_NOTIFY(sync_acquired, a->my_slots + index);
646     // TODO: it requires market to have P workers (not P-1)
647     // TODO: a preempted worker should be excluded from assignment to other arenas e.g. my_slack--
648     if( !is_worker() && slot_index >= my_arena->my_num_reserved_slots )
649         my_arena->my_market->adjust_demand(*my_arena, -1);
650 #if __TBB_ARENA_OBSERVER
651     my_last_local_observer = 0; // TODO: try optimize number of calls
652     my_arena->my_observers.notify_entry_observers( my_last_local_observer, /*worker=*/false );
653 #endif
654 }
655 
nested_arena_exit()656 void generic_scheduler::nested_arena_exit() {
657 #if __TBB_ARENA_OBSERVER
658     my_arena->my_observers.notify_exit_observers( my_last_local_observer, /*worker=*/false );
659 #endif /* __TBB_ARENA_OBSERVER */
660 #if __TBB_TASK_PRIORITY
661     if ( my_offloaded_tasks )
662         my_arena->orphan_offloaded_tasks( *this );
663 #endif
664     if( !is_worker() && my_arena_index >= my_arena->my_num_reserved_slots )
665         my_arena->my_market->adjust_demand(*my_arena, 1);
666     // Free the master slot.
667     __TBB_ASSERT(my_arena->my_slots[my_arena_index].my_scheduler, "A slot is already empty");
668     __TBB_store_with_release(my_arena->my_slots[my_arena_index].my_scheduler, (generic_scheduler*)NULL);
669     my_arena->my_exit_monitors.notify_one(); // do not relax!
670 }
671 
wait_until_empty()672 void generic_scheduler::wait_until_empty() {
673     my_dummy_task->prefix().ref_count++; // prevents exit from local_wait_for_all when local work is done enforcing the stealing
674     while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY )
675         local_wait_for_all(*my_dummy_task, NULL);
676     my_dummy_task->prefix().ref_count--;
677 }
678 
679 } // namespace internal
680 } // namespace tbb
681 
682 #include "scheduler_utility.h"
683 #include "tbb/task_arena.h" // task_arena_base
684 
685 namespace tbb {
686 namespace interface7 {
687 namespace internal {
688 
internal_initialize()689 void task_arena_base::internal_initialize( ) {
690     governor::one_time_init();
691     if( my_max_concurrency < 1 )
692         my_max_concurrency = (int)governor::default_num_threads();
693     __TBB_ASSERT( my_master_slots <= (unsigned)my_max_concurrency, "Number of slots reserved for master should not exceed arena concurrency");
694     arena* new_arena = market::create_arena( my_max_concurrency, my_master_slots, 0 );
695     // add an internal market reference; a public reference was added in create_arena
696     market &m = market::global_market( /*is_public=*/false );
697     // allocate default context for task_arena
698 #if __TBB_TASK_GROUP_CONTEXT
699     new_arena->my_default_ctx = new ( NFS_Allocate(1, sizeof(task_group_context), NULL) )
700             task_group_context( task_group_context::isolated, task_group_context::default_traits );
701 #if __TBB_FP_CONTEXT
702     new_arena->my_default_ctx->capture_fp_settings();
703 #endif
704 #endif /* __TBB_TASK_GROUP_CONTEXT */
705     // threads might race to initialize the arena
706     if(as_atomic(my_arena).compare_and_swap(new_arena, NULL) != NULL) {
707         __TBB_ASSERT(my_arena, NULL); // another thread won the race
708         // release public market reference
709         m.release( /*is_public=*/true, /*blocking_terminate=*/false );
710         new_arena->on_thread_leaving<arena::ref_external>(); // destroy unneeded arena
711 #if __TBB_TASK_GROUP_CONTEXT
712         spin_wait_while_eq(my_context, (task_group_context*)NULL);
713     } else {
714         new_arena->my_default_ctx->my_version_and_traits |= my_version_and_traits & exact_exception_flag;
715         as_atomic(my_context) = new_arena->my_default_ctx;
716 #endif
717     }
718     // TODO: should it trigger automatic initialization of this thread?
719     governor::local_scheduler_weak();
720 }
721 
internal_terminate()722 void task_arena_base::internal_terminate( ) {
723     if( my_arena ) {// task_arena was initialized
724         my_arena->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
725         my_arena->on_thread_leaving<arena::ref_external>();
726         my_arena = 0;
727 #if __TBB_TASK_GROUP_CONTEXT
728         my_context = 0;
729 #endif
730     }
731 }
732 
internal_attach()733 void task_arena_base::internal_attach( ) {
734     __TBB_ASSERT(!my_arena, NULL);
735     generic_scheduler* s = governor::local_scheduler_if_initialized();
736     if( s && s->my_arena ) {
737         // There is an active arena to attach to.
738         // It's still used by s, so won't be destroyed right away.
739         my_arena = s->my_arena;
740         __TBB_ASSERT( my_arena->my_references > 0, NULL );
741         my_arena->my_references += arena::ref_external;
742 #if __TBB_TASK_GROUP_CONTEXT
743         my_context = my_arena->my_default_ctx;
744         my_version_and_traits |= my_context->my_version_and_traits & exact_exception_flag;
745 #endif
746         my_master_slots = my_arena->my_num_reserved_slots;
747         my_max_concurrency = my_master_slots + my_arena->my_max_num_workers;
748         __TBB_ASSERT(arena::num_arena_slots(my_max_concurrency)==my_arena->my_num_slots, NULL);
749         // increases market's ref count for task_arena
750         market::global_market( /*is_public=*/true );
751     }
752 }
753 
internal_enqueue(task & t,intptr_t prio) const754 void task_arena_base::internal_enqueue( task& t, intptr_t prio ) const {
755     __TBB_ASSERT(my_arena, NULL);
756     generic_scheduler* s = governor::local_scheduler_if_initialized();
757     __TBB_ASSERT(s, "Scheduler is not initialized"); // we allocated a task so can expect the scheduler
758 #if __TBB_TASK_GROUP_CONTEXT
759     __TBB_ASSERT(my_arena->my_default_ctx == t.prefix().context, NULL);
760     __TBB_ASSERT(!my_arena->my_default_ctx->is_group_execution_cancelled(), // TODO: any better idea?
761                  "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?");
762 #endif
763     my_arena->enqueue_task( t, prio, s->my_random );
764 }
765 
766 class delegated_task : public task {
767     internal::delegate_base & my_delegate;
768     concurrent_monitor & my_monitor;
769     task * my_root;
execute()770     task* execute() __TBB_override {
771         generic_scheduler& s = *(generic_scheduler*)prefix().owner;
772         __TBB_ASSERT(s.outermost_level(), "expected to be enqueued and received on the outermost level");
773         struct outermost_context : internal::no_copy {
774             delegated_task * t;
775             generic_scheduler & s;
776             task * orig_dummy;
777             task_group_context * orig_ctx;
778             scheduler_properties orig_props;
779             outermost_context(delegated_task *_t, generic_scheduler &_s)
780                 : t(_t), s(_s), orig_dummy(s.my_dummy_task), orig_props(s.my_properties) {
781                 __TBB_ASSERT(s.my_innermost_running_task == t, NULL);
782 #if __TBB_TASK_GROUP_CONTEXT
783                 orig_ctx = t->prefix().context;
784                 t->prefix().context = s.my_arena->my_default_ctx;
785 #endif
786                 // Mimics outermost master
787                 s.my_dummy_task = t;
788                 s.my_properties.type = scheduler_properties::master;
789             }
790             ~outermost_context() {
791 #if __TBB_TASK_GROUP_CONTEXT
792                 // Restore context for sake of registering potential exception
793                 t->prefix().context = orig_ctx;
794 #endif
795                 s.my_properties = orig_props;
796                 s.my_dummy_task = orig_dummy;
797             }
798         } scope(this, s);
799         my_delegate();
800         return NULL;
801     }
~delegated_task()802     ~delegated_task() {
803         // potential exception was already registered. It must happen before the notification
804         __TBB_ASSERT(my_root->ref_count()==2, NULL);
805         __TBB_store_with_release(my_root->prefix().ref_count, 1); // must precede the wakeup
806         my_monitor.notify(*this); // do not relax, it needs a fence!
807     }
808 public:
delegated_task(internal::delegate_base & d,concurrent_monitor & s,task * t)809     delegated_task( internal::delegate_base & d, concurrent_monitor & s, task * t )
810         : my_delegate(d), my_monitor(s), my_root(t) {}
811     // predicate for concurrent_monitor notification
operator ()(uintptr_t ctx) const812     bool operator()(uintptr_t ctx) const { return (void*)ctx == (void*)&my_delegate; }
813 };
814 
internal_execute(internal::delegate_base & d) const815 void task_arena_base::internal_execute(internal::delegate_base& d) const {
816     __TBB_ASSERT(my_arena, NULL);
817     generic_scheduler* s = governor::local_scheduler_weak();
818     __TBB_ASSERT(s, "Scheduler is not initialized");
819 
820     bool same_arena = s->my_arena == my_arena;
821     size_t index1 = s->my_arena_index;
822     if (!same_arena) {
823         index1 = my_arena->occupy_free_slot</* as_worker*/false>(*s);
824         if (index1 == arena::out_of_arena) {
825 
826 #if __TBB_USE_OPTIONAL_RTTI
827             // Workaround for the bug inside graph. If the thread can not occupy arena slot during task_arena::execute()
828             // and all aggregator operations depend on this task completion (all other threads are inside arena already)
829             // deadlock appears, because enqueued task will never enter arena.
830             // Workaround: check if the task came from graph via RTTI (casting to graph::spawn_functor)
831             // and enqueue this task with non-blocking internal_enqueue method.
832             // TODO: have to change behaviour later in next GOLD release (maybe to add new library entry point - try_execute)
833             typedef tbb::flow::interface10::graph::spawn_functor graph_funct;
834             internal::delegated_function< graph_funct, void >* deleg_funct =
835                     dynamic_cast< internal::delegated_function< graph_funct, void>* >(&d);
836 
837             if (deleg_funct) {
838                 internal_enqueue(*new(task::allocate_root(*my_context))
839                     internal::function_task< internal::strip< graph_funct >::type >
840                         (internal::forward< graph_funct >(deleg_funct->my_func)), 0);
841                 return;
842             } else {
843 #endif
844                 concurrent_monitor::thread_context waiter;
845 #if __TBB_TASK_GROUP_CONTEXT
846                 task_group_context exec_context(task_group_context::isolated, my_version_and_traits & exact_exception_flag);
847 #if __TBB_FP_CONTEXT
848                 exec_context.copy_fp_settings(*my_context);
849 #endif
850 #endif
851                 auto_empty_task root(__TBB_CONTEXT_ARG(s, &exec_context));
852                 root.prefix().ref_count = 2;
853                 my_arena->enqueue_task(*new(task::allocate_root(__TBB_CONTEXT_ARG1(exec_context)))
854                     delegated_task(d, my_arena->my_exit_monitors, &root),
855                     0, s->my_random); // TODO: priority?
856                 size_t index2 = arena::out_of_arena;
857                 do {
858                     my_arena->my_exit_monitors.prepare_wait(waiter, (uintptr_t)&d);
859                     if (__TBB_load_with_acquire(root.prefix().ref_count) < 2) {
860                         my_arena->my_exit_monitors.cancel_wait(waiter);
861                         break;
862                     }
863                     index2 = my_arena->occupy_free_slot</*as_worker*/false>(*s);
864                     if (index2 != arena::out_of_arena) {
865                         my_arena->my_exit_monitors.cancel_wait(waiter);
866                         nested_arena_context scope(s, my_arena, index2, scheduler_properties::master, same_arena);
867                         s->local_wait_for_all(root, NULL);
868 #if TBB_USE_EXCEPTIONS
869                         __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred
870 #endif
871                         __TBB_ASSERT(root.prefix().ref_count == 0, NULL);
872                         break;
873                     }
874                     my_arena->my_exit_monitors.commit_wait(waiter);
875                 } while (__TBB_load_with_acquire(root.prefix().ref_count) == 2);
876                 if (index2 == arena::out_of_arena) {
877                     // notify a waiting thread even if this thread did not enter arena,
878                     // in case it was woken by a leaving thread but did not need to enter
879                     my_arena->my_exit_monitors.notify_one(); // do not relax!
880                 }
881 #if TBB_USE_EXCEPTIONS
882                 // process possible exception
883                 if (task_group_context::exception_container_type *pe = exec_context.my_exception)
884                     TbbRethrowException(pe);
885 #endif
886                 return;
887 #if __TBB_USE_OPTIONAL_RTTI
888             } // if task came from graph
889 #endif
890         } // if (index1 == arena::out_of_arena)
891     } // if (!same_arena)
892 
893     cpu_ctl_env_helper cpu_ctl_helper;
894     cpu_ctl_helper.set_env(__TBB_CONTEXT_ARG1(my_context));
895 #if TBB_USE_EXCEPTIONS
896     try {
897 #endif
898         //TODO: replace dummy tasks for workers as well to avoid using of the_dummy_context
899         nested_arena_context scope(s, my_arena, index1, scheduler_properties::master, same_arena);
900         d();
901 #if TBB_USE_EXCEPTIONS
902     }
903     catch (...) {
904         cpu_ctl_helper.restore_default(); // TODO: is it needed on Windows?
905         if (my_version_and_traits & exact_exception_flag) throw;
906         else {
907             task_group_context exception_container(task_group_context::isolated,
908                 task_group_context::default_traits & ~task_group_context::exact_exception);
909             exception_container.register_pending_exception();
910             __TBB_ASSERT(exception_container.my_exception, NULL);
911             TbbRethrowException(exception_container.my_exception);
912         }
913     }
914 #endif
915 }
916 
917 // this wait task is a temporary approach to wait for arena emptiness for masters without slots
918 // TODO: it will be rather reworked for one source of notification from is_out_of_work
919 class wait_task : public task {
920     binary_semaphore & my_signal;
execute()921     task* execute() __TBB_override {
922         generic_scheduler* s = governor::local_scheduler_if_initialized();
923         __TBB_ASSERT( s, NULL );
924         __TBB_ASSERT( s->outermost_level(), "The enqueued task can be processed only on outermost level" );
925         if ( s->is_worker() ) {
926             __TBB_ASSERT( s->my_innermost_running_task == this, NULL );
927             // Mimic worker on outermost level to run remaining tasks
928             s->my_innermost_running_task = s->my_dummy_task;
929             s->local_wait_for_all( *s->my_dummy_task, NULL );
930             s->my_innermost_running_task = this;
931         } else s->my_arena->is_out_of_work(); // avoids starvation of internal_wait: issuing this task makes arena full
932         my_signal.V();
933         return NULL;
934     }
935 public:
wait_task(binary_semaphore & sema)936     wait_task ( binary_semaphore & sema ) : my_signal(sema) {}
937 };
938 
internal_wait() const939 void task_arena_base::internal_wait() const {
940     __TBB_ASSERT(my_arena, NULL);
941     generic_scheduler* s = governor::local_scheduler_weak();
942     __TBB_ASSERT(s, "Scheduler is not initialized");
943     __TBB_ASSERT(s->my_arena != my_arena || s->my_arena_index == 0, "task_arena::wait_until_empty() is not supported within a worker context" );
944     if( s->my_arena == my_arena ) {
945         //unsupported, but try do something for outermost master
946         __TBB_ASSERT(s->master_outermost_level(), "unsupported");
947         if( !s->my_arena_index )
948             while( my_arena->num_workers_active() )
949                 s->wait_until_empty();
950     } else for(;;) {
951         while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY ) {
952             if( !__TBB_load_with_acquire(my_arena->my_slots[0].my_scheduler) // TODO TEMP: one master, make more masters
953                 && as_atomic(my_arena->my_slots[0].my_scheduler).compare_and_swap(s, NULL) == NULL ) {
954                 nested_arena_context a(s, my_arena, 0, scheduler_properties::worker, false);
955                 s->wait_until_empty();
956             } else {
957                 binary_semaphore waiter; // TODO: replace by a single event notification from is_out_of_work
958                 internal_enqueue( *new( task::allocate_root(__TBB_CONTEXT_ARG1(*my_context)) ) wait_task(waiter), 0 ); // TODO: priority?
959                 waiter.P(); // TODO: concurrent_monitor
960             }
961         }
962         if( !my_arena->num_workers_active() && !my_arena->my_slots[0].my_scheduler) // no activity
963             break; // spin until workers active but avoid spinning in a worker
964         __TBB_Yield(); // wait until workers and master leave
965     }
966 }
967 
internal_current_slot()968 /*static*/ int task_arena_base::internal_current_slot() {
969     generic_scheduler* s = governor::local_scheduler_if_initialized();
970     return s? int(s->my_arena_index) : -1;
971 }
972 
973 #if __TBB_TASK_ISOLATION
974 class isolation_guard : tbb::internal::no_copy {
975     isolation_tag &guarded;
976     isolation_tag previous_value;
977 public:
isolation_guard(isolation_tag & isolation)978     isolation_guard( isolation_tag &isolation ) : guarded( isolation ), previous_value( isolation ) {}
~isolation_guard()979     ~isolation_guard() {
980         guarded = previous_value;
981     }
982 };
983 
isolate_within_arena(delegate_base & d,intptr_t reserved)984 void isolate_within_arena( delegate_base& d, intptr_t reserved ) {
985     (void)reserved;
986     __TBB_ASSERT( reserved == 0, NULL );
987     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
988     generic_scheduler* s = governor::local_scheduler_weak();
989     __TBB_ASSERT( s, "this_task_arena::isolate() needs an initialized scheduler" );
990     // Theoretically, we can keep the current isolation in the scheduler; however, it makes sense to store it in innermost
991     // running task because it can in principle be queried via task::self().
992     isolation_tag& current_isolation = s->my_innermost_running_task->prefix().isolation;
993     // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
994     isolation_guard guard( current_isolation );
995     current_isolation = reinterpret_cast<isolation_tag>(&d);
996     d();
997 }
998 #endif /* __TBB_TASK_ISOLATION */
999 
internal_max_concurrency(const task_arena * ta)1000 int task_arena_base::internal_max_concurrency(const task_arena *ta) {
1001     arena* a = NULL;
1002     if( ta ) // for special cases of ta->max_concurrency()
1003         a = ta->my_arena;
1004     else if( generic_scheduler* s = governor::local_scheduler_if_initialized() )
1005         a = s->my_arena; // the current arena if any
1006 
1007     if( a ) { // Get parameters from the arena
1008         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
1009         return a->my_num_reserved_slots + a->my_max_num_workers;
1010     } else {
1011         __TBB_ASSERT( !ta || ta->my_max_concurrency==automatic, NULL );
1012         return int(governor::default_num_threads());
1013     }
1014 }
1015 } // tbb::interfaceX::internal
1016 } // tbb::interfaceX
1017 } // tbb
1018