1 /*
2 Copyright (c) 2020-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef _TBB_task_dispatcher_H
18 #define _TBB_task_dispatcher_H
19
20 #include "oneapi/tbb/detail/_utils.h"
21 #include "oneapi/tbb/detail/_task.h"
22 #include "oneapi/tbb/global_control.h"
23
24 #include "scheduler_common.h"
25 #include "waiters.h"
26 #include "arena_slot.h"
27 #include "arena.h"
28 #include "thread_data.h"
29 #include "mailbox.h"
30 #include "itt_notify.h"
31 #include "concurrent_monitor.h"
32
33 #include <atomic>
34
35 #if !__TBB_CPU_CTL_ENV_PRESENT
36 #include <fenv.h> //
37 #endif
38
39 namespace tbb {
40 namespace detail {
41 namespace r1 {
42
get_self_recall_task(arena_slot & slot)43 inline d1::task* get_self_recall_task(arena_slot& slot) {
44 suppress_unused_warning(slot);
45 d1::task* t = nullptr;
46 #if __TBB_RESUMABLE_TASKS
47 suspend_point_type* sp = slot.default_task_dispatcher().m_suspend_point;
48 if (sp && sp->m_is_owner_recalled.load(std::memory_order_acquire)) {
49 t = &sp->m_resume_task;
50 __TBB_ASSERT(sp->m_resume_task.m_target.m_thread_data == nullptr, nullptr);
51 }
52 #endif /* __TBB_RESUMABLE_TASKS */
53 return t;
54 }
55
56 // Defined in exception.cpp
57 /*[[noreturn]]*/void do_throw_noexcept(void (*throw_exception)()) noexcept;
58
59 //------------------------------------------------------------------------
60 // Suspend point
61 //------------------------------------------------------------------------
62 #if __TBB_RESUMABLE_TASKS
63
execute(d1::execution_data & ed)64 inline d1::task* suspend_point_type::resume_task::execute(d1::execution_data& ed) {
65 execution_data_ext& ed_ext = static_cast<execution_data_ext&>(ed);
66
67 if (ed_ext.wait_ctx) {
68 market_concurrent_monitor::resume_context monitor_node{{std::uintptr_t(ed_ext.wait_ctx), nullptr}, ed_ext, m_target};
69 // The wait_ctx is present only in external_waiter. In that case we leave the current stack
70 // in the abandoned state to resume when waiting completes.
71 thread_data* td = ed_ext.task_disp->m_thread_data;
72 td->set_post_resume_action(thread_data::post_resume_action::register_waiter, &monitor_node);
73
74 market_concurrent_monitor& wait_list = td->my_arena->my_market->get_wait_list();
75
76 if (wait_list.wait([&] { return !ed_ext.wait_ctx->continue_execution(); }, monitor_node)) {
77 return nullptr;
78 }
79
80 td->clear_post_resume_action();
81 td->set_post_resume_action(thread_data::post_resume_action::resume, ed_ext.task_disp->get_suspend_point());
82 } else {
83 // If wait_ctx is null, it can be only a worker thread on outermost level because
84 // coroutine_waiter interrupts bypass loop before the resume_task execution.
85 ed_ext.task_disp->m_thread_data->set_post_resume_action(thread_data::post_resume_action::notify,
86 ed_ext.task_disp->get_suspend_point());
87 }
88 // Do not access this task because it might be destroyed
89 ed_ext.task_disp->resume(m_target);
90 return nullptr;
91 }
92
suspend_point_type(arena * a,size_t stack_size,task_dispatcher & task_disp)93 inline suspend_point_type::suspend_point_type(arena* a, size_t stack_size, task_dispatcher& task_disp)
94 : m_arena(a)
95 , m_random(this)
96 , m_co_context(stack_size, &task_disp)
97 , m_resume_task(task_disp)
98 {
99 assert_pointer_valid(m_arena);
100 assert_pointer_valid(m_arena->my_default_ctx);
101 task_accessor::context(m_resume_task) = m_arena->my_default_ctx;
102 task_accessor::isolation(m_resume_task) = no_isolation;
103 // Initialize the itt_caller for the context of the resume task.
104 // It will be bound to the stack of the first suspend call.
105 task_group_context_impl::bind_to(*task_accessor::context(m_resume_task), task_disp.m_thread_data);
106 }
107
108 #endif /* __TBB_RESUMABLE_TASKS */
109
110 //------------------------------------------------------------------------
111 // Task Dispatcher
112 //------------------------------------------------------------------------
task_dispatcher(arena * a)113 inline task_dispatcher::task_dispatcher(arena* a) {
114 m_execute_data_ext.context = a->my_default_ctx;
115 m_execute_data_ext.task_disp = this;
116 }
117
can_steal()118 inline bool task_dispatcher::can_steal() {
119 __TBB_ASSERT(m_stealing_threshold != 0, nullptr);
120 stack_anchor_type anchor{};
121 return reinterpret_cast<std::uintptr_t>(&anchor) > m_stealing_threshold;
122 }
123
get_inbox_or_critical_task(execution_data_ext & ed,mail_inbox & inbox,isolation_type isolation,bool critical_allowed)124 inline d1::task* task_dispatcher::get_inbox_or_critical_task(
125 execution_data_ext& ed, mail_inbox& inbox, isolation_type isolation, bool critical_allowed)
126 {
127 if (inbox.empty())
128 return nullptr;
129 d1::task* result = get_critical_task(nullptr, ed, isolation, critical_allowed);
130 if (result)
131 return result;
132 // Check if there are tasks mailed to this thread via task-to-thread affinity mechanism.
133 result = get_mailbox_task(inbox, ed, isolation);
134 // There is a race with a thread adding a new task (possibly with suitable isolation)
135 // to our mailbox, so the below conditions might result in a false positive.
136 // Then set_is_idle(false) allows that task to be stolen; it's OK.
137 if (isolation != no_isolation && !result && !inbox.empty() && inbox.is_idle_state(true)) {
138 // We have proxy tasks in our mailbox but the isolation blocks their execution.
139 // So publish the proxy tasks in mailbox to be available for stealing from owner's task pool.
140 inbox.set_is_idle( false );
141 }
142 return result;
143 }
144
get_stream_or_critical_task(execution_data_ext & ed,arena & a,task_stream<front_accessor> & stream,unsigned & hint,isolation_type isolation,bool critical_allowed)145 inline d1::task* task_dispatcher::get_stream_or_critical_task(
146 execution_data_ext& ed, arena& a, task_stream<front_accessor>& stream, unsigned& hint,
147 isolation_type isolation, bool critical_allowed)
148 {
149 if (stream.empty())
150 return nullptr;
151 d1::task* result = get_critical_task(nullptr, ed, isolation, critical_allowed);
152 if (result)
153 return result;
154 return a.get_stream_task(stream, hint);
155 }
156
steal_or_get_critical(execution_data_ext & ed,arena & a,unsigned arena_index,FastRandom & random,isolation_type isolation,bool critical_allowed)157 inline d1::task* task_dispatcher::steal_or_get_critical(
158 execution_data_ext& ed, arena& a, unsigned arena_index, FastRandom& random,
159 isolation_type isolation, bool critical_allowed)
160 {
161 if (d1::task* t = a.steal_task(arena_index, random, ed, isolation)) {
162 ed.context = task_accessor::context(*t);
163 ed.isolation = task_accessor::isolation(*t);
164 return get_critical_task(t, ed, isolation, critical_allowed);
165 }
166 return nullptr;
167 }
168
169 template <bool ITTPossible, typename Waiter>
receive_or_steal_task(thread_data & tls,execution_data_ext & ed,Waiter & waiter,isolation_type isolation,bool fifo_allowed,bool critical_allowed)170 d1::task* task_dispatcher::receive_or_steal_task(
171 thread_data& tls, execution_data_ext& ed, Waiter& waiter, isolation_type isolation,
172 bool fifo_allowed, bool critical_allowed)
173 {
174 __TBB_ASSERT(governor::is_thread_data_set(&tls), NULL);
175 // Task to return
176 d1::task* t = nullptr;
177 // Get tls data (again)
178 arena& a = *tls.my_arena;
179 arena_slot& slot = *tls.my_arena_slot;
180 unsigned arena_index = tls.my_arena_index;
181 mail_inbox& inbox = tls.my_inbox;
182 task_stream<front_accessor>& resume_stream = a.my_resume_task_stream;
183 unsigned& resume_hint = slot.hint_for_resume_stream;
184 task_stream<front_accessor>& fifo_stream = a.my_fifo_task_stream;
185 unsigned& fifo_hint = slot.hint_for_fifo_stream;
186
187 waiter.reset_wait();
188 // Thread is in idle state now
189 inbox.set_is_idle(true);
190
191 bool stealing_is_allowed = can_steal();
192
193 // Stealing loop mailbox/enqueue/other_slots
194 for (;;) {
195 __TBB_ASSERT(t == nullptr, nullptr);
196 // Check if the resource manager requires our arena to relinquish some threads
197 // For the external thread restore idle state to true after dispatch loop
198 if (!waiter.continue_execution(slot, t)) {
199 __TBB_ASSERT(t == nullptr, nullptr);
200 break;
201 }
202 // Start searching
203 if (t != nullptr) {
204 // continue_execution returned a task
205 }
206 else if ((t = get_inbox_or_critical_task(ed, inbox, isolation, critical_allowed))) {
207 // Successfully got the task from mailbox or critical task
208 }
209 else if ((t = get_stream_or_critical_task(ed, a, resume_stream, resume_hint, isolation, critical_allowed))) {
210 // Successfully got the resume or critical task
211 }
212 else if (fifo_allowed && isolation == no_isolation
213 && (t = get_stream_or_critical_task(ed, a, fifo_stream, fifo_hint, isolation, critical_allowed))) {
214 // Checked if there are tasks in starvation-resistant stream. Only allowed at the outermost dispatch level without isolation.
215 }
216 else if (stealing_is_allowed
217 && (t = steal_or_get_critical(ed, a, arena_index, tls.my_random, isolation, critical_allowed))) {
218 // Stole a task from a random arena slot
219 }
220 else {
221 t = get_critical_task(t, ed, isolation, critical_allowed);
222 }
223
224 if (t != nullptr) {
225 ed.context = task_accessor::context(*t);
226 ed.isolation = task_accessor::isolation(*t);
227 a.my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
228 break; // Stealing success, end of stealing attempt
229 }
230 // Nothing to do, pause a little.
231 waiter.pause(slot);
232 } // end of nonlocal task retrieval loop
233
234 __TBB_ASSERT(is_alive(a.my_guard), nullptr);
235 if (inbox.is_idle_state(true)) {
236 inbox.set_is_idle(false);
237 }
238 return t;
239 }
240
241 template <bool ITTPossible, typename Waiter>
local_wait_for_all(d1::task * t,Waiter & waiter)242 d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {
243 assert_pointer_valid(m_thread_data);
244 __TBB_ASSERT(m_thread_data->my_task_dispatcher == this, nullptr);
245
246 // Guard an outer/default execution state
247 struct dispatch_loop_guard {
248 task_dispatcher& task_disp;
249 execution_data_ext old_execute_data_ext;
250 properties old_properties;
251
252 ~dispatch_loop_guard() {
253 task_disp.m_execute_data_ext = old_execute_data_ext;
254 task_disp.m_properties = old_properties;
255
256 __TBB_ASSERT(task_disp.m_thread_data && governor::is_thread_data_set(task_disp.m_thread_data), nullptr);
257 __TBB_ASSERT(task_disp.m_thread_data->my_task_dispatcher == &task_disp, nullptr);
258 }
259 } dl_guard{ *this, m_execute_data_ext, m_properties };
260
261 // The context guard to track fp setting and itt tasks.
262 context_guard_helper</*report_tasks=*/ITTPossible> context_guard;
263
264 // Current isolation context
265 const isolation_type isolation = dl_guard.old_execute_data_ext.isolation;
266
267 // Critical work inflection point. Once turned false current execution context has taken
268 // critical task on the previous stack frame and cannot take more until that critical path is
269 // finished.
270 bool critical_allowed = dl_guard.old_properties.critical_task_allowed;
271
272 // Extended execution data that is used for dispatching.
273 // Base version is passed to the task::execute method.
274 execution_data_ext& ed = m_execute_data_ext;
275 ed.context = t ? task_accessor::context(*t) : nullptr;
276 ed.original_slot = m_thread_data->my_arena_index;
277 ed.affinity_slot = d1::no_slot;
278 ed.task_disp = this;
279 ed.wait_ctx = waiter.wait_ctx();
280
281 m_properties.outermost = false;
282 m_properties.fifo_tasks_allowed = false;
283
284 t = get_critical_task(t, ed, isolation, critical_allowed);
285 if (t && m_thread_data->my_inbox.is_idle_state(true)) {
286 // The thread has a work to do. Therefore, marking its inbox as not idle so that
287 // affinitized tasks can be stolen from it.
288 m_thread_data->my_inbox.set_is_idle(false);
289 }
290
291 // Infinite exception loop
292 for (;;) {
293 try {
294 // Main execution loop
295 do {
296 // We assume that bypass tasks are from the same task group.
297 context_guard.set_ctx(ed.context);
298 // Inner level evaluates tasks coming from nesting loops and those returned
299 // by just executed tasks (bypassing spawn or enqueue calls).
300 while (t != nullptr) {
301 assert_task_valid(t);
302 assert_pointer_valid</*alignment = */alignof(void*)>(ed.context);
303 __TBB_ASSERT(ed.context->my_lifetime_state == d1::task_group_context::lifetime_state::bound ||
304 ed.context->my_lifetime_state == d1::task_group_context::lifetime_state::isolated, nullptr);
305 __TBB_ASSERT(m_thread_data->my_inbox.is_idle_state(false), nullptr);
306 __TBB_ASSERT(task_accessor::is_resume_task(*t) || isolation == no_isolation || isolation == ed.isolation, nullptr);
307 // Check premature leave
308 if (Waiter::postpone_execution(*t)) {
309 __TBB_ASSERT(task_accessor::is_resume_task(*t) && dl_guard.old_properties.outermost,
310 "Currently, the bypass loop can be interrupted only for resume task on outermost level");
311 return t;
312 }
313 // Copy itt_caller to a stack because the context might be destroyed after t->execute.
314 void* itt_caller = ed.context->my_itt_caller;
315 suppress_unused_warning(itt_caller);
316
317 ITT_CALLEE_ENTER(ITTPossible, t, itt_caller);
318
319 if (ed.context->is_group_execution_cancelled()) {
320 t = t->cancel(ed);
321 } else {
322 t = t->execute(ed);
323 }
324
325 ITT_CALLEE_LEAVE(ITTPossible, itt_caller);
326
327 // The task affinity in execution data is set for affinitized tasks.
328 // So drop it after the task execution.
329 ed.affinity_slot = d1::no_slot;
330 // Reset task owner id for bypassed task
331 ed.original_slot = m_thread_data->my_arena_index;
332 t = get_critical_task(t, ed, isolation, critical_allowed);
333 }
334 __TBB_ASSERT(m_thread_data && governor::is_thread_data_set(m_thread_data), nullptr);
335 __TBB_ASSERT(m_thread_data->my_task_dispatcher == this, nullptr);
336 // When refactoring, pay attention that m_thread_data can be changed after t->execute()
337 __TBB_ASSERT(m_thread_data->my_arena_slot != nullptr, nullptr);
338 arena_slot& slot = *m_thread_data->my_arena_slot;
339 if (!waiter.continue_execution(slot, t)) {
340 break;
341 }
342 // Retrieve the task from local task pool
343 if (t || (slot.is_task_pool_published() && (t = slot.get_task(ed, isolation)))) {
344 __TBB_ASSERT(ed.original_slot == m_thread_data->my_arena_index, NULL);
345 ed.context = task_accessor::context(*t);
346 ed.isolation = task_accessor::isolation(*t);
347 continue;
348 }
349 // Retrieve the task from global sources
350 t = receive_or_steal_task<ITTPossible>(
351 *m_thread_data, ed, waiter, isolation, dl_guard.old_properties.fifo_tasks_allowed,
352 critical_allowed
353 );
354 } while (t != nullptr); // main dispatch loop
355 break; // Exit exception loop;
356 } catch (...) {
357 if (global_control::active_value(global_control::terminate_on_exception) == 1) {
358 do_throw_noexcept([] { throw; });
359 }
360 if (ed.context->cancel_group_execution()) {
361 /* We are the first to signal cancellation, so store the exception that caused it. */
362 ed.context->my_exception.store(tbb_exception_ptr::allocate(), std::memory_order_release);
363 }
364 }
365 } // Infinite exception loop
366 __TBB_ASSERT(t == nullptr, nullptr);
367
368
369 #if __TBB_RESUMABLE_TASKS
370 if (dl_guard.old_properties.outermost) {
371 recall_point();
372 }
373 #endif /* __TBB_RESUMABLE_TASKS */
374
375 return nullptr;
376 }
377
378 #if __TBB_RESUMABLE_TASKS
recall_point()379 inline void task_dispatcher::recall_point() {
380 if (this != &m_thread_data->my_arena_slot->default_task_dispatcher()) {
381 __TBB_ASSERT(m_suspend_point != nullptr, nullptr);
382 __TBB_ASSERT(m_suspend_point->m_is_owner_recalled.load(std::memory_order_relaxed) == false, nullptr);
383 d1::suspend([](suspend_point_type* sp) {
384 sp->m_is_owner_recalled.store(true, std::memory_order_release);
385 auto is_related_suspend_point = [sp] (market_context context) {
386 std::uintptr_t sp_addr = std::uintptr_t(sp);
387 return sp_addr == context.my_uniq_addr;
388 };
389 sp->m_arena->my_market->get_wait_list().notify(is_related_suspend_point);
390 });
391
392 if (m_thread_data->my_inbox.is_idle_state(true)) {
393 m_thread_data->my_inbox.set_is_idle(false);
394 }
395 }
396 }
397 #endif /* __TBB_RESUMABLE_TASKS */
398
399 #if __TBB_PREVIEW_CRITICAL_TASKS
get_critical_task(d1::task * t,execution_data_ext & ed,isolation_type isolation,bool critical_allowed)400 inline d1::task* task_dispatcher::get_critical_task(d1::task* t, execution_data_ext& ed, isolation_type isolation, bool critical_allowed) {
401 __TBB_ASSERT( critical_allowed || !m_properties.critical_task_allowed, nullptr );
402
403 if (!critical_allowed) {
404 // The stack is already in the process of critical path execution. Cannot take another
405 // critical work until finish with the current one.
406 __TBB_ASSERT(!m_properties.critical_task_allowed, nullptr);
407 return t;
408 }
409
410 assert_pointers_valid(m_thread_data, m_thread_data->my_arena, m_thread_data->my_arena_slot);
411 thread_data& td = *m_thread_data;
412 arena& a = *td.my_arena;
413 arena_slot& slot = *td.my_arena_slot;
414
415 d1::task* crit_t = a.get_critical_task(slot.hint_for_critical_stream, isolation);
416 if (crit_t != nullptr) {
417 assert_task_valid(crit_t);
418 if (t != nullptr) {
419 assert_pointer_valid</*alignment = */alignof(void*)>(ed.context);
420 r1::spawn(*t, *ed.context);
421 }
422 ed.context = task_accessor::context(*crit_t);
423 ed.isolation = task_accessor::isolation(*crit_t);
424
425 // We cannot execute more than one critical task on the same stack.
426 // In other words, we prevent nested critical tasks.
427 m_properties.critical_task_allowed = false;
428
429 // TODO: add a test that the observer is called when critical task is taken.
430 a.my_observers.notify_entry_observers(td.my_last_observer, td.my_is_worker);
431 t = crit_t;
432 } else {
433 // Was unable to find critical work in the queue. Allow inspecting the queue in nested
434 // invocations. Handles the case when critical task has been just completed.
435 m_properties.critical_task_allowed = true;
436 }
437 return t;
438 }
439 #else
get_critical_task(d1::task * t,execution_data_ext &,isolation_type,bool)440 inline d1::task* task_dispatcher::get_critical_task(d1::task* t, execution_data_ext&, isolation_type, bool /*critical_allowed*/) {
441 return t;
442 }
443 #endif
444
get_mailbox_task(mail_inbox & my_inbox,execution_data_ext & ed,isolation_type isolation)445 inline d1::task* task_dispatcher::get_mailbox_task(mail_inbox& my_inbox, execution_data_ext& ed, isolation_type isolation) {
446 while (task_proxy* const tp = my_inbox.pop(isolation)) {
447 if (d1::task* result = tp->extract_task<task_proxy::mailbox_bit>()) {
448 ed.original_slot = (unsigned short)(-2);
449 ed.affinity_slot = ed.task_disp->m_thread_data->my_arena_index;
450 return result;
451 }
452 // We have exclusive access to the proxy, and can destroy it.
453 tp->allocator.delete_object(tp, ed);
454 }
455 return NULL;
456 }
457
458 template <typename Waiter>
local_wait_for_all(d1::task * t,Waiter & waiter)459 d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter) {
460 if (governor::is_itt_present()) {
461 return local_wait_for_all</*ITTPossible = */ true>(t, waiter);
462 } else {
463 return local_wait_for_all</*ITTPossible = */ false>(t, waiter);
464 }
465 }
466
467 } // namespace r1
468 } // namespace detail
469 } // namespace tbb
470
471 #endif // _TBB_task_dispatcher_H
472
473