1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_scheduler_common_H
18 #define _TBB_scheduler_common_H
19 
20 #include "oneapi/tbb/detail/_utils.h"
21 #include "oneapi/tbb/detail/_template_helpers.h"
22 #include "oneapi/tbb/detail/_task.h"
23 #include "oneapi/tbb/detail/_machine.h"
24 #include "oneapi/tbb/task_group.h"
25 #include "oneapi/tbb/cache_aligned_allocator.h"
26 #include "itt_notify.h"
27 #include "co_context.h"
28 #include "misc.h"
29 #include "governor.h"
30 
31 #ifndef __TBB_SCHEDULER_MUTEX_TYPE
32 #define __TBB_SCHEDULER_MUTEX_TYPE tbb::spin_mutex
33 #endif
34 // TODO: add conditional inclusion based on specified type
35 #include "oneapi/tbb/spin_mutex.h"
36 #include "oneapi/tbb/mutex.h"
37 
38 #if TBB_USE_ASSERT
39 #include <atomic>
40 #endif
41 
42 #include <cstdint>
43 #include <exception>
44 
45 //! Mutex type for global locks in the scheduler
46 using scheduler_mutex_type = __TBB_SCHEDULER_MUTEX_TYPE;
47 
48 #if _MSC_VER && !defined(__INTEL_COMPILER)
49     // Workaround for overzealous compiler warnings
50     // These particular warnings are so ubiquitous that no attempt is made to narrow
51     // the scope of the warnings.
52     #pragma warning (disable: 4100 4127 4312 4244 4267 4706)
53 #endif
54 
55 namespace tbb {
56 namespace detail {
57 namespace r1 {
58 
59 class arena;
60 class mail_inbox;
61 class mail_outbox;
62 class market;
63 class observer_proxy;
64 
65 enum task_stream_accessor_type { front_accessor = 0, back_nonnull_accessor };
66 template<task_stream_accessor_type> class task_stream;
67 
68 using isolation_type = std::intptr_t;
69 constexpr isolation_type no_isolation = 0;
70 
71 //------------------------------------------------------------------------
72 // Extended execute data
73 //------------------------------------------------------------------------
74 
75 //! Execute data used on a task dispatcher side, reflects a current execution state
76 struct execution_data_ext : d1::execution_data {
77     task_dispatcher* task_disp{};
78     isolation_type isolation{};
79     d1::wait_context* wait_ctx{};
80 };
81 
82 //------------------------------------------------------------------------
83 // Task accessor
84 //------------------------------------------------------------------------
85 
86 //! Interpretation of reserved task fields inside a task dispatcher
87 struct task_accessor {
88     static constexpr std::uint64_t proxy_task_trait = 1;
89     static constexpr std::uint64_t resume_task_trait = 2;
contexttask_accessor90     static d1::task_group_context*& context(d1::task& t) {
91         task_group_context** tgc = reinterpret_cast<task_group_context**>(&t.m_reserved[0]);
92         return *tgc;
93     }
isolationtask_accessor94     static isolation_type& isolation(d1::task& t) {
95         isolation_type* tag = reinterpret_cast<isolation_type*>(&t.m_reserved[2]);
96         return *tag;
97     }
set_proxy_traittask_accessor98     static void set_proxy_trait(d1::task& t) {
99         // TODO: refactor proxy tasks not to work on uninitialized memory.
100         //__TBB_ASSERT((t.m_version_and_traits & proxy_task_trait) == 0, nullptr);
101         t.m_version_and_traits |= proxy_task_trait;
102     }
is_proxy_tasktask_accessor103     static bool is_proxy_task(d1::task& t) {
104         return (t.m_version_and_traits & proxy_task_trait) != 0;
105     }
set_resume_traittask_accessor106     static void set_resume_trait(d1::task& t) {
107         __TBB_ASSERT((t.m_version_and_traits & resume_task_trait) == 0, nullptr);
108         t.m_version_and_traits |= resume_task_trait;
109     }
is_resume_tasktask_accessor110     static bool is_resume_task(d1::task& t) {
111         return (t.m_version_and_traits & resume_task_trait) != 0;
112     }
113 };
114 
115 //------------------------------------------------------------------------
116 //! Extended variant of the standard offsetof macro
117 /** The standard offsetof macro is not sufficient for TBB as it can be used for
118     POD-types only. The constant 0x1000 (not NULL) is necessary to appease GCC. **/
119 #define __TBB_offsetof(class_name, member_name) \
120     ((ptrdiff_t)&(reinterpret_cast<class_name*>(0x1000)->member_name) - 0x1000)
121 
122 //! Returns address of the object containing a member with the given name and address
123 #define __TBB_get_object_ref(class_name, member_name, member_addr) \
124     (*reinterpret_cast<class_name*>((char*)member_addr - __TBB_offsetof(class_name, member_name)))
125 
126 //! Helper class for tracking floating point context and task group context switches
127 /** Assuming presence of an itt collector, in addition to keeping track of floating
128     point context, this class emits itt events to indicate begin and end of task group
129     context execution **/
130 template <bool report_tasks>
131 class context_guard_helper {
132     const d1::task_group_context* curr_ctx;
133     d1::cpu_ctl_env guard_cpu_ctl_env;
134     d1::cpu_ctl_env curr_cpu_ctl_env;
135 public:
context_guard_helper()136     context_guard_helper() : curr_ctx(NULL) {
137         guard_cpu_ctl_env.get_env();
138         curr_cpu_ctl_env = guard_cpu_ctl_env;
139     }
~context_guard_helper()140     ~context_guard_helper() {
141         if (curr_cpu_ctl_env != guard_cpu_ctl_env)
142             guard_cpu_ctl_env.set_env();
143         if (report_tasks && curr_ctx)
144             ITT_TASK_END;
145     }
146     // The function is called from bypass dispatch loop on the hot path.
147     // Consider performance issues when refactoring.
set_ctx(const d1::task_group_context * ctx)148     void set_ctx(const d1::task_group_context* ctx) {
149         if (!ctx)
150             return;
151         const d1::cpu_ctl_env* ctl = reinterpret_cast<const d1::cpu_ctl_env*>(&ctx->my_cpu_ctl_env);
152         // Compare the FPU settings directly because the context can be reused between parallel algorithms.
153         if (*ctl != curr_cpu_ctl_env) {
154             curr_cpu_ctl_env = *ctl;
155             curr_cpu_ctl_env.set_env();
156         }
157         if (report_tasks && ctx != curr_ctx) {
158             // if task group context was active, report end of current execution frame.
159             if (curr_ctx)
160                 ITT_TASK_END;
161             // reporting begin of new task group context execution frame.
162             // using address of task group context object to group tasks (parent).
163             // id of task execution frame is NULL and reserved for future use.
164             ITT_TASK_BEGIN(ctx, ctx->my_name, NULL);
165             curr_ctx = ctx;
166         }
167     }
168 #if _WIN64
restore_default()169     void restore_default() {
170         if (curr_cpu_ctl_env != guard_cpu_ctl_env) {
171             guard_cpu_ctl_env.set_env();
172             curr_cpu_ctl_env = guard_cpu_ctl_env;
173         }
174     }
175 #endif // _WIN64
176 };
177 
178 #if (_WIN32 || _WIN64 || __unix__) && (__TBB_x86_32 || __TBB_x86_64)
179 #if _MSC_VER
180 #pragma intrinsic(__rdtsc)
181 #endif
machine_time_stamp()182 inline std::uint64_t machine_time_stamp() {
183 #if __INTEL_COMPILER
184     return _rdtsc();
185 #elif _MSC_VER
186     return __rdtsc();
187 #else
188     std::uint32_t hi, lo;
189     __asm__ __volatile__("rdtsc" : "=d"(hi), "=a"(lo));
190     return (std::uint64_t(hi) << 32) | lo;
191 #endif
192 }
193 
prolonged_pause_impl()194 inline void prolonged_pause_impl() {
195     // Assumption based on practice: 1000-2000 ticks seems to be a suitable invariant for the
196     // majority of platforms. Currently, skip platforms that define __TBB_STEALING_PAUSE
197     // because these platforms require very careful tuning.
198     std::uint64_t prev = machine_time_stamp();
199     const std::uint64_t finish = prev + 1000;
200     atomic_backoff backoff;
201     do {
202         backoff.bounded_pause();
203         std::uint64_t curr = machine_time_stamp();
204         if (curr <= prev)
205             // Possibly, the current logical thread is moved to another hardware thread or overflow is occurred.
206             break;
207         prev = curr;
208     } while (prev < finish);
209 }
210 #else
prolonged_pause_impl()211 inline void prolonged_pause_impl() {
212 #ifdef __TBB_ipf
213     static const long PauseTime = 1500;
214 #else
215     static const long PauseTime = 80;
216 #endif
217     // TODO IDEA: Update PauseTime adaptively?
218     machine_pause(PauseTime);
219 }
220 #endif
221 
prolonged_pause()222 inline void prolonged_pause() {
223 #if __TBB_WAITPKG_INTRINSICS_PRESENT && (_WIN32 || _WIN64 || __unix__) && (__TBB_x86_32 || __TBB_x86_64)
224     if (governor::wait_package_enabled()) {
225         std::uint64_t time_stamp = machine_time_stamp();
226         // _tpause function directs the processor to enter an implementation-dependent optimized state
227         // until the Time Stamp Counter reaches or exceeds the value specified in second parameter.
228         // Constant "700" is ticks to wait for.
229         // First parameter 0 selects between a lower power (cleared) or faster wakeup (set) optimized state.
230         _tpause(0, time_stamp + 700);
231     }
232     else
233 #endif
234     prolonged_pause_impl();
235 }
236 
237 // TODO: investigate possibility to work with number of CPU cycles
238 // because for different configurations this number of pauses + yields
239 // will be calculated in different amount of CPU cycles
240 // for example use rdtsc for it
241 class stealing_loop_backoff {
242     const int my_pause_threshold;
243     const int my_yield_threshold;
244     int my_pause_count;
245     int my_yield_count;
246 public:
247     // my_yield_threshold = 100 is an experimental value. Ideally, once we start calling __TBB_Yield(),
248     // the time spent spinning before calling is_out_of_work() should be approximately
249     // the time it takes for a thread to be woken up. Doing so would guarantee that we do
250     // no worse than 2x the optimal spin time. Or perhaps a time-slice quantum is the right amount.
stealing_loop_backoff(int num_workers,int yields_multiplier)251     stealing_loop_backoff(int num_workers, int yields_multiplier)
252         : my_pause_threshold{ 2 * (num_workers + 1) }
253 #if __APPLE__
254         // threshold value tuned separately for macOS due to high cost of sched_yield there
255         , my_yield_threshold{10 * yields_multiplier}
256 #else
257         , my_yield_threshold{100 * yields_multiplier}
258 #endif
259         , my_pause_count{}
260         , my_yield_count{}
261     {}
pause()262     bool pause() {
263         prolonged_pause();
264         if (my_pause_count++ >= my_pause_threshold) {
265             my_pause_count = my_pause_threshold;
266             d0::yield();
267             if (my_yield_count++ >= my_yield_threshold) {
268                 my_yield_count = my_yield_threshold;
269                 return true;
270             }
271         }
272         return false;
273     }
reset_wait()274     void reset_wait() {
275         my_pause_count = my_yield_count = 0;
276     }
277 };
278 
279 //------------------------------------------------------------------------
280 // Exception support
281 //------------------------------------------------------------------------
282 //! Task group state change propagation global epoch
283 /** Together with generic_scheduler::my_context_state_propagation_epoch forms
284     cross-thread signaling mechanism that allows to avoid locking at the hot path
285     of normal execution flow.
286 
287     When a descendant task group context is registered or unregistered, the global
288     and local epochs are compared. If they differ, a state change is being propagated,
289     and thus registration/deregistration routines take slower branch that may block
290     (at most one thread of the pool can be blocked at any moment). Otherwise the
291     control path is lock-free and fast. **/
292 extern std::atomic<std::uintptr_t> the_context_state_propagation_epoch;
293 
294 //! Mutex guarding state change propagation across task groups forest.
295 /** Also protects modification of related data structures. **/
296 typedef scheduler_mutex_type context_state_propagation_mutex_type;
297 extern context_state_propagation_mutex_type the_context_state_propagation_mutex;
298 
299 class tbb_exception_ptr {
300     std::exception_ptr my_ptr;
301 public:
302     static tbb_exception_ptr* allocate() noexcept;
303 
304     //! Destroys this objects
305     /** Note that objects of this type can be created only by the allocate() method. **/
306     void destroy() noexcept;
307 
308     //! Throws the contained exception .
309     void throw_self();
310 
311 private:
tbb_exception_ptr(const std::exception_ptr & src)312     tbb_exception_ptr(const std::exception_ptr& src) : my_ptr(src) {}
313 }; // class tbb_exception_ptr
314 
315 //------------------------------------------------------------------------
316 // Debugging support
317 //------------------------------------------------------------------------
318 
319 #if TBB_USE_ASSERT
320 static const std::uintptr_t venom = tbb::detail::select_size_t_constant<0xDEADBEEFU, 0xDDEEAADDDEADBEEFULL>::value;
321 
poison_value(std::uintptr_t & val)322 inline void poison_value(std::uintptr_t& val) { val = venom; }
323 
poison_value(std::atomic<std::uintptr_t> & val)324 inline void poison_value(std::atomic<std::uintptr_t>& val) { val.store(venom, std::memory_order_relaxed); }
325 
326 /** Expected to be used in assertions only, thus no empty form is defined. **/
is_alive(std::uintptr_t v)327 inline bool is_alive(std::uintptr_t v) { return v != venom; }
328 
329 /** Logically, this method should be a member of class task.
330     But we do not want to publish it, so it is here instead. */
assert_task_valid(const d1::task * t)331 inline void assert_task_valid(const d1::task* t) {
332     assert_pointer_valid(t);
333 }
334 #else /* !TBB_USE_ASSERT */
335 
336 /** In contrast to debug version poison_value() is a macro here because
337     the variable used as its argument may be undefined in release builds. **/
338 #define poison_value(g) ((void)0)
339 
assert_task_valid(const d1::task *)340 inline void assert_task_valid(const d1::task*) {}
341 
342 #endif /* !TBB_USE_ASSERT */
343 
344 struct suspend_point_type {
345 #if __TBB_RESUMABLE_TASKS
346     //! The arena related to this task_dispatcher
347     arena* m_arena{ nullptr };
348     //! The random for the resume task
349     FastRandom m_random;
350     //! The flag is raised when the original owner should return to this task dispatcher.
351     std::atomic<bool> m_is_owner_recalled{ false };
352     //! Inicates if the resume task should be placed to the critical task stream.
353     bool m_is_critical{ false };
354     //! Associated coroutine
355     co_context m_co_context;
356 
357     struct resume_task final : public d1::task {
358         task_dispatcher& m_target;
resume_tasksuspend_point_type::final359         explicit resume_task(task_dispatcher& target) : m_target(target) {
360             task_accessor::set_resume_trait(*this);
361         }
362         d1::task* execute(d1::execution_data& ed) override;
cancelsuspend_point_type::final363         d1::task* cancel(d1::execution_data&) override {
364             __TBB_ASSERT(false, "The resume task cannot be canceled");
365             return nullptr;
366         }
367     } m_resume_task;
368 
369     suspend_point_type(arena* a, std::size_t stack_size, task_dispatcher& target);
370 #endif /*__TBB_RESUMABLE_TASKS */
371 };
372 
373 #if _MSC_VER && !defined(__INTEL_COMPILER)
374 // structure was padded due to alignment specifier
375 #pragma warning( push )
376 #pragma warning( disable: 4324 )
377 #endif
378 
alignas(max_nfs_size)379 class alignas (max_nfs_size) task_dispatcher {
380 public:
381     // TODO: reconsider low level design to better organize dependencies and files.
382     friend class thread_data;
383     friend class arena_slot;
384     friend class nested_arena_context;
385     friend class delegated_task;
386     friend struct base_waiter;
387 
388     //! The data of the current thread attached to this task_dispatcher
389     thread_data* m_thread_data{ nullptr };
390 
391     //! The current execution data
392     execution_data_ext m_execute_data_ext;
393 
394     //! Properties
395     struct properties {
396         bool outermost{ true };
397         bool fifo_tasks_allowed{ true };
398         bool critical_task_allowed{ true };
399     } m_properties;
400 
401     //! Position in the call stack when stealing is still allowed.
402     std::uintptr_t m_stealing_threshold{};
403 
404     //! Suspend point (null if this task dispatcher has been never suspended)
405     suspend_point_type* m_suspend_point{ nullptr };
406 
407     //! Attempt to get a task from the mailbox.
408     /** Gets a task only if it has not been executed by its sender or a thief
409         that has stolen it from the sender's task pool. Otherwise returns NULL.
410         This method is intended to be used only by the thread extracting the proxy
411         from its mailbox. (In contrast to local task pool, mailbox can be read only
412         by its owner). **/
413     d1::task* get_mailbox_task(mail_inbox& my_inbox, execution_data_ext& ed, isolation_type isolation);
414 
415     d1::task* get_critical_task(d1::task*, execution_data_ext&, isolation_type, bool);
416 
417     template <bool ITTPossible, typename Waiter>
418     d1::task* receive_or_steal_task(thread_data& tls, execution_data_ext& ed, Waiter& waiter,
419                                 isolation_type isolation, bool outermost, bool criticality_absence);
420 
421     template <bool ITTPossible, typename Waiter>
422     d1::task* local_wait_for_all(d1::task * t, Waiter& waiter);
423 
424     task_dispatcher(const task_dispatcher&) = delete;
425 
426     bool can_steal();
427 public:
428     task_dispatcher(arena* a);
429 
430     ~task_dispatcher() {
431         if (m_suspend_point) {
432             m_suspend_point->~suspend_point_type();
433             cache_aligned_deallocate(m_suspend_point);
434         }
435         poison_pointer(m_thread_data);
436         poison_pointer(m_suspend_point);
437     }
438 
439     template <typename Waiter>
440     d1::task* local_wait_for_all(d1::task* t, Waiter& waiter);
441 
442     bool allow_fifo_task(bool new_state) {
443         bool old_state = m_properties.fifo_tasks_allowed;
444         m_properties.fifo_tasks_allowed = new_state;
445         return old_state;
446     }
447 
448     isolation_type set_isolation(isolation_type isolation) {
449         isolation_type prev = m_execute_data_ext.isolation;
450         m_execute_data_ext.isolation = isolation;
451         return prev;
452     }
453 
454     thread_data& get_thread_data() {
455         __TBB_ASSERT(m_thread_data, nullptr);
456         return *m_thread_data;
457     }
458 
459     static void execute_and_wait(d1::task* t, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx);
460 
461     void set_stealing_threshold(std::uintptr_t stealing_threshold) {
462         bool assert_condition = (stealing_threshold == 0 && m_stealing_threshold != 0) ||
463                                 (stealing_threshold != 0 && m_stealing_threshold == 0);
464         __TBB_ASSERT_EX( assert_condition, nullptr );
465         m_stealing_threshold = stealing_threshold;
466     }
467 
468     d1::task* get_inbox_or_critical_task(execution_data_ext&, mail_inbox&, isolation_type, bool);
469     d1::task* get_stream_or_critical_task(execution_data_ext&, arena&, task_stream<front_accessor>&,
470                                       unsigned& /*hint_for_stream*/, isolation_type,
471                                       bool /*critical_allowed*/);
472     d1::task* steal_or_get_critical(execution_data_ext&, arena&, unsigned /*arena_index*/, FastRandom&,
473                                 isolation_type, bool /*critical_allowed*/);
474 
475 #if __TBB_RESUMABLE_TASKS
476     /* [[noreturn]] */ void co_local_wait_for_all() noexcept;
477     void suspend(suspend_callback_type suspend_callback, void* user_callback);
478     bool resume(task_dispatcher& target);
479     suspend_point_type* get_suspend_point();
480     void init_suspend_point(arena* a, std::size_t stack_size);
481     friend void internal_resume(suspend_point_type*);
482     void recall_point();
483 #endif /* __TBB_RESUMABLE_TASKS */
484 };
485 
486 #if _MSC_VER && !defined(__INTEL_COMPILER)
487 #pragma warning( pop )
488 #endif
489 
calculate_stealing_threshold(std::uintptr_t base,std::size_t stack_size)490 inline std::uintptr_t calculate_stealing_threshold(std::uintptr_t base, std::size_t stack_size) {
491     return base - stack_size / 2;
492 }
493 
494 struct task_group_context_impl {
495     static void destroy(d1::task_group_context&);
496     static void initialize(d1::task_group_context&);
497     static void register_with(d1::task_group_context&, thread_data*);
498     static void bind_to_impl(d1::task_group_context&, thread_data*);
499     static void bind_to(d1::task_group_context&, thread_data*);
500     template <typename T>
501     static void propagate_task_group_state(d1::task_group_context&, std::atomic<T> d1::task_group_context::*, d1::task_group_context&, T);
502     static bool cancel_group_execution(d1::task_group_context&);
503     static bool is_group_execution_cancelled(const d1::task_group_context&);
504     static void reset(d1::task_group_context&);
505     static void capture_fp_settings(d1::task_group_context&);
506     static void copy_fp_settings(d1::task_group_context& ctx, const d1::task_group_context& src);
507 };
508 
509 
510 //! Forward declaration for scheduler entities
511 bool gcc_rethrow_exception_broken();
512 void fix_broken_rethrow();
513 //! Forward declaration: throws std::runtime_error with what() returning error_code description prefixed with aux_info
514 void handle_perror(int error_code, const char* aux_info);
515 
516 } // namespace r1
517 } // namespace detail
518 } // namespace tbb
519 
520 #endif /* _TBB_scheduler_common_H */
521