1 #include "Core.h"
2 
3 #ifdef PLATFORM_OSX
4 #include <sys/time.h>
5 #endif
6 
7 namespace Upp {
8 
9 #define LLOG(x)  // DLOG(x)
10 
sMutexLock()11 static Mutex& sMutexLock()
12 { // this is Mutex intended to synchronize initialization of other primitives
13 	static Mutex m;
14 	return m;
15 }
16 
17 INITBLOCK {
18 	sMutexLock();
19 }
20 
21 Thread::Thread()
22 {
23 	sMutexLock();
24 #ifdef PLATFORM_WIN32
25 	handle = 0;
26 	thread_id = 0;
27 #endif
28 #ifdef PLATFORM_POSIX
29 	handle = 0;
30 #endif
31 }
32 
Detach()33 void Thread::Detach()
34 {
35 #if defined(PLATFORM_WIN32)
36 	if(handle) {
37 		CloseHandle(handle);
38 		handle = 0;
39 		thread_id = 0;
40 	}
41 #elif defined(PLATFORM_POSIX)
42 	if(handle) {
43 		CHECK(!pthread_detach(handle));
44 		handle = 0;
45 	}
46 #endif
47 }
48 
49 static Atomic sThreadCount;
50 
51 static thread_local void (*sExit)(void);
52 
AtExit(void (* exitfn)())53 void (*Thread::AtExit(void (*exitfn)()))()
54 {
55 	void (*prev)() = sExit;
56 	sExit = exitfn;
57 	return prev;
58 }
59 
60 struct sThreadExitExc__ {};
61 
Exit()62 void Thread::Exit()
63 {
64 	throw sThreadExitExc__();
65 }
66 
67 struct sThreadParam {
68 	Function<void ()> cb;
69 	bool              noshutdown;
70 };
71 
72 static
73 #ifdef PLATFORM_WIN32
74 	#ifdef CPU_64
75 		unsigned int
76 	#else
77 		uintptr_t __stdcall
78 	#endif
79 #else
80 	void *
81 #endif
sThreadRoutine(void * arg)82 sThreadRoutine(void *arg)
83 {
84 	LLOG("sThreadRoutine");
85 	auto p = (sThreadParam *)arg;
86 	try {
87 		p->cb();
88 	}
89 	catch(Exc e) {
90 		Panic(e);
91 	}
92 	catch(sThreadExitExc__) {}
93 	catch(Upp::ExitExc) {}
94 	if(!p->noshutdown)
95 		AtomicDec(sThreadCount);
96 	delete p;
97 	if(sExit)
98 		(*sExit)();
99 #ifdef UPP_HEAP
100 	MemoryFreeThread();
101 #endif
102 	return 0;
103 }
104 
105 static bool threadr; //indicates if *any* Thread instance is running (having called its Run()), upon first call of Run
106 #ifndef CPU_BLACKFIN
107 static thread_local  bool sMain;
108 #else
109 #ifdef PLATFORM_POSIX
110 static Index<pthread_t> threadsv; //a threads id vector, sMain=true ==>> 'pthread_self() pthread_t beeing present in vector, problem, wont be cleared when thread exits
111 Mutex vm; //a common access synchronizer
112 #endif
113 #endif
114 
115 //to sMain: an Application can start more than one thread, without having *any* one of them called Run() of any Thread instace
116 //when Run() is called *anytime*, it means, the term of *MainThread* has to be running anyway,
117 //otherwise no child threads could run. they are created by main.
118 //now each thread, having any Thread instance can start a first Run()
119 
Run(Function<void ()> _cb,bool noshutdown)120 bool Thread::Run(Function<void ()> _cb, bool noshutdown)
121 {
122 	LLOG("Thread::Run");
123 	if(!noshutdown)
124 		AtomicInc(sThreadCount);
125 	if(!threadr)
126 #ifndef CPU_BLACKFIN
127 		threadr = sMain = true;
128 #else
129 	{
130 		threadr = true;
131 		//the sMain replacement
132 #ifdef PLATFORM_POSIX
133 		pthread_t thid = pthread_self();
134 		vm.Enter();
135 		if(threadsv.Find(thid) < 0){
136 			//thread not yet present, mark present
137 			threadsv.Add(thid);
138 		}
139 		else
140 			RLOG("BUG: Multiple Add in Mt.cpp");
141 		vm.Leave();
142 #endif
143 	}
144 #endif
145 	Detach();
146 	auto p = new sThreadParam;
147 	p->cb = _cb;
148 	p->noshutdown = noshutdown;
149 #ifdef PLATFORM_WIN32
150 #ifdef CPU_32 // in 32-bit, reduce stack size to 1MB to fit more threads into address space
151 	handle = (HANDLE)_beginthreadex(0, 1024*1024, sThreadRoutine, p, STACK_SIZE_PARAM_IS_A_RESERVATION, ((unsigned int *)(&thread_id)));
152 #else
153 	handle = (HANDLE)_beginthreadex(0, 0, sThreadRoutine, p, 0, ((unsigned int *)(&thread_id)));
154 #endif
155 #endif
156 #ifdef PLATFORM_POSIX
157 	if(pthread_create(&handle, 0, sThreadRoutine, p))
158 		handle = 0;
159 #endif
160 	return handle;
161 }
162 
RunNice(Function<void ()> cb,bool noshutdown)163 bool Thread::RunNice(Function<void ()> cb, bool noshutdown)
164 {
165 	if(Run(cb, noshutdown)) {
166 		Nice();
167 		return true;
168 	}
169 	return false;
170 }
171 
RunCritical(Function<void ()> cb,bool noshutdown)172 bool Thread::RunCritical(Function<void ()> cb, bool noshutdown)
173 {
174 	if(Run(cb, noshutdown)) {
175 		Critical();
176 		return true;
177 	}
178 	return false;
179 }
180 
~Thread()181 Thread::~Thread()
182 {
183 	Detach();
184 #ifdef CPU_BLACKFIN
185 #ifdef PLATFORM_POSIX
186 	//the static destruction replacement
187 	pthread_t thid = pthread_self();
188 	vm.Enter();
189 	int id = threadsv.Find(thid);
190 	if(id >= 0)
191 		threadsv.Remove(id);
192 	vm.Leave();
193 #endif
194 #endif
195 }
196 
IsST()197 bool Thread::IsST() //the containing thread (of wich there may be multiple) has not run its Run() yet
198 {
199 	return !threadr;
200 }
201 
IsMain()202 bool Thread::IsMain() //the calling thread is the Main Thread or the only one in App
203 {
204 #ifndef CPU_BLACKFIN
205 	return !threadr || sMain;
206 #else
207 	if(!threadr)
208 		return true;
209 #ifdef PLATFORM_POSIX
210 	//the sMain replacement
211 	pthread_t thid = pthread_self();
212 	vm.Enter();
213 	if(threadsv.Find(thid) >= 0)
214 	{
215 		vm.Leave();
216 		return true;
217 	}
218 	vm.Leave();
219 #endif
220 	return false;
221 #endif
222 }
223 
GetCount()224 int Thread::GetCount()
225 {
226 	return sThreadCount;
227 }
228 
229 static int sShutdown;
230 
BeginShutdownThreads()231 void Thread::BeginShutdownThreads()
232 {
233 	sShutdown++;
234 }
235 
EndShutdownThreads()236 void Thread::EndShutdownThreads()
237 {
238 	sShutdown--;
239 }
240 
ShutdownThreads()241 void Thread::ShutdownThreads()
242 {
243 	BeginShutdownThreads();
244 	while(GetCount())
245 		Sleep(100);
246 	EndShutdownThreads();
247 }
248 
IsShutdownThreads()249 bool Thread::IsShutdownThreads()
250 {
251 	return sShutdown;
252 }
253 
Wait()254 int Thread::Wait()
255 {
256 	if(!IsOpen())
257 		return -1;
258 	int out;
259 #ifdef PLATFORM_WIN32
260 	dword exit;
261 	if(!GetExitCodeThread(handle, &exit))
262 		return -1;
263 	if(exit != STILL_ACTIVE)
264 		out = (int)exit;
265 	else
266 	{
267 		if(WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0)
268 			return Null;
269 		out = GetExitCodeThread(handle, &exit) ? int(exit) : int(Null);
270 	}
271 	Detach();
272 #endif
273 #ifdef PLATFORM_POSIX
274 	void *thread_return;
275 	if(pthread_join(handle, &thread_return))
276 		out = Null;
277 	else
278 		out = (int)(intptr_t)thread_return;
279 	handle = 0;
280 #endif
281 	return out;
282 }
283 
Priority(int percent)284 bool Thread::Priority(int percent)
285 {
286 	ASSERT(IsOpen());
287 #ifdef PLATFORM_WIN32
288 	int prior;
289 	if(percent <= 25)
290 		prior = THREAD_PRIORITY_LOWEST;
291 	else if(percent <= 75)
292 		prior = THREAD_PRIORITY_BELOW_NORMAL;
293 	else if(percent <= 125)
294 		prior = THREAD_PRIORITY_NORMAL;
295 	else if(percent <= 175)
296 		prior = THREAD_PRIORITY_ABOVE_NORMAL;
297 	else
298 		prior = THREAD_PRIORITY_HIGHEST;
299 	return SetThreadPriority(handle, prior);
300 #endif
301 #ifdef PLATFORM_POSIX
302 	int policy;
303 	struct sched_param param;
304 
305 	if(pthread_getschedparam(handle, &policy, &param))
306 		return false;
307 	int percent_min = 0, percent_max = 200;
308 	if(percent <= 25) {
309 		#if defined(SCHED_IDLE)
310 			policy = SCHED_IDLE;
311 			percent_min = 0;
312 			percent_max = 25;
313 		#elif defined(SCHED_BATCH)
314 			policy = SCHED_BATCH;
315 			percent_min = 0;
316 			percent_max = 75;
317 		#else
318 			policy = SCHED_OTHER;
319 			percent_min = 0;
320 			percent_max = 125;
321 		#endif
322 	}
323 	else
324 	if(percent <= 75){
325 		#if defined(SCHED_IDLE)
326 			policy = SCHED_BATCH;
327 			percent_min = 25;
328 			percent_max = 75;
329 		#elif defined(SCHED_BATCH)
330 			policy = SCHED_BATCH;
331 			percent_min = 0;
332 			percent_max = 75;
333 		#else
334 			policy = SCHED_OTHER;
335 			percent_min = 0;
336 			percent_max = 125;
337 		#endif
338 	}
339 	else
340 	if(percent <= 125){
341 		policy = SCHED_OTHER;
342 		#if defined(SCHED_IDLE)
343 			percent_min = 75;
344 			percent_max = 125;
345 		#elif defined(SCHED_BATCH)
346 			percent_min = 25;
347 			percent_max = 125;
348 		#else
349 			percent_min = 0;
350 			percent_max = 125;
351 		#endif
352 	}
353 	else
354 	if(percent <= 175){ // should be the root
355 		policy = SCHED_FIFO;
356 		percent_min = 125;
357 		percent_max = 175;
358 	}
359 	else
360 		policy = SCHED_RR;
361 
362 	param.sched_priority = (sched_get_priority_max(policy) - sched_get_priority_min(policy))*(minmax(percent, percent_min, percent_max)-percent_min)/(percent_max - percent_min);
363 
364 	if (pthread_setschedparam(handle, policy, &param)) {
365 		// No privileges? Try maximum possible! Do not use EPERM as not all os support this one
366 		policy = SCHED_OTHER;
367 		percent_max = 125;
368 		percent_min = minmax(percent_min, 0, percent_max);
369 		param.sched_priority = (sched_get_priority_max(policy) - sched_get_priority_min(policy))
370 		                       * (minmax(percent, percent_min, percent_max) - percent_min)
371 		                       / max(percent_max - percent_min, 1);
372 		if(pthread_setschedparam(handle, policy, &param))
373 			return false;
374 	}
375 	return true;
376 #endif
377 }
378 
Start(Function<void ()> cb,bool noshutdown)379 void Thread::Start(Function<void ()> cb, bool noshutdown)
380 {
381 	Thread t;
382 	t.Run(cb);
383 	t.Detach();
384 }
385 
StartNice(Function<void ()> cb,bool noshutdown)386 void Thread::StartNice(Function<void ()> cb, bool noshutdown)
387 {
388 	Thread t;
389 	t.Run(cb);
390 	t.Nice();
391 	t.Detach();
392 }
393 
StartCritical(Function<void ()> cb,bool noshutdown)394 void Thread::StartCritical(Function<void ()> cb, bool noshutdown)
395 {
396 	Thread t;
397 	t.Run(cb);
398 	t.Critical();
399 	t.Detach();
400 }
401 
Sleep(int msec)402 void Thread::Sleep(int msec)
403 {
404 #ifdef PLATFORM_WIN32
405 	::Sleep(msec);
406 #endif
407 #ifdef PLATFORM_POSIX
408 	::timespec tval;
409 	tval.tv_sec = msec / 1000;
410 	tval.tv_nsec = (msec % 1000) * 1000000;
411 	nanosleep(&tval, NULL);
412 #endif
413 }
414 
415 #ifdef flagPROFILEMT
Dumi()416 MtInspector *MtInspector::Dumi()
417 {
418 	static MtInspector h(NULL);
419 	return &h;
420 }
421 
~MtInspector()422 MtInspector::~MtInspector()
423 {
424 	if(name)
425 		RLOG("Mutex " << name << '(' << number << ") " << blocked << "/" << locked <<
426 		     " = " << Sprintf("%.4f", locked ? (double)blocked / locked : 0) << " blocked/locked times");
427 }
428 #endif
429 
430 #ifdef PLATFORM_WIN32
431 
Release()432 void Semaphore::Release()
433 {
434 	ReleaseSemaphore(handle, 1, NULL);
435 }
436 
Release(int n)437 void Semaphore::Release(int n)
438 {
439 	ReleaseSemaphore(handle, n, NULL);
440 }
441 
Wait(int timeout_ms)442 bool Semaphore::Wait(int timeout_ms)
443 {
444 	return WaitForSingleObject(handle, timeout_ms < 0 ? INFINITE : timeout_ms) == WAIT_OBJECT_0;
445 }
446 
Semaphore()447 Semaphore::Semaphore()
448 {
449 	handle = CreateSemaphore(NULL, 0, INT_MAX, NULL);
450 }
451 
~Semaphore()452 Semaphore::~Semaphore()
453 {
454 	CloseHandle(handle);
455 }
456 
457 Mutex& sMutexLock();
458 
TryEnter()459 bool Mutex::TryEnter()
460 {
461 	return TryEnterCriticalSection(&section);
462 }
463 
464 /* Win32 RWMutex implementation by Chris Thomasson, cristom@comcast.net */
465 
EnterWrite()466 void RWMutex::EnterWrite()
467 {
468 	EnterCriticalSection ( &m_wrlock );
469 	LONG count = InterlockedExchangeAdd(&m_count, -LONG_MAX);
470 	if(count < LONG_MAX)
471 		if(InterlockedExchangeAdd ( &m_rdwake, LONG_MAX - count ) + LONG_MAX - count )
472 			WaitForSingleObject ( m_wrwset, INFINITE );
473 }
474 
LeaveWrite()475 void RWMutex::LeaveWrite()
476 {
477 	LONG count = InterlockedExchangeAdd ( &m_count, LONG_MAX );
478 	if (count < 0)
479 	    ReleaseSemaphore ( m_rdwset, count * -1, 0 );
480 	LeaveCriticalSection ( &m_wrlock );
481 }
482 
EnterRead()483 void RWMutex::EnterRead()
484 {
485 	LONG count = InterlockedDecrement ( &m_count );
486 	if(count < 0)
487 		WaitForSingleObject ( m_rdwset, INFINITE );
488 }
489 
LeaveRead()490 void RWMutex::LeaveRead()
491 {
492 	LONG count = InterlockedIncrement ( &m_count );
493 	if ( count < 1 )
494 		if ( ! InterlockedDecrement ( &m_rdwake ) )
495 			SetEvent ( m_wrwset );
496 }
497 
RWMutex()498 RWMutex::RWMutex()
499 :	m_count ( LONG_MAX ),
500 	m_rdwake ( 0 ),
501 	m_wrwset ( CreateEvent ( 0, FALSE, FALSE, 0 ) ),
502 	m_rdwset ( CreateSemaphore ( 0, 0, LONG_MAX, 0 ) )
503 {
504 	InitializeCriticalSection ( &m_wrlock );
505 }
506 
~RWMutex()507 RWMutex::~RWMutex()
508 {
509 	DeleteCriticalSection ( &m_wrlock );
510 	CloseHandle ( m_rdwset );
511 	CloseHandle ( m_wrwset );
512 }
513 
514 VOID (WINAPI *ConditionVariable::InitializeConditionVariable)(PCONDITION_VARIABLE ConditionVariable);
515 VOID (WINAPI *ConditionVariable::WakeConditionVariable)(PCONDITION_VARIABLE ConditionVariable);
516 VOID (WINAPI *ConditionVariable::WakeAllConditionVariable)(PCONDITION_VARIABLE ConditionVariable);
517 BOOL (WINAPI *ConditionVariable::SleepConditionVariableCS)(PCONDITION_VARIABLE ConditionVariable, PCRITICAL_SECTION CriticalSection, DWORD dwMilliseconds);
518 
Wait(Mutex & m,int timeout_ms)519 bool ConditionVariable::Wait(Mutex& m, int timeout_ms)
520 {
521 	if(InitializeConditionVariable)
522 		return SleepConditionVariableCS(cv, &m.section, timeout_ms < 0 ? INFINITE : timeout_ms);
523 	else { // WindowsXP implementation
524 		static thread_local byte buffer[sizeof(WaitingThread)]; // only one Wait per thread is possible
525 		WaitingThread *w = new(buffer) WaitingThread;
526 		{
527 			Mutex::Lock __(mutex);
528 			w->next = NULL;
529 			if(head)
530 				tail->next = w;
531 			else
532 				head = w;
533 			tail = w;
534 		}
535 		m.Leave();
536 		bool r = w->sem.Wait(timeout_ms);
537 		m.Enter();
538 		w->WaitingThread::~WaitingThread();
539 		return r;
540 	}
541 }
542 
Signal()543 void ConditionVariable::Signal()
544 {
545 	if(InitializeConditionVariable)
546 		WakeConditionVariable(cv);
547 	else { // WindowsXP implementation
548 		Mutex::Lock __(mutex);
549 		if(head) {
550 			head->sem.Release();
551 			head = head->next;
552 		}
553 	}
554 }
555 
Broadcast()556 void ConditionVariable::Broadcast()
557 {
558 	if(InitializeConditionVariable)
559 		WakeAllConditionVariable(cv);
560 	else { // WindowsXP implementation
561 		Mutex::Lock __(mutex);
562 		while(head) {
563 			head->sem.Release();
564 			head = head->next;
565 		}
566 	}
567 }
568 
ConditionVariable()569 ConditionVariable::ConditionVariable()
570 {
571 #ifndef flagTESTXPCV
572 	ONCELOCK {
573 		if(IsWinVista()) {
574 			DllFn(InitializeConditionVariable, "kernel32", "InitializeConditionVariable");
575 			DllFn(WakeConditionVariable, "kernel32", "WakeConditionVariable");
576 			DllFn(WakeAllConditionVariable, "kernel32", "WakeAllConditionVariable");
577 			DllFn(SleepConditionVariableCS, "kernel32", "SleepConditionVariableCS");
578 		}
579 	}
580 #endif
581 	if(InitializeConditionVariable)
582 		InitializeConditionVariable(cv);
583 	else
584 		head = tail = NULL;
585 }
586 
~ConditionVariable()587 ConditionVariable::~ConditionVariable()
588 {
589 	Broadcast();
590 }
591 
592 #endif
593 
594 #ifdef PLATFORM_POSIX
595 
Mutex()596 Mutex::Mutex()
597 {
598 	pthread_mutexattr_t mutex_attr[1];
599 	pthread_mutexattr_init(mutex_attr);
600 	pthread_mutexattr_settype(mutex_attr, PTHREAD_MUTEX_RECURSIVE);
601 	pthread_mutex_init(mutex, mutex_attr);
602 #ifdef flagPROFILEMT
603 	mti = MtInspector::Dumi();
604 #endif
605 }
606 
RWMutex()607 RWMutex::RWMutex()
608 {
609 	pthread_rwlock_init(rwlock, NULL);
610 }
611 
~RWMutex()612 RWMutex::~RWMutex()
613 {
614 	pthread_rwlock_destroy(rwlock);
615 }
616 
Wait(Mutex & m,int timeout_ms)617 bool ConditionVariable::Wait(Mutex& m, int timeout_ms)
618 {
619 	if(timeout_ms < 0) {
620 		pthread_cond_wait(cv, m.mutex);
621 		return true;
622 	}
623 	::timespec until;
624 	clock_gettime(CLOCK_REALTIME, &until);
625 
626 	until.tv_sec += timeout_ms / 1000;
627 	timeout_ms %= 1000;
628 	until.tv_nsec += timeout_ms * 1000000;
629 	until.tv_sec += until.tv_nsec / 1000000000;
630 	until.tv_nsec %= 1000000000;
631 
632 	return pthread_cond_timedwait(cv, m.mutex, &until) == 0;
633 }
634 
635 #ifdef PLATFORM_OSX
636 
Release()637 void Semaphore::Release()
638 {
639 	dispatch_semaphore_signal(sem);
640 }
641 
Wait(int timeout_ms)642 bool Semaphore::Wait(int timeout_ms)
643 {
644 	return dispatch_semaphore_wait(sem, timeout_ms < 0 ? DISPATCH_TIME_FOREVER
645 	                                    : dispatch_time(DISPATCH_TIME_NOW, 1000000 * timeout_ms)) == 0;
646 }
647 
Semaphore()648 Semaphore::Semaphore()
649 {
650 	sem = dispatch_semaphore_create(0);
651 }
652 
~Semaphore()653 Semaphore::~Semaphore()
654 {
655 	dispatch_release(sem);
656 }
657 
658 #else
659 
Release()660 void Semaphore::Release()
661 {
662 	sem_post(&sem);
663 }
664 
Wait(int timeout_ms)665 bool Semaphore::Wait(int timeout_ms)
666 {
667 	if(timeout_ms < 0) {
668 		sem_wait(&sem);
669 		return true;
670 	}
671 	struct timespec until;
672 	clock_gettime(CLOCK_REALTIME, &until);
673 
674 	until.tv_sec += timeout_ms / 1000;
675 	timeout_ms %= 1000;
676 	until.tv_nsec += timeout_ms * 1000000;
677 	until.tv_sec += until.tv_nsec / 1000000000;
678 	until.tv_nsec %= 1000000000;
679 
680 	return sem_timedwait(&sem,&until) != -1;
681 }
682 
Semaphore()683 Semaphore::Semaphore()
684 {
685 	sem_init(&sem, 0, 0);
686 }
687 
~Semaphore()688 Semaphore::~Semaphore()
689 {
690 	sem_destroy(&sem);
691 }
692 
693 #endif
694 
695 #endif
696 
Invalidate()697 void LazyUpdate::Invalidate()
698 {
699 	dirty.store(true, std::memory_order_release);
700 	dirty = true;
701 }
702 
BeginUpdate() const703 bool LazyUpdate::BeginUpdate() const
704 {
705 	bool b = dirty.load(std::memory_order_acquire);
706 	if(b) {
707 		mutex.Enter();
708 		if(dirty) return true;
709 		mutex.Leave();
710 	}
711 	return false;
712 }
713 
EndUpdate() const714 void LazyUpdate::EndUpdate() const
715 {
716 	dirty.store(false, std::memory_order_release);
717 	mutex.Leave();
718 }
719 
LazyUpdate()720 LazyUpdate::LazyUpdate()
721 {
722 	dirty = true;
723 }
724 
Wait()725 void SpinLock::Wait()
726 {
727 	volatile int n = 0;
728 	while(locked) {
729 	#ifdef CPU_X86
730 		_mm_pause();
731 	#endif
732 		if(n++ > 500)
733 			Sleep(0);
734 	}
735 }
736 
StartAuxThread(auxthread_t (auxthread__ * fn)(void * ptr),void * ptr)737 bool StartAuxThread(auxthread_t (auxthread__ *fn)(void *ptr), void *ptr)
738 {
739 #ifdef PLATFORM_WIN32
740 	HANDLE handle;
741 	handle = CreateThread(NULL, 512*1024, fn, ptr, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL);
742 	if(handle) {
743 		CloseHandle(handle);
744 		return true;
745 	}
746 #endif
747 #ifdef PLATFORM_POSIX
748 	pthread_t handle;
749 	if(pthread_create(&handle, 0, fn, ptr) == 0) {
750 		pthread_detach(handle);
751 		return true;
752 	}
753 #endif
754 	return false;
755 }
756 
757 }
758