1 /*
2     Copyright (c) 2020-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_task_dispatcher_H
18 #define _TBB_task_dispatcher_H
19 
20 #include "oneapi/tbb/detail/_utils.h"
21 #include "oneapi/tbb/detail/_task.h"
22 #include "oneapi/tbb/global_control.h"
23 
24 #include "scheduler_common.h"
25 #include "waiters.h"
26 #include "arena_slot.h"
27 #include "arena.h"
28 #include "thread_data.h"
29 #include "mailbox.h"
30 #include "itt_notify.h"
31 #include "concurrent_monitor.h"
32 
33 #include <atomic>
34 
35 #if !__TBB_CPU_CTL_ENV_PRESENT
36 #include <fenv.h> //
37 #endif
38 
39 namespace tbb {
40 namespace detail {
41 namespace r1 {
42 
get_self_recall_task(arena_slot & slot)43 inline d1::task* get_self_recall_task(arena_slot& slot) {
44     suppress_unused_warning(slot);
45     d1::task* t = nullptr;
46 #if __TBB_RESUMABLE_TASKS
47     suspend_point_type* sp = slot.default_task_dispatcher().m_suspend_point;
48     if (sp && sp->m_is_owner_recalled.load(std::memory_order_acquire)) {
49         t = &sp->m_resume_task;
50         __TBB_ASSERT(sp->m_resume_task.m_target.m_thread_data == nullptr, nullptr);
51     }
52 #endif /* __TBB_RESUMABLE_TASKS */
53     return t;
54 }
55 
56 // Defined in exception.cpp
57 /*[[noreturn]]*/void do_throw_noexcept(void (*throw_exception)()) noexcept;
58 
59 //------------------------------------------------------------------------
60 // Suspend point
61 //------------------------------------------------------------------------
62 #if __TBB_RESUMABLE_TASKS
63 
execute(d1::execution_data & ed)64 inline d1::task* suspend_point_type::resume_task::execute(d1::execution_data& ed) {
65     execution_data_ext& ed_ext = static_cast<execution_data_ext&>(ed);
66 
67     if (ed_ext.wait_ctx) {
68         market_concurrent_monitor::resume_context monitor_node{{std::uintptr_t(ed_ext.wait_ctx), nullptr}, ed_ext, m_target};
69         // The wait_ctx is present only in external_waiter. In that case we leave the current stack
70         // in the abandoned state to resume when waiting completes.
71         thread_data* td = ed_ext.task_disp->m_thread_data;
72         td->set_post_resume_action(thread_data::post_resume_action::register_waiter, &monitor_node);
73 
74         market_concurrent_monitor& wait_list = td->my_arena->my_market->get_wait_list();
75 
76         if (wait_list.wait([&] { return !ed_ext.wait_ctx->continue_execution(); }, monitor_node)) {
77             return nullptr;
78         }
79 
80         td->clear_post_resume_action();
81         td->set_post_resume_action(thread_data::post_resume_action::resume, ed_ext.task_disp->get_suspend_point());
82     } else {
83         // If wait_ctx is null, it can be only a worker thread on outermost level because
84         // coroutine_waiter interrupts bypass loop before the resume_task execution.
85         ed_ext.task_disp->m_thread_data->set_post_resume_action(thread_data::post_resume_action::notify,
86             ed_ext.task_disp->get_suspend_point());
87     }
88     // Do not access this task because it might be destroyed
89     ed_ext.task_disp->resume(m_target);
90     return nullptr;
91 }
92 
suspend_point_type(arena * a,size_t stack_size,task_dispatcher & task_disp)93 inline suspend_point_type::suspend_point_type(arena* a, size_t stack_size, task_dispatcher& task_disp)
94     : m_arena(a)
95     , m_random(this)
96     , m_co_context(stack_size, &task_disp)
97     , m_resume_task(task_disp)
98 {
99     assert_pointer_valid(m_arena);
100     assert_pointer_valid(m_arena->my_default_ctx);
101     task_accessor::context(m_resume_task) = m_arena->my_default_ctx;
102     task_accessor::isolation(m_resume_task) = no_isolation;
103     // Initialize the itt_caller for the context of the resume task.
104     // It will be bound to the stack of the first suspend call.
105     task_group_context_impl::bind_to(*task_accessor::context(m_resume_task), task_disp.m_thread_data);
106 }
107 
108 #endif /* __TBB_RESUMABLE_TASKS */
109 
110 //------------------------------------------------------------------------
111 // Task Dispatcher
112 //------------------------------------------------------------------------
task_dispatcher(arena * a)113 inline task_dispatcher::task_dispatcher(arena* a) {
114     m_execute_data_ext.context = a->my_default_ctx;
115     m_execute_data_ext.task_disp = this;
116 }
117 
can_steal()118 inline bool task_dispatcher::can_steal() {
119     __TBB_ASSERT(m_stealing_threshold != 0, nullptr);
120     stack_anchor_type anchor{};
121     return reinterpret_cast<std::uintptr_t>(&anchor) > m_stealing_threshold;
122 }
123 
get_inbox_or_critical_task(execution_data_ext & ed,mail_inbox & inbox,isolation_type isolation,bool critical_allowed)124 inline d1::task* task_dispatcher::get_inbox_or_critical_task(
125     execution_data_ext& ed, mail_inbox& inbox, isolation_type isolation, bool critical_allowed)
126 {
127     if (inbox.empty())
128         return nullptr;
129     d1::task* result = get_critical_task(nullptr, ed, isolation, critical_allowed);
130     if (result)
131         return result;
132     // Check if there are tasks mailed to this thread via task-to-thread affinity mechanism.
133     result = get_mailbox_task(inbox, ed, isolation);
134     // There is a race with a thread adding a new task (possibly with suitable isolation)
135     // to our mailbox, so the below conditions might result in a false positive.
136     // Then set_is_idle(false) allows that task to be stolen; it's OK.
137     if (isolation != no_isolation && !result && !inbox.empty() && inbox.is_idle_state(true)) {
138         // We have proxy tasks in our mailbox but the isolation blocks their execution.
139         // So publish the proxy tasks in mailbox to be available for stealing from owner's task pool.
140         inbox.set_is_idle( false );
141     }
142     return result;
143 }
144 
get_stream_or_critical_task(execution_data_ext & ed,arena & a,task_stream<front_accessor> & stream,unsigned & hint,isolation_type isolation,bool critical_allowed)145 inline d1::task* task_dispatcher::get_stream_or_critical_task(
146     execution_data_ext& ed, arena& a, task_stream<front_accessor>& stream, unsigned& hint,
147     isolation_type isolation, bool critical_allowed)
148 {
149     if (stream.empty())
150         return nullptr;
151     d1::task* result = get_critical_task(nullptr, ed, isolation, critical_allowed);
152     if (result)
153         return result;
154     return a.get_stream_task(stream, hint);
155 }
156 
steal_or_get_critical(execution_data_ext & ed,arena & a,unsigned arena_index,FastRandom & random,isolation_type isolation,bool critical_allowed)157 inline d1::task* task_dispatcher::steal_or_get_critical(
158     execution_data_ext& ed, arena& a, unsigned arena_index, FastRandom& random,
159     isolation_type isolation, bool critical_allowed)
160 {
161     if (d1::task* t = a.steal_task(arena_index, random, ed, isolation)) {
162         ed.context = task_accessor::context(*t);
163         ed.isolation = task_accessor::isolation(*t);
164         return get_critical_task(t, ed, isolation, critical_allowed);
165     }
166     return nullptr;
167 }
168 
169 template <bool ITTPossible, typename Waiter>
receive_or_steal_task(thread_data & tls,execution_data_ext & ed,Waiter & waiter,isolation_type isolation,bool fifo_allowed,bool critical_allowed)170 d1::task* task_dispatcher::receive_or_steal_task(
171     thread_data& tls, execution_data_ext& ed, Waiter& waiter, isolation_type isolation,
172     bool fifo_allowed, bool critical_allowed)
173 {
174     __TBB_ASSERT(governor::is_thread_data_set(&tls), NULL);
175     // Task to return
176     d1::task* t = nullptr;
177     // Get tls data (again)
178     arena& a = *tls.my_arena;
179     arena_slot& slot = *tls.my_arena_slot;
180     unsigned arena_index = tls.my_arena_index;
181     mail_inbox& inbox = tls.my_inbox;
182     task_stream<front_accessor>& resume_stream = a.my_resume_task_stream;
183     unsigned& resume_hint = slot.hint_for_resume_stream;
184     task_stream<front_accessor>& fifo_stream = a.my_fifo_task_stream;
185     unsigned& fifo_hint = slot.hint_for_fifo_stream;
186 
187     waiter.reset_wait();
188     // Thread is in idle state now
189     inbox.set_is_idle(true);
190 
191     bool stealing_is_allowed = can_steal();
192 
193     // Stealing loop mailbox/enqueue/other_slots
194     for (;;) {
195         __TBB_ASSERT(t == nullptr, nullptr);
196         // Check if the resource manager requires our arena to relinquish some threads
197         // For the external thread restore idle state to true after dispatch loop
198         if (!waiter.continue_execution(slot, t)) {
199             __TBB_ASSERT(t == nullptr, nullptr);
200             break;
201         }
202         // Start searching
203         if (t != nullptr) {
204             // continue_execution returned a task
205         }
206         else if ((t = get_inbox_or_critical_task(ed, inbox, isolation, critical_allowed))) {
207             // Successfully got the task from mailbox or critical task
208         }
209         else if ((t = get_stream_or_critical_task(ed, a, resume_stream, resume_hint, isolation, critical_allowed))) {
210             // Successfully got the resume or critical task
211         }
212         else if (fifo_allowed && isolation == no_isolation
213                  && (t = get_stream_or_critical_task(ed, a, fifo_stream, fifo_hint, isolation, critical_allowed))) {
214             // Checked if there are tasks in starvation-resistant stream. Only allowed at the outermost dispatch level without isolation.
215         }
216         else if (stealing_is_allowed
217                  && (t = steal_or_get_critical(ed, a, arena_index, tls.my_random, isolation, critical_allowed))) {
218             // Stole a task from a random arena slot
219         }
220         else {
221             t = get_critical_task(t, ed, isolation, critical_allowed);
222         }
223 
224         if (t != nullptr) {
225             ed.context = task_accessor::context(*t);
226             ed.isolation = task_accessor::isolation(*t);
227             a.my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
228             break; // Stealing success, end of stealing attempt
229         }
230         // Nothing to do, pause a little.
231         waiter.pause(slot);
232     } // end of nonlocal task retrieval loop
233 
234     __TBB_ASSERT(is_alive(a.my_guard), nullptr);
235     if (inbox.is_idle_state(true)) {
236         inbox.set_is_idle(false);
237     }
238     return t;
239 }
240 
241 template <bool ITTPossible, typename Waiter>
local_wait_for_all(d1::task * t,Waiter & waiter)242 d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {
243     assert_pointer_valid(m_thread_data);
244     __TBB_ASSERT(m_thread_data->my_task_dispatcher == this, nullptr);
245 
246     // Guard an outer/default execution state
247     struct dispatch_loop_guard {
248         task_dispatcher& task_disp;
249         execution_data_ext old_execute_data_ext;
250         properties old_properties;
251 
252         ~dispatch_loop_guard() {
253             task_disp.m_execute_data_ext = old_execute_data_ext;
254             task_disp.m_properties = old_properties;
255 
256             __TBB_ASSERT(task_disp.m_thread_data && governor::is_thread_data_set(task_disp.m_thread_data), nullptr);
257             __TBB_ASSERT(task_disp.m_thread_data->my_task_dispatcher == &task_disp, nullptr);
258         }
259     } dl_guard{ *this, m_execute_data_ext, m_properties };
260 
261     // The context guard to track fp setting and itt tasks.
262     context_guard_helper</*report_tasks=*/ITTPossible> context_guard;
263 
264     // Current isolation context
265     const isolation_type isolation = dl_guard.old_execute_data_ext.isolation;
266 
267     // Critical work inflection point. Once turned false current execution context has taken
268     // critical task on the previous stack frame and cannot take more until that critical path is
269     // finished.
270     bool critical_allowed = dl_guard.old_properties.critical_task_allowed;
271 
272     // Extended execution data that is used for dispatching.
273     // Base version is passed to the task::execute method.
274     execution_data_ext& ed = m_execute_data_ext;
275     ed.context = t ? task_accessor::context(*t) : nullptr;
276     ed.original_slot = m_thread_data->my_arena_index;
277     ed.affinity_slot = d1::no_slot;
278     ed.task_disp = this;
279     ed.wait_ctx = waiter.wait_ctx();
280 
281     m_properties.outermost = false;
282     m_properties.fifo_tasks_allowed = false;
283 
284     t = get_critical_task(t, ed, isolation, critical_allowed);
285     if (t && m_thread_data->my_inbox.is_idle_state(true)) {
286         // The thread has a work to do. Therefore, marking its inbox as not idle so that
287         // affinitized tasks can be stolen from it.
288         m_thread_data->my_inbox.set_is_idle(false);
289     }
290 
291     // Infinite exception loop
292     for (;;) {
293         try {
294             // Main execution loop
295             do {
296                 // We assume that bypass tasks are from the same task group.
297                 context_guard.set_ctx(ed.context);
298                 // Inner level evaluates tasks coming from nesting loops and those returned
299                 // by just executed tasks (bypassing spawn or enqueue calls).
300                 while (t != nullptr) {
301                     assert_task_valid(t);
302                     assert_pointer_valid</*alignment = */alignof(void*)>(ed.context);
303                     __TBB_ASSERT(ed.context->my_lifetime_state == d1::task_group_context::lifetime_state::bound ||
304                         ed.context->my_lifetime_state == d1::task_group_context::lifetime_state::isolated, nullptr);
305                     __TBB_ASSERT(m_thread_data->my_inbox.is_idle_state(false), nullptr);
306                     __TBB_ASSERT(task_accessor::is_resume_task(*t) || isolation == no_isolation || isolation == ed.isolation, nullptr);
307                     // Check premature leave
308                     if (Waiter::postpone_execution(*t)) {
309                         __TBB_ASSERT(task_accessor::is_resume_task(*t) && dl_guard.old_properties.outermost,
310                             "Currently, the bypass loop can be interrupted only for resume task on outermost level");
311                         return t;
312                     }
313                     // Copy itt_caller to a stack because the context might be destroyed after t->execute.
314                     void* itt_caller = ed.context->my_itt_caller;
315                     suppress_unused_warning(itt_caller);
316 
317                     ITT_CALLEE_ENTER(ITTPossible, t, itt_caller);
318 
319                     if (ed.context->is_group_execution_cancelled()) {
320                         t = t->cancel(ed);
321                     } else {
322                         t = t->execute(ed);
323                     }
324 
325                     ITT_CALLEE_LEAVE(ITTPossible, itt_caller);
326 
327                     // The task affinity in execution data is set for affinitized tasks.
328                     // So drop it after the task execution.
329                     ed.affinity_slot = d1::no_slot;
330                     // Reset task owner id for bypassed task
331                     ed.original_slot = m_thread_data->my_arena_index;
332                     t = get_critical_task(t, ed, isolation, critical_allowed);
333                 }
334                 __TBB_ASSERT(m_thread_data && governor::is_thread_data_set(m_thread_data), nullptr);
335                 __TBB_ASSERT(m_thread_data->my_task_dispatcher == this, nullptr);
336                 // When refactoring, pay attention that m_thread_data can be changed after t->execute()
337                 __TBB_ASSERT(m_thread_data->my_arena_slot != nullptr, nullptr);
338                 arena_slot& slot = *m_thread_data->my_arena_slot;
339                 if (!waiter.continue_execution(slot, t)) {
340                     break;
341                 }
342                 // Retrieve the task from local task pool
343                 if (t || (slot.is_task_pool_published() && (t = slot.get_task(ed, isolation)))) {
344                     __TBB_ASSERT(ed.original_slot == m_thread_data->my_arena_index, NULL);
345                     ed.context = task_accessor::context(*t);
346                     ed.isolation = task_accessor::isolation(*t);
347                     continue;
348                 }
349                 // Retrieve the task from global sources
350                 t = receive_or_steal_task<ITTPossible>(
351                     *m_thread_data, ed, waiter, isolation, dl_guard.old_properties.fifo_tasks_allowed,
352                     critical_allowed
353                 );
354             } while (t != nullptr); // main dispatch loop
355             break; // Exit exception loop;
356         } catch (...) {
357             if (global_control::active_value(global_control::terminate_on_exception) == 1) {
358                 do_throw_noexcept([] { throw; });
359             }
360             if (ed.context->cancel_group_execution()) {
361                 /* We are the first to signal cancellation, so store the exception that caused it. */
362                 ed.context->my_exception.store(tbb_exception_ptr::allocate(), std::memory_order_release);
363             }
364         }
365     } // Infinite exception loop
366     __TBB_ASSERT(t == nullptr, nullptr);
367 
368 
369 #if __TBB_RESUMABLE_TASKS
370     if (dl_guard.old_properties.outermost) {
371         recall_point();
372     }
373 #endif /* __TBB_RESUMABLE_TASKS */
374 
375     return nullptr;
376 }
377 
378 #if __TBB_RESUMABLE_TASKS
recall_point()379 inline void task_dispatcher::recall_point() {
380     if (this != &m_thread_data->my_arena_slot->default_task_dispatcher()) {
381         __TBB_ASSERT(m_suspend_point != nullptr, nullptr);
382         __TBB_ASSERT(m_suspend_point->m_is_owner_recalled.load(std::memory_order_relaxed) == false, nullptr);
383         d1::suspend([](suspend_point_type* sp) {
384             sp->m_is_owner_recalled.store(true, std::memory_order_release);
385             auto is_related_suspend_point = [sp] (market_context context) {
386                 std::uintptr_t sp_addr = std::uintptr_t(sp);
387                 return sp_addr == context.my_uniq_addr;
388             };
389             sp->m_arena->my_market->get_wait_list().notify(is_related_suspend_point);
390         });
391 
392         if (m_thread_data->my_inbox.is_idle_state(true)) {
393             m_thread_data->my_inbox.set_is_idle(false);
394         }
395     }
396 }
397 #endif /* __TBB_RESUMABLE_TASKS */
398 
399 #if __TBB_PREVIEW_CRITICAL_TASKS
get_critical_task(d1::task * t,execution_data_ext & ed,isolation_type isolation,bool critical_allowed)400 inline d1::task* task_dispatcher::get_critical_task(d1::task* t, execution_data_ext& ed, isolation_type isolation, bool critical_allowed) {
401     __TBB_ASSERT( critical_allowed || !m_properties.critical_task_allowed, nullptr );
402 
403     if (!critical_allowed) {
404         // The stack is already in the process of critical path execution. Cannot take another
405         // critical work until finish with the current one.
406         __TBB_ASSERT(!m_properties.critical_task_allowed, nullptr);
407         return t;
408     }
409 
410     assert_pointers_valid(m_thread_data, m_thread_data->my_arena, m_thread_data->my_arena_slot);
411     thread_data& td = *m_thread_data;
412     arena& a = *td.my_arena;
413     arena_slot& slot = *td.my_arena_slot;
414 
415     d1::task* crit_t = a.get_critical_task(slot.hint_for_critical_stream, isolation);
416     if (crit_t != nullptr) {
417         assert_task_valid(crit_t);
418         if (t != nullptr) {
419             assert_pointer_valid</*alignment = */alignof(void*)>(ed.context);
420             r1::spawn(*t, *ed.context);
421         }
422         ed.context = task_accessor::context(*crit_t);
423         ed.isolation = task_accessor::isolation(*crit_t);
424 
425         // We cannot execute more than one critical task on the same stack.
426         // In other words, we prevent nested critical tasks.
427         m_properties.critical_task_allowed = false;
428 
429         // TODO: add a test that the observer is called when critical task is taken.
430         a.my_observers.notify_entry_observers(td.my_last_observer, td.my_is_worker);
431         t = crit_t;
432     } else {
433         // Was unable to find critical work in the queue. Allow inspecting the queue in nested
434         // invocations. Handles the case when critical task has been just completed.
435         m_properties.critical_task_allowed = true;
436     }
437     return t;
438 }
439 #else
get_critical_task(d1::task * t,execution_data_ext &,isolation_type,bool)440 inline d1::task* task_dispatcher::get_critical_task(d1::task* t, execution_data_ext&, isolation_type, bool /*critical_allowed*/) {
441     return t;
442 }
443 #endif
444 
get_mailbox_task(mail_inbox & my_inbox,execution_data_ext & ed,isolation_type isolation)445 inline d1::task* task_dispatcher::get_mailbox_task(mail_inbox& my_inbox, execution_data_ext& ed, isolation_type isolation) {
446     while (task_proxy* const tp = my_inbox.pop(isolation)) {
447         if (d1::task* result = tp->extract_task<task_proxy::mailbox_bit>()) {
448             ed.original_slot = (unsigned short)(-2);
449             ed.affinity_slot = ed.task_disp->m_thread_data->my_arena_index;
450             return result;
451         }
452         // We have exclusive access to the proxy, and can destroy it.
453         tp->allocator.delete_object(tp, ed);
454     }
455     return NULL;
456 }
457 
458 template <typename Waiter>
local_wait_for_all(d1::task * t,Waiter & waiter)459 d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter) {
460     if (governor::is_itt_present()) {
461         return local_wait_for_all</*ITTPossible = */ true>(t, waiter);
462     } else {
463         return local_wait_for_all</*ITTPossible = */ false>(t, waiter);
464     }
465 }
466 
467 } // namespace r1
468 } // namespace detail
469 } // namespace tbb
470 
471 #endif // _TBB_task_dispatcher_H
472 
473