1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef _TBB_arena_H
18 #define _TBB_arena_H
19
20 #include <atomic>
21 #include <cstring>
22
23 #include "oneapi/tbb/detail/_task.h"
24
25 #include "scheduler_common.h"
26 #include "intrusive_list.h"
27 #include "task_stream.h"
28 #include "arena_slot.h"
29 #include "rml_tbb.h"
30 #include "mailbox.h"
31 #include "market.h"
32 #include "governor.h"
33 #include "concurrent_monitor.h"
34 #include "observer_proxy.h"
35 #include "oneapi/tbb/spin_mutex.h"
36
37 namespace tbb {
38 namespace detail {
39 namespace r1 {
40
41 class task_dispatcher;
42 class task_group_context;
43 class allocate_root_with_context_proxy;
44
45 #if __TBB_ARENA_BINDING
46 class numa_binding_observer;
47 #endif /*__TBB_ARENA_BINDING*/
48
49 //! Bounded coroutines cache LIFO ring buffer
50 class arena_co_cache {
51 //! Ring buffer storage
52 task_dispatcher** my_co_scheduler_cache;
53 //! Current cache index
54 unsigned my_head;
55 //! Cache capacity for arena
56 unsigned my_max_index;
57 //! Accessor lock for modification operations
58 tbb::spin_mutex my_co_cache_mutex;
59
next_index()60 unsigned next_index() {
61 return ( my_head == my_max_index ) ? 0 : my_head + 1;
62 }
63
prev_index()64 unsigned prev_index() {
65 return ( my_head == 0 ) ? my_max_index : my_head - 1;
66 }
67
internal_empty()68 bool internal_empty() {
69 return my_co_scheduler_cache[prev_index()] == nullptr;
70 }
71
internal_task_dispatcher_cleanup(task_dispatcher * to_cleanup)72 void internal_task_dispatcher_cleanup(task_dispatcher* to_cleanup) {
73 to_cleanup->~task_dispatcher();
74 cache_aligned_deallocate(to_cleanup);
75 }
76
77 public:
init(unsigned cache_capacity)78 void init(unsigned cache_capacity) {
79 std::size_t alloc_size = cache_capacity * sizeof(task_dispatcher*);
80 my_co_scheduler_cache = (task_dispatcher**)cache_aligned_allocate(alloc_size);
81 std::memset( my_co_scheduler_cache, 0, alloc_size );
82 my_head = 0;
83 my_max_index = cache_capacity - 1;
84 }
85
cleanup()86 void cleanup() {
87 while (task_dispatcher* to_cleanup = pop()) {
88 internal_task_dispatcher_cleanup(to_cleanup);
89 }
90 cache_aligned_deallocate(my_co_scheduler_cache);
91 }
92
93 //! Insert scheduler to the current available place.
94 //! Replace an old value, if necessary.
push(task_dispatcher * s)95 void push(task_dispatcher* s) {
96 task_dispatcher* to_cleanup = nullptr;
97 {
98 tbb::spin_mutex::scoped_lock lock(my_co_cache_mutex);
99 // Check if we are replacing some existing buffer entrance
100 if (my_co_scheduler_cache[my_head] != nullptr) {
101 to_cleanup = my_co_scheduler_cache[my_head];
102 }
103 // Store the cached value
104 my_co_scheduler_cache[my_head] = s;
105 // Move head index to the next slot
106 my_head = next_index();
107 }
108 // Cleanup replaced buffer if any
109 if (to_cleanup) {
110 internal_task_dispatcher_cleanup(to_cleanup);
111 }
112 }
113
114 //! Get a cached scheduler if any
pop()115 task_dispatcher* pop() {
116 tbb::spin_mutex::scoped_lock lock(my_co_cache_mutex);
117 // No cached coroutine
118 if (internal_empty()) {
119 return nullptr;
120 }
121 // Move head index to the currently available value
122 my_head = prev_index();
123 // Retrieve the value from the buffer
124 task_dispatcher* to_return = my_co_scheduler_cache[my_head];
125 // Clear the previous entrance value
126 my_co_scheduler_cache[my_head] = nullptr;
127 return to_return;
128 }
129 };
130
131 struct stack_anchor_type {
132 stack_anchor_type() = default;
133 stack_anchor_type(const stack_anchor_type&) = delete;
134 };
135
136 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
137 class atomic_flag {
138 static const std::uintptr_t SET = 1;
139 static const std::uintptr_t EMPTY = 0;
140 std::atomic<std::uintptr_t> my_state;
141 public:
test_and_set()142 bool test_and_set() {
143 std::uintptr_t state = my_state.load(std::memory_order_acquire);
144 switch (state) {
145 case SET:
146 return false;
147 default: /* busy */
148 if (my_state.compare_exchange_strong(state, SET)) {
149 // We interrupted clear transaction
150 return false;
151 }
152 if (state != EMPTY) {
153 // We lost our epoch
154 return false;
155 }
156 // We are too late but still in the same epoch
157 __TBB_fallthrough;
158 case EMPTY:
159 return my_state.compare_exchange_strong(state, SET);
160 }
161 }
162 template <typename Pred>
try_clear_if(Pred && pred)163 bool try_clear_if(Pred&& pred) {
164 std::uintptr_t busy = std::uintptr_t(&busy);
165 std::uintptr_t state = my_state.load(std::memory_order_acquire);
166 if (state == SET && my_state.compare_exchange_strong(state, busy)) {
167 if (pred()) {
168 return my_state.compare_exchange_strong(busy, EMPTY);
169 }
170 // The result of the next operation is discarded, always false should be returned.
171 my_state.compare_exchange_strong(busy, SET);
172 }
173 return false;
174 }
clear()175 void clear() {
176 my_state.store(EMPTY, std::memory_order_release);
177 }
test()178 bool test() {
179 return my_state.load(std::memory_order_acquire) != EMPTY;
180 }
181 };
182 #endif
183
184 //! The structure of an arena, except the array of slots.
185 /** Separated in order to simplify padding.
186 Intrusive list node base class is used by market to form a list of arenas. **/
187 // TODO: Analyze arena_base cache lines placement
188 struct arena_base : padded<intrusive_list_node> {
189 //! The number of workers that have been marked out by the resource manager to service the arena.
190 std::atomic<unsigned> my_num_workers_allotted; // heavy use in stealing loop
191
192 //! Reference counter for the arena.
193 /** Worker and external thread references are counted separately: first several bits are for references
194 from external thread threads or explicit task_arenas (see arena::ref_external_bits below);
195 the rest counts the number of workers servicing the arena. */
196 std::atomic<unsigned> my_references; // heavy use in stealing loop
197
198 //! The maximal number of currently busy slots.
199 std::atomic<unsigned> my_limit; // heavy use in stealing loop
200
201 //! Task pool for the tasks scheduled via task::enqueue() method.
202 /** Such scheduling guarantees eventual execution even if
203 - new tasks are constantly coming (by extracting scheduled tasks in
204 relaxed FIFO order);
205 - the enqueuing thread does not call any of wait_for_all methods. **/
206 task_stream<front_accessor> my_fifo_task_stream; // heavy use in stealing loop
207
208 //! Task pool for the tasks scheduled via tbb::resume() function.
209 task_stream<front_accessor> my_resume_task_stream; // heavy use in stealing loop
210
211 #if __TBB_PREVIEW_CRITICAL_TASKS
212 //! Task pool for the tasks with critical property set.
213 /** Critical tasks are scheduled for execution ahead of other sources (including local task pool
214 and even bypassed tasks) unless the thread already executes a critical task in an outer
215 dispatch loop **/
216 // used on the hot path of the task dispatch loop
217 task_stream<back_nonnull_accessor> my_critical_task_stream;
218 #endif
219
220 //! The number of workers requested by the external thread owning the arena.
221 unsigned my_max_num_workers;
222
223 //! The total number of workers that are requested from the resource manager.
224 int my_total_num_workers_requested;
225
226 //! The number of workers that are really requested from the resource manager.
227 //! Possible values are in [0, my_max_num_workers]
228 int my_num_workers_requested;
229
230 //! The index in the array of per priority lists of arenas this object is in.
231 /*const*/ unsigned my_priority_level;
232
233 //! The max priority level of arena in market.
234 std::atomic<bool> my_is_top_priority{false};
235
236 //! Current task pool state and estimate of available tasks amount.
237 /** The estimate is either 0 (SNAPSHOT_EMPTY) or infinity (SNAPSHOT_FULL).
238 Special state is "busy" (any other unsigned value).
239 Note that the implementation of arena::is_busy_or_empty() requires
240 my_pool_state to be unsigned. */
241 using pool_state_t = std::uintptr_t ;
242 std::atomic<pool_state_t> my_pool_state;
243
244 //! The list of local observers attached to this arena.
245 observer_list my_observers;
246
247 #if __TBB_ARENA_BINDING
248 //! Pointer to internal observer that allows to bind threads in arena to certain NUMA node.
249 numa_binding_observer* my_numa_binding_observer;
250 #endif /*__TBB_ARENA_BINDING*/
251
252 // Below are rarely modified members
253
254 //! The market that owns this arena.
255 market* my_market;
256
257 //! ABA prevention marker.
258 std::uintptr_t my_aba_epoch;
259
260 //! Default task group context.
261 d1::task_group_context* my_default_ctx;
262
263 //! The number of slots in the arena.
264 unsigned my_num_slots;
265
266 //! The number of reserved slots (can be occupied only by external threads).
267 unsigned my_num_reserved_slots;
268
269 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
270 // arena needs an extra worker despite a global limit
271 std::atomic<bool> my_global_concurrency_mode;
272 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
273
274 //! Waiting object for external threads that cannot join the arena.
275 concurrent_monitor my_exit_monitors;
276
277 //! Coroutines (task_dispathers) cache buffer
278 arena_co_cache my_co_cache;
279
280 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
281 // arena needs an extra worker despite the arena limit
282 atomic_flag my_local_concurrency_flag;
283 // the number of local mandatory concurrency requests
284 int my_local_concurrency_requests;
285 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY*/
286
287 #if TBB_USE_ASSERT
288 //! Used to trap accesses to the object after its destruction.
289 std::uintptr_t my_guard;
290 #endif /* TBB_USE_ASSERT */
291 }; // struct arena_base
292
293 class arena: public padded<arena_base>
294 {
295 public:
296 using base_type = padded<arena_base>;
297
298 //! Types of work advertised by advertise_new_work()
299 enum new_work_type {
300 work_spawned,
301 wakeup,
302 work_enqueued
303 };
304
305 //! Constructor
306 arena ( market& m, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level);
307
308 //! Allocate an instance of arena.
309 static arena& allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
310 unsigned priority_level );
311
num_arena_slots(unsigned num_slots)312 static int unsigned num_arena_slots ( unsigned num_slots ) {
313 return max(2u, num_slots);
314 }
315
allocation_size(unsigned num_slots)316 static int allocation_size ( unsigned num_slots ) {
317 return sizeof(base_type) + num_slots * (sizeof(mail_outbox) + sizeof(arena_slot) + sizeof(task_dispatcher));
318 }
319
320 //! Get reference to mailbox corresponding to given slot_id
mailbox(d1::slot_id slot)321 mail_outbox& mailbox( d1::slot_id slot ) {
322 __TBB_ASSERT( slot != d1::no_slot, "affinity should be specified" );
323
324 return reinterpret_cast<mail_outbox*>(this)[-(int)(slot+1)]; // cast to 'int' is redundant but left for readability
325 }
326
327 //! Completes arena shutdown, destructs and deallocates it.
328 void free_arena ();
329
330 //! No tasks to steal since last snapshot was taken
331 static const pool_state_t SNAPSHOT_EMPTY = 0;
332
333 //! At least one task has been offered for stealing since the last snapshot started
334 static const pool_state_t SNAPSHOT_FULL = pool_state_t(-1);
335
336 //! The number of least significant bits for external references
337 static const unsigned ref_external_bits = 12; // up to 4095 external and 1M workers
338
339 //! Reference increment values for externals and workers
340 static const unsigned ref_external = 1;
341 static const unsigned ref_worker = 1 << ref_external_bits;
342
343 //! No tasks to steal or snapshot is being taken.
is_busy_or_empty(pool_state_t s)344 static bool is_busy_or_empty( pool_state_t s ) { return s < SNAPSHOT_FULL; }
345
346 //! The number of workers active in the arena.
num_workers_active()347 unsigned num_workers_active() const {
348 return my_references.load(std::memory_order_acquire) >> ref_external_bits;
349 }
350
351 //! Check if the recall is requested by the market.
is_recall_requested()352 bool is_recall_requested() const {
353 return num_workers_active() > my_num_workers_allotted.load(std::memory_order_relaxed);
354 }
355
356 //! If necessary, raise a flag that there is new job in arena.
357 template<arena::new_work_type work_type> void advertise_new_work();
358
359 //! Attempts to steal a task from a randomly chosen arena slot
360 d1::task* steal_task(unsigned arena_index, FastRandom& frnd, execution_data_ext& ed, isolation_type isolation);
361
362 //! Get a task from a global starvation resistant queue
363 template<task_stream_accessor_type accessor>
364 d1::task* get_stream_task(task_stream<accessor>& stream, unsigned& hint);
365
366 #if __TBB_PREVIEW_CRITICAL_TASKS
367 //! Tries to find a critical task in global critical task stream
368 d1::task* get_critical_task(unsigned& hint, isolation_type isolation);
369 #endif
370
371 //! Check if there is job anywhere in arena.
372 /** Return true if no job or if arena is being cleaned up. */
373 bool is_out_of_work();
374
375 //! enqueue a task into starvation-resistance queue
376 void enqueue_task(d1::task&, d1::task_group_context&, thread_data&);
377
378 //! Registers the worker with the arena and enters TBB scheduler dispatch loop
379 void process(thread_data&);
380
381 //! Notification that the thread leaves its arena
382 template<unsigned ref_param>
383 inline void on_thread_leaving ( );
384
385 //! Check for the presence of enqueued tasks at all priority levels
386 bool has_enqueued_tasks();
387
388 static const std::size_t out_of_arena = ~size_t(0);
389 //! Tries to occupy a slot in the arena. On success, returns the slot index; if no slot is available, returns out_of_arena.
390 template <bool as_worker>
391 std::size_t occupy_free_slot(thread_data&);
392 //! Tries to occupy a slot in the specified range.
393 std::size_t occupy_free_slot_in_range(thread_data& tls, std::size_t lower, std::size_t upper);
394
395 std::uintptr_t calculate_stealing_threshold();
396
397 /** Must be the last data field */
398 arena_slot my_slots[1];
399 }; // class arena
400
401 template<unsigned ref_param>
on_thread_leaving()402 inline void arena::on_thread_leaving ( ) {
403 //
404 // Implementation of arena destruction synchronization logic contained various
405 // bugs/flaws at the different stages of its evolution, so below is a detailed
406 // description of the issues taken into consideration in the framework of the
407 // current design.
408 //
409 // In case of using fire-and-forget tasks (scheduled via task::enqueue())
410 // external thread is allowed to leave its arena before all its work is executed,
411 // and market may temporarily revoke all workers from this arena. Since revoked
412 // workers never attempt to reset arena state to EMPTY and cancel its request
413 // to RML for threads, the arena object is destroyed only when both the last
414 // thread is leaving it and arena's state is EMPTY (that is its external thread
415 // left and it does not contain any work).
416 // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not
417 // be done here (or anywhere else in the external thread to that matter); doing so
418 // can result either in arena's premature destruction (at least without
419 // additional costly checks in workers) or in unnecessary arena state changes
420 // (and ensuing workers migration).
421 //
422 // A worker that checks for work presence and transitions arena to the EMPTY
423 // state (in snapshot taking procedure arena::is_out_of_work()) updates
424 // arena::my_pool_state first and only then arena::my_num_workers_requested.
425 // So the check for work absence must be done against the latter field.
426 //
427 // In a time window between decrementing the active threads count and checking
428 // if there is an outstanding request for workers. New worker thread may arrive,
429 // finish remaining work, set arena state to empty, and leave decrementing its
430 // refcount and destroying. Then the current thread will destroy the arena
431 // the second time. To preclude it a local copy of the outstanding request
432 // value can be stored before decrementing active threads count.
433 //
434 // But this technique may cause two other problem. When the stored request is
435 // zero, it is possible that arena still has threads and they can generate new
436 // tasks and thus re-establish non-zero requests. Then all the threads can be
437 // revoked (as described above) leaving this thread the last one, and causing
438 // it to destroy non-empty arena.
439 //
440 // The other problem takes place when the stored request is non-zero. Another
441 // thread may complete the work, set arena state to empty, and leave without
442 // arena destruction before this thread decrements the refcount. This thread
443 // cannot destroy the arena either. Thus the arena may be "orphaned".
444 //
445 // In both cases we cannot dereference arena pointer after the refcount is
446 // decremented, as our arena may already be destroyed.
447 //
448 // If this is the external thread, the market is protected by refcount to it.
449 // In case of workers market's liveness is ensured by the RML connection
450 // rundown protocol, according to which the client (i.e. the market) lives
451 // until RML server notifies it about connection termination, and this
452 // notification is fired only after all workers return into RML.
453 //
454 // Thus if we decremented refcount to zero we ask the market to check arena
455 // state (including the fact if it is alive) under the lock.
456 //
457 std::uintptr_t aba_epoch = my_aba_epoch;
458 unsigned priority_level = my_priority_level;
459 market* m = my_market;
460 __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter");
461 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
462 // When there is no workers someone must free arena, as
463 // without workers, no one calls is_out_of_work().
464 // Skip workerless arenas because they have no demand for workers.
465 // TODO: consider more strict conditions for the cleanup,
466 // because it can create the demand of workers,
467 // but the arena can be already empty (and so ready for destroying)
468 // TODO: Fix the race: while we check soft limit and it might be changed.
469 if( ref_param==ref_external && my_num_slots != my_num_reserved_slots
470 && 0 == m->my_num_workers_soft_limit.load(std::memory_order_relaxed) &&
471 !my_global_concurrency_mode.load(std::memory_order_relaxed) ) {
472 is_out_of_work();
473 // We expect, that in worst case it's enough to have num_priority_levels-1
474 // calls to restore priorities and yet another is_out_of_work() to conform
475 // that no work was found. But as market::set_active_num_workers() can be called
476 // concurrently, can't guarantee last is_out_of_work() return true.
477 }
478 #endif
479
480 // Release our reference to sync with arena destroy
481 unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param;
482 if (remaining_ref == 0) {
483 m->try_destroy_arena( this, aba_epoch, priority_level );
484 }
485 }
486
487 template<arena::new_work_type work_type>
advertise_new_work()488 void arena::advertise_new_work() {
489 auto is_related_arena = [&] (market_context context) {
490 return this == context.my_arena_addr;
491 };
492
493 if( work_type == work_enqueued ) {
494 atomic_fence(std::memory_order_seq_cst);
495 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
496 if ( my_market->my_num_workers_soft_limit.load(std::memory_order_acquire) == 0 &&
497 my_global_concurrency_mode.load(std::memory_order_acquire) == false )
498 my_market->enable_mandatory_concurrency(this);
499
500 if (my_max_num_workers == 0 && my_num_reserved_slots == 1 && my_local_concurrency_flag.test_and_set()) {
501 my_market->adjust_demand(*this, /* delta = */ 1, /* mandatory = */ true);
502 }
503 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
504 // Local memory fence here and below is required to avoid missed wakeups; see the comment below.
505 // Starvation resistant tasks require concurrency, so missed wakeups are unacceptable.
506 }
507 else if( work_type == wakeup ) {
508 atomic_fence(std::memory_order_seq_cst);
509 }
510
511 // Double-check idiom that, in case of spawning, is deliberately sloppy about memory fences.
512 // Technically, to avoid missed wakeups, there should be a full memory fence between the point we
513 // released the task pool (i.e. spawned task) and read the arena's state. However, adding such a
514 // fence might hurt overall performance more than it helps, because the fence would be executed
515 // on every task pool release, even when stealing does not occur. Since TBB allows parallelism,
516 // but never promises parallelism, the missed wakeup is not a correctness problem.
517 pool_state_t snapshot = my_pool_state.load(std::memory_order_acquire);
518 if( is_busy_or_empty(snapshot) ) {
519 // Attempt to mark as full. The compare_and_swap below is a little unusual because the
520 // result is compared to a value that can be different than the comparand argument.
521 pool_state_t expected_state = snapshot;
522 my_pool_state.compare_exchange_strong( expected_state, SNAPSHOT_FULL );
523 if( expected_state == SNAPSHOT_EMPTY ) {
524 if( snapshot != SNAPSHOT_EMPTY ) {
525 // This thread read "busy" into snapshot, and then another thread transitioned
526 // my_pool_state to "empty" in the meantime, which caused the compare_and_swap above
527 // to fail. Attempt to transition my_pool_state from "empty" to "full".
528 expected_state = SNAPSHOT_EMPTY;
529 if( !my_pool_state.compare_exchange_strong( expected_state, SNAPSHOT_FULL ) ) {
530 // Some other thread transitioned my_pool_state from "empty", and hence became
531 // responsible for waking up workers.
532 return;
533 }
534 }
535 // This thread transitioned pool from empty to full state, and thus is responsible for
536 // telling the market that there is work to do.
537 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
538 if( work_type == work_spawned ) {
539 if ( my_global_concurrency_mode.load(std::memory_order_acquire) == true )
540 my_market->mandatory_concurrency_disable( this );
541 }
542 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
543 // TODO: investigate adjusting of arena's demand by a single worker.
544 my_market->adjust_demand(*this, my_max_num_workers, /* mandatory = */ false);
545
546 // Notify all sleeping threads that work has appeared in the arena.
547 my_market->get_wait_list().notify(is_related_arena);
548 }
549 }
550 }
551
steal_task(unsigned arena_index,FastRandom & frnd,execution_data_ext & ed,isolation_type isolation)552 inline d1::task* arena::steal_task(unsigned arena_index, FastRandom& frnd, execution_data_ext& ed, isolation_type isolation) {
553 auto slot_num_limit = my_limit.load(std::memory_order_relaxed);
554 if (slot_num_limit == 1) {
555 // No slots to steal from
556 return nullptr;
557 }
558 // Try to steal a task from a random victim.
559 std::size_t k = frnd.get() % (slot_num_limit - 1);
560 // The following condition excludes the external thread that might have
561 // already taken our previous place in the arena from the list .
562 // of potential victims. But since such a situation can take
563 // place only in case of significant oversubscription, keeping
564 // the checks simple seems to be preferable to complicating the code.
565 if (k >= arena_index) {
566 ++k; // Adjusts random distribution to exclude self
567 }
568 arena_slot* victim = &my_slots[k];
569 d1::task **pool = victim->task_pool.load(std::memory_order_relaxed);
570 d1::task *t = nullptr;
571 if (pool == EmptyTaskPool || !(t = victim->steal_task(*this, isolation, k))) {
572 return nullptr;
573 }
574 if (task_accessor::is_proxy_task(*t)) {
575 task_proxy &tp = *(task_proxy*)t;
576 d1::slot_id slot = tp.slot;
577 t = tp.extract_task<task_proxy::pool_bit>();
578 if (!t) {
579 // Proxy was empty, so it's our responsibility to free it
580 tp.allocator.delete_object(&tp, ed);
581 return nullptr;
582 }
583 // Note affinity is called for any stolen task (proxy or general)
584 ed.affinity_slot = slot;
585 } else {
586 // Note affinity is called for any stolen task (proxy or general)
587 ed.affinity_slot = d1::any_slot;
588 }
589 // Update task owner thread id to identify stealing
590 ed.original_slot = k;
591 return t;
592 }
593
594 template<task_stream_accessor_type accessor>
get_stream_task(task_stream<accessor> & stream,unsigned & hint)595 inline d1::task* arena::get_stream_task(task_stream<accessor>& stream, unsigned& hint) {
596 if (stream.empty())
597 return nullptr;
598 return stream.pop(subsequent_lane_selector(hint));
599 }
600
601 #if __TBB_PREVIEW_CRITICAL_TASKS
602 // Retrieves critical task respecting isolation level, if provided. The rule is:
603 // 1) If no outer critical task and no isolation => take any critical task
604 // 2) If working on an outer critical task and no isolation => cannot take any critical task
605 // 3) If no outer critical task but isolated => respect isolation
606 // 4) If working on an outer critical task and isolated => respect isolation
607 // Hint is used to keep some LIFO-ness, start search with the lane that was used during push operation.
get_critical_task(unsigned & hint,isolation_type isolation)608 inline d1::task* arena::get_critical_task(unsigned& hint, isolation_type isolation) {
609 if (my_critical_task_stream.empty())
610 return nullptr;
611
612 if ( isolation != no_isolation ) {
613 return my_critical_task_stream.pop_specific( hint, isolation );
614 } else {
615 return my_critical_task_stream.pop(preceding_lane_selector(hint));
616 }
617 }
618 #endif // __TBB_PREVIEW_CRITICAL_TASKS
619
620 } // namespace r1
621 } // namespace detail
622 } // namespace tbb
623
624 #endif /* _TBB_arena_H */
625