1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_arena_H
18 #define _TBB_arena_H
19 
20 #include <atomic>
21 #include <cstring>
22 
23 #include "oneapi/tbb/detail/_task.h"
24 
25 #include "scheduler_common.h"
26 #include "intrusive_list.h"
27 #include "task_stream.h"
28 #include "arena_slot.h"
29 #include "rml_tbb.h"
30 #include "mailbox.h"
31 #include "market.h"
32 #include "governor.h"
33 #include "concurrent_monitor.h"
34 #include "observer_proxy.h"
35 #include "oneapi/tbb/spin_mutex.h"
36 
37 namespace tbb {
38 namespace detail {
39 namespace r1 {
40 
41 class task_dispatcher;
42 class task_group_context;
43 class allocate_root_with_context_proxy;
44 
45 #if __TBB_ARENA_BINDING
46 class numa_binding_observer;
47 #endif /*__TBB_ARENA_BINDING*/
48 
49 //! Bounded coroutines cache LIFO ring buffer
50 class arena_co_cache {
51     //! Ring buffer storage
52     task_dispatcher** my_co_scheduler_cache;
53     //! Current cache index
54     unsigned my_head;
55     //! Cache capacity for arena
56     unsigned my_max_index;
57     //! Accessor lock for modification operations
58     tbb::spin_mutex my_co_cache_mutex;
59 
next_index()60     unsigned next_index() {
61         return ( my_head == my_max_index ) ? 0 : my_head + 1;
62     }
63 
prev_index()64     unsigned prev_index() {
65         return ( my_head == 0 ) ? my_max_index : my_head - 1;
66     }
67 
internal_empty()68     bool internal_empty() {
69         return my_co_scheduler_cache[prev_index()] == nullptr;
70     }
71 
internal_task_dispatcher_cleanup(task_dispatcher * to_cleanup)72     void internal_task_dispatcher_cleanup(task_dispatcher* to_cleanup) {
73         to_cleanup->~task_dispatcher();
74         cache_aligned_deallocate(to_cleanup);
75     }
76 
77 public:
init(unsigned cache_capacity)78     void init(unsigned cache_capacity) {
79         std::size_t alloc_size = cache_capacity * sizeof(task_dispatcher*);
80         my_co_scheduler_cache = (task_dispatcher**)cache_aligned_allocate(alloc_size);
81         std::memset( my_co_scheduler_cache, 0, alloc_size );
82         my_head = 0;
83         my_max_index = cache_capacity - 1;
84     }
85 
cleanup()86     void cleanup() {
87         while (task_dispatcher* to_cleanup = pop()) {
88             internal_task_dispatcher_cleanup(to_cleanup);
89         }
90         cache_aligned_deallocate(my_co_scheduler_cache);
91     }
92 
93     //! Insert scheduler to the current available place.
94     //! Replace an old value, if necessary.
push(task_dispatcher * s)95     void push(task_dispatcher* s) {
96         task_dispatcher* to_cleanup = nullptr;
97         {
98             tbb::spin_mutex::scoped_lock lock(my_co_cache_mutex);
99             // Check if we are replacing some existing buffer entrance
100             if (my_co_scheduler_cache[my_head] != nullptr) {
101                 to_cleanup = my_co_scheduler_cache[my_head];
102             }
103             // Store the cached value
104             my_co_scheduler_cache[my_head] = s;
105             // Move head index to the next slot
106             my_head = next_index();
107         }
108         // Cleanup replaced buffer if any
109         if (to_cleanup) {
110             internal_task_dispatcher_cleanup(to_cleanup);
111         }
112     }
113 
114     //! Get a cached scheduler if any
pop()115     task_dispatcher* pop() {
116         tbb::spin_mutex::scoped_lock lock(my_co_cache_mutex);
117         // No cached coroutine
118         if (internal_empty()) {
119             return nullptr;
120         }
121         // Move head index to the currently available value
122         my_head = prev_index();
123         // Retrieve the value from the buffer
124         task_dispatcher* to_return = my_co_scheduler_cache[my_head];
125         // Clear the previous entrance value
126         my_co_scheduler_cache[my_head] = nullptr;
127         return to_return;
128     }
129 };
130 
131 struct stack_anchor_type {
132     stack_anchor_type() = default;
133     stack_anchor_type(const stack_anchor_type&) = delete;
134 };
135 
136 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
137 class atomic_flag {
138     static const std::uintptr_t SET = 1;
139     static const std::uintptr_t EMPTY = 0;
140     std::atomic<std::uintptr_t> my_state;
141 public:
test_and_set()142     bool test_and_set() {
143         std::uintptr_t state = my_state.load(std::memory_order_acquire);
144         switch (state) {
145         case SET:
146             return false;
147         default: /* busy */
148             if (my_state.compare_exchange_strong(state, SET)) {
149                 // We interrupted clear transaction
150                 return false;
151             }
152             if (state != EMPTY) {
153                 // We lost our epoch
154                 return false;
155             }
156             // We are too late but still in the same epoch
157             __TBB_fallthrough;
158         case EMPTY:
159             return my_state.compare_exchange_strong(state, SET);
160         }
161     }
162     template <typename Pred>
try_clear_if(Pred && pred)163     bool try_clear_if(Pred&& pred) {
164         std::uintptr_t busy = std::uintptr_t(&busy);
165         std::uintptr_t state = my_state.load(std::memory_order_acquire);
166         if (state == SET && my_state.compare_exchange_strong(state, busy)) {
167             if (pred()) {
168                 return my_state.compare_exchange_strong(busy, EMPTY);
169             }
170             // The result of the next operation is discarded, always false should be returned.
171             my_state.compare_exchange_strong(busy, SET);
172         }
173         return false;
174     }
clear()175     void clear() {
176         my_state.store(EMPTY, std::memory_order_release);
177     }
test()178     bool test() {
179         return my_state.load(std::memory_order_acquire) != EMPTY;
180     }
181 };
182 #endif
183 
184 //! The structure of an arena, except the array of slots.
185 /** Separated in order to simplify padding.
186     Intrusive list node base class is used by market to form a list of arenas. **/
187 // TODO: Analyze arena_base cache lines placement
188 struct arena_base : padded<intrusive_list_node> {
189     //! The number of workers that have been marked out by the resource manager to service the arena.
190     std::atomic<unsigned> my_num_workers_allotted;   // heavy use in stealing loop
191 
192     //! Reference counter for the arena.
193     /** Worker and external thread references are counted separately: first several bits are for references
194         from external thread threads or explicit task_arenas (see arena::ref_external_bits below);
195         the rest counts the number of workers servicing the arena. */
196     std::atomic<unsigned> my_references;     // heavy use in stealing loop
197 
198     //! The maximal number of currently busy slots.
199     std::atomic<unsigned> my_limit;          // heavy use in stealing loop
200 
201     //! Task pool for the tasks scheduled via task::enqueue() method.
202     /** Such scheduling guarantees eventual execution even if
203         - new tasks are constantly coming (by extracting scheduled tasks in
204           relaxed FIFO order);
205         - the enqueuing thread does not call any of wait_for_all methods. **/
206     task_stream<front_accessor> my_fifo_task_stream; // heavy use in stealing loop
207 
208     //! Task pool for the tasks scheduled via tbb::resume() function.
209     task_stream<front_accessor> my_resume_task_stream; // heavy use in stealing loop
210 
211 #if __TBB_PREVIEW_CRITICAL_TASKS
212     //! Task pool for the tasks with critical property set.
213     /** Critical tasks are scheduled for execution ahead of other sources (including local task pool
214         and even bypassed tasks) unless the thread already executes a critical task in an outer
215         dispatch loop **/
216     // used on the hot path of the task dispatch loop
217     task_stream<back_nonnull_accessor> my_critical_task_stream;
218 #endif
219 
220     //! The number of workers requested by the external thread owning the arena.
221     unsigned my_max_num_workers;
222 
223     //! The total number of workers that are requested from the resource manager.
224     int my_total_num_workers_requested;
225 
226     //! The number of workers that are really requested from the resource manager.
227     //! Possible values are in [0, my_max_num_workers]
228     int my_num_workers_requested;
229 
230     //! The index in the array of per priority lists of arenas this object is in.
231     /*const*/ unsigned my_priority_level;
232 
233     //! The max priority level of arena in market.
234     std::atomic<bool> my_is_top_priority{false};
235 
236     //! Current task pool state and estimate of available tasks amount.
237     /** The estimate is either 0 (SNAPSHOT_EMPTY) or infinity (SNAPSHOT_FULL).
238         Special state is "busy" (any other unsigned value).
239         Note that the implementation of arena::is_busy_or_empty() requires
240         my_pool_state to be unsigned. */
241     using pool_state_t = std::uintptr_t ;
242     std::atomic<pool_state_t> my_pool_state;
243 
244     //! The list of local observers attached to this arena.
245     observer_list my_observers;
246 
247 #if __TBB_ARENA_BINDING
248     //! Pointer to internal observer that allows to bind threads in arena to certain NUMA node.
249     numa_binding_observer* my_numa_binding_observer;
250 #endif /*__TBB_ARENA_BINDING*/
251 
252     // Below are rarely modified members
253 
254     //! The market that owns this arena.
255     market* my_market;
256 
257     //! ABA prevention marker.
258     std::uintptr_t my_aba_epoch;
259 
260     //! Default task group context.
261     d1::task_group_context* my_default_ctx;
262 
263     //! The number of slots in the arena.
264     unsigned my_num_slots;
265 
266     //! The number of reserved slots (can be occupied only by external threads).
267     unsigned my_num_reserved_slots;
268 
269 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
270     // arena needs an extra worker despite a global limit
271     std::atomic<bool> my_global_concurrency_mode;
272 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
273 
274     //! Waiting object for external threads that cannot join the arena.
275     concurrent_monitor my_exit_monitors;
276 
277     //! Coroutines (task_dispathers) cache buffer
278     arena_co_cache my_co_cache;
279 
280 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
281     // arena needs an extra worker despite the arena limit
282     atomic_flag my_local_concurrency_flag;
283     // the number of local mandatory concurrency requests
284     int my_local_concurrency_requests;
285 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY*/
286 
287 #if TBB_USE_ASSERT
288     //! Used to trap accesses to the object after its destruction.
289     std::uintptr_t my_guard;
290 #endif /* TBB_USE_ASSERT */
291 }; // struct arena_base
292 
293 class arena: public padded<arena_base>
294 {
295 public:
296     using base_type = padded<arena_base>;
297 
298     //! Types of work advertised by advertise_new_work()
299     enum new_work_type {
300         work_spawned,
301         wakeup,
302         work_enqueued
303     };
304 
305     //! Constructor
306     arena ( market& m, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level);
307 
308     //! Allocate an instance of arena.
309     static arena& allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
310                                   unsigned priority_level );
311 
num_arena_slots(unsigned num_slots)312     static int unsigned num_arena_slots ( unsigned num_slots ) {
313         return max(2u, num_slots);
314     }
315 
allocation_size(unsigned num_slots)316     static int allocation_size ( unsigned num_slots ) {
317         return sizeof(base_type) + num_slots * (sizeof(mail_outbox) + sizeof(arena_slot) + sizeof(task_dispatcher));
318     }
319 
320     //! Get reference to mailbox corresponding to given slot_id
mailbox(d1::slot_id slot)321     mail_outbox& mailbox( d1::slot_id slot ) {
322         __TBB_ASSERT( slot != d1::no_slot, "affinity should be specified" );
323 
324         return reinterpret_cast<mail_outbox*>(this)[-(int)(slot+1)]; // cast to 'int' is redundant but left for readability
325     }
326 
327     //! Completes arena shutdown, destructs and deallocates it.
328     void free_arena ();
329 
330     //! No tasks to steal since last snapshot was taken
331     static const pool_state_t SNAPSHOT_EMPTY = 0;
332 
333     //! At least one task has been offered for stealing since the last snapshot started
334     static const pool_state_t SNAPSHOT_FULL = pool_state_t(-1);
335 
336     //! The number of least significant bits for external references
337     static const unsigned ref_external_bits = 12; // up to 4095 external and 1M workers
338 
339     //! Reference increment values for externals and workers
340     static const unsigned ref_external = 1;
341     static const unsigned ref_worker   = 1 << ref_external_bits;
342 
343     //! No tasks to steal or snapshot is being taken.
is_busy_or_empty(pool_state_t s)344     static bool is_busy_or_empty( pool_state_t s ) { return s < SNAPSHOT_FULL; }
345 
346     //! The number of workers active in the arena.
num_workers_active()347     unsigned num_workers_active() const {
348         return my_references.load(std::memory_order_acquire) >> ref_external_bits;
349     }
350 
351     //! Check if the recall is requested by the market.
is_recall_requested()352     bool is_recall_requested() const {
353         return num_workers_active() > my_num_workers_allotted.load(std::memory_order_relaxed);
354     }
355 
356     //! If necessary, raise a flag that there is new job in arena.
357     template<arena::new_work_type work_type> void advertise_new_work();
358 
359     //! Attempts to steal a task from a randomly chosen arena slot
360     d1::task* steal_task(unsigned arena_index, FastRandom& frnd, execution_data_ext& ed, isolation_type isolation);
361 
362     //! Get a task from a global starvation resistant queue
363     template<task_stream_accessor_type accessor>
364     d1::task* get_stream_task(task_stream<accessor>& stream, unsigned& hint);
365 
366 #if __TBB_PREVIEW_CRITICAL_TASKS
367     //! Tries to find a critical task in global critical task stream
368     d1::task* get_critical_task(unsigned& hint, isolation_type isolation);
369 #endif
370 
371     //! Check if there is job anywhere in arena.
372     /** Return true if no job or if arena is being cleaned up. */
373     bool is_out_of_work();
374 
375     //! enqueue a task into starvation-resistance queue
376     void enqueue_task(d1::task&, d1::task_group_context&, thread_data&);
377 
378     //! Registers the worker with the arena and enters TBB scheduler dispatch loop
379     void process(thread_data&);
380 
381     //! Notification that the thread leaves its arena
382     template<unsigned ref_param>
383     inline void on_thread_leaving ( );
384 
385     //! Check for the presence of enqueued tasks at all priority levels
386     bool has_enqueued_tasks();
387 
388     static const std::size_t out_of_arena = ~size_t(0);
389     //! Tries to occupy a slot in the arena. On success, returns the slot index; if no slot is available, returns out_of_arena.
390     template <bool as_worker>
391     std::size_t occupy_free_slot(thread_data&);
392     //! Tries to occupy a slot in the specified range.
393     std::size_t occupy_free_slot_in_range(thread_data& tls, std::size_t lower, std::size_t upper);
394 
395     std::uintptr_t calculate_stealing_threshold();
396 
397     /** Must be the last data field */
398     arena_slot my_slots[1];
399 }; // class arena
400 
401 template<unsigned ref_param>
on_thread_leaving()402 inline void arena::on_thread_leaving ( ) {
403     //
404     // Implementation of arena destruction synchronization logic contained various
405     // bugs/flaws at the different stages of its evolution, so below is a detailed
406     // description of the issues taken into consideration in the framework of the
407     // current design.
408     //
409     // In case of using fire-and-forget tasks (scheduled via task::enqueue())
410     // external thread is allowed to leave its arena before all its work is executed,
411     // and market may temporarily revoke all workers from this arena. Since revoked
412     // workers never attempt to reset arena state to EMPTY and cancel its request
413     // to RML for threads, the arena object is destroyed only when both the last
414     // thread is leaving it and arena's state is EMPTY (that is its external thread
415     // left and it does not contain any work).
416     // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not
417     // be done here (or anywhere else in the external thread to that matter); doing so
418     // can result either in arena's premature destruction (at least without
419     // additional costly checks in workers) or in unnecessary arena state changes
420     // (and ensuing workers migration).
421     //
422     // A worker that checks for work presence and transitions arena to the EMPTY
423     // state (in snapshot taking procedure arena::is_out_of_work()) updates
424     // arena::my_pool_state first and only then arena::my_num_workers_requested.
425     // So the check for work absence must be done against the latter field.
426     //
427     // In a time window between decrementing the active threads count and checking
428     // if there is an outstanding request for workers. New worker thread may arrive,
429     // finish remaining work, set arena state to empty, and leave decrementing its
430     // refcount and destroying. Then the current thread will destroy the arena
431     // the second time. To preclude it a local copy of the outstanding request
432     // value can be stored before decrementing active threads count.
433     //
434     // But this technique may cause two other problem. When the stored request is
435     // zero, it is possible that arena still has threads and they can generate new
436     // tasks and thus re-establish non-zero requests. Then all the threads can be
437     // revoked (as described above) leaving this thread the last one, and causing
438     // it to destroy non-empty arena.
439     //
440     // The other problem takes place when the stored request is non-zero. Another
441     // thread may complete the work, set arena state to empty, and leave without
442     // arena destruction before this thread decrements the refcount. This thread
443     // cannot destroy the arena either. Thus the arena may be "orphaned".
444     //
445     // In both cases we cannot dereference arena pointer after the refcount is
446     // decremented, as our arena may already be destroyed.
447     //
448     // If this is the external thread, the market is protected by refcount to it.
449     // In case of workers market's liveness is ensured by the RML connection
450     // rundown protocol, according to which the client (i.e. the market) lives
451     // until RML server notifies it about connection termination, and this
452     // notification is fired only after all workers return into RML.
453     //
454     // Thus if we decremented refcount to zero we ask the market to check arena
455     // state (including the fact if it is alive) under the lock.
456     //
457     std::uintptr_t aba_epoch = my_aba_epoch;
458     unsigned priority_level = my_priority_level;
459     market* m = my_market;
460     __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter");
461 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
462     // When there is no workers someone must free arena, as
463     // without workers, no one calls is_out_of_work().
464     // Skip workerless arenas because they have no demand for workers.
465     // TODO: consider more strict conditions for the cleanup,
466     // because it can create the demand of workers,
467     // but the arena can be already empty (and so ready for destroying)
468     // TODO: Fix the race: while we check soft limit and it might be changed.
469     if( ref_param==ref_external && my_num_slots != my_num_reserved_slots
470         && 0 == m->my_num_workers_soft_limit.load(std::memory_order_relaxed) &&
471         !my_global_concurrency_mode.load(std::memory_order_relaxed) ) {
472         is_out_of_work();
473         // We expect, that in worst case it's enough to have num_priority_levels-1
474         // calls to restore priorities and yet another is_out_of_work() to conform
475         // that no work was found. But as market::set_active_num_workers() can be called
476         // concurrently, can't guarantee last is_out_of_work() return true.
477     }
478 #endif
479 
480     // Release our reference to sync with arena destroy
481     unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param;
482     if (remaining_ref == 0) {
483         m->try_destroy_arena( this, aba_epoch, priority_level );
484     }
485 }
486 
487 template<arena::new_work_type work_type>
advertise_new_work()488 void arena::advertise_new_work() {
489     auto is_related_arena = [&] (market_context context) {
490         return this == context.my_arena_addr;
491     };
492 
493     if( work_type == work_enqueued ) {
494         atomic_fence(std::memory_order_seq_cst);
495 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
496         if ( my_market->my_num_workers_soft_limit.load(std::memory_order_acquire) == 0 &&
497             my_global_concurrency_mode.load(std::memory_order_acquire) == false )
498             my_market->enable_mandatory_concurrency(this);
499 
500         if (my_max_num_workers == 0 && my_num_reserved_slots == 1 && my_local_concurrency_flag.test_and_set()) {
501             my_market->adjust_demand(*this, /* delta = */ 1, /* mandatory = */ true);
502         }
503 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
504         // Local memory fence here and below is required to avoid missed wakeups; see the comment below.
505         // Starvation resistant tasks require concurrency, so missed wakeups are unacceptable.
506     }
507     else if( work_type == wakeup ) {
508         atomic_fence(std::memory_order_seq_cst);
509     }
510 
511     // Double-check idiom that, in case of spawning, is deliberately sloppy about memory fences.
512     // Technically, to avoid missed wakeups, there should be a full memory fence between the point we
513     // released the task pool (i.e. spawned task) and read the arena's state.  However, adding such a
514     // fence might hurt overall performance more than it helps, because the fence would be executed
515     // on every task pool release, even when stealing does not occur.  Since TBB allows parallelism,
516     // but never promises parallelism, the missed wakeup is not a correctness problem.
517     pool_state_t snapshot = my_pool_state.load(std::memory_order_acquire);
518     if( is_busy_or_empty(snapshot) ) {
519         // Attempt to mark as full.  The compare_and_swap below is a little unusual because the
520         // result is compared to a value that can be different than the comparand argument.
521         pool_state_t expected_state = snapshot;
522         my_pool_state.compare_exchange_strong( expected_state, SNAPSHOT_FULL );
523         if( expected_state == SNAPSHOT_EMPTY ) {
524             if( snapshot != SNAPSHOT_EMPTY ) {
525                 // This thread read "busy" into snapshot, and then another thread transitioned
526                 // my_pool_state to "empty" in the meantime, which caused the compare_and_swap above
527                 // to fail.  Attempt to transition my_pool_state from "empty" to "full".
528                 expected_state = SNAPSHOT_EMPTY;
529                 if( !my_pool_state.compare_exchange_strong( expected_state, SNAPSHOT_FULL ) ) {
530                     // Some other thread transitioned my_pool_state from "empty", and hence became
531                     // responsible for waking up workers.
532                     return;
533                 }
534             }
535             // This thread transitioned pool from empty to full state, and thus is responsible for
536             // telling the market that there is work to do.
537 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
538             if( work_type == work_spawned ) {
539                 if ( my_global_concurrency_mode.load(std::memory_order_acquire) == true )
540                     my_market->mandatory_concurrency_disable( this );
541             }
542 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
543             // TODO: investigate adjusting of arena's demand by a single worker.
544             my_market->adjust_demand(*this, my_max_num_workers, /* mandatory = */ false);
545 
546             // Notify all sleeping threads that work has appeared in the arena.
547             my_market->get_wait_list().notify(is_related_arena);
548         }
549     }
550 }
551 
steal_task(unsigned arena_index,FastRandom & frnd,execution_data_ext & ed,isolation_type isolation)552 inline d1::task* arena::steal_task(unsigned arena_index, FastRandom& frnd, execution_data_ext& ed, isolation_type isolation) {
553     auto slot_num_limit = my_limit.load(std::memory_order_relaxed);
554     if (slot_num_limit == 1) {
555         // No slots to steal from
556         return nullptr;
557     }
558     // Try to steal a task from a random victim.
559     std::size_t k = frnd.get() % (slot_num_limit - 1);
560     // The following condition excludes the external thread that might have
561     // already taken our previous place in the arena from the list .
562     // of potential victims. But since such a situation can take
563     // place only in case of significant oversubscription, keeping
564     // the checks simple seems to be preferable to complicating the code.
565     if (k >= arena_index) {
566         ++k; // Adjusts random distribution to exclude self
567     }
568     arena_slot* victim = &my_slots[k];
569     d1::task **pool = victim->task_pool.load(std::memory_order_relaxed);
570     d1::task *t = nullptr;
571     if (pool == EmptyTaskPool || !(t = victim->steal_task(*this, isolation, k))) {
572         return nullptr;
573     }
574     if (task_accessor::is_proxy_task(*t)) {
575         task_proxy &tp = *(task_proxy*)t;
576         d1::slot_id slot = tp.slot;
577         t = tp.extract_task<task_proxy::pool_bit>();
578         if (!t) {
579             // Proxy was empty, so it's our responsibility to free it
580             tp.allocator.delete_object(&tp, ed);
581             return nullptr;
582         }
583         // Note affinity is called for any stolen task (proxy or general)
584         ed.affinity_slot = slot;
585     } else {
586         // Note affinity is called for any stolen task (proxy or general)
587         ed.affinity_slot = d1::any_slot;
588     }
589     // Update task owner thread id to identify stealing
590     ed.original_slot = k;
591     return t;
592 }
593 
594 template<task_stream_accessor_type accessor>
get_stream_task(task_stream<accessor> & stream,unsigned & hint)595 inline d1::task* arena::get_stream_task(task_stream<accessor>& stream, unsigned& hint) {
596     if (stream.empty())
597         return nullptr;
598     return stream.pop(subsequent_lane_selector(hint));
599 }
600 
601 #if __TBB_PREVIEW_CRITICAL_TASKS
602 // Retrieves critical task respecting isolation level, if provided. The rule is:
603 // 1) If no outer critical task and no isolation => take any critical task
604 // 2) If working on an outer critical task and no isolation => cannot take any critical task
605 // 3) If no outer critical task but isolated => respect isolation
606 // 4) If working on an outer critical task and isolated => respect isolation
607 // Hint is used to keep some LIFO-ness, start search with the lane that was used during push operation.
get_critical_task(unsigned & hint,isolation_type isolation)608 inline d1::task* arena::get_critical_task(unsigned& hint, isolation_type isolation) {
609     if (my_critical_task_stream.empty())
610         return nullptr;
611 
612     if ( isolation != no_isolation ) {
613         return my_critical_task_stream.pop_specific( hint, isolation );
614     } else {
615         return my_critical_task_stream.pop(preceding_lane_selector(hint));
616     }
617 }
618 #endif // __TBB_PREVIEW_CRITICAL_TASKS
619 
620 } // namespace r1
621 } // namespace detail
622 } // namespace tbb
623 
624 #endif /* _TBB_arena_H */
625