1 #ifndef __PLINK2_THREAD_H__
2 #define __PLINK2_THREAD_H__
3
4 // This library is part of PLINK 2.00, copyright (C) 2005-2020 Shaun Purcell,
5 // Christopher Chang.
6 //
7 // This program is free software: you can redistribute it and/or modify it
8 // under the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // This library is distributed in the hope that it will be useful, but WITHOUT
13 // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15 // for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
21 // Basic multithreading code. Uses native Win32 API instead of pthreads
22 // emulation on Windows.
23 #include "plink2_base.h"
24
25 #ifdef _WIN32
26 # include <process.h>
27 #else
28 # include <pthread.h>
29 #endif
30
31 // Most thread functions should be of the form
32 // THREAD_FUNC_DECL function_name(void* raw_arg) {
33 // ThreadGroupFuncArg* arg = S_CAST(ThreadGroupFuncArg*, raw_arg);
34 // uint32_t tidx = arg->tidx;
35 // ...
36 // do {
37 // ... // process current block
38 // } while (!THREAD_BLOCK_FINISH(arg));
39 // THREAD_RETURN;
40 // }
41 // or
42 // THREAD_FUNC_DECL function_name(void* raw_arg) {
43 // ThreadGroupFuncArg* arg = S_CAST(ThreadGroupFuncArg*, raw_arg);
44 // uint32_t tidx = arg->tidx;
45 // ...
46 // // parallelized initialization code, must wait for other threads to also
47 // // finish this before proceeding to main loop
48 // while (!THREAD_BLOCK_FINISH(arg)) {
49 // ... // process current block
50 // }
51 // THREAD_RETURN;
52 // }
53 #ifdef _WIN32
54 # define pthread_t HANDLE
55 # define THREAD_FUNC_DECL unsigned __stdcall
56 # define THREAD_FUNCPTR_T(func_ptr) unsigned (__stdcall *func_ptr)(void*)
57 // #define THREAD_FUNCPP_T(func_pp) unsigned (__stdcall **func_pp)(void*)
58 # define THREAD_RETURN return 0
59 #else
60 # define THREAD_FUNC_DECL void*
61 # define THREAD_FUNCPTR_T(func_ptr) void* (*func_ptr)(void*)
62 // #define THREAD_FUNCPP_T(func_pp) void* (**func_pp)(void*)
63 # define THREAD_RETURN return nullptr
64 #endif
65
66 #if (__GNUC__ == 4) && (__GNUC_MINOR__ < 7) && !defined(__clang__)
67 // todo: check if this is also needed for any clang versions we care about.
68 // (support was added in clang 3.1, I think?)
69 # define __ATOMIC_RELAXED 0
70 # define __ATOMIC_CONSUME 1
71 # define __ATOMIC_ACQUIRE 2
72 # define __ATOMIC_RELEASE 3
73 # define __ATOMIC_ACQ_REL 4
74 # define __ATOMIC_SEQ_CST 5
75 # define __atomic_fetch_add(ptr, val, memorder) __sync_fetch_and_add((ptr), (val))
76 # define __atomic_fetch_sub(ptr, val, memorder) __sync_fetch_and_sub((ptr), (val))
77 # define __atomic_sub_fetch(ptr, val, memorder) __sync_sub_and_fetch((ptr), (val))
78
ATOMIC_COMPARE_EXCHANGE_N_U32(uint32_t * ptr,uint32_t * expected,uint32_t desired,__maybe_unused int weak,__maybe_unused int success_memorder,__maybe_unused int failure_memorder)79 HEADER_INLINE uint32_t ATOMIC_COMPARE_EXCHANGE_N_U32(uint32_t* ptr, uint32_t* expected, uint32_t desired, __maybe_unused int weak, __maybe_unused int success_memorder, __maybe_unused int failure_memorder) {
80 const uint32_t new_expected = __sync_val_compare_and_swap(ptr, *expected, desired);
81 if (new_expected == (*expected)) {
82 return 1;
83 }
84 *expected = new_expected;
85 return 0;
86 }
87
ATOMIC_COMPARE_EXCHANGE_N_U64(uint64_t * ptr,uint64_t * expected,uint64_t desired,__maybe_unused int weak,__maybe_unused int success_memorder,__maybe_unused int failure_memorder)88 HEADER_INLINE uint32_t ATOMIC_COMPARE_EXCHANGE_N_U64(uint64_t* ptr, uint64_t* expected, uint64_t desired, __maybe_unused int weak, __maybe_unused int success_memorder, __maybe_unused int failure_memorder) {
89 const uint64_t new_expected = __sync_val_compare_and_swap(ptr, *expected, desired);
90 if (new_expected == (*expected)) {
91 return 1;
92 }
93 *expected = new_expected;
94 return 0;
95 }
96 #else
97 # define ATOMIC_COMPARE_EXCHANGE_N_U32 __atomic_compare_exchange_n
98 # define ATOMIC_COMPARE_EXCHANGE_N_U64 __atomic_compare_exchange_n
99 #endif
100
101 #ifdef __cplusplus
102 namespace plink2 {
103 #endif
104
105 #ifdef _WIN32
106 void WaitForAllObjects(uint32_t ct, HANDLE* objs);
107 #endif
108
109 #ifdef _WIN32
110 // This should be increased once the old-style threads code has been purged.
111 CONSTI32(kMaxThreads, 64);
112 #else
113 // currently assumed to be less than 2^16 (otherwise some multiply overflows
114 // are theoretically possible, at least in the 32-bit build)
115 CONSTI32(kMaxThreads, 512);
116 #endif
117
118 #ifdef __APPLE__
119 // cblas_dgemm may fail with 128k
120 CONSTI32(kDefaultThreadStack, 524288);
121 #else
122 // asserts didn't seem to work properly with a setting much smaller than this
123 CONSTI32(kDefaultThreadStack, 131072);
124 #endif
125
126 typedef struct ThreadGroupControlBlockStruct {
127 // Neither thread-functions nor the thread-group owner should touch these
128 // variables directly.
129 uintptr_t spawn_ct;
130 #ifdef _WIN32
131 HANDLE start_next_events[2]; // uses parity of spawn_ct
132 HANDLE cur_block_done_event;
133 #else
134 pthread_mutex_t sync_mutex;
135 pthread_cond_t cur_block_done_condvar;
136 pthread_cond_t start_next_condvar;
137 #endif
138 uint32_t active_ct;
139
140 // Thread-functions can safely read from these.
141 uint32_t thread_ct;
142
143 // 1 = process last block and exit; 2 = immediate termination requested
144 uint32_t is_last_block;
145 } ThreadGroupControlBlock;
146
147 typedef struct ThreadGroupSharedStruct {
148 void* context;
149 #ifdef __cplusplus
GET_PRIVATE_cbThreadGroupSharedStruct150 ThreadGroupControlBlock& GET_PRIVATE_cb() { return cb; }
GET_PRIVATE_cbThreadGroupSharedStruct151 ThreadGroupControlBlock const& GET_PRIVATE_cb() const { return cb; }
152 private:
153 #endif
154 ThreadGroupControlBlock cb;
155 } ThreadGroupShared;
156
157 typedef struct ThreadGroupFuncArgStruct {
158 ThreadGroupShared* sharedp;
159 uint32_t tidx;
160 } ThreadGroupFuncArg;
161
162 typedef struct ThreadGroupMainStruct {
163 ThreadGroupShared shared;
164 THREAD_FUNCPTR_T(thread_func_ptr);
165 pthread_t* threads;
166 ThreadGroupFuncArg* thread_args;
167 // Generally favor uint16_t/uint32_t over unsigned char/uint8_t for isolated
168 // bools, since in the latter case the compiler is fairly likely to generate
169 // worse code due to aliasing paranoia; see e.g.
170 // https://travisdowns.github.io/blog/2019/08/26/vector-inc.html
171 uint16_t is_unjoined;
172 uint16_t is_active;
173
174 #ifndef _WIN32
175 uint32_t sync_init_state;
176 #endif
177 } ThreadGroupMain;
178
179 typedef struct ThreadGroupStruct {
180 #ifdef __cplusplus
GET_PRIVATE_mThreadGroupStruct181 ThreadGroupMain& GET_PRIVATE_m() { return m; }
GET_PRIVATE_mThreadGroupStruct182 ThreadGroupMain const& GET_PRIVATE_m() const { return m; }
183 private:
184 #endif
185 ThreadGroupMain m;
186 } ThreadGroup;
187
188 void PreinitThreads(ThreadGroup* tg_ptr);
189
190 // Return value is clipped to 1..kMaxThreads.
191 // If known_procs_ptr is non-null, it's set to the raw unclipped value (which
192 // can theoretically be -1 if the sysconf call fails)
193 uint32_t NumCpu(int32_t* known_procs_ptr);
194
195 // Also allocates, returning 1 on failure.
196 BoolErr SetThreadCt(uint32_t thread_ct, ThreadGroup* tg_ptr);
197
GetThreadCt(const ThreadGroupShared * sharedp)198 HEADER_INLINE uint32_t GetThreadCt(const ThreadGroupShared* sharedp) {
199 return GET_PRIVATE(*sharedp, cb).thread_ct;
200 }
201
GetThreadCtTg(const ThreadGroup * tg_ptr)202 HEADER_INLINE uint32_t GetThreadCtTg(const ThreadGroup* tg_ptr) {
203 const ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
204 return GET_PRIVATE(tgp->shared, cb).thread_ct;
205 }
206
SetThreadFuncAndData(THREAD_FUNCPTR_T (start_routine),void * shared_context,ThreadGroup * tg_ptr)207 HEADER_INLINE void SetThreadFuncAndData(THREAD_FUNCPTR_T(start_routine), void* shared_context, ThreadGroup* tg_ptr) {
208 ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
209 assert(!tgp->is_active);
210 tgp->shared.context = shared_context;
211 GET_PRIVATE(tgp->shared, cb).is_last_block = 0;
212 tgp->thread_func_ptr = start_routine;
213 }
214
215 // Equivalent to SetThreadFuncAndData() with unchanged
216 // start_routine/shared_context. Ok to call this "unnecessarily".
ReinitThreads(ThreadGroup * tg_ptr)217 HEADER_INLINE void ReinitThreads(ThreadGroup* tg_ptr) {
218 ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
219 assert(!tgp->is_active);
220 GET_PRIVATE(tgp->shared, cb).is_last_block = 0;
221 }
222
ThreadsAreActive(ThreadGroup * tg_ptr)223 HEADER_INLINE uint32_t ThreadsAreActive(ThreadGroup* tg_ptr) {
224 ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
225 return tgp->is_active;
226 }
227
228 // Technically unnecessary to call this, but it does save one sync cycle.
229 //
230 // Note that, if there's only one block of work-shards, this should be called
231 // before the first SpawnThreads() call.
DeclareLastThreadBlock(ThreadGroup * tg_ptr)232 HEADER_INLINE void DeclareLastThreadBlock(ThreadGroup* tg_ptr) {
233 ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
234 assert(!tgp->is_unjoined);
235 GET_PRIVATE(tgp->shared, cb).is_last_block = 1;
236 }
237
IsLastBlock(const ThreadGroup * tg_ptr)238 HEADER_INLINE uint32_t IsLastBlock(const ThreadGroup* tg_ptr) {
239 const ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
240 return GET_PRIVATE(tgp->shared, cb).is_last_block;
241 }
242
243 #if defined(__cplusplus) && !defined(_WIN32)
244 class Plink2ThreadStartup {
245 public:
246 pthread_attr_t smallstack_thread_attr;
Plink2ThreadStartup()247 Plink2ThreadStartup() {
248 # ifdef NDEBUG
249 // we'll error out for another reason soon enough if there's insufficient
250 // memory...
251 pthread_attr_init(&smallstack_thread_attr);
252 # else
253 assert(!pthread_attr_init(&smallstack_thread_attr));
254 # endif
255 // if this fails due to kDefaultThreadStack being smaller than the system
256 // page size, no need to error out
257 pthread_attr_setstacksize(&smallstack_thread_attr, kDefaultThreadStack);
258 }
259
~Plink2ThreadStartup()260 ~Plink2ThreadStartup() {
261 pthread_attr_destroy(&smallstack_thread_attr);
262 }
263 };
264
265 extern Plink2ThreadStartup g_thread_startup;
266 #endif
267
268 BoolErr SpawnThreads(ThreadGroup* tg_ptr);
269
270 void JoinThreads(ThreadGroup* tg_ptr);
271
272 // Assumes threads are joined.
273 void StopThreads(ThreadGroup* tg_ptr);
274
275 void CleanupThreads(ThreadGroup* tg_ptr);
276
277 #ifdef _WIN32
THREAD_BLOCK_FINISH(ThreadGroupFuncArg * tgfap)278 HEADER_INLINE BoolErr THREAD_BLOCK_FINISH(ThreadGroupFuncArg* tgfap) {
279 ThreadGroupControlBlock* cbp = &(GET_PRIVATE(*tgfap->sharedp, cb));
280 if (cbp->is_last_block) {
281 return 1;
282 }
283 const uint32_t start_next_parity = cbp->spawn_ct & 1;
284 if (!__atomic_sub_fetch(&cbp->active_ct, 1, __ATOMIC_ACQ_REL)) {
285 SetEvent(cbp->cur_block_done_event);
286 }
287 WaitForSingleObject(cbp->start_next_events[start_next_parity], INFINITE);
288 return (cbp->is_last_block == 2);
289 }
290 #else
291 BoolErr THREAD_BLOCK_FINISH(ThreadGroupFuncArg* tgfap);
292 #endif
293
294 // Convenience functions for potentially-small-and-frequent jobs where
295 // thread_ct == 0 corresponds to not wanting to launch threads at all; see
296 // MakeDupflagHtable in plink2_cmdline for a typical use case.
SetThreadCt0(uint32_t thread_ct,ThreadGroup * tg_ptr)297 HEADER_INLINE BoolErr SetThreadCt0(uint32_t thread_ct, ThreadGroup* tg_ptr) {
298 if (!thread_ct) {
299 return 0;
300 }
301 return SetThreadCt(thread_ct, tg_ptr);
302 }
303
JoinThreads0(ThreadGroup * tg_ptr)304 HEADER_INLINE void JoinThreads0(ThreadGroup* tg_ptr) {
305 if (GET_PRIVATE(*tg_ptr, m).threads) {
306 JoinThreads(tg_ptr);
307 }
308 }
309
310 // This comes in handy a lot in multithreaded error-reporting code when
311 // deterministic behavior is desired.
312 void UpdateU64IfSmaller(uint64_t newval, uint64_t* oldval_ptr);
313
314 #ifdef __cplusplus
315 } // namespace plink2
316 #endif
317
318 #endif // __PLINK2_THREAD_H__
319