1 /*
2 Copyright (c) 2020-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_thread_data_H
18 #define __TBB_thread_data_H
19
20 #include "oneapi/tbb/detail/_task.h"
21 #include "oneapi/tbb/task.h"
22
23 #include "rml_base.h" // rml::job
24
25 #include "scheduler_common.h"
26 #include "arena.h"
27 #include "concurrent_monitor.h"
28 #include "mailbox.h"
29 #include "misc.h" // FastRandom
30 #include "small_object_pool_impl.h"
31
32 #include <atomic>
33
34 namespace tbb {
35 namespace detail {
36 namespace r1 {
37
38 class task;
39 class arena_slot;
40 class task_group_context;
41 class task_dispatcher;
42
43 class context_list : public intrusive_list<intrusive_list_node> {
44 public:
45 bool orphaned{false};
46
47 //! Last state propagation epoch known to this thread
48 /** Together with the_context_state_propagation_epoch constitute synchronization protocol
49 that keeps hot path of task group context construction destruction mostly
50 lock-free.
51 When local epoch equals the global one, the state of task group contexts
52 registered with this thread is consistent with that of the task group trees
53 they belong to. **/
54 std::atomic<std::uintptr_t> epoch{};
55
56 //! Mutex protecting access to the list of task group contexts.
57 d1::mutex m_mutex{};
58
destroy()59 void destroy() {
60 this->~context_list();
61 cache_aligned_deallocate(this);
62 }
63
remove(intrusive_list_node & val)64 void remove(intrusive_list_node& val) {
65 mutex::scoped_lock lock(m_mutex);
66
67 intrusive_list<intrusive_list_node>::remove(val);
68
69 if (orphaned && empty()) {
70 lock.release();
71 destroy();
72 }
73 }
74
push_front(intrusive_list_node & val)75 void push_front(intrusive_list_node& val) {
76 mutex::scoped_lock lock(m_mutex);
77
78 intrusive_list<intrusive_list_node>::push_front(val);
79 }
80
orphan()81 void orphan() {
82 mutex::scoped_lock lock(m_mutex);
83
84 orphaned = true;
85 if (empty()) {
86 lock.release();
87 destroy();
88 }
89 }
90 };
91
92 //------------------------------------------------------------------------
93 // Thread Data
94 //------------------------------------------------------------------------
95 class thread_data : public ::rml::job
96 , public intrusive_list_node
97 , no_copy {
98 public:
thread_data(unsigned short index,bool is_worker)99 thread_data(unsigned short index, bool is_worker)
100 : my_arena_index{ index }
101 , my_is_worker{ is_worker }
102 , my_task_dispatcher{ nullptr }
103 , my_arena{}
104 , my_arena_slot{}
105 , my_inbox{}
106 , my_random{ this }
107 , my_last_observer{ nullptr }
new(cache_aligned_allocate (sizeof (small_object_pool_impl)))108 , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}}
109 , my_context_list(new (cache_aligned_allocate(sizeof(context_list))) context_list{})
110 #if __TBB_RESUMABLE_TASKS
111 , my_post_resume_action{ post_resume_action::none }
112 , my_post_resume_arg{nullptr}
113 #endif /* __TBB_RESUMABLE_TASKS */
114 {
115 ITT_SYNC_CREATE(&my_context_list->m_mutex, SyncType_Scheduler, SyncObj_ContextsList);
116 }
117
~thread_data()118 ~thread_data() {
119 my_context_list->orphan();
120 my_small_object_pool->destroy();
121 poison_pointer(my_task_dispatcher);
122 poison_pointer(my_arena);
123 poison_pointer(my_arena_slot);
124 poison_pointer(my_last_observer);
125 poison_pointer(my_small_object_pool);
126 poison_pointer(my_context_list);
127 #if __TBB_RESUMABLE_TASKS
128 poison_pointer(my_post_resume_arg);
129 #endif /* __TBB_RESUMABLE_TASKS */
130 }
131
132 void attach_arena(arena& a, std::size_t index);
133 bool is_attached_to(arena*);
134 void attach_task_dispatcher(task_dispatcher&);
135 void detach_task_dispatcher();
136 void context_list_cleanup();
137 template <typename T>
138 void propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state);
139
140 //! Index of the arena slot the scheduler occupies now, or occupied last time
141 unsigned short my_arena_index;
142
143 //! Indicates if the thread is created by RML
144 const bool my_is_worker;
145
146 //! The current task dipsatcher
147 task_dispatcher* my_task_dispatcher;
148
149 //! The arena that I own (if external thread) or am servicing at the moment (if worker)
150 arena* my_arena;
151
152 //! Pointer to the slot in the arena we own at the moment
153 arena_slot* my_arena_slot;
154
155 //! The mailbox (affinity mechanism) the current thread attached to
156 mail_inbox my_inbox;
157
158 //! The random generator
159 FastRandom my_random;
160
161 //! Last observer in the observers list processed on this slot
162 observer_proxy* my_last_observer;
163
164 //! Pool of small object for fast task allocation
165 small_object_pool_impl* my_small_object_pool;
166
167 context_list* my_context_list;
168 #if __TBB_RESUMABLE_TASKS
169 //! The list of possible post resume actions.
170 enum class post_resume_action {
171 invalid,
172 register_waiter,
173 resume,
174 callback,
175 cleanup,
176 notify,
177 none
178 };
179
180 //! The callback to call the user callback passed to tbb::suspend.
181 struct suspend_callback_wrapper {
182 suspend_callback_type suspend_callback;
183 void* user_callback;
184 suspend_point_type* tag;
185
operatorsuspend_callback_wrapper186 void operator()() {
187 __TBB_ASSERT(suspend_callback && user_callback && tag, nullptr);
188 suspend_callback(user_callback, tag);
189 }
190 };
191
192 //! Suspends the current coroutine (task_dispatcher).
193 void suspend(void* suspend_callback, void* user_callback);
194
195 //! Resumes the target task_dispatcher.
196 void resume(task_dispatcher& target);
197
198 //! Set post resume action to perform after resume.
set_post_resume_action(post_resume_action pra,void * arg)199 void set_post_resume_action(post_resume_action pra, void* arg) {
200 __TBB_ASSERT(my_post_resume_action == post_resume_action::none, "The Post resume action must not be set");
201 __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument");
202 my_post_resume_action = pra;
203 my_post_resume_arg = arg;
204 }
205
clear_post_resume_action()206 void clear_post_resume_action() {
207 my_post_resume_action = thread_data::post_resume_action::none;
208 my_post_resume_arg = nullptr;
209 }
210
211 //! Performs post resume action.
212 void do_post_resume_action();
213
214 //! The post resume action requested after the swap contexts.
215 post_resume_action my_post_resume_action;
216
217 //! The post resume action argument.
218 void* my_post_resume_arg;
219 #endif /* __TBB_RESUMABLE_TASKS */
220
221 //! The default context
222 // TODO: consider using common default context because it is used only to simplify
223 // cancellation check.
224 d1::task_group_context my_default_context;
225 };
226
attach_arena(arena & a,std::size_t index)227 inline void thread_data::attach_arena(arena& a, std::size_t index) {
228 my_arena = &a;
229 my_arena_index = static_cast<unsigned short>(index);
230 my_arena_slot = a.my_slots + index;
231 // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe)
232 my_inbox.attach(my_arena->mailbox(index));
233 }
234
is_attached_to(arena * a)235 inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; }
236
attach_task_dispatcher(task_dispatcher & task_disp)237 inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) {
238 __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr);
239 __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr);
240 task_disp.m_thread_data = this;
241 my_task_dispatcher = &task_disp;
242 }
243
detach_task_dispatcher()244 inline void thread_data::detach_task_dispatcher() {
245 __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr);
246 __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr);
247 my_task_dispatcher->m_thread_data = nullptr;
248 my_task_dispatcher = nullptr;
249 }
250
251 } // namespace r1
252 } // namespace detail
253 } // namespace tbb
254
255 #endif // __TBB_thread_data_H
256
257