1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef _TBB_scheduler_common_H
18 #define _TBB_scheduler_common_H
19
20 #include "oneapi/tbb/detail/_utils.h"
21 #include "oneapi/tbb/detail/_template_helpers.h"
22 #include "oneapi/tbb/detail/_task.h"
23 #include "oneapi/tbb/detail/_machine.h"
24 #include "oneapi/tbb/task_group.h"
25 #include "oneapi/tbb/cache_aligned_allocator.h"
26 #include "itt_notify.h"
27 #include "co_context.h"
28 #include "misc.h"
29 #include "governor.h"
30
31 #ifndef __TBB_SCHEDULER_MUTEX_TYPE
32 #define __TBB_SCHEDULER_MUTEX_TYPE tbb::spin_mutex
33 #endif
34 // TODO: add conditional inclusion based on specified type
35 #include "oneapi/tbb/spin_mutex.h"
36 #include "oneapi/tbb/mutex.h"
37
38 #if TBB_USE_ASSERT
39 #include <atomic>
40 #endif
41
42 #include <cstdint>
43 #include <exception>
44
45 //! Mutex type for global locks in the scheduler
46 using scheduler_mutex_type = __TBB_SCHEDULER_MUTEX_TYPE;
47
48 #if _MSC_VER && !defined(__INTEL_COMPILER)
49 // Workaround for overzealous compiler warnings
50 // These particular warnings are so ubiquitous that no attempt is made to narrow
51 // the scope of the warnings.
52 #pragma warning (disable: 4100 4127 4312 4244 4267 4706)
53 #endif
54
55 namespace tbb {
56 namespace detail {
57 namespace r1 {
58
59 class arena;
60 class mail_inbox;
61 class mail_outbox;
62 class market;
63 class observer_proxy;
64
65 enum task_stream_accessor_type { front_accessor = 0, back_nonnull_accessor };
66 template<task_stream_accessor_type> class task_stream;
67
68 using isolation_type = std::intptr_t;
69 constexpr isolation_type no_isolation = 0;
70
71 //------------------------------------------------------------------------
72 // Extended execute data
73 //------------------------------------------------------------------------
74
75 //! Execute data used on a task dispatcher side, reflects a current execution state
76 struct execution_data_ext : d1::execution_data {
77 task_dispatcher* task_disp{};
78 isolation_type isolation{};
79 d1::wait_context* wait_ctx{};
80 };
81
82 //------------------------------------------------------------------------
83 // Task accessor
84 //------------------------------------------------------------------------
85
86 //! Interpretation of reserved task fields inside a task dispatcher
87 struct task_accessor {
88 static constexpr std::uint64_t proxy_task_trait = 1;
89 static constexpr std::uint64_t resume_task_trait = 2;
contexttask_accessor90 static d1::task_group_context*& context(d1::task& t) {
91 task_group_context** tgc = reinterpret_cast<task_group_context**>(&t.m_reserved[0]);
92 return *tgc;
93 }
isolationtask_accessor94 static isolation_type& isolation(d1::task& t) {
95 isolation_type* tag = reinterpret_cast<isolation_type*>(&t.m_reserved[2]);
96 return *tag;
97 }
set_proxy_traittask_accessor98 static void set_proxy_trait(d1::task& t) {
99 // TODO: refactor proxy tasks not to work on uninitialized memory.
100 //__TBB_ASSERT((t.m_version_and_traits & proxy_task_trait) == 0, nullptr);
101 t.m_version_and_traits |= proxy_task_trait;
102 }
is_proxy_tasktask_accessor103 static bool is_proxy_task(d1::task& t) {
104 return (t.m_version_and_traits & proxy_task_trait) != 0;
105 }
set_resume_traittask_accessor106 static void set_resume_trait(d1::task& t) {
107 __TBB_ASSERT((t.m_version_and_traits & resume_task_trait) == 0, nullptr);
108 t.m_version_and_traits |= resume_task_trait;
109 }
is_resume_tasktask_accessor110 static bool is_resume_task(d1::task& t) {
111 return (t.m_version_and_traits & resume_task_trait) != 0;
112 }
113 };
114
115 //------------------------------------------------------------------------
116 //! Extended variant of the standard offsetof macro
117 /** The standard offsetof macro is not sufficient for TBB as it can be used for
118 POD-types only. The constant 0x1000 (not NULL) is necessary to appease GCC. **/
119 #define __TBB_offsetof(class_name, member_name) \
120 ((ptrdiff_t)&(reinterpret_cast<class_name*>(0x1000)->member_name) - 0x1000)
121
122 //! Returns address of the object containing a member with the given name and address
123 #define __TBB_get_object_ref(class_name, member_name, member_addr) \
124 (*reinterpret_cast<class_name*>((char*)member_addr - __TBB_offsetof(class_name, member_name)))
125
126 //! Helper class for tracking floating point context and task group context switches
127 /** Assuming presence of an itt collector, in addition to keeping track of floating
128 point context, this class emits itt events to indicate begin and end of task group
129 context execution **/
130 template <bool report_tasks>
131 class context_guard_helper {
132 const d1::task_group_context* curr_ctx;
133 d1::cpu_ctl_env guard_cpu_ctl_env;
134 d1::cpu_ctl_env curr_cpu_ctl_env;
135 public:
context_guard_helper()136 context_guard_helper() : curr_ctx(NULL) {
137 guard_cpu_ctl_env.get_env();
138 curr_cpu_ctl_env = guard_cpu_ctl_env;
139 }
~context_guard_helper()140 ~context_guard_helper() {
141 if (curr_cpu_ctl_env != guard_cpu_ctl_env)
142 guard_cpu_ctl_env.set_env();
143 if (report_tasks && curr_ctx)
144 ITT_TASK_END;
145 }
146 // The function is called from bypass dispatch loop on the hot path.
147 // Consider performance issues when refactoring.
set_ctx(const d1::task_group_context * ctx)148 void set_ctx(const d1::task_group_context* ctx) {
149 if (!ctx)
150 return;
151 const d1::cpu_ctl_env* ctl = reinterpret_cast<const d1::cpu_ctl_env*>(&ctx->my_cpu_ctl_env);
152 // Compare the FPU settings directly because the context can be reused between parallel algorithms.
153 if (*ctl != curr_cpu_ctl_env) {
154 curr_cpu_ctl_env = *ctl;
155 curr_cpu_ctl_env.set_env();
156 }
157 if (report_tasks && ctx != curr_ctx) {
158 // if task group context was active, report end of current execution frame.
159 if (curr_ctx)
160 ITT_TASK_END;
161 // reporting begin of new task group context execution frame.
162 // using address of task group context object to group tasks (parent).
163 // id of task execution frame is NULL and reserved for future use.
164 ITT_TASK_BEGIN(ctx, ctx->my_name, NULL);
165 curr_ctx = ctx;
166 }
167 }
168 #if _WIN64
restore_default()169 void restore_default() {
170 if (curr_cpu_ctl_env != guard_cpu_ctl_env) {
171 guard_cpu_ctl_env.set_env();
172 curr_cpu_ctl_env = guard_cpu_ctl_env;
173 }
174 }
175 #endif // _WIN64
176 };
177
178 #if (_WIN32 || _WIN64 || __unix__) && (__TBB_x86_32 || __TBB_x86_64)
179 #if _MSC_VER
180 #pragma intrinsic(__rdtsc)
181 #endif
machine_time_stamp()182 inline std::uint64_t machine_time_stamp() {
183 #if __INTEL_COMPILER
184 return _rdtsc();
185 #elif _MSC_VER
186 return __rdtsc();
187 #else
188 std::uint32_t hi, lo;
189 __asm__ __volatile__("rdtsc" : "=d"(hi), "=a"(lo));
190 return (std::uint64_t(hi) << 32) | lo;
191 #endif
192 }
193
prolonged_pause_impl()194 inline void prolonged_pause_impl() {
195 // Assumption based on practice: 1000-2000 ticks seems to be a suitable invariant for the
196 // majority of platforms. Currently, skip platforms that define __TBB_STEALING_PAUSE
197 // because these platforms require very careful tuning.
198 std::uint64_t prev = machine_time_stamp();
199 const std::uint64_t finish = prev + 1000;
200 atomic_backoff backoff;
201 do {
202 backoff.bounded_pause();
203 std::uint64_t curr = machine_time_stamp();
204 if (curr <= prev)
205 // Possibly, the current logical thread is moved to another hardware thread or overflow is occurred.
206 break;
207 prev = curr;
208 } while (prev < finish);
209 }
210 #else
prolonged_pause_impl()211 inline void prolonged_pause_impl() {
212 #ifdef __TBB_ipf
213 static const long PauseTime = 1500;
214 #else
215 static const long PauseTime = 80;
216 #endif
217 // TODO IDEA: Update PauseTime adaptively?
218 machine_pause(PauseTime);
219 }
220 #endif
221
prolonged_pause()222 inline void prolonged_pause() {
223 #if __TBB_WAITPKG_INTRINSICS_PRESENT && (_WIN32 || _WIN64 || __unix__) && (__TBB_x86_32 || __TBB_x86_64)
224 if (governor::wait_package_enabled()) {
225 std::uint64_t time_stamp = machine_time_stamp();
226 // _tpause function directs the processor to enter an implementation-dependent optimized state
227 // until the Time Stamp Counter reaches or exceeds the value specified in second parameter.
228 // Constant "700" is ticks to wait for.
229 // First parameter 0 selects between a lower power (cleared) or faster wakeup (set) optimized state.
230 _tpause(0, time_stamp + 700);
231 }
232 else
233 #endif
234 prolonged_pause_impl();
235 }
236
237 // TODO: investigate possibility to work with number of CPU cycles
238 // because for different configurations this number of pauses + yields
239 // will be calculated in different amount of CPU cycles
240 // for example use rdtsc for it
241 class stealing_loop_backoff {
242 const int my_pause_threshold;
243 const int my_yield_threshold;
244 int my_pause_count;
245 int my_yield_count;
246 public:
247 // my_yield_threshold = 100 is an experimental value. Ideally, once we start calling __TBB_Yield(),
248 // the time spent spinning before calling is_out_of_work() should be approximately
249 // the time it takes for a thread to be woken up. Doing so would guarantee that we do
250 // no worse than 2x the optimal spin time. Or perhaps a time-slice quantum is the right amount.
stealing_loop_backoff(int num_workers,int yields_multiplier)251 stealing_loop_backoff(int num_workers, int yields_multiplier)
252 : my_pause_threshold{ 2 * (num_workers + 1) }
253 #if __APPLE__
254 // threshold value tuned separately for macOS due to high cost of sched_yield there
255 , my_yield_threshold{10 * yields_multiplier}
256 #else
257 , my_yield_threshold{100 * yields_multiplier}
258 #endif
259 , my_pause_count{}
260 , my_yield_count{}
261 {}
pause()262 bool pause() {
263 prolonged_pause();
264 if (my_pause_count++ >= my_pause_threshold) {
265 my_pause_count = my_pause_threshold;
266 d0::yield();
267 if (my_yield_count++ >= my_yield_threshold) {
268 my_yield_count = my_yield_threshold;
269 return true;
270 }
271 }
272 return false;
273 }
reset_wait()274 void reset_wait() {
275 my_pause_count = my_yield_count = 0;
276 }
277 };
278
279 //------------------------------------------------------------------------
280 // Exception support
281 //------------------------------------------------------------------------
282 //! Task group state change propagation global epoch
283 /** Together with generic_scheduler::my_context_state_propagation_epoch forms
284 cross-thread signaling mechanism that allows to avoid locking at the hot path
285 of normal execution flow.
286
287 When a descendant task group context is registered or unregistered, the global
288 and local epochs are compared. If they differ, a state change is being propagated,
289 and thus registration/deregistration routines take slower branch that may block
290 (at most one thread of the pool can be blocked at any moment). Otherwise the
291 control path is lock-free and fast. **/
292 extern std::atomic<std::uintptr_t> the_context_state_propagation_epoch;
293
294 //! Mutex guarding state change propagation across task groups forest.
295 /** Also protects modification of related data structures. **/
296 typedef scheduler_mutex_type context_state_propagation_mutex_type;
297 extern context_state_propagation_mutex_type the_context_state_propagation_mutex;
298
299 class tbb_exception_ptr {
300 std::exception_ptr my_ptr;
301 public:
302 static tbb_exception_ptr* allocate() noexcept;
303
304 //! Destroys this objects
305 /** Note that objects of this type can be created only by the allocate() method. **/
306 void destroy() noexcept;
307
308 //! Throws the contained exception .
309 void throw_self();
310
311 private:
tbb_exception_ptr(const std::exception_ptr & src)312 tbb_exception_ptr(const std::exception_ptr& src) : my_ptr(src) {}
313 }; // class tbb_exception_ptr
314
315 //------------------------------------------------------------------------
316 // Debugging support
317 //------------------------------------------------------------------------
318
319 #if TBB_USE_ASSERT
320 static const std::uintptr_t venom = tbb::detail::select_size_t_constant<0xDEADBEEFU, 0xDDEEAADDDEADBEEFULL>::value;
321
poison_value(std::uintptr_t & val)322 inline void poison_value(std::uintptr_t& val) { val = venom; }
323
poison_value(std::atomic<std::uintptr_t> & val)324 inline void poison_value(std::atomic<std::uintptr_t>& val) { val.store(venom, std::memory_order_relaxed); }
325
326 /** Expected to be used in assertions only, thus no empty form is defined. **/
is_alive(std::uintptr_t v)327 inline bool is_alive(std::uintptr_t v) { return v != venom; }
328
329 /** Logically, this method should be a member of class task.
330 But we do not want to publish it, so it is here instead. */
assert_task_valid(const d1::task * t)331 inline void assert_task_valid(const d1::task* t) {
332 assert_pointer_valid(t);
333 }
334 #else /* !TBB_USE_ASSERT */
335
336 /** In contrast to debug version poison_value() is a macro here because
337 the variable used as its argument may be undefined in release builds. **/
338 #define poison_value(g) ((void)0)
339
assert_task_valid(const d1::task *)340 inline void assert_task_valid(const d1::task*) {}
341
342 #endif /* !TBB_USE_ASSERT */
343
344 struct suspend_point_type {
345 #if __TBB_RESUMABLE_TASKS
346 //! The arena related to this task_dispatcher
347 arena* m_arena{ nullptr };
348 //! The random for the resume task
349 FastRandom m_random;
350 //! The flag is raised when the original owner should return to this task dispatcher.
351 std::atomic<bool> m_is_owner_recalled{ false };
352 //! Inicates if the resume task should be placed to the critical task stream.
353 bool m_is_critical{ false };
354 //! Associated coroutine
355 co_context m_co_context;
356
357 struct resume_task final : public d1::task {
358 task_dispatcher& m_target;
resume_tasksuspend_point_type::final359 explicit resume_task(task_dispatcher& target) : m_target(target) {
360 task_accessor::set_resume_trait(*this);
361 }
362 d1::task* execute(d1::execution_data& ed) override;
cancelsuspend_point_type::final363 d1::task* cancel(d1::execution_data&) override {
364 __TBB_ASSERT(false, "The resume task cannot be canceled");
365 return nullptr;
366 }
367 } m_resume_task;
368
369 suspend_point_type(arena* a, std::size_t stack_size, task_dispatcher& target);
370 #endif /*__TBB_RESUMABLE_TASKS */
371 };
372
373 #if _MSC_VER && !defined(__INTEL_COMPILER)
374 // structure was padded due to alignment specifier
375 #pragma warning( push )
376 #pragma warning( disable: 4324 )
377 #endif
378
alignas(max_nfs_size)379 class alignas (max_nfs_size) task_dispatcher {
380 public:
381 // TODO: reconsider low level design to better organize dependencies and files.
382 friend class thread_data;
383 friend class arena_slot;
384 friend class nested_arena_context;
385 friend class delegated_task;
386 friend struct base_waiter;
387
388 //! The data of the current thread attached to this task_dispatcher
389 thread_data* m_thread_data{ nullptr };
390
391 //! The current execution data
392 execution_data_ext m_execute_data_ext;
393
394 //! Properties
395 struct properties {
396 bool outermost{ true };
397 bool fifo_tasks_allowed{ true };
398 bool critical_task_allowed{ true };
399 } m_properties;
400
401 //! Position in the call stack when stealing is still allowed.
402 std::uintptr_t m_stealing_threshold{};
403
404 //! Suspend point (null if this task dispatcher has been never suspended)
405 suspend_point_type* m_suspend_point{ nullptr };
406
407 //! Attempt to get a task from the mailbox.
408 /** Gets a task only if it has not been executed by its sender or a thief
409 that has stolen it from the sender's task pool. Otherwise returns NULL.
410 This method is intended to be used only by the thread extracting the proxy
411 from its mailbox. (In contrast to local task pool, mailbox can be read only
412 by its owner). **/
413 d1::task* get_mailbox_task(mail_inbox& my_inbox, execution_data_ext& ed, isolation_type isolation);
414
415 d1::task* get_critical_task(d1::task*, execution_data_ext&, isolation_type, bool);
416
417 template <bool ITTPossible, typename Waiter>
418 d1::task* receive_or_steal_task(thread_data& tls, execution_data_ext& ed, Waiter& waiter,
419 isolation_type isolation, bool outermost, bool criticality_absence);
420
421 template <bool ITTPossible, typename Waiter>
422 d1::task* local_wait_for_all(d1::task * t, Waiter& waiter);
423
424 task_dispatcher(const task_dispatcher&) = delete;
425
426 bool can_steal();
427 public:
428 task_dispatcher(arena* a);
429
430 ~task_dispatcher() {
431 if (m_suspend_point) {
432 m_suspend_point->~suspend_point_type();
433 cache_aligned_deallocate(m_suspend_point);
434 }
435 poison_pointer(m_thread_data);
436 poison_pointer(m_suspend_point);
437 }
438
439 template <typename Waiter>
440 d1::task* local_wait_for_all(d1::task* t, Waiter& waiter);
441
442 bool allow_fifo_task(bool new_state) {
443 bool old_state = m_properties.fifo_tasks_allowed;
444 m_properties.fifo_tasks_allowed = new_state;
445 return old_state;
446 }
447
448 isolation_type set_isolation(isolation_type isolation) {
449 isolation_type prev = m_execute_data_ext.isolation;
450 m_execute_data_ext.isolation = isolation;
451 return prev;
452 }
453
454 thread_data& get_thread_data() {
455 __TBB_ASSERT(m_thread_data, nullptr);
456 return *m_thread_data;
457 }
458
459 static void execute_and_wait(d1::task* t, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx);
460
461 void set_stealing_threshold(std::uintptr_t stealing_threshold) {
462 bool assert_condition = (stealing_threshold == 0 && m_stealing_threshold != 0) ||
463 (stealing_threshold != 0 && m_stealing_threshold == 0);
464 __TBB_ASSERT_EX( assert_condition, nullptr );
465 m_stealing_threshold = stealing_threshold;
466 }
467
468 d1::task* get_inbox_or_critical_task(execution_data_ext&, mail_inbox&, isolation_type, bool);
469 d1::task* get_stream_or_critical_task(execution_data_ext&, arena&, task_stream<front_accessor>&,
470 unsigned& /*hint_for_stream*/, isolation_type,
471 bool /*critical_allowed*/);
472 d1::task* steal_or_get_critical(execution_data_ext&, arena&, unsigned /*arena_index*/, FastRandom&,
473 isolation_type, bool /*critical_allowed*/);
474
475 #if __TBB_RESUMABLE_TASKS
476 /* [[noreturn]] */ void co_local_wait_for_all() noexcept;
477 void suspend(suspend_callback_type suspend_callback, void* user_callback);
478 bool resume(task_dispatcher& target);
479 suspend_point_type* get_suspend_point();
480 void init_suspend_point(arena* a, std::size_t stack_size);
481 friend void internal_resume(suspend_point_type*);
482 void recall_point();
483 #endif /* __TBB_RESUMABLE_TASKS */
484 };
485
486 #if _MSC_VER && !defined(__INTEL_COMPILER)
487 #pragma warning( pop )
488 #endif
489
calculate_stealing_threshold(std::uintptr_t base,std::size_t stack_size)490 inline std::uintptr_t calculate_stealing_threshold(std::uintptr_t base, std::size_t stack_size) {
491 return base - stack_size / 2;
492 }
493
494 struct task_group_context_impl {
495 static void destroy(d1::task_group_context&);
496 static void initialize(d1::task_group_context&);
497 static void register_with(d1::task_group_context&, thread_data*);
498 static void bind_to_impl(d1::task_group_context&, thread_data*);
499 static void bind_to(d1::task_group_context&, thread_data*);
500 template <typename T>
501 static void propagate_task_group_state(d1::task_group_context&, std::atomic<T> d1::task_group_context::*, d1::task_group_context&, T);
502 static bool cancel_group_execution(d1::task_group_context&);
503 static bool is_group_execution_cancelled(const d1::task_group_context&);
504 static void reset(d1::task_group_context&);
505 static void capture_fp_settings(d1::task_group_context&);
506 static void copy_fp_settings(d1::task_group_context& ctx, const d1::task_group_context& src);
507 };
508
509
510 //! Forward declaration for scheduler entities
511 bool gcc_rethrow_exception_broken();
512 void fix_broken_rethrow();
513 //! Forward declaration: throws std::runtime_error with what() returning error_code description prefixed with aux_info
514 void handle_perror(int error_code, const char* aux_info);
515
516 } // namespace r1
517 } // namespace detail
518 } // namespace tbb
519
520 #endif /* _TBB_scheduler_common_H */
521