1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_co_context_H
18 #define _TBB_co_context_H
19 
20 #include "oneapi/tbb/detail/_config.h"
21 
22 #if __TBB_RESUMABLE_TASKS
23 
24 #include <cstddef>
25 #include <cstdint>
26 
27 #if __TBB_RESUMABLE_TASKS_USE_THREADS
28 
29 #if _WIN32 || _WIN64
30 #include <windows.h>
31 #else
32 #include <pthread.h>
33 #endif
34 
35 #include <condition_variable>
36 #include "governor.h"
37 
38 #elif _WIN32 || _WIN64
39 #include <windows.h>
40 #else
41 // ucontext.h API is deprecated since macOS 10.6
42 #if __APPLE__
43     #if __INTEL_COMPILER
44         #pragma warning(push)
45         #pragma warning(disable:1478)
46     #elif __clang__
47         #pragma clang diagnostic push
48         #pragma clang diagnostic ignored "-Wdeprecated-declarations"
49     #endif
50 #endif // __APPLE__
51 
52 #include <ucontext.h>
53 #include <sys/mman.h> // mprotect
54 
55 #include "governor.h" // default_page_size()
56 
57 #ifndef MAP_STACK
58 // macOS* does not define MAP_STACK
59 #define MAP_STACK 0
60 #endif
61 #ifndef MAP_ANONYMOUS
62 // macOS* defines MAP_ANON, which is deprecated in Linux*.
63 #define MAP_ANONYMOUS MAP_ANON
64 #endif
65 #endif // _WIN32 || _WIN64
66 
67 namespace tbb {
68 namespace detail {
69 namespace r1 {
70 
71 #if __TBB_RESUMABLE_TASKS_USE_THREADS
72     struct coroutine_type {
73 #if _WIN32 || _WIN64
74         using handle_type = HANDLE;
75 #else
76         using handle_type = pthread_t;
77 #endif
78 
79         handle_type my_thread;
80         std::condition_variable my_condvar;
81         std::mutex my_mutex;
82         thread_data* my_thread_data{ nullptr };
83         bool my_is_active{ true };
84     };
85 #elif _WIN32 || _WIN64
86     typedef LPVOID coroutine_type;
87 #else
88     struct coroutine_type {
89         coroutine_type() : my_context(), my_stack(), my_stack_size() {}
90         ucontext_t my_context;
91         void* my_stack;
92         std::size_t my_stack_size;
93     };
94 #endif
95 
96     // Forward declaration of the coroutine API.
97     void create_coroutine(coroutine_type& c, std::size_t stack_size, void* arg);
98     void current_coroutine(coroutine_type& c);
99     void swap_coroutine(coroutine_type& prev_coroutine, coroutine_type& new_coroutine);
100     void destroy_coroutine(coroutine_type& c);
101 
102 class co_context {
103     enum co_state {
104         co_invalid,
105         co_suspended,
106         co_executing,
107         co_destroyed
108     };
109     coroutine_type      my_coroutine;
110     co_state            my_state;
111 
112 public:
co_context(std::size_t stack_size,void * arg)113     co_context(std::size_t stack_size, void* arg)
114         : my_state(stack_size ? co_suspended : co_executing)
115     {
116         if (stack_size) {
117             __TBB_ASSERT(arg != 0, nullptr);
118             create_coroutine(my_coroutine, stack_size, arg);
119         } else {
120             current_coroutine(my_coroutine);
121         }
122     }
123 
~co_context()124     ~co_context() {
125         __TBB_ASSERT(1 << my_state & (1 << co_suspended | 1 << co_executing), NULL);
126         if (my_state == co_suspended) {
127 #if __TBB_RESUMABLE_TASKS_USE_THREADS
128             my_state = co_executing;
129 #endif
130             destroy_coroutine(my_coroutine);
131         }
132         my_state = co_destroyed;
133     }
134 
resume(co_context & target)135     void resume(co_context& target) {
136         // Do not create non-trivial objects on the stack of this function. They might never be destroyed.
137         __TBB_ASSERT(my_state == co_executing, NULL);
138         __TBB_ASSERT(target.my_state == co_suspended, NULL);
139 
140         my_state = co_suspended;
141         target.my_state = co_executing;
142 
143         // 'target' can reference an invalid object after swap_coroutine. Do not access it.
144         swap_coroutine(my_coroutine, target.my_coroutine);
145 
146         __TBB_ASSERT(my_state == co_executing, NULL);
147     }
148 };
149 
150 #if _WIN32 || _WIN64
151 /* [[noreturn]] */ void __stdcall co_local_wait_for_all(void* arg) noexcept;
152 #else
153 /* [[noreturn]] */ void co_local_wait_for_all(unsigned hi, unsigned lo) noexcept;
154 #endif
155 
156 #if __TBB_RESUMABLE_TASKS_USE_THREADS
157 void handle_perror(int error_code, const char* what);
158 
check(int error_code,const char * routine)159 inline void check(int error_code, const char* routine) {
160     if (error_code) {
161         handle_perror(error_code, routine);
162     }
163 }
164 
165 using thread_data_t = std::pair<coroutine_type&, void*&>;
166 
167 #if _WIN32 || _WIN64
coroutine_thread_func(void * d)168 inline unsigned WINAPI coroutine_thread_func(void* d)
169 #else
170 inline void* coroutine_thread_func(void* d)
171 #endif
172 {
173     thread_data_t& data = *static_cast<thread_data_t*>(d);
174     coroutine_type& c = data.first;
175     void* arg = data.second;
176     {
177         std::unique_lock<std::mutex> lock(c.my_mutex);
178         __TBB_ASSERT(c.my_thread_data == nullptr, nullptr);
179         c.my_is_active = false;
180 
181         // We read the data notify the waiting thread
182         data.second = nullptr;
183         c.my_condvar.notify_one();
184 
185         c.my_condvar.wait(lock, [&c] { return c.my_is_active == true; });
186     }
187     __TBB_ASSERT(c.my_thread_data != nullptr, nullptr);
188     governor::set_thread_data(*c.my_thread_data);
189 
190 #if _WIN32 || _WIN64
191     co_local_wait_for_all(arg);
192 
193     return 0;
194 #else
195     std::uintptr_t addr = std::uintptr_t(arg);
196     unsigned lo = unsigned(addr);
197     unsigned hi = unsigned(std::uint64_t(addr) >> 32);
198     __TBB_ASSERT(sizeof(addr) == 8 || hi == 0, nullptr);
199 
200     co_local_wait_for_all(hi, lo);
201 
202     return nullptr;
203 #endif
204 };
205 
create_coroutine(coroutine_type & c,std::size_t stack_size,void * arg)206 inline void create_coroutine(coroutine_type& c, std::size_t stack_size, void* arg) {
207     thread_data_t data{ c, arg };
208 
209 #if _WIN32 || _WIN64
210     c.my_thread = (HANDLE)_beginthreadex(nullptr, unsigned(stack_size), coroutine_thread_func, &data, STACK_SIZE_PARAM_IS_A_RESERVATION, nullptr);
211     if (!c.my_thread) {
212         handle_perror(0, "create_coroutine: _beginthreadex failed\n");
213     }
214 #else
215     pthread_attr_t s;
216     check(pthread_attr_init(&s), "pthread_attr_init has failed");
217     if (stack_size > 0) {
218         check(pthread_attr_setstacksize(&s, stack_size), "pthread_attr_setstack_size has failed");
219     }
220     check(pthread_create(&c.my_thread, &s, coroutine_thread_func, &data), "pthread_create has failed");
221     check(pthread_attr_destroy(&s), "pthread_attr_destroy has failed");
222 #endif
223 
224     // Wait for the just created thread to read the data
225     std::unique_lock<std::mutex> lock(c.my_mutex);
226     c.my_condvar.wait(lock, [&arg] { return arg == nullptr; });
227 }
228 
current_coroutine(coroutine_type & c)229 inline void current_coroutine(coroutine_type& c) {
230 #if _WIN32 || _WIN64
231     c.my_thread = GetCurrentThread();
232 #else
233     c.my_thread = pthread_self();
234 #endif
235 }
236 
swap_coroutine(coroutine_type & prev_coroutine,coroutine_type & new_coroutine)237 inline void swap_coroutine(coroutine_type& prev_coroutine, coroutine_type& new_coroutine) {
238     thread_data* td = governor::get_thread_data();
239     __TBB_ASSERT(prev_coroutine.my_is_active == true, "The current thread should be active");
240 
241     // Detach our state before notification other thread
242     // (because we might be notified just after other thread notification)
243     prev_coroutine.my_thread_data = nullptr;
244     prev_coroutine.my_is_active = false;
245     governor::clear_thread_data();
246 
247     {
248         std::unique_lock<std::mutex> lock(new_coroutine.my_mutex);
249         __TBB_ASSERT(new_coroutine.my_is_active == false, "The sleeping thread should not be active");
250         __TBB_ASSERT(new_coroutine.my_thread_data == nullptr, "The sleeping thread should not be active");
251 
252         new_coroutine.my_thread_data = td;
253         new_coroutine.my_is_active = true;
254         new_coroutine.my_condvar.notify_one();
255     }
256 
257     std::unique_lock<std::mutex> lock(prev_coroutine.my_mutex);
258     prev_coroutine.my_condvar.wait(lock, [&prev_coroutine] { return prev_coroutine.my_is_active == true; });
259     __TBB_ASSERT(governor::get_thread_data() != nullptr, nullptr);
260     governor::set_thread_data(*prev_coroutine.my_thread_data);
261 }
262 
destroy_coroutine(coroutine_type & c)263 inline void destroy_coroutine(coroutine_type& c) {
264     {
265         std::unique_lock<std::mutex> lock(c.my_mutex);
266         __TBB_ASSERT(c.my_thread_data == nullptr, "The sleeping thread should not be active");
267         __TBB_ASSERT(c.my_is_active == false, "The sleeping thread should not be active");
268         c.my_is_active = true;
269         c.my_condvar.notify_one();
270     }
271 #if _WIN32 || _WIN64
272     WaitForSingleObject(c.my_thread, INFINITE);
273     CloseHandle(c.my_thread);
274 #else
275     check(pthread_join(c.my_thread, nullptr), "pthread_join has failed");
276 #endif
277 }
278 #elif _WIN32 || _WIN64
create_coroutine(coroutine_type & c,std::size_t stack_size,void * arg)279 inline void create_coroutine(coroutine_type& c, std::size_t stack_size, void* arg) {
280     __TBB_ASSERT(arg, NULL);
281     c = CreateFiber(stack_size, co_local_wait_for_all, arg);
282     __TBB_ASSERT(c, NULL);
283 }
284 
current_coroutine(coroutine_type & c)285 inline void current_coroutine(coroutine_type& c) {
286     c = IsThreadAFiber() ? GetCurrentFiber() :
287         ConvertThreadToFiberEx(nullptr, FIBER_FLAG_FLOAT_SWITCH);
288     __TBB_ASSERT(c, NULL);
289 }
290 
swap_coroutine(coroutine_type & prev_coroutine,coroutine_type & new_coroutine)291 inline void swap_coroutine(coroutine_type& prev_coroutine, coroutine_type& new_coroutine) {
292     if (!IsThreadAFiber()) {
293         ConvertThreadToFiberEx(nullptr, FIBER_FLAG_FLOAT_SWITCH);
294     }
295     __TBB_ASSERT(new_coroutine, NULL);
296     prev_coroutine = GetCurrentFiber();
297     __TBB_ASSERT(prev_coroutine, NULL);
298     SwitchToFiber(new_coroutine);
299 }
300 
destroy_coroutine(coroutine_type & c)301 inline void destroy_coroutine(coroutine_type& c) {
302     __TBB_ASSERT(c, NULL);
303     DeleteFiber(c);
304 }
305 #else // !(_WIN32 || _WIN64)
306 
create_coroutine(coroutine_type & c,std::size_t stack_size,void * arg)307 inline void create_coroutine(coroutine_type& c, std::size_t stack_size, void* arg) {
308     const std::size_t REG_PAGE_SIZE = governor::default_page_size();
309     const std::size_t page_aligned_stack_size = (stack_size + (REG_PAGE_SIZE - 1)) & ~(REG_PAGE_SIZE - 1);
310     const std::size_t protected_stack_size = page_aligned_stack_size + 2 * REG_PAGE_SIZE;
311 
312     // Allocate the stack with protection property
313     std::uintptr_t stack_ptr = (std::uintptr_t)mmap(NULL, protected_stack_size, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_STACK, -1, 0);
314     __TBB_ASSERT((void*)stack_ptr != MAP_FAILED, NULL);
315 
316     // Allow read write on our stack (guarded pages are still protected)
317     int err = mprotect((void*)(stack_ptr + REG_PAGE_SIZE), page_aligned_stack_size, PROT_READ | PROT_WRITE);
318     __TBB_ASSERT_EX(!err, NULL);
319 
320     // Remember the stack state
321     c.my_stack = (void*)(stack_ptr + REG_PAGE_SIZE);
322     c.my_stack_size = page_aligned_stack_size;
323 
324     err = getcontext(&c.my_context);
325     __TBB_ASSERT_EX(!err, NULL);
326 
327     c.my_context.uc_link = 0;
328     // cast to char* to disable FreeBSD clang-3.4.1 'incompatible type' error
329     c.my_context.uc_stack.ss_sp = (char*)c.my_stack;
330     c.my_context.uc_stack.ss_size = c.my_stack_size;
331     c.my_context.uc_stack.ss_flags = 0;
332 
333     typedef void(*coroutine_func_t)();
334 
335     std::uintptr_t addr = std::uintptr_t(arg);
336     unsigned lo = unsigned(addr);
337     unsigned hi = unsigned(std::uint64_t(addr) >> 32);
338     __TBB_ASSERT(sizeof(addr) == 8 || hi == 0, nullptr);
339 
340     makecontext(&c.my_context, (coroutine_func_t)co_local_wait_for_all, 2, hi, lo);
341 }
342 
current_coroutine(coroutine_type & c)343 inline void current_coroutine(coroutine_type& c) {
344     int err = getcontext(&c.my_context);
345     __TBB_ASSERT_EX(!err, NULL);
346 }
347 
swap_coroutine(coroutine_type & prev_coroutine,coroutine_type & new_coroutine)348 inline void swap_coroutine(coroutine_type& prev_coroutine, coroutine_type& new_coroutine) {
349     int err = swapcontext(&prev_coroutine.my_context, &new_coroutine.my_context);
350     __TBB_ASSERT_EX(!err, NULL);
351 }
352 
destroy_coroutine(coroutine_type & c)353 inline void destroy_coroutine(coroutine_type& c) {
354     const std::size_t REG_PAGE_SIZE = governor::default_page_size();
355     // Free stack memory with guarded pages
356     munmap((void*)((std::uintptr_t)c.my_stack - REG_PAGE_SIZE), c.my_stack_size + 2 * REG_PAGE_SIZE);
357     // Clear the stack state afterwards
358     c.my_stack = NULL;
359     c.my_stack_size = 0;
360 }
361 
362 #if __APPLE__
363     #if __INTEL_COMPILER
364         #pragma warning(pop) // 1478 warning
365     #elif __clang__
366         #pragma clang diagnostic pop // "-Wdeprecated-declarations"
367     #endif
368 #endif
369 
370 #endif // _WIN32 || _WIN64
371 
372 } // namespace r1
373 } // namespace detail
374 } // namespace tbb
375 
376 #endif /* __TBB_RESUMABLE_TASKS */
377 
378 #endif /* _TBB_co_context_H */
379