1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 // All platform-specific threading support is encapsulated here. */
18
19 #ifndef __RML_thread_monitor_H
20 #define __RML_thread_monitor_H
21
22 #if __TBB_USE_WINAPI
23 #include <windows.h>
24 #include <process.h>
25 #include <malloc.h> //_alloca
26 #include "misc.h" // support for processor groups
27 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
28 #include <thread>
29 #endif
30 #elif __TBB_USE_POSIX
31 #include <pthread.h>
32 #include <cstring>
33 #include <cstdlib>
34 #else
35 #error Unsupported platform
36 #endif
37 #include <cstdio>
38
39 #include "oneapi/tbb/detail/_template_helpers.h"
40
41 #include "itt_notify.h"
42 #include "semaphore.h"
43
44 // All platform-specific threading support is in this header.
45
46 #if (_WIN32||_WIN64)&&!__TBB_ipf
47 // Deal with 64K aliasing. The formula for "offset" is a Fibonacci hash function,
48 // which has the desirable feature of spreading out the offsets fairly evenly
49 // without knowing the total number of offsets, and furthermore unlikely to
50 // accidentally cancel out other 64K aliasing schemes that Microsoft might implement later.
51 // See Knuth Vol 3. "Theorem S" for details on Fibonacci hashing.
52 // The second statement is really does need "volatile", otherwise the compiler might remove the _alloca.
53 #define AVOID_64K_ALIASING(idx) \
54 std::size_t offset = (idx+1) * 40503U % (1U<<16); \
55 void* volatile sink_for_alloca = _alloca(offset); \
56 __TBB_ASSERT_EX(sink_for_alloca, "_alloca failed");
57 #else
58 // Linux thread allocators avoid 64K aliasing.
59 #define AVOID_64K_ALIASING(idx) tbb::detail::suppress_unused_warning(idx)
60 #endif /* _WIN32||_WIN64 */
61
62 namespace tbb {
63 namespace detail {
64 namespace r1 {
65
66 // Forward declaration: throws std::runtime_error with what() returning error_code description prefixed with aux_info
67 void handle_perror(int error_code, const char* aux_info);
68
69 namespace rml {
70 namespace internal {
71
72 #if __TBB_USE_ITT_NOTIFY
73 static const ::tbb::detail::r1::tchar *SyncType_RML = _T("%Constant");
74 static const ::tbb::detail::r1::tchar *SyncObj_ThreadMonitor = _T("RML Thr Monitor");
75 #endif /* __TBB_USE_ITT_NOTIFY */
76
77 //! Monitor with limited two-phase commit form of wait.
78 /** At most one thread should wait on an instance at a time. */
79 class thread_monitor {
80 public:
81 class cookie {
82 friend class thread_monitor;
83 std::atomic<std::size_t> my_epoch{0};
84 };
thread_monitor()85 thread_monitor() : skipped_wakeup(false), my_sema() {
86 ITT_SYNC_CREATE(&my_sema, SyncType_RML, SyncObj_ThreadMonitor);
87 }
~thread_monitor()88 ~thread_monitor() {}
89
90 //! If a thread is waiting or started a two-phase wait, notify it.
91 /** Can be called by any thread. */
92 void notify();
93
94 //! Begin two-phase wait.
95 /** Should only be called by thread that owns the monitor.
96 The caller must either complete the wait or cancel it. */
97 void prepare_wait( cookie& c );
98
99 //! Complete a two-phase wait and wait until notification occurs after the earlier prepare_wait.
100 void commit_wait( cookie& c );
101
102 //! Cancel a two-phase wait.
103 void cancel_wait();
104
105 #if __TBB_USE_WINAPI
106 typedef HANDLE handle_type;
107
108 #define __RML_DECL_THREAD_ROUTINE unsigned WINAPI
109 typedef unsigned (WINAPI *thread_routine_type)(void*);
110
111 //! Launch a thread
112 static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const size_t* worker_index = NULL );
113
114 #elif __TBB_USE_POSIX
115 typedef pthread_t handle_type;
116
117 #define __RML_DECL_THREAD_ROUTINE void*
118 typedef void*(*thread_routine_type)(void*);
119
120 //! Launch a thread
121 static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size );
122 #endif /* __TBB_USE_POSIX */
123
124 //! Join thread
125 static void join(handle_type handle);
126
127 //! Detach thread
128 static void detach_thread(handle_type handle);
129 private:
130 cookie my_cookie; // epoch counter
131 std::atomic<bool> in_wait{false};
132 bool skipped_wakeup;
133 binary_semaphore my_sema;
134 #if __TBB_USE_POSIX
135 static void check( int error_code, const char* routine );
136 #endif
137 };
138
139 #if __TBB_USE_WINAPI
140
141 #ifndef STACK_SIZE_PARAM_IS_A_RESERVATION
142 #define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000
143 #endif
144
145 // _beginthreadex API is not available in Windows 8 Store* applications, so use std::thread instead
146 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
launch(thread_routine_type thread_function,void * arg,std::size_t,const std::size_t *)147 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_function, void* arg, std::size_t, const std::size_t*) {
148 //TODO: check that exception thrown from std::thread is not swallowed silently
149 std::thread* thread_tmp=new std::thread(thread_function, arg);
150 return thread_tmp->native_handle();
151 }
152 #else
launch(thread_routine_type thread_routine,void * arg,std::size_t stack_size,const std::size_t * worker_index)153 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const std::size_t* worker_index ) {
154 unsigned thread_id;
155 int number_of_processor_groups = ( worker_index ) ? NumberOfProcessorGroups() : 0;
156 unsigned create_flags = ( number_of_processor_groups > 1 ) ? CREATE_SUSPENDED : 0;
157 HANDLE h = (HANDLE)_beginthreadex( NULL, unsigned(stack_size), thread_routine, arg, STACK_SIZE_PARAM_IS_A_RESERVATION | create_flags, &thread_id );
158 if( !h ) {
159 handle_perror(0, "thread_monitor::launch: _beginthreadex failed\n");
160 }
161 if ( number_of_processor_groups > 1 ) {
162 MoveThreadIntoProcessorGroup( h, FindProcessorGroupIndex( static_cast<int>(*worker_index) ) );
163 ResumeThread( h );
164 }
165 return h;
166 }
167 #endif //__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
168
join(handle_type handle)169 void thread_monitor::join(handle_type handle) {
170 #if TBB_USE_ASSERT
171 DWORD res =
172 #endif
173 WaitForSingleObjectEx(handle, INFINITE, FALSE);
174 __TBB_ASSERT( res==WAIT_OBJECT_0, NULL );
175 #if TBB_USE_ASSERT
176 BOOL val =
177 #endif
178 CloseHandle(handle);
179 __TBB_ASSERT( val, NULL );
180 }
181
detach_thread(handle_type handle)182 void thread_monitor::detach_thread(handle_type handle) {
183 #if TBB_USE_ASSERT
184 BOOL val =
185 #endif
186 CloseHandle(handle);
187 __TBB_ASSERT( val, NULL );
188 }
189
190 #endif /* __TBB_USE_WINAPI */
191
192 #if __TBB_USE_POSIX
check(int error_code,const char * routine)193 inline void thread_monitor::check( int error_code, const char* routine ) {
194 if( error_code ) {
195 handle_perror(error_code, routine);
196 }
197 }
198
launch(void * (* thread_routine)(void *),void * arg,std::size_t stack_size)199 inline thread_monitor::handle_type thread_monitor::launch( void* (*thread_routine)(void*), void* arg, std::size_t stack_size ) {
200 // FIXME - consider more graceful recovery than just exiting if a thread cannot be launched.
201 // Note that there are some tricky situations to deal with, such that the thread is already
202 // grabbed as part of an OpenMP team.
203 pthread_attr_t s;
204 check(pthread_attr_init( &s ), "pthread_attr_init has failed");
205 if( stack_size>0 )
206 check(pthread_attr_setstacksize( &s, stack_size ), "pthread_attr_setstack_size has failed" );
207 pthread_t handle;
208 check( pthread_create( &handle, &s, thread_routine, arg ), "pthread_create has failed" );
209 check( pthread_attr_destroy( &s ), "pthread_attr_destroy has failed" );
210 return handle;
211 }
212
join(handle_type handle)213 void thread_monitor::join(handle_type handle) {
214 check(pthread_join(handle, NULL), "pthread_join has failed");
215 }
216
detach_thread(handle_type handle)217 void thread_monitor::detach_thread(handle_type handle) {
218 check(pthread_detach(handle), "pthread_detach has failed");
219 }
220 #endif /* __TBB_USE_POSIX */
221
notify()222 inline void thread_monitor::notify() {
223 my_cookie.my_epoch.store(my_cookie.my_epoch.load(std::memory_order_acquire) + 1, std::memory_order_release);
224 bool do_signal = in_wait.exchange( false );
225 if( do_signal )
226 my_sema.V();
227 }
228
prepare_wait(cookie & c)229 inline void thread_monitor::prepare_wait( cookie& c ) {
230 if( skipped_wakeup ) {
231 // Lazily consume a signal that was skipped due to cancel_wait
232 skipped_wakeup = false;
233 my_sema.P(); // does not really wait on the semaphore
234 }
235 // Former c = my_cookie
236 c.my_epoch.store(my_cookie.my_epoch.load(std::memory_order_acquire), std::memory_order_release);
237 in_wait.store( true, std::memory_order_seq_cst );
238 }
239
commit_wait(cookie & c)240 inline void thread_monitor::commit_wait( cookie& c ) {
241 bool do_it = ( c.my_epoch.load(std::memory_order_relaxed) == my_cookie.my_epoch.load(std::memory_order_relaxed) );
242 if( do_it ) my_sema.P();
243 else cancel_wait();
244 }
245
cancel_wait()246 inline void thread_monitor::cancel_wait() {
247 // if not in_wait, then some thread has sent us a signal;
248 // it will be consumed by the next prepare_wait call
249 skipped_wakeup = ! in_wait.exchange( false );
250 }
251
252 } // namespace internal
253 } // namespace rml
254 } // namespace r1
255 } // namespace detail
256 } // namespace tbb
257
258 #endif /* __RML_thread_monitor_H */
259