1 /* Threads.c -- multithreading library
2 2021-12-21 : Igor Pavlov : Public domain */
3 
4 #include "Precomp.h"
5 
6 #ifdef _WIN32
7 
8 #ifndef USE_THREADS_CreateThread
9 #include <process.h>
10 #endif
11 
12 #include "Threads.h"
13 
GetError()14 static WRes GetError()
15 {
16   DWORD res = GetLastError();
17   return res ? (WRes)res : 1;
18 }
19 
HandleToWRes(HANDLE h)20 static WRes HandleToWRes(HANDLE h) { return (h != NULL) ? 0 : GetError(); }
BOOLToWRes(BOOL v)21 static WRes BOOLToWRes(BOOL v) { return v ? 0 : GetError(); }
22 
HandlePtr_Close(HANDLE * p)23 WRes HandlePtr_Close(HANDLE *p)
24 {
25   if (*p != NULL)
26   {
27     if (!CloseHandle(*p))
28       return GetError();
29     *p = NULL;
30   }
31   return 0;
32 }
33 
Handle_WaitObject(HANDLE h)34 WRes Handle_WaitObject(HANDLE h)
35 {
36   DWORD dw = WaitForSingleObject(h, INFINITE);
37   /*
38     (dw) result:
39     WAIT_OBJECT_0  // 0
40     WAIT_ABANDONED // 0x00000080 : is not compatible with Win32 Error space
41     WAIT_TIMEOUT   // 0x00000102 : is     compatible with Win32 Error space
42     WAIT_FAILED    // 0xFFFFFFFF
43   */
44   if (dw == WAIT_FAILED)
45   {
46     dw = GetLastError();
47     if (dw == 0)
48       return WAIT_FAILED;
49   }
50   return (WRes)dw;
51 }
52 
53 #define Thread_Wait(p) Handle_WaitObject(*(p))
54 
Thread_Wait_Close(CThread * p)55 WRes Thread_Wait_Close(CThread *p)
56 {
57   WRes res = Thread_Wait(p);
58   WRes res2 = Thread_Close(p);
59   return (res != 0 ? res : res2);
60 }
61 
Thread_Create(CThread * p,THREAD_FUNC_TYPE func,LPVOID param)62 WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
63 {
64   /* Windows Me/98/95: threadId parameter may not be NULL in _beginthreadex/CreateThread functions */
65 
66   #ifdef USE_THREADS_CreateThread
67 
68   DWORD threadId;
69   *p = CreateThread(NULL, 0, func, param, 0, &threadId);
70 
71   #else
72 
73   unsigned threadId;
74   *p = (HANDLE)(_beginthreadex(NULL, 0, func, param, 0, &threadId));
75 
76   #endif
77 
78   /* maybe we must use errno here, but probably GetLastError() is also OK. */
79   return HandleToWRes(*p);
80 }
81 
82 
Thread_Create_With_Affinity(CThread * p,THREAD_FUNC_TYPE func,LPVOID param,CAffinityMask affinity)83 WRes Thread_Create_With_Affinity(CThread *p, THREAD_FUNC_TYPE func, LPVOID param, CAffinityMask affinity)
84 {
85   #ifdef USE_THREADS_CreateThread
86 
87   UNUSED_VAR(affinity)
88   return Thread_Create(p, func, param);
89 
90   #else
91 
92   /* Windows Me/98/95: threadId parameter may not be NULL in _beginthreadex/CreateThread functions */
93   HANDLE h;
94   WRes wres;
95   unsigned threadId;
96   h = (HANDLE)(_beginthreadex(NULL, 0, func, param, CREATE_SUSPENDED, &threadId));
97   *p = h;
98   wres = HandleToWRes(h);
99   if (h)
100   {
101     {
102       // DWORD_PTR prevMask =
103       SetThreadAffinityMask(h, (DWORD_PTR)affinity);
104       /*
105       if (prevMask == 0)
106       {
107         // affinity change is non-critical error, so we can ignore it
108         // wres = GetError();
109       }
110       */
111     }
112     {
113       DWORD prevSuspendCount = ResumeThread(h);
114       /* ResumeThread() returns:
115          0 : was_not_suspended
116          1 : was_resumed
117         -1 : error
118       */
119       if (prevSuspendCount == (DWORD)-1)
120         wres = GetError();
121     }
122   }
123 
124   /* maybe we must use errno here, but probably GetLastError() is also OK. */
125   return wres;
126 
127   #endif
128 }
129 
130 
Event_Create(CEvent * p,BOOL manualReset,int signaled)131 static WRes Event_Create(CEvent *p, BOOL manualReset, int signaled)
132 {
133   *p = CreateEvent(NULL, manualReset, (signaled ? TRUE : FALSE), NULL);
134   return HandleToWRes(*p);
135 }
136 
Event_Set(CEvent * p)137 WRes Event_Set(CEvent *p) { return BOOLToWRes(SetEvent(*p)); }
Event_Reset(CEvent * p)138 WRes Event_Reset(CEvent *p) { return BOOLToWRes(ResetEvent(*p)); }
139 
ManualResetEvent_Create(CManualResetEvent * p,int signaled)140 WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled) { return Event_Create(p, TRUE, signaled); }
AutoResetEvent_Create(CAutoResetEvent * p,int signaled)141 WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled) { return Event_Create(p, FALSE, signaled); }
ManualResetEvent_CreateNotSignaled(CManualResetEvent * p)142 WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p) { return ManualResetEvent_Create(p, 0); }
AutoResetEvent_CreateNotSignaled(CAutoResetEvent * p)143 WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) { return AutoResetEvent_Create(p, 0); }
144 
145 
Semaphore_Create(CSemaphore * p,UInt32 initCount,UInt32 maxCount)146 WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
147 {
148   // negative ((LONG)maxCount) is not supported in WIN32::CreateSemaphore()
149   *p = CreateSemaphore(NULL, (LONG)initCount, (LONG)maxCount, NULL);
150   return HandleToWRes(*p);
151 }
152 
Semaphore_OptCreateInit(CSemaphore * p,UInt32 initCount,UInt32 maxCount)153 WRes Semaphore_OptCreateInit(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
154 {
155   // if (Semaphore_IsCreated(p))
156   {
157     WRes wres = Semaphore_Close(p);
158     if (wres != 0)
159       return wres;
160   }
161   return Semaphore_Create(p, initCount, maxCount);
162 }
163 
Semaphore_Release(CSemaphore * p,LONG releaseCount,LONG * previousCount)164 static WRes Semaphore_Release(CSemaphore *p, LONG releaseCount, LONG *previousCount)
165   { return BOOLToWRes(ReleaseSemaphore(*p, releaseCount, previousCount)); }
Semaphore_ReleaseN(CSemaphore * p,UInt32 num)166 WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num)
167   { return Semaphore_Release(p, (LONG)num, NULL); }
Semaphore_Release1(CSemaphore * p)168 WRes Semaphore_Release1(CSemaphore *p) { return Semaphore_ReleaseN(p, 1); }
169 
CriticalSection_Init(CCriticalSection * p)170 WRes CriticalSection_Init(CCriticalSection *p)
171 {
172   /* InitializeCriticalSection() can raise exception:
173      Windows XP, 2003 : can raise a STATUS_NO_MEMORY exception
174      Windows Vista+   : no exceptions */
175   #ifdef _MSC_VER
176   __try
177   #endif
178   {
179     InitializeCriticalSection(p);
180     /* InitializeCriticalSectionAndSpinCount(p, 0); */
181   }
182   #ifdef _MSC_VER
183   __except (EXCEPTION_EXECUTE_HANDLER) { return ERROR_NOT_ENOUGH_MEMORY; }
184   #endif
185   return 0;
186 }
187 
188 
189 
190 
191 #else // _WIN32
192 
193 // ---------- POSIX ----------
194 
195 #ifndef __APPLE__
196 #ifndef _7ZIP_AFFINITY_DISABLE
197 // _GNU_SOURCE can be required for pthread_setaffinity_np() / CPU_ZERO / CPU_SET
198 #define _GNU_SOURCE
199 #endif
200 #endif
201 
202 #include "Threads.h"
203 
204 #include <errno.h>
205 #include <stdlib.h>
206 #include <string.h>
207 #ifdef _7ZIP_AFFINITY_SUPPORTED
208 // #include <sched.h>
209 #endif
210 
211 
212 // #include <stdio.h>
213 // #define PRF(p) p
214 #define PRF(p)
215 
216 #define Print(s) PRF(printf("\n%s\n", s))
217 
218 // #include <stdio.h>
219 
Thread_Create_With_CpuSet(CThread * p,THREAD_FUNC_TYPE func,LPVOID param,const CCpuSet * cpuSet)220 WRes Thread_Create_With_CpuSet(CThread *p, THREAD_FUNC_TYPE func, LPVOID param, const CCpuSet *cpuSet)
221 {
222   // new thread in Posix probably inherits affinity from parrent thread
223   Print("Thread_Create_With_CpuSet");
224 
225   pthread_attr_t attr;
226   int ret;
227   // int ret2;
228 
229   p->_created = 0;
230 
231   RINOK(pthread_attr_init(&attr));
232 
233   ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
234 
235   if (!ret)
236   {
237     if (cpuSet)
238     {
239       #ifdef _7ZIP_AFFINITY_SUPPORTED
240 
241       /*
242       printf("\n affinity :");
243       unsigned i;
244       for (i = 0; i < sizeof(*cpuSet) && i < 8; i++)
245       {
246         Byte b = *((const Byte *)cpuSet + i);
247         char temp[32];
248         #define GET_HEX_CHAR(t) ((char)(((t < 10) ? ('0' + t) : ('A' + (t - 10)))))
249         temp[0] = GET_HEX_CHAR((b & 0xF));
250         temp[1] = GET_HEX_CHAR((b >> 4));
251         // temp[0] = GET_HEX_CHAR((b >> 4));  // big-endian
252         // temp[1] = GET_HEX_CHAR((b & 0xF));  // big-endian
253         temp[2] = 0;
254         printf("%s", temp);
255       }
256       printf("\n");
257       */
258 
259       // ret2 =
260       pthread_attr_setaffinity_np(&attr, sizeof(*cpuSet), cpuSet);
261       // if (ret2) ret = ret2;
262       #endif
263     }
264 
265     ret = pthread_create(&p->_tid, &attr, func, param);
266 
267     if (!ret)
268     {
269       p->_created = 1;
270       /*
271       if (cpuSet)
272       {
273         // ret2 =
274         pthread_setaffinity_np(p->_tid, sizeof(*cpuSet), cpuSet);
275         // if (ret2) ret = ret2;
276       }
277       */
278     }
279   }
280   // ret2 =
281   pthread_attr_destroy(&attr);
282   // if (ret2 != 0) ret = ret2;
283   return ret;
284 }
285 
286 
Thread_Create(CThread * p,THREAD_FUNC_TYPE func,LPVOID param)287 WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
288 {
289   return Thread_Create_With_CpuSet(p, func, param, NULL);
290 }
291 
292 
Thread_Create_With_Affinity(CThread * p,THREAD_FUNC_TYPE func,LPVOID param,CAffinityMask affinity)293 WRes Thread_Create_With_Affinity(CThread *p, THREAD_FUNC_TYPE func, LPVOID param, CAffinityMask affinity)
294 {
295   Print("Thread_Create_WithAffinity");
296   CCpuSet cs;
297   unsigned i;
298   CpuSet_Zero(&cs);
299   for (i = 0; i < sizeof(affinity) * 8; i++)
300   {
301     if (affinity == 0)
302       break;
303     if (affinity & 1)
304     {
305       CpuSet_Set(&cs, i);
306     }
307     affinity >>= 1;
308   }
309   return Thread_Create_With_CpuSet(p, func, param, &cs);
310 }
311 
312 
Thread_Close(CThread * p)313 WRes Thread_Close(CThread *p)
314 {
315   // Print("Thread_Close");
316   int ret;
317   if (!p->_created)
318     return 0;
319 
320   ret = pthread_detach(p->_tid);
321   p->_tid = 0;
322   p->_created = 0;
323   return ret;
324 }
325 
326 
Thread_Wait_Close(CThread * p)327 WRes Thread_Wait_Close(CThread *p)
328 {
329   // Print("Thread_Wait_Close");
330   void *thread_return;
331   int ret;
332   if (!p->_created)
333     return EINVAL;
334 
335   ret = pthread_join(p->_tid, &thread_return);
336   // probably we can't use that (_tid) after pthread_join(), so we close thread here
337   p->_created = 0;
338   p->_tid = 0;
339   return ret;
340 }
341 
342 
343 
Event_Create(CEvent * p,int manualReset,int signaled)344 static WRes Event_Create(CEvent *p, int manualReset, int signaled)
345 {
346   RINOK(pthread_mutex_init(&p->_mutex, NULL));
347   RINOK(pthread_cond_init(&p->_cond, NULL));
348   p->_manual_reset = manualReset;
349   p->_state = (signaled ? True : False);
350   p->_created = 1;
351   return 0;
352 }
353 
ManualResetEvent_Create(CManualResetEvent * p,int signaled)354 WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled)
355   { return Event_Create(p, True, signaled); }
ManualResetEvent_CreateNotSignaled(CManualResetEvent * p)356 WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p)
357   { return ManualResetEvent_Create(p, 0); }
AutoResetEvent_Create(CAutoResetEvent * p,int signaled)358 WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled)
359   { return Event_Create(p, False, signaled); }
AutoResetEvent_CreateNotSignaled(CAutoResetEvent * p)360 WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p)
361   { return AutoResetEvent_Create(p, 0); }
362 
363 
Event_Set(CEvent * p)364 WRes Event_Set(CEvent *p)
365 {
366   RINOK(pthread_mutex_lock(&p->_mutex));
367   p->_state = True;
368   int res1 = pthread_cond_broadcast(&p->_cond);
369   int res2 = pthread_mutex_unlock(&p->_mutex);
370   return (res2 ? res2 : res1);
371 }
372 
Event_Reset(CEvent * p)373 WRes Event_Reset(CEvent *p)
374 {
375   RINOK(pthread_mutex_lock(&p->_mutex));
376   p->_state = False;
377   return pthread_mutex_unlock(&p->_mutex);
378 }
379 
Event_Wait(CEvent * p)380 WRes Event_Wait(CEvent *p)
381 {
382   RINOK(pthread_mutex_lock(&p->_mutex));
383   while (p->_state == False)
384   {
385     // ETIMEDOUT
386     // ret =
387     pthread_cond_wait(&p->_cond, &p->_mutex);
388     // if (ret != 0) break;
389   }
390   if (p->_manual_reset == False)
391   {
392     p->_state = False;
393   }
394   return pthread_mutex_unlock(&p->_mutex);
395 }
396 
Event_Close(CEvent * p)397 WRes Event_Close(CEvent *p)
398 {
399   if (!p->_created)
400     return 0;
401   p->_created = 0;
402   {
403     int res1 = pthread_mutex_destroy(&p->_mutex);
404     int res2 = pthread_cond_destroy(&p->_cond);
405     return (res1 ? res1 : res2);
406   }
407 }
408 
409 
Semaphore_Create(CSemaphore * p,UInt32 initCount,UInt32 maxCount)410 WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
411 {
412   if (initCount > maxCount || maxCount < 1)
413     return EINVAL;
414   RINOK(pthread_mutex_init(&p->_mutex, NULL));
415   RINOK(pthread_cond_init(&p->_cond, NULL));
416   p->_count = initCount;
417   p->_maxCount = maxCount;
418   p->_created = 1;
419   return 0;
420 }
421 
422 
Semaphore_OptCreateInit(CSemaphore * p,UInt32 initCount,UInt32 maxCount)423 WRes Semaphore_OptCreateInit(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
424 {
425   if (Semaphore_IsCreated(p))
426   {
427     /*
428     WRes wres = Semaphore_Close(p);
429     if (wres != 0)
430       return wres;
431     */
432     if (initCount > maxCount || maxCount < 1)
433       return EINVAL;
434     // return EINVAL; // for debug
435     p->_count = initCount;
436     p->_maxCount = maxCount;
437     return 0;
438   }
439   return Semaphore_Create(p, initCount, maxCount);
440 }
441 
442 
Semaphore_ReleaseN(CSemaphore * p,UInt32 releaseCount)443 WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 releaseCount)
444 {
445   UInt32 newCount;
446   int ret;
447 
448   if (releaseCount < 1)
449     return EINVAL;
450 
451   RINOK(pthread_mutex_lock(&p->_mutex));
452 
453   newCount = p->_count + releaseCount;
454   if (newCount > p->_maxCount)
455     ret = ERROR_TOO_MANY_POSTS; // EINVAL;
456   else
457   {
458     p->_count = newCount;
459     ret = pthread_cond_broadcast(&p->_cond);
460   }
461   RINOK(pthread_mutex_unlock(&p->_mutex));
462   return ret;
463 }
464 
Semaphore_Wait(CSemaphore * p)465 WRes Semaphore_Wait(CSemaphore *p)
466 {
467   RINOK(pthread_mutex_lock(&p->_mutex));
468   while (p->_count < 1)
469   {
470     pthread_cond_wait(&p->_cond, &p->_mutex);
471   }
472   p->_count--;
473   return pthread_mutex_unlock(&p->_mutex);
474 }
475 
Semaphore_Close(CSemaphore * p)476 WRes Semaphore_Close(CSemaphore *p)
477 {
478   if (!p->_created)
479     return 0;
480   p->_created = 0;
481   {
482     int res1 = pthread_mutex_destroy(&p->_mutex);
483     int res2 = pthread_cond_destroy(&p->_cond);
484     return (res1 ? res1 : res2);
485   }
486 }
487 
488 
489 
CriticalSection_Init(CCriticalSection * p)490 WRes CriticalSection_Init(CCriticalSection *p)
491 {
492   // Print("CriticalSection_Init");
493   if (!p)
494     return EINTR;
495   return pthread_mutex_init(&p->_mutex, NULL);
496 }
497 
CriticalSection_Enter(CCriticalSection * p)498 void CriticalSection_Enter(CCriticalSection *p)
499 {
500   // Print("CriticalSection_Enter");
501   if (p)
502   {
503     // int ret =
504     pthread_mutex_lock(&p->_mutex);
505   }
506 }
507 
CriticalSection_Leave(CCriticalSection * p)508 void CriticalSection_Leave(CCriticalSection *p)
509 {
510   // Print("CriticalSection_Leave");
511   if (p)
512   {
513     // int ret =
514     pthread_mutex_unlock(&p->_mutex);
515   }
516 }
517 
CriticalSection_Delete(CCriticalSection * p)518 void CriticalSection_Delete(CCriticalSection *p)
519 {
520   // Print("CriticalSection_Delete");
521   if (p)
522   {
523     // int ret =
524     pthread_mutex_destroy(&p->_mutex);
525   }
526 }
527 
InterlockedIncrement(LONG volatile * addend)528 LONG InterlockedIncrement(LONG volatile *addend)
529 {
530   // Print("InterlockedIncrement");
531   #ifdef USE_HACK_UNSAFE_ATOMIC
532     LONG val = *addend + 1;
533     *addend = val;
534     return val;
535   #else
536     return __sync_add_and_fetch(addend, 1);
537   #endif
538 }
539 
540 #endif // _WIN32
541