1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // All platform-specific threading support is encapsulated here. */
18 
19 #ifndef __RML_thread_monitor_H
20 #define __RML_thread_monitor_H
21 
22 #if __TBB_USE_WINAPI
23 #include <windows.h>
24 #include <process.h>
25 #include <malloc.h> //_alloca
26 #include "misc.h" // support for processor groups
27 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
28 #include <thread>
29 #endif
30 #elif __TBB_USE_POSIX
31 #include <pthread.h>
32 #include <cstring>
33 #include <cstdlib>
34 #else
35 #error Unsupported platform
36 #endif
37 #include <cstdio>
38 
39 #include "oneapi/tbb/detail/_template_helpers.h"
40 
41 #include "itt_notify.h"
42 #include "semaphore.h"
43 
44 // All platform-specific threading support is in this header.
45 
46 #if (_WIN32||_WIN64)&&!__TBB_ipf
47 // Deal with 64K aliasing.  The formula for "offset" is a Fibonacci hash function,
48 // which has the desirable feature of spreading out the offsets fairly evenly
49 // without knowing the total number of offsets, and furthermore unlikely to
50 // accidentally cancel out other 64K aliasing schemes that Microsoft might implement later.
51 // See Knuth Vol 3. "Theorem S" for details on Fibonacci hashing.
52 // The second statement is really does need "volatile", otherwise the compiler might remove the _alloca.
53 #define AVOID_64K_ALIASING(idx)                       \
54     std::size_t offset = (idx+1) * 40503U % (1U<<16);      \
55     void* volatile sink_for_alloca = _alloca(offset); \
56     __TBB_ASSERT_EX(sink_for_alloca, "_alloca failed");
57 #else
58 // Linux thread allocators avoid 64K aliasing.
59 #define AVOID_64K_ALIASING(idx) tbb::detail::suppress_unused_warning(idx)
60 #endif /* _WIN32||_WIN64 */
61 
62 namespace tbb {
63 namespace detail {
64 namespace r1 {
65 
66 // Forward declaration: throws std::runtime_error with what() returning error_code description prefixed with aux_info
67 void handle_perror(int error_code, const char* aux_info);
68 
69 namespace rml {
70 namespace internal {
71 
72 #if __TBB_USE_ITT_NOTIFY
73 static const ::tbb::detail::r1::tchar *SyncType_RML = _T("%Constant");
74 static const ::tbb::detail::r1::tchar *SyncObj_ThreadMonitor = _T("RML Thr Monitor");
75 #endif /* __TBB_USE_ITT_NOTIFY */
76 
77 //! Monitor with limited two-phase commit form of wait.
78 /** At most one thread should wait on an instance at a time. */
79 class thread_monitor {
80 public:
81     class cookie {
82         friend class thread_monitor;
83         std::atomic<std::size_t> my_epoch{0};
84     };
thread_monitor()85     thread_monitor() : skipped_wakeup(false), my_sema() {
86         ITT_SYNC_CREATE(&my_sema, SyncType_RML, SyncObj_ThreadMonitor);
87     }
~thread_monitor()88     ~thread_monitor() {}
89 
90     //! If a thread is waiting or started a two-phase wait, notify it.
91     /** Can be called by any thread. */
92     void notify();
93 
94     //! Begin two-phase wait.
95     /** Should only be called by thread that owns the monitor.
96         The caller must either complete the wait or cancel it. */
97     void prepare_wait( cookie& c );
98 
99     //! Complete a two-phase wait and wait until notification occurs after the earlier prepare_wait.
100     void commit_wait( cookie& c );
101 
102     //! Cancel a two-phase wait.
103     void cancel_wait();
104 
105 #if __TBB_USE_WINAPI
106     typedef HANDLE handle_type;
107 
108     #define __RML_DECL_THREAD_ROUTINE unsigned WINAPI
109     typedef unsigned (WINAPI *thread_routine_type)(void*);
110 
111     //! Launch a thread
112     static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const size_t* worker_index = NULL );
113 
114 #elif __TBB_USE_POSIX
115     typedef pthread_t handle_type;
116 
117     #define __RML_DECL_THREAD_ROUTINE void*
118     typedef void*(*thread_routine_type)(void*);
119 
120     //! Launch a thread
121     static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size );
122 #endif /* __TBB_USE_POSIX */
123 
124     //! Join thread
125     static void join(handle_type handle);
126 
127     //! Detach thread
128     static void detach_thread(handle_type handle);
129 private:
130     cookie my_cookie; // epoch counter
131     std::atomic<bool> in_wait{false};
132     bool skipped_wakeup;
133     binary_semaphore my_sema;
134 #if __TBB_USE_POSIX
135     static void check( int error_code, const char* routine );
136 #endif
137 };
138 
139 #if __TBB_USE_WINAPI
140 
141 #ifndef STACK_SIZE_PARAM_IS_A_RESERVATION
142 #define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000
143 #endif
144 
145 // _beginthreadex API is not available in Windows 8 Store* applications, so use std::thread instead
146 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
launch(thread_routine_type thread_function,void * arg,std::size_t,const std::size_t *)147 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_function, void* arg, std::size_t, const std::size_t*) {
148 //TODO: check that exception thrown from std::thread is not swallowed silently
149     std::thread* thread_tmp=new std::thread(thread_function, arg);
150     return thread_tmp->native_handle();
151 }
152 #else
launch(thread_routine_type thread_routine,void * arg,std::size_t stack_size,const std::size_t * worker_index)153 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const std::size_t* worker_index ) {
154     unsigned thread_id;
155     int number_of_processor_groups = ( worker_index ) ? NumberOfProcessorGroups() : 0;
156     unsigned create_flags = ( number_of_processor_groups > 1 ) ? CREATE_SUSPENDED : 0;
157     HANDLE h = (HANDLE)_beginthreadex( NULL, unsigned(stack_size), thread_routine, arg, STACK_SIZE_PARAM_IS_A_RESERVATION | create_flags, &thread_id );
158     if( !h ) {
159         handle_perror(0, "thread_monitor::launch: _beginthreadex failed\n");
160     }
161     if ( number_of_processor_groups > 1 ) {
162         MoveThreadIntoProcessorGroup( h, FindProcessorGroupIndex( static_cast<int>(*worker_index) ) );
163         ResumeThread( h );
164     }
165     return h;
166 }
167 #endif //__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
168 
join(handle_type handle)169 void thread_monitor::join(handle_type handle) {
170 #if TBB_USE_ASSERT
171     DWORD res =
172 #endif
173         WaitForSingleObjectEx(handle, INFINITE, FALSE);
174     __TBB_ASSERT( res==WAIT_OBJECT_0, NULL );
175 #if TBB_USE_ASSERT
176     BOOL val =
177 #endif
178         CloseHandle(handle);
179     __TBB_ASSERT( val, NULL );
180 }
181 
detach_thread(handle_type handle)182 void thread_monitor::detach_thread(handle_type handle) {
183 #if TBB_USE_ASSERT
184     BOOL val =
185 #endif
186         CloseHandle(handle);
187     __TBB_ASSERT( val, NULL );
188 }
189 
190 #endif /* __TBB_USE_WINAPI */
191 
192 #if __TBB_USE_POSIX
check(int error_code,const char * routine)193 inline void thread_monitor::check( int error_code, const char* routine ) {
194     if( error_code ) {
195         handle_perror(error_code, routine);
196     }
197 }
198 
launch(void * (* thread_routine)(void *),void * arg,std::size_t stack_size)199 inline thread_monitor::handle_type thread_monitor::launch( void* (*thread_routine)(void*), void* arg, std::size_t stack_size ) {
200     // FIXME - consider more graceful recovery than just exiting if a thread cannot be launched.
201     // Note that there are some tricky situations to deal with, such that the thread is already
202     // grabbed as part of an OpenMP team.
203     pthread_attr_t s;
204     check(pthread_attr_init( &s ), "pthread_attr_init has failed");
205     if( stack_size>0 )
206         check(pthread_attr_setstacksize( &s, stack_size ), "pthread_attr_setstack_size has failed" );
207     pthread_t handle;
208     check( pthread_create( &handle, &s, thread_routine, arg ), "pthread_create has failed" );
209     check( pthread_attr_destroy( &s ), "pthread_attr_destroy has failed" );
210     return handle;
211 }
212 
join(handle_type handle)213 void thread_monitor::join(handle_type handle) {
214     check(pthread_join(handle, NULL), "pthread_join has failed");
215 }
216 
detach_thread(handle_type handle)217 void thread_monitor::detach_thread(handle_type handle) {
218     check(pthread_detach(handle), "pthread_detach has failed");
219 }
220 #endif /* __TBB_USE_POSIX */
221 
notify()222 inline void thread_monitor::notify() {
223     my_cookie.my_epoch.store(my_cookie.my_epoch.load(std::memory_order_acquire) + 1, std::memory_order_release);
224     bool do_signal = in_wait.exchange( false );
225     if( do_signal )
226         my_sema.V();
227 }
228 
prepare_wait(cookie & c)229 inline void thread_monitor::prepare_wait( cookie& c ) {
230     if( skipped_wakeup ) {
231         // Lazily consume a signal that was skipped due to cancel_wait
232         skipped_wakeup = false;
233         my_sema.P(); // does not really wait on the semaphore
234     }
235     // Former c = my_cookie
236     c.my_epoch.store(my_cookie.my_epoch.load(std::memory_order_acquire), std::memory_order_release);
237     in_wait.store( true, std::memory_order_seq_cst );
238 }
239 
commit_wait(cookie & c)240 inline void thread_monitor::commit_wait( cookie& c ) {
241     bool do_it = ( c.my_epoch.load(std::memory_order_relaxed) == my_cookie.my_epoch.load(std::memory_order_relaxed) );
242     if( do_it ) my_sema.P();
243     else        cancel_wait();
244 }
245 
cancel_wait()246 inline void thread_monitor::cancel_wait() {
247     // if not in_wait, then some thread has sent us a signal;
248     // it will be consumed by the next prepare_wait call
249     skipped_wakeup = ! in_wait.exchange( false );
250 }
251 
252 } // namespace internal
253 } // namespace rml
254 } // namespace r1
255 } // namespace detail
256 } // namespace tbb
257 
258 #endif /* __RML_thread_monitor_H */
259