1 // This library is part of PLINK 2.00, copyright (C) 2005-2020 Shaun Purcell,
2 // Christopher Chang.
3 //
4 // This library is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by the
6 // Free Software Foundation; either version 3 of the License, or (at your
7 // option) any later version.
8 //
9 // This library is distributed in the hope that it will be useful, but WITHOUT
10 // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
12 // for more details.
13 //
14 // You should have received a copy of the GNU Lesser General Public License
15 // along with this library. If not, see <http://www.gnu.org/licenses/>.
16
17
18 #include "plink2_thread.h"
19
20 #ifndef _WIN32
21 # include <unistd.h> // sysconf()
22 #endif
23
24 #ifdef __cplusplus
25 namespace plink2 {
26 #endif
27
GetTgp(ThreadGroup * tg_ptr)28 static inline ThreadGroupMain* GetTgp(ThreadGroup* tg_ptr) {
29 return &GET_PRIVATE(*tg_ptr, m);
30 }
31
GetCbp(ThreadGroupShared * sharedp)32 static inline ThreadGroupControlBlock* GetCbp(ThreadGroupShared* sharedp) {
33 return &GET_PRIVATE(*sharedp, cb);
34 }
35
36 #ifdef _WIN32
WaitForAllObjects(uint32_t ct,HANDLE * objs)37 void WaitForAllObjects(uint32_t ct, HANDLE* objs) {
38 while (ct > 64) {
39 WaitForMultipleObjects(64, objs, 1, INFINITE);
40 objs = &(objs[64]);
41 ct -= 64;
42 }
43 WaitForMultipleObjects(ct, objs, 1, INFINITE);
44 }
45 #endif
46
PreinitThreads(ThreadGroup * tg_ptr)47 void PreinitThreads(ThreadGroup* tg_ptr) {
48 ThreadGroupMain* tgp = GetTgp(tg_ptr);
49 GetCbp(&tgp->shared)->is_last_block = 0;
50 tgp->thread_func_ptr = nullptr;
51 tgp->threads = nullptr;
52 tgp->is_unjoined = 0;
53 tgp->is_active = 0;
54 }
55
NumCpu(int32_t * known_procs_ptr)56 uint32_t NumCpu(int32_t* known_procs_ptr) {
57 #ifdef _WIN32
58 SYSTEM_INFO sysinfo;
59 GetSystemInfo(&sysinfo);
60 const int32_t known_procs = sysinfo.dwNumberOfProcessors;
61 uint32_t max_thread_ct = known_procs;
62 #else
63 const int32_t known_procs = sysconf(_SC_NPROCESSORS_ONLN);
64 uint32_t max_thread_ct = (known_procs == -1)? 1 : known_procs;
65 #endif
66 if (known_procs_ptr) {
67 *known_procs_ptr = known_procs;
68 }
69 if (max_thread_ct > kMaxThreads) {
70 max_thread_ct = kMaxThreads;
71 }
72 return max_thread_ct;
73 }
74
SetThreadCt(uint32_t thread_ct,ThreadGroup * tg_ptr)75 BoolErr SetThreadCt(uint32_t thread_ct, ThreadGroup* tg_ptr) {
76 ThreadGroupMain* tgp = GetTgp(tg_ptr);
77 assert(!tgp->is_active);
78 if (tgp->threads) {
79 free(tgp->threads);
80 tgp->threads = nullptr;
81 }
82 assert(thread_ct && (thread_ct <= kMaxThreads));
83 #ifdef _WIN32
84 unsigned char* memptr = S_CAST(unsigned char*, malloc(thread_ct * (sizeof(ThreadGroupFuncArg) + sizeof(HANDLE))));
85 if (unlikely(!memptr)) {
86 return 1;
87 }
88 tgp->threads = R_CAST(HANDLE*, memptr);
89 memset(tgp->threads, 0, thread_ct * sizeof(HANDLE));
90 memptr = &(memptr[thread_ct * sizeof(HANDLE)]);
91 #else
92 unsigned char* memptr = S_CAST(unsigned char*, malloc(thread_ct * (sizeof(pthread_t) + sizeof(ThreadGroupFuncArg))));
93 if (unlikely(!memptr)) {
94 return 1;
95 }
96 tgp->threads = R_CAST(pthread_t*, memptr);
97 // !is_active currently guarantees that the sync events/mutex/condvars are
98 // not initialized. Could change this later.
99 tgp->sync_init_state = 0;
100 memptr = &(memptr[thread_ct * sizeof(pthread_t)]);
101 #endif
102 ThreadGroupControlBlock* cbp = GetCbp(&tgp->shared);
103 cbp->active_ct = 0;
104 tgp->thread_args = R_CAST(ThreadGroupFuncArg*, memptr);
105
106 cbp->thread_ct = thread_ct;
107 return 0;
108 }
109
110 // Note that thread_ct is permitted to be less than tgp->shared.cb.thread_ct,
111 // to support the SpawnThreads() error cases.
JoinThreadsInternal(uint32_t thread_ct,ThreadGroupMain * tgp)112 void JoinThreadsInternal(uint32_t thread_ct, ThreadGroupMain* tgp) {
113 assert(tgp->is_active);
114 ThreadGroupControlBlock* cbp = GetCbp(&tgp->shared);
115 #ifdef _WIN32
116 if (!cbp->is_last_block) {
117 WaitForSingleObject(cbp->cur_block_done_event, INFINITE);
118 } else {
119 WaitForAllObjects(thread_ct, tgp->threads);
120 for (uint32_t tidx = 0; tidx != thread_ct; ++tidx) {
121 CloseHandle(tgp->threads[tidx]);
122 }
123 CloseHandle(cbp->start_next_events[0]);
124 CloseHandle(cbp->start_next_events[1]);
125 CloseHandle(cbp->cur_block_done_event);
126 memset(tgp->threads, 0, thread_ct * sizeof(HANDLE));
127 tgp->is_active = 0;
128 }
129 #else
130 if (!cbp->is_last_block) {
131 pthread_mutex_lock(&cbp->sync_mutex);
132 while (cbp->active_ct) {
133 pthread_cond_wait(&cbp->cur_block_done_condvar, &cbp->sync_mutex);
134 }
135 // keep mutex until next block loaded
136 } else {
137 for (uint32_t tidx = 0; tidx != thread_ct; ++tidx) {
138 pthread_join(tgp->threads[tidx], nullptr);
139 }
140 pthread_mutex_destroy(&cbp->sync_mutex);
141 pthread_cond_destroy(&cbp->cur_block_done_condvar);
142 pthread_cond_destroy(&cbp->start_next_condvar);
143 tgp->is_active = 0;
144 tgp->sync_init_state = 0;
145 }
146 #endif
147 tgp->is_unjoined = 0;
148 }
149
150 #if defined(__cplusplus) && !defined(_WIN32)
151 Plink2ThreadStartup g_thread_startup;
152 #endif
153
SpawnThreads(ThreadGroup * tg_ptr)154 BoolErr SpawnThreads(ThreadGroup* tg_ptr) {
155 ThreadGroupMain* tgp = GetTgp(tg_ptr);
156 ThreadGroupControlBlock* cbp = GetCbp(&tgp->shared);
157 const uint32_t thread_ct = cbp->thread_ct;
158 const uint32_t was_active = tgp->is_active;
159 const uint32_t is_last_block = cbp->is_last_block;
160 pthread_t* threads = tgp->threads;
161 assert(threads != nullptr);
162 #ifdef _WIN32
163 if (!was_active) {
164 cbp->spawn_ct = 0;
165 // manual-reset broadcast event
166 HANDLE cur_event = CreateEvent(nullptr, TRUE, FALSE, nullptr);
167 if (unlikely(!cur_event)) {
168 return 1;
169 }
170 cbp->start_next_events[0] = cur_event;
171 cur_event = CreateEvent(nullptr, TRUE, FALSE, nullptr);
172 if (unlikely(!cur_event)) {
173 CloseHandle(cbp->start_next_events[0]);
174 return 1;
175 }
176 cbp->start_next_events[1] = cur_event;
177 cur_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
178 if (unlikely(!cur_event)) {
179 CloseHandle(cbp->start_next_events[0]);
180 CloseHandle(cbp->start_next_events[1]);
181 return 1;
182 }
183 cbp->cur_block_done_event = cur_event;
184 cbp->active_ct = thread_ct;
185 for (uint32_t tidx = 0; tidx != thread_ct; ++tidx) {
186 ThreadGroupFuncArg* arg_slot = &(tgp->thread_args[tidx]);
187 arg_slot->sharedp = &(tgp->shared);
188 arg_slot->tidx = tidx;
189 threads[tidx] = R_CAST(HANDLE, _beginthreadex(nullptr, kDefaultThreadStack, tgp->thread_func_ptr, arg_slot, 0, nullptr));
190 if (unlikely(!threads[tidx])) {
191 if (tidx) {
192 if (!is_last_block) {
193 // not sure the old TerminateThread() code ever worked properly in
194 // the first place...
195 // anyway, new contract makes clean error-shutdown easy
196 if (!__atomic_sub_fetch(&cbp->active_ct, thread_ct - tidx, __ATOMIC_ACQ_REL)) {
197 SetEvent(cbp->cur_block_done_event);
198 }
199 JoinThreadsInternal(tidx, tgp);
200 cbp->is_last_block = 2;
201 const uint32_t start_next_parity = cbp->spawn_ct & 1;
202 // no need to reset start_prev_parity
203 cbp->spawn_ct += 1;
204 SetEvent(cbp->start_next_events[start_next_parity]);
205 }
206 JoinThreadsInternal(tidx, tgp);
207 }
208 return 1;
209 }
210 }
211 tgp->is_active = 1;
212 } else {
213 cbp->spawn_ct += 1;
214 cbp->active_ct = thread_ct;
215 const uint32_t start_prev_parity = cbp->spawn_ct & 1;
216 ResetEvent(cbp->start_next_events[start_prev_parity]);
217 SetEvent(cbp->start_next_events[start_prev_parity ^ 1]);
218 }
219 #else
220 if (!is_last_block) {
221 cbp->active_ct = thread_ct;
222 }
223 if (!was_active) {
224 cbp->spawn_ct = 0;
225 assert(!tgp->sync_init_state);
226 if (unlikely(pthread_mutex_init(&cbp->sync_mutex, nullptr))) {
227 return 1;
228 }
229 if (unlikely(pthread_cond_init(&cbp->cur_block_done_condvar, nullptr))) {
230 tgp->sync_init_state = 1;
231 return 1;
232 }
233 if (unlikely(pthread_cond_init(&cbp->start_next_condvar, nullptr))) {
234 tgp->sync_init_state = 2;
235 return 1;
236 }
237 tgp->sync_init_state = 3;
238 # ifndef __cplusplus
239 pthread_attr_t smallstack_thread_attr;
240 if (unlikely(pthread_attr_init(&smallstack_thread_attr))) {
241 return 1;
242 }
243 pthread_attr_setstacksize(&smallstack_thread_attr, kDefaultThreadStack);
244 # endif
245 for (uint32_t tidx = 0; tidx != thread_ct; ++tidx) {
246 ThreadGroupFuncArg* arg_slot = &(tgp->thread_args[tidx]);
247 arg_slot->sharedp = &(tgp->shared);
248 arg_slot->tidx = tidx;
249 if (unlikely(pthread_create(&(threads[tidx]),
250 # ifdef __cplusplus
251 &g_thread_startup.smallstack_thread_attr,
252 # else
253 &smallstack_thread_attr,
254 # endif
255 tgp->thread_func_ptr, arg_slot))) {
256 if (tidx) {
257 if (!is_last_block) {
258 JoinThreadsInternal(tidx, tgp);
259 const uint32_t unstarted_thread_ct = thread_ct - tidx;
260 cbp->active_ct -= unstarted_thread_ct;
261 while (cbp->active_ct) {
262 pthread_cond_wait(&cbp->cur_block_done_condvar, &cbp->sync_mutex);
263 }
264 cbp->is_last_block = 2;
265 cbp->spawn_ct += 1;
266 pthread_cond_broadcast(&cbp->start_next_condvar);
267 pthread_mutex_unlock(&cbp->sync_mutex);
268 }
269 JoinThreadsInternal(tidx, tgp);
270 } else {
271 cbp->active_ct = 0;
272 }
273 # ifndef __cplusplus
274 pthread_attr_destroy(&smallstack_thread_attr);
275 # endif
276 return 1;
277 }
278 }
279 # ifndef __cplusplus
280 pthread_attr_destroy(&smallstack_thread_attr);
281 # endif
282 tgp->is_active = 1;
283 } else {
284 cbp->spawn_ct += 1;
285 // still holding mutex
286 pthread_cond_broadcast(&cbp->start_next_condvar);
287 pthread_mutex_unlock(&cbp->sync_mutex);
288 }
289 #endif
290 tgp->is_unjoined = 1;
291 return 0;
292 }
293
JoinThreads(ThreadGroup * tg_ptr)294 void JoinThreads(ThreadGroup* tg_ptr) {
295 ThreadGroupMain* tgp = GetTgp(tg_ptr);
296 JoinThreadsInternal(GetCbp(&tgp->shared)->thread_ct, tgp);
297 }
298
StopThreads(ThreadGroup * tg_ptr)299 void StopThreads(ThreadGroup* tg_ptr) {
300 ThreadGroupMain* tgp = GetTgp(tg_ptr);
301 ThreadGroupControlBlock* cbp = GetCbp(&tgp->shared);
302 cbp->is_last_block = 2;
303 SpawnThreads(tg_ptr);
304 JoinThreadsInternal(cbp->thread_ct, tgp);
305 }
306
CleanupThreads(ThreadGroup * tg_ptr)307 void CleanupThreads(ThreadGroup* tg_ptr) {
308 ThreadGroupMain* tgp = GetTgp(tg_ptr);
309 ThreadGroupControlBlock* cbp = GetCbp(&tgp->shared);
310 if (tgp->threads) {
311 const uint32_t thread_ct = cbp->thread_ct;
312 if (tgp->is_active) {
313 if (tgp->is_unjoined) {
314 JoinThreadsInternal(thread_ct, tgp);
315 }
316 if (!cbp->is_last_block) {
317 StopThreads(tg_ptr);
318 }
319 #ifndef _WIN32
320 } else {
321 do {
322 const uint32_t sync_init_state = tgp->sync_init_state;
323 if (!sync_init_state) {
324 break;
325 }
326 pthread_mutex_destroy(&cbp->sync_mutex);
327 if (sync_init_state == 1) {
328 break;
329 }
330 pthread_cond_destroy(&cbp->cur_block_done_condvar);
331 if (sync_init_state == 2) {
332 break;
333 }
334 pthread_cond_destroy(&cbp->start_next_condvar);
335 } while (0);
336 tgp->sync_init_state = 0;
337 #endif
338 }
339 #ifndef _WIN32
340 assert(!cbp->active_ct);
341 #endif
342 cbp->thread_ct = 0;
343 free(tgp->threads);
344 tgp->threads = nullptr;
345 }
346 cbp->is_last_block = 0;
347 tgp->thread_func_ptr = nullptr;
348 }
349
350 #ifndef _WIN32
THREAD_BLOCK_FINISH(ThreadGroupFuncArg * tgfap)351 BoolErr THREAD_BLOCK_FINISH(ThreadGroupFuncArg* tgfap) {
352 ThreadGroupControlBlock* cbp = GetCbp(tgfap->sharedp);
353 if (cbp->is_last_block) {
354 return 1;
355 }
356 const uintptr_t initial_spawn_ct = cbp->spawn_ct;
357 pthread_mutex_lock(&cbp->sync_mutex);
358 if (!(--cbp->active_ct)) {
359 pthread_cond_signal(&cbp->cur_block_done_condvar);
360 }
361 while (cbp->spawn_ct == initial_spawn_ct) {
362 // spurious wakeup guard
363 pthread_cond_wait(&cbp->start_next_condvar, &cbp->sync_mutex);
364 }
365 pthread_mutex_unlock(&cbp->sync_mutex);
366 return (cbp->is_last_block == 2);
367 }
368 #endif
369
UpdateU64IfSmaller(uint64_t newval,uint64_t * oldval_ptr)370 void UpdateU64IfSmaller(uint64_t newval, uint64_t* oldval_ptr) {
371 uint64_t oldval = *oldval_ptr;
372 while (oldval > newval) {
373 if (ATOMIC_COMPARE_EXCHANGE_N_U64(oldval_ptr, &oldval, newval, 1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
374 break;
375 }
376 }
377 }
378
379 #ifdef __cplusplus
380 } // namespace plink2
381 #endif
382