1 #include "Core.h"
2
3 #ifdef PLATFORM_OSX
4 #include <sys/time.h>
5 #endif
6
7 namespace Upp {
8
9 #define LLOG(x) // DLOG(x)
10
sMutexLock()11 static Mutex& sMutexLock()
12 { // this is Mutex intended to synchronize initialization of other primitives
13 static Mutex m;
14 return m;
15 }
16
17 INITBLOCK {
18 sMutexLock();
19 }
20
21 Thread::Thread()
22 {
23 sMutexLock();
24 #ifdef PLATFORM_WIN32
25 handle = 0;
26 thread_id = 0;
27 #endif
28 #ifdef PLATFORM_POSIX
29 handle = 0;
30 #endif
31 }
32
Detach()33 void Thread::Detach()
34 {
35 #if defined(PLATFORM_WIN32)
36 if(handle) {
37 CloseHandle(handle);
38 handle = 0;
39 thread_id = 0;
40 }
41 #elif defined(PLATFORM_POSIX)
42 if(handle) {
43 CHECK(!pthread_detach(handle));
44 handle = 0;
45 }
46 #endif
47 }
48
49 static Atomic sThreadCount;
50
51 static thread_local void (*sExit)(void);
52
AtExit(void (* exitfn)())53 void (*Thread::AtExit(void (*exitfn)()))()
54 {
55 void (*prev)() = sExit;
56 sExit = exitfn;
57 return prev;
58 }
59
60 struct sThreadExitExc__ {};
61
Exit()62 void Thread::Exit()
63 {
64 throw sThreadExitExc__();
65 }
66
67 struct sThreadParam {
68 Function<void ()> cb;
69 bool noshutdown;
70 };
71
72 static
73 #ifdef PLATFORM_WIN32
74 #ifdef CPU_64
75 unsigned int
76 #else
77 uintptr_t __stdcall
78 #endif
79 #else
80 void *
81 #endif
sThreadRoutine(void * arg)82 sThreadRoutine(void *arg)
83 {
84 LLOG("sThreadRoutine");
85 auto p = (sThreadParam *)arg;
86 try {
87 p->cb();
88 }
89 catch(Exc e) {
90 Panic(e);
91 }
92 catch(sThreadExitExc__) {}
93 catch(Upp::ExitExc) {}
94 if(!p->noshutdown)
95 AtomicDec(sThreadCount);
96 delete p;
97 if(sExit)
98 (*sExit)();
99 #ifdef UPP_HEAP
100 MemoryFreeThread();
101 #endif
102 return 0;
103 }
104
105 static bool threadr; //indicates if *any* Thread instance is running (having called its Run()), upon first call of Run
106 #ifndef CPU_BLACKFIN
107 static thread_local bool sMain;
108 #else
109 #ifdef PLATFORM_POSIX
110 static Index<pthread_t> threadsv; //a threads id vector, sMain=true ==>> 'pthread_self() pthread_t beeing present in vector, problem, wont be cleared when thread exits
111 Mutex vm; //a common access synchronizer
112 #endif
113 #endif
114
115 //to sMain: an Application can start more than one thread, without having *any* one of them called Run() of any Thread instace
116 //when Run() is called *anytime*, it means, the term of *MainThread* has to be running anyway,
117 //otherwise no child threads could run. they are created by main.
118 //now each thread, having any Thread instance can start a first Run()
119
Run(Function<void ()> _cb,bool noshutdown)120 bool Thread::Run(Function<void ()> _cb, bool noshutdown)
121 {
122 LLOG("Thread::Run");
123 if(!noshutdown)
124 AtomicInc(sThreadCount);
125 if(!threadr)
126 #ifndef CPU_BLACKFIN
127 threadr = sMain = true;
128 #else
129 {
130 threadr = true;
131 //the sMain replacement
132 #ifdef PLATFORM_POSIX
133 pthread_t thid = pthread_self();
134 vm.Enter();
135 if(threadsv.Find(thid) < 0){
136 //thread not yet present, mark present
137 threadsv.Add(thid);
138 }
139 else
140 RLOG("BUG: Multiple Add in Mt.cpp");
141 vm.Leave();
142 #endif
143 }
144 #endif
145 Detach();
146 auto p = new sThreadParam;
147 p->cb = _cb;
148 p->noshutdown = noshutdown;
149 #ifdef PLATFORM_WIN32
150 #ifdef CPU_32 // in 32-bit, reduce stack size to 1MB to fit more threads into address space
151 handle = (HANDLE)_beginthreadex(0, 1024*1024, sThreadRoutine, p, STACK_SIZE_PARAM_IS_A_RESERVATION, ((unsigned int *)(&thread_id)));
152 #else
153 handle = (HANDLE)_beginthreadex(0, 0, sThreadRoutine, p, 0, ((unsigned int *)(&thread_id)));
154 #endif
155 #endif
156 #ifdef PLATFORM_POSIX
157 if(pthread_create(&handle, 0, sThreadRoutine, p))
158 handle = 0;
159 #endif
160 return handle;
161 }
162
RunNice(Function<void ()> cb,bool noshutdown)163 bool Thread::RunNice(Function<void ()> cb, bool noshutdown)
164 {
165 if(Run(cb, noshutdown)) {
166 Nice();
167 return true;
168 }
169 return false;
170 }
171
RunCritical(Function<void ()> cb,bool noshutdown)172 bool Thread::RunCritical(Function<void ()> cb, bool noshutdown)
173 {
174 if(Run(cb, noshutdown)) {
175 Critical();
176 return true;
177 }
178 return false;
179 }
180
~Thread()181 Thread::~Thread()
182 {
183 Detach();
184 #ifdef CPU_BLACKFIN
185 #ifdef PLATFORM_POSIX
186 //the static destruction replacement
187 pthread_t thid = pthread_self();
188 vm.Enter();
189 int id = threadsv.Find(thid);
190 if(id >= 0)
191 threadsv.Remove(id);
192 vm.Leave();
193 #endif
194 #endif
195 }
196
IsST()197 bool Thread::IsST() //the containing thread (of wich there may be multiple) has not run its Run() yet
198 {
199 return !threadr;
200 }
201
IsMain()202 bool Thread::IsMain() //the calling thread is the Main Thread or the only one in App
203 {
204 #ifndef CPU_BLACKFIN
205 return !threadr || sMain;
206 #else
207 if(!threadr)
208 return true;
209 #ifdef PLATFORM_POSIX
210 //the sMain replacement
211 pthread_t thid = pthread_self();
212 vm.Enter();
213 if(threadsv.Find(thid) >= 0)
214 {
215 vm.Leave();
216 return true;
217 }
218 vm.Leave();
219 #endif
220 return false;
221 #endif
222 }
223
GetCount()224 int Thread::GetCount()
225 {
226 return sThreadCount;
227 }
228
229 static int sShutdown;
230
BeginShutdownThreads()231 void Thread::BeginShutdownThreads()
232 {
233 sShutdown++;
234 }
235
EndShutdownThreads()236 void Thread::EndShutdownThreads()
237 {
238 sShutdown--;
239 }
240
ShutdownThreads()241 void Thread::ShutdownThreads()
242 {
243 BeginShutdownThreads();
244 while(GetCount())
245 Sleep(100);
246 EndShutdownThreads();
247 }
248
IsShutdownThreads()249 bool Thread::IsShutdownThreads()
250 {
251 return sShutdown;
252 }
253
Wait()254 int Thread::Wait()
255 {
256 if(!IsOpen())
257 return -1;
258 int out;
259 #ifdef PLATFORM_WIN32
260 dword exit;
261 if(!GetExitCodeThread(handle, &exit))
262 return -1;
263 if(exit != STILL_ACTIVE)
264 out = (int)exit;
265 else
266 {
267 if(WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0)
268 return Null;
269 out = GetExitCodeThread(handle, &exit) ? int(exit) : int(Null);
270 }
271 Detach();
272 #endif
273 #ifdef PLATFORM_POSIX
274 void *thread_return;
275 if(pthread_join(handle, &thread_return))
276 out = Null;
277 else
278 out = (int)(intptr_t)thread_return;
279 handle = 0;
280 #endif
281 return out;
282 }
283
Priority(int percent)284 bool Thread::Priority(int percent)
285 {
286 ASSERT(IsOpen());
287 #ifdef PLATFORM_WIN32
288 int prior;
289 if(percent <= 25)
290 prior = THREAD_PRIORITY_LOWEST;
291 else if(percent <= 75)
292 prior = THREAD_PRIORITY_BELOW_NORMAL;
293 else if(percent <= 125)
294 prior = THREAD_PRIORITY_NORMAL;
295 else if(percent <= 175)
296 prior = THREAD_PRIORITY_ABOVE_NORMAL;
297 else
298 prior = THREAD_PRIORITY_HIGHEST;
299 return SetThreadPriority(handle, prior);
300 #endif
301 #ifdef PLATFORM_POSIX
302 int policy;
303 struct sched_param param;
304
305 if(pthread_getschedparam(handle, &policy, ¶m))
306 return false;
307 int percent_min = 0, percent_max = 200;
308 if(percent <= 25) {
309 #if defined(SCHED_IDLE)
310 policy = SCHED_IDLE;
311 percent_min = 0;
312 percent_max = 25;
313 #elif defined(SCHED_BATCH)
314 policy = SCHED_BATCH;
315 percent_min = 0;
316 percent_max = 75;
317 #else
318 policy = SCHED_OTHER;
319 percent_min = 0;
320 percent_max = 125;
321 #endif
322 }
323 else
324 if(percent <= 75){
325 #if defined(SCHED_IDLE)
326 policy = SCHED_BATCH;
327 percent_min = 25;
328 percent_max = 75;
329 #elif defined(SCHED_BATCH)
330 policy = SCHED_BATCH;
331 percent_min = 0;
332 percent_max = 75;
333 #else
334 policy = SCHED_OTHER;
335 percent_min = 0;
336 percent_max = 125;
337 #endif
338 }
339 else
340 if(percent <= 125){
341 policy = SCHED_OTHER;
342 #if defined(SCHED_IDLE)
343 percent_min = 75;
344 percent_max = 125;
345 #elif defined(SCHED_BATCH)
346 percent_min = 25;
347 percent_max = 125;
348 #else
349 percent_min = 0;
350 percent_max = 125;
351 #endif
352 }
353 else
354 if(percent <= 175){ // should be the root
355 policy = SCHED_FIFO;
356 percent_min = 125;
357 percent_max = 175;
358 }
359 else
360 policy = SCHED_RR;
361
362 param.sched_priority = (sched_get_priority_max(policy) - sched_get_priority_min(policy))*(minmax(percent, percent_min, percent_max)-percent_min)/(percent_max - percent_min);
363
364 if (pthread_setschedparam(handle, policy, ¶m)) {
365 // No privileges? Try maximum possible! Do not use EPERM as not all os support this one
366 policy = SCHED_OTHER;
367 percent_max = 125;
368 percent_min = minmax(percent_min, 0, percent_max);
369 param.sched_priority = (sched_get_priority_max(policy) - sched_get_priority_min(policy))
370 * (minmax(percent, percent_min, percent_max) - percent_min)
371 / max(percent_max - percent_min, 1);
372 if(pthread_setschedparam(handle, policy, ¶m))
373 return false;
374 }
375 return true;
376 #endif
377 }
378
Start(Function<void ()> cb,bool noshutdown)379 void Thread::Start(Function<void ()> cb, bool noshutdown)
380 {
381 Thread t;
382 t.Run(cb);
383 t.Detach();
384 }
385
StartNice(Function<void ()> cb,bool noshutdown)386 void Thread::StartNice(Function<void ()> cb, bool noshutdown)
387 {
388 Thread t;
389 t.Run(cb);
390 t.Nice();
391 t.Detach();
392 }
393
StartCritical(Function<void ()> cb,bool noshutdown)394 void Thread::StartCritical(Function<void ()> cb, bool noshutdown)
395 {
396 Thread t;
397 t.Run(cb);
398 t.Critical();
399 t.Detach();
400 }
401
Sleep(int msec)402 void Thread::Sleep(int msec)
403 {
404 #ifdef PLATFORM_WIN32
405 ::Sleep(msec);
406 #endif
407 #ifdef PLATFORM_POSIX
408 ::timespec tval;
409 tval.tv_sec = msec / 1000;
410 tval.tv_nsec = (msec % 1000) * 1000000;
411 nanosleep(&tval, NULL);
412 #endif
413 }
414
415 #ifdef flagPROFILEMT
Dumi()416 MtInspector *MtInspector::Dumi()
417 {
418 static MtInspector h(NULL);
419 return &h;
420 }
421
~MtInspector()422 MtInspector::~MtInspector()
423 {
424 if(name)
425 RLOG("Mutex " << name << '(' << number << ") " << blocked << "/" << locked <<
426 " = " << Sprintf("%.4f", locked ? (double)blocked / locked : 0) << " blocked/locked times");
427 }
428 #endif
429
430 #ifdef PLATFORM_WIN32
431
Release()432 void Semaphore::Release()
433 {
434 ReleaseSemaphore(handle, 1, NULL);
435 }
436
Release(int n)437 void Semaphore::Release(int n)
438 {
439 ReleaseSemaphore(handle, n, NULL);
440 }
441
Wait(int timeout_ms)442 bool Semaphore::Wait(int timeout_ms)
443 {
444 return WaitForSingleObject(handle, timeout_ms < 0 ? INFINITE : timeout_ms) == WAIT_OBJECT_0;
445 }
446
Semaphore()447 Semaphore::Semaphore()
448 {
449 handle = CreateSemaphore(NULL, 0, INT_MAX, NULL);
450 }
451
~Semaphore()452 Semaphore::~Semaphore()
453 {
454 CloseHandle(handle);
455 }
456
457 Mutex& sMutexLock();
458
TryEnter()459 bool Mutex::TryEnter()
460 {
461 return TryEnterCriticalSection(§ion);
462 }
463
464 /* Win32 RWMutex implementation by Chris Thomasson, cristom@comcast.net */
465
EnterWrite()466 void RWMutex::EnterWrite()
467 {
468 EnterCriticalSection ( &m_wrlock );
469 LONG count = InterlockedExchangeAdd(&m_count, -LONG_MAX);
470 if(count < LONG_MAX)
471 if(InterlockedExchangeAdd ( &m_rdwake, LONG_MAX - count ) + LONG_MAX - count )
472 WaitForSingleObject ( m_wrwset, INFINITE );
473 }
474
LeaveWrite()475 void RWMutex::LeaveWrite()
476 {
477 LONG count = InterlockedExchangeAdd ( &m_count, LONG_MAX );
478 if (count < 0)
479 ReleaseSemaphore ( m_rdwset, count * -1, 0 );
480 LeaveCriticalSection ( &m_wrlock );
481 }
482
EnterRead()483 void RWMutex::EnterRead()
484 {
485 LONG count = InterlockedDecrement ( &m_count );
486 if(count < 0)
487 WaitForSingleObject ( m_rdwset, INFINITE );
488 }
489
LeaveRead()490 void RWMutex::LeaveRead()
491 {
492 LONG count = InterlockedIncrement ( &m_count );
493 if ( count < 1 )
494 if ( ! InterlockedDecrement ( &m_rdwake ) )
495 SetEvent ( m_wrwset );
496 }
497
RWMutex()498 RWMutex::RWMutex()
499 : m_count ( LONG_MAX ),
500 m_rdwake ( 0 ),
501 m_wrwset ( CreateEvent ( 0, FALSE, FALSE, 0 ) ),
502 m_rdwset ( CreateSemaphore ( 0, 0, LONG_MAX, 0 ) )
503 {
504 InitializeCriticalSection ( &m_wrlock );
505 }
506
~RWMutex()507 RWMutex::~RWMutex()
508 {
509 DeleteCriticalSection ( &m_wrlock );
510 CloseHandle ( m_rdwset );
511 CloseHandle ( m_wrwset );
512 }
513
514 VOID (WINAPI *ConditionVariable::InitializeConditionVariable)(PCONDITION_VARIABLE ConditionVariable);
515 VOID (WINAPI *ConditionVariable::WakeConditionVariable)(PCONDITION_VARIABLE ConditionVariable);
516 VOID (WINAPI *ConditionVariable::WakeAllConditionVariable)(PCONDITION_VARIABLE ConditionVariable);
517 BOOL (WINAPI *ConditionVariable::SleepConditionVariableCS)(PCONDITION_VARIABLE ConditionVariable, PCRITICAL_SECTION CriticalSection, DWORD dwMilliseconds);
518
Wait(Mutex & m,int timeout_ms)519 bool ConditionVariable::Wait(Mutex& m, int timeout_ms)
520 {
521 if(InitializeConditionVariable)
522 return SleepConditionVariableCS(cv, &m.section, timeout_ms < 0 ? INFINITE : timeout_ms);
523 else { // WindowsXP implementation
524 static thread_local byte buffer[sizeof(WaitingThread)]; // only one Wait per thread is possible
525 WaitingThread *w = new(buffer) WaitingThread;
526 {
527 Mutex::Lock __(mutex);
528 w->next = NULL;
529 if(head)
530 tail->next = w;
531 else
532 head = w;
533 tail = w;
534 }
535 m.Leave();
536 bool r = w->sem.Wait(timeout_ms);
537 m.Enter();
538 w->WaitingThread::~WaitingThread();
539 return r;
540 }
541 }
542
Signal()543 void ConditionVariable::Signal()
544 {
545 if(InitializeConditionVariable)
546 WakeConditionVariable(cv);
547 else { // WindowsXP implementation
548 Mutex::Lock __(mutex);
549 if(head) {
550 head->sem.Release();
551 head = head->next;
552 }
553 }
554 }
555
Broadcast()556 void ConditionVariable::Broadcast()
557 {
558 if(InitializeConditionVariable)
559 WakeAllConditionVariable(cv);
560 else { // WindowsXP implementation
561 Mutex::Lock __(mutex);
562 while(head) {
563 head->sem.Release();
564 head = head->next;
565 }
566 }
567 }
568
ConditionVariable()569 ConditionVariable::ConditionVariable()
570 {
571 #ifndef flagTESTXPCV
572 ONCELOCK {
573 if(IsWinVista()) {
574 DllFn(InitializeConditionVariable, "kernel32", "InitializeConditionVariable");
575 DllFn(WakeConditionVariable, "kernel32", "WakeConditionVariable");
576 DllFn(WakeAllConditionVariable, "kernel32", "WakeAllConditionVariable");
577 DllFn(SleepConditionVariableCS, "kernel32", "SleepConditionVariableCS");
578 }
579 }
580 #endif
581 if(InitializeConditionVariable)
582 InitializeConditionVariable(cv);
583 else
584 head = tail = NULL;
585 }
586
~ConditionVariable()587 ConditionVariable::~ConditionVariable()
588 {
589 Broadcast();
590 }
591
592 #endif
593
594 #ifdef PLATFORM_POSIX
595
Mutex()596 Mutex::Mutex()
597 {
598 pthread_mutexattr_t mutex_attr[1];
599 pthread_mutexattr_init(mutex_attr);
600 pthread_mutexattr_settype(mutex_attr, PTHREAD_MUTEX_RECURSIVE);
601 pthread_mutex_init(mutex, mutex_attr);
602 #ifdef flagPROFILEMT
603 mti = MtInspector::Dumi();
604 #endif
605 }
606
RWMutex()607 RWMutex::RWMutex()
608 {
609 pthread_rwlock_init(rwlock, NULL);
610 }
611
~RWMutex()612 RWMutex::~RWMutex()
613 {
614 pthread_rwlock_destroy(rwlock);
615 }
616
Wait(Mutex & m,int timeout_ms)617 bool ConditionVariable::Wait(Mutex& m, int timeout_ms)
618 {
619 if(timeout_ms < 0) {
620 pthread_cond_wait(cv, m.mutex);
621 return true;
622 }
623 ::timespec until;
624 clock_gettime(CLOCK_REALTIME, &until);
625
626 until.tv_sec += timeout_ms / 1000;
627 timeout_ms %= 1000;
628 until.tv_nsec += timeout_ms * 1000000;
629 until.tv_sec += until.tv_nsec / 1000000000;
630 until.tv_nsec %= 1000000000;
631
632 return pthread_cond_timedwait(cv, m.mutex, &until) == 0;
633 }
634
635 #ifdef PLATFORM_OSX
636
Release()637 void Semaphore::Release()
638 {
639 dispatch_semaphore_signal(sem);
640 }
641
Wait(int timeout_ms)642 bool Semaphore::Wait(int timeout_ms)
643 {
644 return dispatch_semaphore_wait(sem, timeout_ms < 0 ? DISPATCH_TIME_FOREVER
645 : dispatch_time(DISPATCH_TIME_NOW, 1000000 * timeout_ms)) == 0;
646 }
647
Semaphore()648 Semaphore::Semaphore()
649 {
650 sem = dispatch_semaphore_create(0);
651 }
652
~Semaphore()653 Semaphore::~Semaphore()
654 {
655 dispatch_release(sem);
656 }
657
658 #else
659
Release()660 void Semaphore::Release()
661 {
662 sem_post(&sem);
663 }
664
Wait(int timeout_ms)665 bool Semaphore::Wait(int timeout_ms)
666 {
667 if(timeout_ms < 0) {
668 sem_wait(&sem);
669 return true;
670 }
671 struct timespec until;
672 clock_gettime(CLOCK_REALTIME, &until);
673
674 until.tv_sec += timeout_ms / 1000;
675 timeout_ms %= 1000;
676 until.tv_nsec += timeout_ms * 1000000;
677 until.tv_sec += until.tv_nsec / 1000000000;
678 until.tv_nsec %= 1000000000;
679
680 return sem_timedwait(&sem,&until) != -1;
681 }
682
Semaphore()683 Semaphore::Semaphore()
684 {
685 sem_init(&sem, 0, 0);
686 }
687
~Semaphore()688 Semaphore::~Semaphore()
689 {
690 sem_destroy(&sem);
691 }
692
693 #endif
694
695 #endif
696
Invalidate()697 void LazyUpdate::Invalidate()
698 {
699 dirty.store(true, std::memory_order_release);
700 dirty = true;
701 }
702
BeginUpdate() const703 bool LazyUpdate::BeginUpdate() const
704 {
705 bool b = dirty.load(std::memory_order_acquire);
706 if(b) {
707 mutex.Enter();
708 if(dirty) return true;
709 mutex.Leave();
710 }
711 return false;
712 }
713
EndUpdate() const714 void LazyUpdate::EndUpdate() const
715 {
716 dirty.store(false, std::memory_order_release);
717 mutex.Leave();
718 }
719
LazyUpdate()720 LazyUpdate::LazyUpdate()
721 {
722 dirty = true;
723 }
724
Wait()725 void SpinLock::Wait()
726 {
727 volatile int n = 0;
728 while(locked) {
729 #ifdef CPU_X86
730 _mm_pause();
731 #endif
732 if(n++ > 500)
733 Sleep(0);
734 }
735 }
736
StartAuxThread(auxthread_t (auxthread__ * fn)(void * ptr),void * ptr)737 bool StartAuxThread(auxthread_t (auxthread__ *fn)(void *ptr), void *ptr)
738 {
739 #ifdef PLATFORM_WIN32
740 HANDLE handle;
741 handle = CreateThread(NULL, 512*1024, fn, ptr, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL);
742 if(handle) {
743 CloseHandle(handle);
744 return true;
745 }
746 #endif
747 #ifdef PLATFORM_POSIX
748 pthread_t handle;
749 if(pthread_create(&handle, 0, fn, ptr) == 0) {
750 pthread_detach(handle);
751 return true;
752 }
753 #endif
754 return false;
755 }
756
757 }
758