1 #ifndef __PLINK2_THREAD_H__
2 #define __PLINK2_THREAD_H__
3 
4 // This library is part of PLINK 2.00, copyright (C) 2005-2020 Shaun Purcell,
5 // Christopher Chang.
6 //
7 // This program is free software: you can redistribute it and/or modify it
8 // under the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // This library is distributed in the hope that it will be useful, but WITHOUT
13 // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
15 // for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 
20 
21 // Basic multithreading code.  Uses native Win32 API instead of pthreads
22 // emulation on Windows.
23 #include "plink2_base.h"
24 
25 #ifdef _WIN32
26 #  include <process.h>
27 #else
28 #  include <pthread.h>
29 #endif
30 
31 // Most thread functions should be of the form
32 //   THREAD_FUNC_DECL function_name(void* raw_arg) {
33 //     ThreadGroupFuncArg* arg = S_CAST(ThreadGroupFuncArg*, raw_arg);
34 //     uint32_t tidx = arg->tidx;
35 //     ...
36 //     do {
37 //       ... // process current block
38 //     } while (!THREAD_BLOCK_FINISH(arg));
39 //     THREAD_RETURN;
40 //   }
41 // or
42 //   THREAD_FUNC_DECL function_name(void* raw_arg) {
43 //     ThreadGroupFuncArg* arg = S_CAST(ThreadGroupFuncArg*, raw_arg);
44 //     uint32_t tidx = arg->tidx;
45 //     ...
46 //     // parallelized initialization code, must wait for other threads to also
47 //     // finish this before proceeding to main loop
48 //     while (!THREAD_BLOCK_FINISH(arg)) {
49 //       ... // process current block
50 //     }
51 //     THREAD_RETURN;
52 //   }
53 #ifdef _WIN32
54 #  define pthread_t HANDLE
55 #  define THREAD_FUNC_DECL unsigned __stdcall
56 #  define THREAD_FUNCPTR_T(func_ptr) unsigned (__stdcall *func_ptr)(void*)
57   // #define THREAD_FUNCPP_T(func_pp) unsigned (__stdcall **func_pp)(void*)
58 #  define THREAD_RETURN return 0
59 #else
60 #  define THREAD_FUNC_DECL void*
61 #  define THREAD_FUNCPTR_T(func_ptr) void* (*func_ptr)(void*)
62   // #define THREAD_FUNCPP_T(func_pp) void* (**func_pp)(void*)
63 #  define THREAD_RETURN return nullptr
64 #endif
65 
66 #if (__GNUC__ == 4) && (__GNUC_MINOR__ < 7) && !defined(__clang__)
67 // todo: check if this is also needed for any clang versions we care about.
68 // (support was added in clang 3.1, I think?)
69 #  define __ATOMIC_RELAXED 0
70 #  define __ATOMIC_CONSUME 1
71 #  define __ATOMIC_ACQUIRE 2
72 #  define __ATOMIC_RELEASE 3
73 #  define __ATOMIC_ACQ_REL 4
74 #  define __ATOMIC_SEQ_CST 5
75 #  define __atomic_fetch_add(ptr, val, memorder) __sync_fetch_and_add((ptr), (val))
76 #  define __atomic_fetch_sub(ptr, val, memorder) __sync_fetch_and_sub((ptr), (val))
77 #  define __atomic_sub_fetch(ptr, val, memorder) __sync_sub_and_fetch((ptr), (val))
78 
ATOMIC_COMPARE_EXCHANGE_N_U32(uint32_t * ptr,uint32_t * expected,uint32_t desired,__maybe_unused int weak,__maybe_unused int success_memorder,__maybe_unused int failure_memorder)79 HEADER_INLINE uint32_t ATOMIC_COMPARE_EXCHANGE_N_U32(uint32_t* ptr, uint32_t* expected, uint32_t desired, __maybe_unused int weak, __maybe_unused int success_memorder, __maybe_unused int failure_memorder) {
80   const uint32_t new_expected = __sync_val_compare_and_swap(ptr, *expected, desired);
81   if (new_expected == (*expected)) {
82     return 1;
83   }
84   *expected = new_expected;
85   return 0;
86 }
87 
ATOMIC_COMPARE_EXCHANGE_N_U64(uint64_t * ptr,uint64_t * expected,uint64_t desired,__maybe_unused int weak,__maybe_unused int success_memorder,__maybe_unused int failure_memorder)88 HEADER_INLINE uint32_t ATOMIC_COMPARE_EXCHANGE_N_U64(uint64_t* ptr, uint64_t* expected, uint64_t desired, __maybe_unused int weak, __maybe_unused int success_memorder, __maybe_unused int failure_memorder) {
89   const uint64_t new_expected = __sync_val_compare_and_swap(ptr, *expected, desired);
90   if (new_expected == (*expected)) {
91     return 1;
92   }
93   *expected = new_expected;
94   return 0;
95 }
96 #else
97 #  define ATOMIC_COMPARE_EXCHANGE_N_U32 __atomic_compare_exchange_n
98 #  define ATOMIC_COMPARE_EXCHANGE_N_U64 __atomic_compare_exchange_n
99 #endif
100 
101 #ifdef __cplusplus
102 namespace plink2 {
103 #endif
104 
105 #ifdef _WIN32
106 void WaitForAllObjects(uint32_t ct, HANDLE* objs);
107 #endif
108 
109 #ifdef _WIN32
110 // This should be increased once the old-style threads code has been purged.
111 CONSTI32(kMaxThreads, 64);
112 #else
113 // currently assumed to be less than 2^16 (otherwise some multiply overflows
114 // are theoretically possible, at least in the 32-bit build)
115 CONSTI32(kMaxThreads, 512);
116 #endif
117 
118 #ifdef __APPLE__
119 // cblas_dgemm may fail with 128k
120 CONSTI32(kDefaultThreadStack, 524288);
121 #else
122 // asserts didn't seem to work properly with a setting much smaller than this
123 CONSTI32(kDefaultThreadStack, 131072);
124 #endif
125 
126 typedef struct ThreadGroupControlBlockStruct {
127   // Neither thread-functions nor the thread-group owner should touch these
128   // variables directly.
129   uintptr_t spawn_ct;
130 #ifdef _WIN32
131   HANDLE start_next_events[2];  // uses parity of spawn_ct
132   HANDLE cur_block_done_event;
133 #else
134   pthread_mutex_t sync_mutex;
135   pthread_cond_t cur_block_done_condvar;
136   pthread_cond_t start_next_condvar;
137 #endif
138   uint32_t active_ct;
139 
140   // Thread-functions can safely read from these.
141   uint32_t thread_ct;
142 
143   // 1 = process last block and exit; 2 = immediate termination requested
144   uint32_t is_last_block;
145 } ThreadGroupControlBlock;
146 
147 typedef struct ThreadGroupSharedStruct {
148   void* context;
149 #ifdef __cplusplus
GET_PRIVATE_cbThreadGroupSharedStruct150   ThreadGroupControlBlock& GET_PRIVATE_cb() { return cb; }
GET_PRIVATE_cbThreadGroupSharedStruct151   ThreadGroupControlBlock const& GET_PRIVATE_cb() const { return cb; }
152  private:
153 #endif
154   ThreadGroupControlBlock cb;
155 } ThreadGroupShared;
156 
157 typedef struct ThreadGroupFuncArgStruct {
158   ThreadGroupShared* sharedp;
159   uint32_t tidx;
160 } ThreadGroupFuncArg;
161 
162 typedef struct ThreadGroupMainStruct {
163   ThreadGroupShared shared;
164   THREAD_FUNCPTR_T(thread_func_ptr);
165   pthread_t* threads;
166   ThreadGroupFuncArg* thread_args;
167   // Generally favor uint16_t/uint32_t over unsigned char/uint8_t for isolated
168   // bools, since in the latter case the compiler is fairly likely to generate
169   // worse code due to aliasing paranoia; see e.g.
170   //   https://travisdowns.github.io/blog/2019/08/26/vector-inc.html
171   uint16_t is_unjoined;
172   uint16_t is_active;
173 
174 #ifndef _WIN32
175   uint32_t sync_init_state;
176 #endif
177 } ThreadGroupMain;
178 
179 typedef struct ThreadGroupStruct {
180 #ifdef __cplusplus
GET_PRIVATE_mThreadGroupStruct181   ThreadGroupMain& GET_PRIVATE_m() { return m; }
GET_PRIVATE_mThreadGroupStruct182   ThreadGroupMain const& GET_PRIVATE_m() const { return m; }
183  private:
184 #endif
185   ThreadGroupMain m;
186 } ThreadGroup;
187 
188 void PreinitThreads(ThreadGroup* tg_ptr);
189 
190 // Return value is clipped to 1..kMaxThreads.
191 // If known_procs_ptr is non-null, it's set to the raw unclipped value (which
192 // can theoretically be -1 if the sysconf call fails)
193 uint32_t NumCpu(int32_t* known_procs_ptr);
194 
195 // Also allocates, returning 1 on failure.
196 BoolErr SetThreadCt(uint32_t thread_ct, ThreadGroup* tg_ptr);
197 
GetThreadCt(const ThreadGroupShared * sharedp)198 HEADER_INLINE uint32_t GetThreadCt(const ThreadGroupShared* sharedp) {
199   return GET_PRIVATE(*sharedp, cb).thread_ct;
200 }
201 
GetThreadCtTg(const ThreadGroup * tg_ptr)202 HEADER_INLINE uint32_t GetThreadCtTg(const ThreadGroup* tg_ptr) {
203   const ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
204   return GET_PRIVATE(tgp->shared, cb).thread_ct;
205 }
206 
SetThreadFuncAndData(THREAD_FUNCPTR_T (start_routine),void * shared_context,ThreadGroup * tg_ptr)207 HEADER_INLINE void SetThreadFuncAndData(THREAD_FUNCPTR_T(start_routine), void* shared_context, ThreadGroup* tg_ptr) {
208   ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
209   assert(!tgp->is_active);
210   tgp->shared.context = shared_context;
211   GET_PRIVATE(tgp->shared, cb).is_last_block = 0;
212   tgp->thread_func_ptr = start_routine;
213 }
214 
215 // Equivalent to SetThreadFuncAndData() with unchanged
216 // start_routine/shared_context.  Ok to call this "unnecessarily".
ReinitThreads(ThreadGroup * tg_ptr)217 HEADER_INLINE void ReinitThreads(ThreadGroup* tg_ptr) {
218   ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
219   assert(!tgp->is_active);
220   GET_PRIVATE(tgp->shared, cb).is_last_block = 0;
221 }
222 
ThreadsAreActive(ThreadGroup * tg_ptr)223 HEADER_INLINE uint32_t ThreadsAreActive(ThreadGroup* tg_ptr) {
224   ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
225   return tgp->is_active;
226 }
227 
228 // Technically unnecessary to call this, but it does save one sync cycle.
229 //
230 // Note that, if there's only one block of work-shards, this should be called
231 // before the first SpawnThreads() call.
DeclareLastThreadBlock(ThreadGroup * tg_ptr)232 HEADER_INLINE void DeclareLastThreadBlock(ThreadGroup* tg_ptr) {
233   ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
234   assert(!tgp->is_unjoined);
235   GET_PRIVATE(tgp->shared, cb).is_last_block = 1;
236 }
237 
IsLastBlock(const ThreadGroup * tg_ptr)238 HEADER_INLINE uint32_t IsLastBlock(const ThreadGroup* tg_ptr) {
239   const ThreadGroupMain* tgp = &GET_PRIVATE(*tg_ptr, m);
240   return GET_PRIVATE(tgp->shared, cb).is_last_block;
241 }
242 
243 #if defined(__cplusplus) && !defined(_WIN32)
244 class Plink2ThreadStartup {
245 public:
246   pthread_attr_t smallstack_thread_attr;
Plink2ThreadStartup()247   Plink2ThreadStartup() {
248 #  ifdef NDEBUG
249     // we'll error out for another reason soon enough if there's insufficient
250     // memory...
251     pthread_attr_init(&smallstack_thread_attr);
252 #  else
253     assert(!pthread_attr_init(&smallstack_thread_attr));
254 #  endif
255     // if this fails due to kDefaultThreadStack being smaller than the system
256     // page size, no need to error out
257     pthread_attr_setstacksize(&smallstack_thread_attr, kDefaultThreadStack);
258   }
259 
~Plink2ThreadStartup()260   ~Plink2ThreadStartup() {
261     pthread_attr_destroy(&smallstack_thread_attr);
262   }
263 };
264 
265 extern Plink2ThreadStartup g_thread_startup;
266 #endif
267 
268 BoolErr SpawnThreads(ThreadGroup* tg_ptr);
269 
270 void JoinThreads(ThreadGroup* tg_ptr);
271 
272 // Assumes threads are joined.
273 void StopThreads(ThreadGroup* tg_ptr);
274 
275 void CleanupThreads(ThreadGroup* tg_ptr);
276 
277 #ifdef _WIN32
THREAD_BLOCK_FINISH(ThreadGroupFuncArg * tgfap)278 HEADER_INLINE BoolErr THREAD_BLOCK_FINISH(ThreadGroupFuncArg* tgfap) {
279   ThreadGroupControlBlock* cbp = &(GET_PRIVATE(*tgfap->sharedp, cb));
280   if (cbp->is_last_block) {
281     return 1;
282   }
283   const uint32_t start_next_parity = cbp->spawn_ct & 1;
284   if (!__atomic_sub_fetch(&cbp->active_ct, 1, __ATOMIC_ACQ_REL)) {
285     SetEvent(cbp->cur_block_done_event);
286   }
287   WaitForSingleObject(cbp->start_next_events[start_next_parity], INFINITE);
288   return (cbp->is_last_block == 2);
289 }
290 #else
291 BoolErr THREAD_BLOCK_FINISH(ThreadGroupFuncArg* tgfap);
292 #endif
293 
294 // Convenience functions for potentially-small-and-frequent jobs where
295 // thread_ct == 0 corresponds to not wanting to launch threads at all; see
296 // MakeDupflagHtable in plink2_cmdline for a typical use case.
SetThreadCt0(uint32_t thread_ct,ThreadGroup * tg_ptr)297 HEADER_INLINE BoolErr SetThreadCt0(uint32_t thread_ct, ThreadGroup* tg_ptr) {
298   if (!thread_ct) {
299     return 0;
300   }
301   return SetThreadCt(thread_ct, tg_ptr);
302 }
303 
JoinThreads0(ThreadGroup * tg_ptr)304 HEADER_INLINE void JoinThreads0(ThreadGroup* tg_ptr) {
305   if (GET_PRIVATE(*tg_ptr, m).threads) {
306     JoinThreads(tg_ptr);
307   }
308 }
309 
310 // This comes in handy a lot in multithreaded error-reporting code when
311 // deterministic behavior is desired.
312 void UpdateU64IfSmaller(uint64_t newval, uint64_t* oldval_ptr);
313 
314 #ifdef __cplusplus
315 }  // namespace plink2
316 #endif
317 
318 #endif  // __PLINK2_THREAD_H__
319