1 // This library is part of PLINK 2.00, copyright (C) 2005-2020 Shaun Purcell,
2 // Christopher Chang.
3 //
4 // This library is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by the
6 // Free Software Foundation; either version 3 of the License, or (at your
7 // option) any later version.
8 //
9 // This library is distributed in the hope that it will be useful, but WITHOUT
10 // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 // FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
12 // for more details.
13 //
14 // You should have received a copy of the GNU Lesser General Public License
15 // along with this library.  If not, see <http://www.gnu.org/licenses/>.
16 
17 
18 #include "plink2_thread.h"
19 
20 #ifndef _WIN32
21 #  include <unistd.h>  // sysconf()
22 #endif
23 
24 #ifdef __cplusplus
25 namespace plink2 {
26 #endif
27 
GetTgp(ThreadGroup * tg_ptr)28 static inline ThreadGroupMain* GetTgp(ThreadGroup* tg_ptr) {
29   return &GET_PRIVATE(*tg_ptr, m);
30 }
31 
GetCbp(ThreadGroupShared * sharedp)32 static inline ThreadGroupControlBlock* GetCbp(ThreadGroupShared* sharedp) {
33   return &GET_PRIVATE(*sharedp, cb);
34 }
35 
36 #ifdef _WIN32
WaitForAllObjects(uint32_t ct,HANDLE * objs)37 void WaitForAllObjects(uint32_t ct, HANDLE* objs) {
38   while (ct > 64) {
39     WaitForMultipleObjects(64, objs, 1, INFINITE);
40     objs = &(objs[64]);
41     ct -= 64;
42   }
43   WaitForMultipleObjects(ct, objs, 1, INFINITE);
44 }
45 #endif
46 
PreinitThreads(ThreadGroup * tg_ptr)47 void PreinitThreads(ThreadGroup* tg_ptr) {
48   ThreadGroupMain* tgp = GetTgp(tg_ptr);
49   GetCbp(&tgp->shared)->is_last_block = 0;
50   tgp->thread_func_ptr = nullptr;
51   tgp->threads = nullptr;
52   tgp->is_unjoined = 0;
53   tgp->is_active = 0;
54 }
55 
NumCpu(int32_t * known_procs_ptr)56 uint32_t NumCpu(int32_t* known_procs_ptr) {
57 #ifdef _WIN32
58   SYSTEM_INFO sysinfo;
59   GetSystemInfo(&sysinfo);
60   const int32_t known_procs = sysinfo.dwNumberOfProcessors;
61   uint32_t max_thread_ct = known_procs;
62 #else
63   const int32_t known_procs = sysconf(_SC_NPROCESSORS_ONLN);
64   uint32_t max_thread_ct = (known_procs == -1)? 1 : known_procs;
65 #endif
66   if (known_procs_ptr) {
67     *known_procs_ptr = known_procs;
68   }
69   if (max_thread_ct > kMaxThreads) {
70     max_thread_ct = kMaxThreads;
71   }
72   return max_thread_ct;
73 }
74 
SetThreadCt(uint32_t thread_ct,ThreadGroup * tg_ptr)75 BoolErr SetThreadCt(uint32_t thread_ct, ThreadGroup* tg_ptr) {
76   ThreadGroupMain* tgp = GetTgp(tg_ptr);
77   assert(!tgp->is_active);
78   if (tgp->threads) {
79     free(tgp->threads);
80     tgp->threads = nullptr;
81   }
82   assert(thread_ct && (thread_ct <= kMaxThreads));
83 #ifdef _WIN32
84   unsigned char* memptr = S_CAST(unsigned char*, malloc(thread_ct * (sizeof(ThreadGroupFuncArg) + sizeof(HANDLE))));
85   if (unlikely(!memptr)) {
86     return 1;
87   }
88   tgp->threads = R_CAST(HANDLE*, memptr);
89   memset(tgp->threads, 0, thread_ct * sizeof(HANDLE));
90   memptr = &(memptr[thread_ct * sizeof(HANDLE)]);
91 #else
92   unsigned char* memptr = S_CAST(unsigned char*, malloc(thread_ct * (sizeof(pthread_t) + sizeof(ThreadGroupFuncArg))));
93   if (unlikely(!memptr)) {
94     return 1;
95   }
96   tgp->threads = R_CAST(pthread_t*, memptr);
97   // !is_active currently guarantees that the sync events/mutex/condvars are
98   // not initialized.  Could change this later.
99   tgp->sync_init_state = 0;
100   memptr = &(memptr[thread_ct * sizeof(pthread_t)]);
101 #endif
102   ThreadGroupControlBlock* cbp = GetCbp(&tgp->shared);
103   cbp->active_ct = 0;
104   tgp->thread_args = R_CAST(ThreadGroupFuncArg*, memptr);
105 
106   cbp->thread_ct = thread_ct;
107   return 0;
108 }
109 
110 // Note that thread_ct is permitted to be less than tgp->shared.cb.thread_ct,
111 // to support the SpawnThreads() error cases.
JoinThreadsInternal(uint32_t thread_ct,ThreadGroupMain * tgp)112 void JoinThreadsInternal(uint32_t thread_ct, ThreadGroupMain* tgp) {
113   assert(tgp->is_active);
114   ThreadGroupControlBlock* cbp = GetCbp(&tgp->shared);
115 #ifdef _WIN32
116   if (!cbp->is_last_block) {
117     WaitForSingleObject(cbp->cur_block_done_event, INFINITE);
118   } else {
119     WaitForAllObjects(thread_ct, tgp->threads);
120     for (uint32_t tidx = 0; tidx != thread_ct; ++tidx) {
121       CloseHandle(tgp->threads[tidx]);
122     }
123     CloseHandle(cbp->start_next_events[0]);
124     CloseHandle(cbp->start_next_events[1]);
125     CloseHandle(cbp->cur_block_done_event);
126     memset(tgp->threads, 0, thread_ct * sizeof(HANDLE));
127     tgp->is_active = 0;
128   }
129 #else
130   if (!cbp->is_last_block) {
131     pthread_mutex_lock(&cbp->sync_mutex);
132     while (cbp->active_ct) {
133       pthread_cond_wait(&cbp->cur_block_done_condvar, &cbp->sync_mutex);
134     }
135     // keep mutex until next block loaded
136   } else {
137     for (uint32_t tidx = 0; tidx != thread_ct; ++tidx) {
138       pthread_join(tgp->threads[tidx], nullptr);
139     }
140     pthread_mutex_destroy(&cbp->sync_mutex);
141     pthread_cond_destroy(&cbp->cur_block_done_condvar);
142     pthread_cond_destroy(&cbp->start_next_condvar);
143     tgp->is_active = 0;
144     tgp->sync_init_state = 0;
145   }
146 #endif
147   tgp->is_unjoined = 0;
148 }
149 
150 #if defined(__cplusplus) && !defined(_WIN32)
151 Plink2ThreadStartup g_thread_startup;
152 #endif
153 
SpawnThreads(ThreadGroup * tg_ptr)154 BoolErr SpawnThreads(ThreadGroup* tg_ptr) {
155   ThreadGroupMain* tgp = GetTgp(tg_ptr);
156   ThreadGroupControlBlock* cbp = GetCbp(&tgp->shared);
157   const uint32_t thread_ct = cbp->thread_ct;
158   const uint32_t was_active = tgp->is_active;
159   const uint32_t is_last_block = cbp->is_last_block;
160   pthread_t* threads = tgp->threads;
161   assert(threads != nullptr);
162 #ifdef _WIN32
163   if (!was_active) {
164     cbp->spawn_ct = 0;
165     // manual-reset broadcast event
166     HANDLE cur_event = CreateEvent(nullptr, TRUE, FALSE, nullptr);
167     if (unlikely(!cur_event)) {
168       return 1;
169     }
170     cbp->start_next_events[0] = cur_event;
171     cur_event = CreateEvent(nullptr, TRUE, FALSE, nullptr);
172     if (unlikely(!cur_event)) {
173       CloseHandle(cbp->start_next_events[0]);
174       return 1;
175     }
176     cbp->start_next_events[1] = cur_event;
177     cur_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
178     if (unlikely(!cur_event)) {
179       CloseHandle(cbp->start_next_events[0]);
180       CloseHandle(cbp->start_next_events[1]);
181       return 1;
182     }
183     cbp->cur_block_done_event = cur_event;
184     cbp->active_ct = thread_ct;
185     for (uint32_t tidx = 0; tidx != thread_ct; ++tidx) {
186       ThreadGroupFuncArg* arg_slot = &(tgp->thread_args[tidx]);
187       arg_slot->sharedp = &(tgp->shared);
188       arg_slot->tidx = tidx;
189       threads[tidx] = R_CAST(HANDLE, _beginthreadex(nullptr, kDefaultThreadStack, tgp->thread_func_ptr, arg_slot, 0, nullptr));
190       if (unlikely(!threads[tidx])) {
191         if (tidx) {
192           if (!is_last_block) {
193             // not sure the old TerminateThread() code ever worked properly in
194             // the first place...
195             // anyway, new contract makes clean error-shutdown easy
196             if (!__atomic_sub_fetch(&cbp->active_ct, thread_ct - tidx, __ATOMIC_ACQ_REL)) {
197               SetEvent(cbp->cur_block_done_event);
198             }
199             JoinThreadsInternal(tidx, tgp);
200             cbp->is_last_block = 2;
201             const uint32_t start_next_parity = cbp->spawn_ct & 1;
202             // no need to reset start_prev_parity
203             cbp->spawn_ct += 1;
204             SetEvent(cbp->start_next_events[start_next_parity]);
205           }
206           JoinThreadsInternal(tidx, tgp);
207         }
208         return 1;
209       }
210     }
211     tgp->is_active = 1;
212   } else {
213     cbp->spawn_ct += 1;
214     cbp->active_ct = thread_ct;
215     const uint32_t start_prev_parity = cbp->spawn_ct & 1;
216     ResetEvent(cbp->start_next_events[start_prev_parity]);
217     SetEvent(cbp->start_next_events[start_prev_parity ^ 1]);
218   }
219 #else
220   if (!is_last_block) {
221     cbp->active_ct = thread_ct;
222   }
223   if (!was_active) {
224     cbp->spawn_ct = 0;
225     assert(!tgp->sync_init_state);
226     if (unlikely(pthread_mutex_init(&cbp->sync_mutex, nullptr))) {
227       return 1;
228     }
229     if (unlikely(pthread_cond_init(&cbp->cur_block_done_condvar, nullptr))) {
230       tgp->sync_init_state = 1;
231       return 1;
232     }
233     if (unlikely(pthread_cond_init(&cbp->start_next_condvar, nullptr))) {
234       tgp->sync_init_state = 2;
235       return 1;
236     }
237     tgp->sync_init_state = 3;
238 #  ifndef __cplusplus
239     pthread_attr_t smallstack_thread_attr;
240     if (unlikely(pthread_attr_init(&smallstack_thread_attr))) {
241       return 1;
242     }
243     pthread_attr_setstacksize(&smallstack_thread_attr, kDefaultThreadStack);
244 #  endif
245     for (uint32_t tidx = 0; tidx != thread_ct; ++tidx) {
246       ThreadGroupFuncArg* arg_slot = &(tgp->thread_args[tidx]);
247       arg_slot->sharedp = &(tgp->shared);
248       arg_slot->tidx = tidx;
249       if (unlikely(pthread_create(&(threads[tidx]),
250 #  ifdef __cplusplus
251                                   &g_thread_startup.smallstack_thread_attr,
252 #  else
253                                   &smallstack_thread_attr,
254 #  endif
255                                   tgp->thread_func_ptr, arg_slot))) {
256         if (tidx) {
257           if (!is_last_block) {
258             JoinThreadsInternal(tidx, tgp);
259             const uint32_t unstarted_thread_ct = thread_ct - tidx;
260             cbp->active_ct -= unstarted_thread_ct;
261             while (cbp->active_ct) {
262               pthread_cond_wait(&cbp->cur_block_done_condvar, &cbp->sync_mutex);
263             }
264             cbp->is_last_block = 2;
265             cbp->spawn_ct += 1;
266             pthread_cond_broadcast(&cbp->start_next_condvar);
267             pthread_mutex_unlock(&cbp->sync_mutex);
268           }
269           JoinThreadsInternal(tidx, tgp);
270         } else {
271           cbp->active_ct = 0;
272         }
273 #  ifndef __cplusplus
274         pthread_attr_destroy(&smallstack_thread_attr);
275 #  endif
276         return 1;
277       }
278     }
279 #  ifndef __cplusplus
280     pthread_attr_destroy(&smallstack_thread_attr);
281 #  endif
282     tgp->is_active = 1;
283   } else {
284     cbp->spawn_ct += 1;
285     // still holding mutex
286     pthread_cond_broadcast(&cbp->start_next_condvar);
287     pthread_mutex_unlock(&cbp->sync_mutex);
288   }
289 #endif
290   tgp->is_unjoined = 1;
291   return 0;
292 }
293 
JoinThreads(ThreadGroup * tg_ptr)294 void JoinThreads(ThreadGroup* tg_ptr) {
295   ThreadGroupMain* tgp = GetTgp(tg_ptr);
296   JoinThreadsInternal(GetCbp(&tgp->shared)->thread_ct, tgp);
297 }
298 
StopThreads(ThreadGroup * tg_ptr)299 void StopThreads(ThreadGroup* tg_ptr) {
300   ThreadGroupMain* tgp = GetTgp(tg_ptr);
301   ThreadGroupControlBlock* cbp = GetCbp(&tgp->shared);
302   cbp->is_last_block = 2;
303   SpawnThreads(tg_ptr);
304   JoinThreadsInternal(cbp->thread_ct, tgp);
305 }
306 
CleanupThreads(ThreadGroup * tg_ptr)307 void CleanupThreads(ThreadGroup* tg_ptr) {
308   ThreadGroupMain* tgp = GetTgp(tg_ptr);
309   ThreadGroupControlBlock* cbp = GetCbp(&tgp->shared);
310   if (tgp->threads) {
311     const uint32_t thread_ct = cbp->thread_ct;
312     if (tgp->is_active) {
313       if (tgp->is_unjoined) {
314         JoinThreadsInternal(thread_ct, tgp);
315       }
316       if (!cbp->is_last_block) {
317         StopThreads(tg_ptr);
318       }
319 #ifndef _WIN32
320     } else {
321       do {
322         const uint32_t sync_init_state = tgp->sync_init_state;
323         if (!sync_init_state) {
324           break;
325         }
326         pthread_mutex_destroy(&cbp->sync_mutex);
327         if (sync_init_state == 1) {
328           break;
329         }
330         pthread_cond_destroy(&cbp->cur_block_done_condvar);
331         if (sync_init_state == 2) {
332           break;
333         }
334         pthread_cond_destroy(&cbp->start_next_condvar);
335       } while (0);
336       tgp->sync_init_state = 0;
337 #endif
338     }
339 #ifndef _WIN32
340     assert(!cbp->active_ct);
341 #endif
342     cbp->thread_ct = 0;
343     free(tgp->threads);
344     tgp->threads = nullptr;
345   }
346   cbp->is_last_block = 0;
347   tgp->thread_func_ptr = nullptr;
348 }
349 
350 #ifndef _WIN32
THREAD_BLOCK_FINISH(ThreadGroupFuncArg * tgfap)351 BoolErr THREAD_BLOCK_FINISH(ThreadGroupFuncArg* tgfap) {
352   ThreadGroupControlBlock* cbp = GetCbp(tgfap->sharedp);
353   if (cbp->is_last_block) {
354     return 1;
355   }
356   const uintptr_t initial_spawn_ct = cbp->spawn_ct;
357   pthread_mutex_lock(&cbp->sync_mutex);
358   if (!(--cbp->active_ct)) {
359     pthread_cond_signal(&cbp->cur_block_done_condvar);
360   }
361   while (cbp->spawn_ct == initial_spawn_ct) {
362     // spurious wakeup guard
363     pthread_cond_wait(&cbp->start_next_condvar, &cbp->sync_mutex);
364   }
365   pthread_mutex_unlock(&cbp->sync_mutex);
366   return (cbp->is_last_block == 2);
367 }
368 #endif
369 
UpdateU64IfSmaller(uint64_t newval,uint64_t * oldval_ptr)370 void UpdateU64IfSmaller(uint64_t newval, uint64_t* oldval_ptr) {
371   uint64_t oldval = *oldval_ptr;
372   while (oldval > newval) {
373     if (ATOMIC_COMPARE_EXCHANGE_N_U64(oldval_ptr, &oldval, newval, 1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
374       break;
375     }
376   }
377 }
378 
379 #ifdef __cplusplus
380 }  // namespace plink2
381 #endif
382