1 /**
2  * Thread.cpp
3  * This file is part of the YATE Project http://YATE.null.ro
4  *
5  * Yet Another Telephony Engine - a fully featured software PBX and IVR
6  * Copyright (C) 2004-2014 Null Team
7  *
8  * This software is distributed under multiple licenses;
9  * see the COPYING file in the main directory for licensing
10  * information for this specific distribution.
11  *
12  * This use of this software may be subject to additional restrictions.
13  * See the LEGAL file in the main directory for details.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18  */
19 
20 #include "yateclass.h"
21 
22 #include <string.h>
23 
24 #ifdef _WINDOWS
25 #include <process.h>
26 typedef unsigned long HTHREAD;
27 
28 #define EINVAL_ERR  ERROR_INVALID_PARAMETER
29 #define ENOTSUP_ERR ERROR_NOT_SUPPORTED
30 #define EINVALTHR_ERR ERROR_INVALID_HANDLE
31 
32 #else
33 
34 #if defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
35 #ifndef _GNU_SOURCE
36 #define _GNU_SOURCE
37 #endif
38 #ifdef SCHED_AFFINITY
39 #include <sys/syscall.h>
40 #endif
41 #endif
42 
43 #define EINVAL_ERR    EINVAL
44 #define ENOTSUP_ERR   ENOTSUP
45 #define EINVALTHR_ERR ESRCH
46 
47 #include <pthread.h>
48 typedef pthread_t HTHREAD;
49 #ifndef PTHREAD_EXPLICIT_SCHED
50 #define PTHREAD_EXPLICIT_SCHED 0
pthread_attr_setinheritsched(pthread_attr_t *,int)51 static int pthread_attr_setinheritsched(pthread_attr_t *,int) { return 0; }
52 #endif
53 #endif
54 
55 #ifdef HAVE_PRCTL
56 #include <sys/prctl.h>
57 #endif
58 
59 #ifndef PTHREAD_STACK_MIN
60 #define PTHREAD_STACK_MIN 16384
61 #else
62 #ifndef PAGE_SIZE
63 #define PAGE_SIZE 4096
64 #endif
65 #endif
66 
67 #ifndef THREAD_IDLE_MSEC
68 #define THREAD_IDLE_MSEC 5
69 #endif
70 #define THREAD_IDLE_MIN  1
71 #define THREAD_IDLE_MAX 20
72 
73 namespace TelEngine {
74 
75 class ThreadPrivate : public GenObject {
76     friend class Thread;
77 public:
78     ThreadPrivate(Thread* t,const char* name);
79     ~ThreadPrivate();
80     void run();
81     bool cancel(bool hard);
82     void cleanup();
83     void destroy();
84     void pubdestroy();
85     static ThreadPrivate* create(Thread* t,const char* name,Thread::Priority prio);
86     static void killall();
87     static ThreadPrivate* current();
88     static int setAffinity(ThreadPrivate* t, const DataBlock& bits);
89     static int getAffinity(ThreadPrivate* t, DataBlock& outMask);
90     Thread* m_thread;
91     HTHREAD thread;
92 #if !defined(_WINDOWS) && defined(SCHED_AFFINITY)
93     pid_t m_tid;
94 #endif
95     NamedCounter* m_counter;
96     bool m_running;
97     bool m_started;
98     bool m_updest;
99     bool m_cancel;
100     const char* m_name;
101 #ifdef _WINDOWS
102     static void startFunc(void* arg);
103     DWORD_PTR m_affinityMask;
104 #else
105     static void* startFunc(void* arg);
106 #endif
107     static void cleanupFunc(void* arg);
108     static void destroyFunc(void* arg);
109 };
110 
111 };
112 
113 using namespace TelEngine;
114 
115 #define SOFT_WAITS 3
116 #define HARD_KILLS 5
117 #define KILL_WAIT 32
118 
119 #ifdef _WINDOWS
120 #define TLS_MAGIC 0xfeeb1e
121 static DWORD tls_index = TLS_MAGIC;
getTls()122 static DWORD getTls()
123 {
124     // this seems unsafe but is not - allocation happens once before
125     //  any Thread is actually created
126     if (tls_index == TLS_MAGIC)
127 	tls_index = ::TlsAlloc();
128     if (tls_index == (DWORD)-1)
129 	// if it happened is REALLY BAD so better die quick and clean!
130 	abort();
131     return tls_index;
132 }
133 
haveAffinity()134 static inline bool haveAffinity()
135 {
136     // TODO handle detection of support on Windows
137     return true;
138 }
139 #else /* _WINDOWS */
140 static pthread_key_t current_key = PTHREAD_KEYS_MAX; // this signifies invalid key
141 
142 class ThreadPrivateKeyAlloc
143 {
144 public:
ThreadPrivateKeyAlloc()145     ThreadPrivateKeyAlloc()
146     {
147 	if (::pthread_key_create(&current_key,ThreadPrivate::destroyFunc)) {
148 	    abortOnBug(true);
149 	    Debug(DebugFail,"Failed to create current thread key!");
150 	}
151     }
152 };
153 
haveAffinity()154 static inline bool haveAffinity()
155 {
156 #if defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
157     return true;
158 #endif
159     return false;
160 }
161 
162 static ThreadPrivateKeyAlloc keyAllocator;
163 #endif /* _WINDOWS */
164 
165 static TokenDict s_prio[] = {
166     { "lowest", Thread::Lowest },
167     { "low", Thread::Low },
168     { "normal", Thread::Normal },
169     { "high", Thread::High },
170     { "highest", Thread::Highest },
171     { 0, 0 }
172 };
173 
174 static unsigned long s_idleMs = 1000 * THREAD_IDLE_MSEC;
175 static ObjList s_threads;
176 static Mutex s_tmutex(true,"Thread");
177 static NamedCounter* s_counter = 0;
178 
create(Thread * t,const char * name,Thread::Priority prio)179 ThreadPrivate* ThreadPrivate::create(Thread* t,const char* name,Thread::Priority prio)
180 {
181     ThreadPrivate *p = new ThreadPrivate(t,name);
182     int e = 0;
183 #ifndef _WINDOWS
184     // Set a decent (256K) stack size that won't eat all virtual memory
185     pthread_attr_t attr;
186     ::pthread_attr_init(&attr);
187     ::pthread_attr_setstacksize(&attr, 16*PTHREAD_STACK_MIN);
188     if (prio > Thread::Normal) {
189 	struct sched_param param;
190 	param.sched_priority = 0;
191 	int policy = SCHED_OTHER;
192 	switch (prio) {
193 	    case Thread::High:
194 		policy = SCHED_RR;
195 		param.sched_priority = 1;
196 		break;
197 	    case Thread::Highest:
198 		policy = SCHED_FIFO;
199 		param.sched_priority = 99;
200 		break;
201 	    default:
202 		break;
203 	}
204 	int err = ::pthread_attr_setinheritsched(&attr,PTHREAD_EXPLICIT_SCHED);
205 	if (!err)
206 	    err = ::pthread_attr_setschedpolicy(&attr,policy);
207 	if (!err)
208 	    err = ::pthread_attr_setschedparam(&attr,&param);
209 	if (err)
210 	    Debug(
211 #ifdef DEBUG
212 		DebugWarn,
213 #else
214 		DebugNote,
215 #endif
216 		"Could not set thread scheduling parameters: %s (%d)",
217 		strerror(err),err);
218 #ifdef XDEBUG
219 	else
220 	    Debug(DebugInfo,"Successfully set high thread priority %d",prio);
221 #endif
222     }
223 #endif /* _WINDOWS */
224 
225     for (int i=0; i<5; i++) {
226 #ifdef _WINDOWS
227 	HTHREAD t = ::_beginthread(startFunc,16*PTHREAD_STACK_MIN,p);
228 	e = (t == (HTHREAD)-1) ? errno : 0;
229 	if (!e) {
230 	    p->thread = t;
231 	    int pr = THREAD_PRIORITY_NORMAL;
232 	    switch (prio) {
233 		case Thread::Lowest:
234 		    pr = THREAD_PRIORITY_LOWEST;
235 		    break;
236 		case Thread::Low:
237 		    pr = THREAD_PRIORITY_BELOW_NORMAL;
238 		    break;
239 		case Thread::High:
240 		    pr = THREAD_PRIORITY_ABOVE_NORMAL;
241 		    break;
242 		case Thread::Highest:
243 		    pr = THREAD_PRIORITY_HIGHEST;
244 		    break;
245 		default:
246 		    break;
247 	    }
248 	    if (pr != THREAD_PRIORITY_NORMAL)
249 		::SetThreadPriority(reinterpret_cast<HANDLE>(t),pr);
250 	}
251 #else /* _WINDOWS */
252 	e = ::pthread_create(&p->thread,&attr,startFunc,p);
253 #ifdef PTHREAD_INHERIT_SCHED
254 	if ((0 == i) && (EPERM == e) && (prio > Thread::Normal)) {
255 	    Debug(DebugWarn,"Failed to create thread with priority %d, trying with inherited",prio);
256 	    ::pthread_attr_setinheritsched(&attr,PTHREAD_INHERIT_SCHED);
257 	    e = EAGAIN;
258 	}
259 #endif
260 #endif /* _WINDOWS */
261 	if (e != EAGAIN)
262 	    break;
263 	Thread::usleep(20);
264     }
265 #ifndef _WINDOWS
266     ::pthread_attr_destroy(&attr);
267 #endif
268     if (e) {
269 	Alarm("engine","system",DebugCrit,"Error %d while creating pthread in '%s' [%p]",e,name,p);
270 	p->m_thread = 0;
271 	p->destroy();
272 	return 0;
273     }
274     p->m_running = true;
275     return p;
276 }
277 
ThreadPrivate(Thread * t,const char * name)278 ThreadPrivate::ThreadPrivate(Thread* t,const char* name)
279     : m_thread(t), m_counter(0),
280       m_running(false), m_started(false), m_updest(true), m_cancel(false), m_name(name)
281 {
282 #ifdef DEBUG
283     Debugger debug("ThreadPrivate::ThreadPrivate","(%p,\"%s\") [%p]",t,name,this);
284 #endif
285 #if defined(_WINDOWS)
286     m_affinityMask = 0;
287     DWORD_PTR sysAffin = 0;
288     if (!::GetProcessAffinityMask(::GetCurrentProcess(),&m_affinityMask,&sysAffin)) {
289 	int err = ::GetLastError();
290 	Debug(DebugNote,"ThreadPrivate::ThreadPrivate(%p,\"%s\") - "
291 		"Failed to get process affinity, err=%d [%p]",t,name,err,this);
292     }
293 #ifdef DEBUG
294     else
295 	Debug(DebugAll,"ThreadPrivate::ThreadPrivate(%p,\"%s\") - Process affinity = %lx,"
296 		" system affinity = %lx [%p]",t,name,m_affinityMask,sysAffin,this);
297 #endif
298 #elif defined(SCHED_AFFINITY)
299     m_tid = -1;
300 #endif
301     // Inherit object counter of creating thread
302     m_counter = Thread::getCurrentObjCounter(true);
303     Lock lock(s_tmutex);
304     s_threads.append(this);
305 }
306 
~ThreadPrivate()307 ThreadPrivate::~ThreadPrivate()
308 {
309 #ifdef DEBUG
310     Debugger debug("ThreadPrivate::~ThreadPrivate()"," %p '%s' [%p]",m_thread,m_name,this);
311 #endif
312     m_running = false;
313     Lock lock(s_tmutex);
314     s_threads.remove(this,false);
315     if (m_thread && m_updest) {
316 	Thread *t = m_thread;
317 	m_thread = 0;
318 	// let other threads access the list while we delete our upper layer
319 	lock.drop();
320 	delete t;
321     }
322 }
323 
destroy()324 void ThreadPrivate::destroy()
325 {
326     DDebug(DebugAll,"ThreadPrivate::destroy() '%s' [%p]",m_name,this);
327     cleanup();
328     delete this;
329 }
330 
pubdestroy()331 void ThreadPrivate::pubdestroy()
332 {
333 #ifdef DEBUG
334     Debugger debug(DebugAll,"ThreadPrivate::pubdestroy()"," %p '%s' [%p]",m_thread,m_name,this);
335 #endif
336     m_updest = false;
337     cleanup();
338     m_thread = 0;
339 
340     if (current() == this) {
341 	cancel(true);
342 	// should never reach here...
343 	Debug(DebugFail,"ThreadPrivate::pubdestroy() past cancel??? [%p]",this);
344     }
345     else {
346 	cancel(false);
347 	// delay a little so thread has a chance to clean up
348 	for (int i=0; i<20; i++) {
349 	    s_tmutex.lock();
350 	    bool done = !s_threads.find(this);
351 	    s_tmutex.unlock();
352 	    if (done)
353 		return;
354 	    Thread::idle(false);
355 	}
356 	if (m_cancel && !cancel(true))
357 	    Debug(DebugWarn,"ThreadPrivate::pubdestroy() %p '%s' failed cancel [%p]",m_thread,m_name,this);
358     }
359 }
360 
run()361 void ThreadPrivate::run()
362 {
363     DDebug(DebugAll,"ThreadPrivate::run() '%s' [%p]",m_name,this);
364 #ifdef _WINDOWS
365     ::TlsSetValue(getTls(),this);
366 #else
367     ::pthread_setspecific(current_key,this);
368     pthread_cleanup_push(cleanupFunc,this);
369 #ifdef PTHREAD_CANCEL_ASYNCHRONOUS
370     ::pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,0);
371 #endif
372     ::pthread_detach(::pthread_self());
373 #endif /* _WINDOWS */
374 
375 #ifdef HAVE_PRCTL
376 #ifdef PR_SET_NAME
377     if (m_name)
378 	prctl(PR_SET_NAME,(unsigned long)m_name,0,0,0);
379 #endif
380 #endif
381 #ifdef _WINDOWS
382 #ifndef NDEBUG
383     if (m_name) {
384 	struct {
385 	    DWORD dwType;
386 	    LPCSTR szName;
387 	    DWORD dwThreadID;
388 	    DWORD dwFlags;
389 	} threadInfo;
390 	threadInfo.dwType = 0x1000;
391 	threadInfo.szName = m_name;
392 	threadInfo.dwThreadID = (DWORD)-1;
393 	threadInfo.dwFlags = 0;
394 	__try { RaiseException(0x406D1388, 0, sizeof(threadInfo)/sizeof(DWORD), (DWORD*)&threadInfo); }
395 	__except (EXCEPTION_CONTINUE_EXECUTION) { }
396     }
397 #endif
398 #endif
399 
400     // FIXME: possible race if public object is destroyed during thread startup
401     while (!m_started)
402 	Thread::usleep(10,true);
403     if (m_thread)
404 	m_thread->run();
405 
406 #ifndef _WINDOWS
407     pthread_cleanup_pop(1);
408 #endif
409 }
410 
cancel(bool hard)411 bool ThreadPrivate::cancel(bool hard)
412 {
413     DDebug(DebugAll,"ThreadPrivate::cancel(%s) '%s' [%p]",String::boolText(hard),m_name,this);
414     bool ret = true;
415     if (m_running) {
416 	ret = false;
417 	if (hard) {
418 	    bool critical = m_thread && m_thread->m_locking;
419 	    if (critical) {
420 		// give the thread a chance to cancel without locking a mutex
421 		Debug(DebugMild,"Hard canceling '%s' while is taking a lock [%p]",m_name,this);
422 		m_cancel = true;
423 		for (int i = 0; i < 50; i++) {
424 		    Thread::msleep(1);
425 		    if (!m_running)
426 			return true;
427 		}
428 	    }
429 	    m_running = false;
430 #ifdef _WINDOWS
431 	    Debug(DebugCrit,"ThreadPrivate '%s' terminating win32 thread %lu [%p]",
432 		m_name,thread,this);
433 	    ret = ::TerminateThread(reinterpret_cast<HANDLE>(thread),0) != 0;
434 #else
435 #ifdef PTHREAD_CANCEL_ASYNCHRONOUS
436 	    Debug(critical ? DebugInfo : DebugWarn,"ThreadPrivate '%s' terminating pthread %p [%p]",
437 		m_name,&thread,this);
438 	    ret = !::pthread_cancel(thread);
439 #else
440 	    Debug(DebugCrit,"ThreadPrivate '%s' cannot terminate %p on this platform [%p]",
441 		m_name,&thread,this);
442 #endif
443 #endif /* _WINDOWS */
444 	    if (ret) {
445 		// hard cancel succeeded - object is unsafe to touch any more
446 		Thread::msleep(1);
447 		return true;
448 	    }
449 	    // hard cancel failed - set back the running flag
450 	    m_running = true;
451 	}
452 	m_cancel = true;
453     }
454     return ret;
455 }
456 
cleanup()457 void ThreadPrivate::cleanup()
458 {
459     DDebug(DebugAll,"ThreadPrivate::cleanup() %p '%s' [%p]",m_thread,m_name,this);
460     if (m_thread && m_thread->m_private) {
461 	if (m_thread->m_private == this) {
462 	    m_thread->m_private = 0;
463 	    m_thread->cleanup();
464 	    if (m_thread->locked())
465 		Alarm("engine","bug",DebugFail,"Thread '%s' destroyed with mutex locks (%d held) [%p]",m_name,m_thread->locks(),m_thread);
466 	}
467 	else {
468 	    Alarm("engine","bug",DebugFail,"ThreadPrivate::cleanup() %p '%s' mismatching %p [%p]",m_thread,m_name,m_thread->m_private,this);
469 	    m_thread = 0;
470 	}
471     }
472 }
473 
current()474 ThreadPrivate* ThreadPrivate::current()
475 {
476 #ifdef _WINDOWS
477     return reinterpret_cast<ThreadPrivate *>(::TlsGetValue(getTls()));
478 #else
479     return reinterpret_cast<ThreadPrivate *>(::pthread_getspecific(current_key));
480 #endif
481 }
482 
setAffinity(ThreadPrivate * t,const DataBlock & cpuMask)483 int ThreadPrivate::setAffinity(ThreadPrivate* t, const DataBlock& cpuMask)
484 {
485 #ifdef DEBUG
486     String str;
487     str.hexify(cpuMask.data(),cpuMask.length());
488     Debug(DebugAll,"ThreadPrivate::setAffinity() %s(%p) to affinity mask:'%s'",
489 	t ? t->m_name : "self",t,str.c_str());
490 #endif
491     if (!haveAffinity())
492 	return ENOTSUP_ERR;
493     if (!cpuMask.length())
494 	return EINVAL_ERR;
495 
496 #ifdef _WINDOWS
497 
498     DWORD_PTR mask = 0;
499     if (sizeof(mask) < cpuMask.length())
500 	Debug(DebugNote,"CPU affinity mask is '%u' long, permitted length is '%u', ignoring exceeding bits",
501 	      cpuMask.length() * 8, sizeof(DWORD_PTR) * 8);
502     for (unsigned int i = 0; i < sizeof(mask) && i < cpuMask.length(); i++)
503 	mask |= (cpuMask.at(i) << (i << 3));
504     if (!(mask = ::SetThreadAffinityMask(t ?reinterpret_cast<HANDLE>(t->thread) : ::GetCurrentThread(),mask)))
505 	return ::GetLastError();
506     if (t)
507 	t->m_affinityMask = mask;
508     return 0;
509 
510 #elif defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
511 
512     cpu_set_t cpuSet;
513     CPU_ZERO(&cpuSet);
514     uint8_t* bytes = (uint8_t*)cpuMask.data();
515     unsigned int len = cpuMask.length();
516     for (unsigned int i = 0; i < len; i++) {
517 	uint8_t b = bytes[i];
518 	for (uint8_t j = 0; j < 8; j++)
519 	    if ((b & (1 << j)))
520 		CPU_SET(((i << 3) + j),&cpuSet);
521     }
522 
523 #ifdef THREAD_AFFINITY
524     return pthread_setaffinity_np(t ? t->thread : pthread_self(),sizeof(cpuSet),&cpuSet);
525 #else
526     pid_t tid  = -1;
527     if (!t)
528 	tid = (pid_t)syscall(SYS_gettid);
529     else {
530 	// there is a race between getting the TID and this call
531 	// try to hold off for a while, maybe it will get set
532 	// otherwise signal that the user should try again
533 	unsigned int i = 0;
534 	while (t->m_tid < 0 && i++ < 5)
535 	    ::usleep(0);
536 	tid = t->m_tid;
537     }
538     DDebug(DebugAll,"ThreadPrivate::setAffinity() %s(%p) for TID:'%d'",t ? t->m_name : "self",t,tid);
539     if (-1 == tid)
540 	return EAGAIN;
541     return sched_setaffinity(tid,sizeof(cpuSet),&cpuSet) ? errno : 0;
542 #endif
543 
544 #endif /* _WINDOWS */
545     return ENOTSUP_ERR;
546 }
547 
getAffinity(ThreadPrivate * t,DataBlock & outMask)548 int ThreadPrivate::getAffinity(ThreadPrivate* t, DataBlock& outMask)
549 {
550     if (!haveAffinity())
551 	return ENOTSUP_ERR;
552 
553 #ifdef _WINDOWS
554 
555     HANDLE thr = t ? reinterpret_cast<HANDLE>(t->thread) : ::GetCurrentThread();
556     DWORD_PTR setMask = 0;
557     if (t) {
558 	setMask = t->m_affinityMask;
559     }
560     else {
561 	DWORD_PTR sysAffin = 0;
562 	if (!::GetProcessAffinityMask(::GetCurrentProcess(),&setMask,&sysAffin)) {
563 	    int err = ::GetLastError();
564 	    Debug(DebugNote,"ThreadPrivate::getAffinity(t=%p) - "
565 		    "Failed to get process affinity, err=%d [%p]",t,err);
566 	}
567     }
568     DWORD_PTR mask = ::SetThreadAffinityMask(thr,setMask);
569     if (!mask)
570 	return ::GetLastError();
571     if (mask != setMask) {// maybe mask was changed by something external, restore it
572 	if (!(::SetThreadAffinityMask(thr,mask))) {
573 	    Debug(DebugNote,"Failed to restore thread CPU affinity to '%lx', "
574 		    "now set to '%lx' [%p] for thread '%s' [%p]",mask,setMask,t ? t->m_name : "self",t);
575 	    return ::GetLastError();
576 	}
577     }
578     outMask.resize(sizeof(mask));
579     uint8_t* bytes = (uint8_t*)outMask.data();
580     for (unsigned int i = 0; i < sizeof(mask) ; i++) {
581 	*bytes++ = (uint8_t)(mask >> (i << 3)) & 0xff;
582     }
583     return 0;
584 
585 #elif defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
586 
587     cpu_set_t cpuSet;
588     CPU_ZERO(&cpuSet);
589     int ret = 0;
590 
591 #ifdef THREAD_AFFINITY
592     ret = pthread_getaffinity_np(t ? t->thread : pthread_self(),sizeof(cpuSet),&cpuSet);
593 #else
594     pid_t tid  = -1;
595     if (!t)
596 	tid = (pid_t)syscall(SYS_gettid);
597     else {
598 	// there is a race between getting the TID and this call
599 	// try to hold off for a while, maybe it will get set
600 	// otherwise signal that the user should try again
601 	unsigned int i = 0;
602 	while (t->m_tid < 0 && i++ < 5)
603 	    ::usleep(0);
604 	tid = t->m_tid;
605     }
606     DDebug(DebugAll,"ThreadPrivate::getAffinity() %s(%p) for TID:'%d'",t ? t->name : "self",t,tid);
607     if (-1 == tid)
608 	return EAGAIN;
609     ret = sched_getaffinity(tid,sizeof(cpuSet),&cpuSet) ? errno : 0;
610 #endif
611 
612     if (!ret) {
613 	outMask.resize(sizeof(cpuSet));
614 	uint8_t* bytes = (uint8_t*) outMask.data();
615 	unsigned int lastSet = 0;
616 	for (unsigned int i = 0; i < (sizeof(cpuSet) << 3); i++) {
617 	    if (!CPU_ISSET(i,&cpuSet))
618 		continue;
619 	    bytes[i >> 3] |= (1 << (i & 7));
620 	    lastSet = i >> 3;
621 	}
622 	// cpu_set has CPU_SETSIZE(1024 bits)
623 	// remove octets from the end that have no bit set
624 	outMask.cut(outMask.length() - lastSet - 1);
625     }
626 
627     return ret;
628 #endif /* _WINDOWS */
629     return ENOTSUP_ERR;
630 }
631 
killall()632 void ThreadPrivate::killall()
633 {
634     Debugger debug("ThreadPrivate::killall()");
635     ThreadPrivate *t;
636     bool sledgehammer = false;
637     s_tmutex.lock();
638     ThreadPrivate* crt = ThreadPrivate::current();
639     int c = s_threads.count();
640     if (crt)
641 	Debug(DebugNote,"Thread '%s' is soft cancelling other %d running threads",crt->m_name,c-1);
642     else
643 	Debug(DebugNote,"Soft cancelling %d running threads",c);
644     ObjList* l = &s_threads;
645     while (l && (t = static_cast<ThreadPrivate *>(l->get())) != 0)
646     {
647 	if (t != crt) {
648 	    Debug(DebugInfo,"Stopping ThreadPrivate '%s' [%p]",t->m_name,t);
649 	    t->cancel(false);
650 	}
651 	l = l->next();
652     }
653     for (int w = 0; w < SOFT_WAITS; w++) {
654 	s_tmutex.unlock();
655 	Thread::idle();
656 	s_tmutex.lock();
657 	c = s_threads.count();
658 	// ignore the current thread if we have one
659 	if (crt && c)
660 	    c--;
661 	if (!c) {
662 	    s_tmutex.unlock();
663 	    return;
664 	}
665     }
666     Debug(DebugMild,"Hard cancelling %d remaining threads",c);
667     l = &s_threads;
668     c = 1;
669     while (l && (t = static_cast<ThreadPrivate *>(l->get())) != 0)
670     {
671 	if (t == crt) {
672 	    l = l->next();
673 	    continue;
674 	}
675 	Debug(DebugInfo,"Trying to kill ThreadPrivate '%s' [%p], attempt %d",t->m_name,t,c);
676 	bool ok = t->cancel(true);
677 	if (ok) {
678 	    int d = 0;
679 	    // delay a little (exponentially) so threads have a chance to clean up
680 	    for (int i=1; i<=KILL_WAIT; i<<=1) {
681 		s_tmutex.unlock();
682 		Thread::msleep(i-d);
683 		d = i;
684 		s_tmutex.lock();
685 		if (t != l->get())
686 		    break;
687 	    }
688 	}
689 	if (t != l->get())
690 	    c = 1;
691 	else {
692 	    if (ok) {
693 #ifdef _WINDOWS
694 		Debug(DebugWarn,"Could not kill %p but seems OK to delete it (library bug?)",t);
695 		s_tmutex.unlock();
696 		t->destroy();
697 		s_tmutex.lock();
698 		if (t != l->get())
699 		    c = 1;
700 #else
701 		Debug(DebugCrit,"Could not kill cancelled %p so we'll abandon it (library bug?)",t);
702 		l->remove(t,false);
703 		c = 1;
704 #endif
705 		continue;
706 	    }
707 	    Thread::msleep(1);
708 	    if (++c >= HARD_KILLS) {
709 		Debug(DebugWarn,"Could not kill %p, will use sledgehammer later.",t);
710 		sledgehammer = true;
711 		t->m_thread = 0;
712 		l = l->next();
713 		c = 1;
714 	    }
715 	}
716     }
717     s_tmutex.unlock();
718     // last solution - a REALLY BIG tool!
719     // usually too big since many libraries have threads of their own...
720     if (sledgehammer) {
721 #ifdef THREAD_KILL
722 	Debug(DebugCrit,"Brutally killing remaining threads!");
723 	::pthread_kill_other_threads_np();
724 #else
725 	Debug(DebugCrit,"Cannot kill remaining threads on this platform!");
726 #endif
727     }
728 }
729 
destroyFunc(void * arg)730 void ThreadPrivate::destroyFunc(void* arg)
731 {
732 #ifdef DEBUG
733     Debugger debug("ThreadPrivate::destroyFunc","(%p)",arg);
734 #endif
735     ThreadPrivate *t = reinterpret_cast<ThreadPrivate *>(arg);
736     if (t)
737 	t->destroy();
738 }
739 
cleanupFunc(void * arg)740 void ThreadPrivate::cleanupFunc(void* arg)
741 {
742     DDebug(DebugAll,"ThreadPrivate::cleanupFunc(%p)",arg);
743     ThreadPrivate *t = reinterpret_cast<ThreadPrivate *>(arg);
744     if (t)
745 	t->cleanup();
746 }
747 
748 #ifdef _WINDOWS
startFunc(void * arg)749 void ThreadPrivate::startFunc(void* arg)
750 #else
751 void* ThreadPrivate::startFunc(void* arg)
752 #endif
753 {
754     DDebug(DebugAll,"ThreadPrivate::startFunc(%p)",arg);
755     ThreadPrivate *t = reinterpret_cast<ThreadPrivate *>(arg);
756 #if !defined(_WINDOWS) && defined(SCHED_AFFINITY) && defined(SYS_gettid)
757     // get TID as early as possible if needed
758     t->m_tid = (pid_t)syscall(SYS_gettid);
759     DDebug(DebugAll,"Thread '%s' (%p) has TID:'%d'",t->m_name,t,t->m_tid);
760 #endif
761     t->run();
762 #ifdef _WINDOWS
763     t->m_running = false;
764     if (t->m_updest)
765 	t->destroy();
766 #else
767     return 0;
768 #endif
769 }
770 
~Runnable()771 Runnable::~Runnable()
772 {
773 }
774 
Thread(const char * name,Priority prio)775 Thread::Thread(const char* name, Priority prio)
776     : m_private(0), m_locks(0), m_locking(false)
777 {
778 #ifdef DEBUG
779     Debugger debug("Thread::Thread","(\"%s\",%d) [%p]",name,prio,this);
780 #endif
781     m_private = ThreadPrivate::create(this,name,prio);
782 }
783 
Thread(const char * name,const char * prio)784 Thread::Thread(const char *name, const char* prio)
785     : m_private(0), m_locks(0), m_locking(false)
786 {
787 #ifdef DEBUG
788     Debugger debug("Thread::Thread","(\"%s\",\"%s\") [%p]",name,prio,this);
789 #endif
790     m_private = ThreadPrivate::create(this,name,priority(prio));
791 }
792 
~Thread()793 Thread::~Thread()
794 {
795     DDebug(DebugAll,"Thread::~Thread() [%p]",this);
796     if (m_private)
797 	m_private->pubdestroy();
798 }
799 
error() const800 bool Thread::error() const
801 {
802     return !m_private;
803 }
804 
running() const805 bool Thread::running() const
806 {
807     Lock lock(s_tmutex);
808     return m_private ? m_private->m_started : false;
809 }
810 
name() const811 const char* Thread::name() const
812 {
813     return m_private ? m_private->m_name : 0;
814 }
815 
startup()816 bool Thread::startup()
817 {
818     if (!m_private)
819 	return false;
820     m_private->m_started = true;
821     return true;
822 }
823 
current()824 Thread *Thread::current()
825 {
826     ThreadPrivate* t = ThreadPrivate::current();
827     return t ? t->m_thread : 0;
828 }
829 
currentName()830 const char* Thread::currentName()
831 {
832     ThreadPrivate* t = ThreadPrivate::current();
833     return t ? t->m_name : 0;
834 }
835 
parseCPUMask(const String & cpus,DataBlock & mask)836 bool Thread::parseCPUMask(const String& cpus, DataBlock& mask)
837 {
838     if (!cpus)
839 	return false;
840     ObjList* cpuList = cpus.split(',',false);
841     bool err = false;
842     for (ObjList* o = cpuList->skipNull(); o; o = o->skipNext()) {
843 	String* str = static_cast<String*>(o->get());
844 	int pos = str->find('-');
845 	int16_t cStart = 0;
846 	int16_t cEnd = 0;
847 	switch (pos) {
848 	    case -1:
849 		cStart = cEnd = str->toInteger(-1);
850 		if (cStart < 0)
851 		    err = true;
852 		break;
853 	    case 0:
854 		err = true;
855 		break;
856 	    default:
857 		cStart = str->substr(0,pos).toInteger(-1);
858 		cEnd = str->substr(pos + 1).toInteger(-1);
859 		if (cStart < 0 || cEnd < 0 || cEnd < cStart)
860 		    err = true;
861 		break;
862 	}
863 	if (err)
864 	    break;
865 	unsigned int needLen = (cEnd >> 3) + 1;
866 	// adjust bitmask length to be able to set the highest bit
867 	while (mask.length() < needLen) {
868 	    uint8_t b = 0;
869 	    mask.append(&b,1);
870 	}
871 	uint8_t* bytes = (uint8_t*)mask.data();
872 	for (int16_t i = cStart; i <= cEnd; i++) {
873 	    uint8_t* byte = bytes + (i >> 3);
874 	    *byte |= 1 << (i & 7);
875 	}
876     }
877     TelEngine::destruct(cpuList);
878 #ifdef DEBUG
879     String str;
880     str.hexify(mask.data(),mask.length());
881     Debug(DebugAll,"Thread::parseCPUMask() Parsed '%s' into bitmask: '%s'",cpus.c_str(),str.c_str());
882 #endif
883     return !err && mask.length();
884 }
885 
printCPUMask(const DataBlock & mask,String & str,bool hexa)886 void Thread::printCPUMask(const DataBlock& mask, String& str, bool hexa)
887 {
888     if (hexa) {
889 	String c;
890 	for (int i = mask.length() - 1; i >= 0; i--) {
891 	    c.hexify(mask.data(i),1);
892 	    str << " " << c;
893 	}
894 	str.trimBlanks();
895     }
896     else {
897 	for (unsigned int i = 0; i < mask.length(); i++) {
898 	uint8_t b = mask[i];
899 	for (uint8_t j = 0; j < 8; j++)
900 	    if ((b & (1 << j))) {
901 		if (str)
902 		    str << ",";
903 		str << (uint32_t)((i << 3) + j);
904 	    }
905 	}
906     }
907 }
908 
setAffinity(const String & cpus)909 int Thread::setAffinity(const String& cpus)
910 {
911     DataBlock bits;
912     if (!parseCPUMask(cpus,bits))
913 	return EINVAL_ERR;
914     Lock lock(s_tmutex);
915     return ThreadPrivate::setAffinity(m_private,bits);
916 }
917 
setAffinity(const DataBlock & cpus)918 int Thread::setAffinity(const DataBlock& cpus)
919 {
920     Lock lock(s_tmutex);
921     return ThreadPrivate::setAffinity(m_private,cpus);
922 }
923 
getAffinity(DataBlock & outCpuMask)924 int Thread::getAffinity(DataBlock& outCpuMask)
925 {
926     Lock lock(s_tmutex);
927     return ThreadPrivate::getAffinity(m_private,outCpuMask);
928 }
929 
setCurrentAffinity(const String & cpus)930 int Thread::setCurrentAffinity(const String& cpus)
931 {
932     DataBlock bits;
933     if (!parseCPUMask(cpus,bits))
934 	return EINVAL_ERR;
935     return ThreadPrivate::setAffinity(ThreadPrivate::current(),bits);
936 }
937 
setCurrentAffinity(const DataBlock & cpuMask)938 int Thread::setCurrentAffinity(const DataBlock& cpuMask)
939 {
940     return ThreadPrivate::setAffinity(ThreadPrivate::current(),cpuMask);
941 }
942 
getCurrentAffinity(DataBlock & outCpuMask)943 int Thread::getCurrentAffinity(DataBlock& outCpuMask)
944 {
945     return ThreadPrivate::getAffinity(ThreadPrivate::current(),outCpuMask);
946 }
947 
getCurrentAffinity(String & outCpus,bool hex)948 int Thread::getCurrentAffinity(String& outCpus, bool hex)
949 {
950     DataBlock d;
951     if (int err = ThreadPrivate::getAffinity(ThreadPrivate::current(),d))
952 	return err;
953     Thread::printCPUMask(d,outCpus,hex);
954     return 0;
955 }
956 
getObjCounter() const957 NamedCounter* Thread::getObjCounter() const
958 {
959     return m_private ? m_private->m_counter : 0;
960 }
961 
setObjCounter(NamedCounter * counter)962 NamedCounter* Thread::setObjCounter(NamedCounter* counter)
963 {
964     if (!m_private)
965 	return 0;
966     if (counter == m_private->m_counter)
967 	return counter;
968     s_tmutex.lock();
969     NamedCounter* oldCounter = m_private->m_counter;
970     m_private->m_counter = counter;
971     s_tmutex.unlock();
972     return oldCounter;
973 }
974 
getCurrentObjCounter(bool always)975 NamedCounter* Thread::getCurrentObjCounter(bool always)
976 {
977     if (!(always || GenObject::getObjCounting()))
978 	return 0;
979     ThreadPrivate* t = ThreadPrivate::current();
980     return t ? t->m_counter : s_counter;
981 }
982 
setCurrentObjCounter(NamedCounter * counter)983 NamedCounter* Thread::setCurrentObjCounter(NamedCounter* counter)
984 {
985     ThreadPrivate* t = ThreadPrivate::current();
986     NamedCounter*& c = t ? t->m_counter : s_counter;
987     if (counter == c)
988 	return counter;
989     if (!t)
990 	s_tmutex.lock();
991     NamedCounter* oldCounter = c;
992     c = counter;
993     if (!t)
994 	s_tmutex.unlock();
995     return oldCounter;
996 }
997 
count()998 int Thread::count()
999 {
1000     Lock lock(s_tmutex);
1001     return s_threads.count();
1002 }
1003 
cleanup()1004 void Thread::cleanup()
1005 {
1006     DDebug(DebugAll,"Thread::cleanup() [%p]",this);
1007 }
1008 
killall()1009 void Thread::killall()
1010 {
1011     ThreadPrivate::killall();
1012 }
1013 
exit()1014 void Thread::exit()
1015 {
1016     DDebug(DebugAll,"Thread::exit()");
1017     ThreadPrivate* t = ThreadPrivate::current();
1018     if (t && t->m_thread && t->m_thread->locked())
1019 	Alarm("engine","bug",DebugFail,"Thread::exit() in '%s' with mutex locks (%d held) [%p]",
1020 	    t->m_name,t->m_thread->locks(),t->m_thread);
1021 #ifdef _WINDOWS
1022     if (t) {
1023 	t->m_running = false;
1024 	t->destroy();
1025     }
1026     ::_endthread();
1027 #else
1028     ::pthread_exit(0);
1029 #endif
1030 }
1031 
check(bool exitNow)1032 bool Thread::check(bool exitNow)
1033 {
1034     ThreadPrivate* t = ThreadPrivate::current();
1035     if (!(t && t->m_cancel))
1036 	return false;
1037     if (exitNow)
1038 	exit();
1039     return true;
1040 }
1041 
cancel(bool hard)1042 void Thread::cancel(bool hard)
1043 {
1044     DDebug(DebugAll,"Thread::cancel() [%p]",this);
1045     if (m_private)
1046 	m_private->cancel(hard);
1047 }
1048 
yield(bool exitCheck)1049 void Thread::yield(bool exitCheck)
1050 {
1051 #ifdef _WINDOWS
1052     // zero sleep is bad if we have high priority threads, they
1053     //  won't relinquish the timeslice for lower priority ones
1054     ::Sleep(1);
1055 #else
1056     ::usleep(0);
1057 #endif
1058     if (exitCheck)
1059 	check();
1060 }
1061 
idle(bool exitCheck)1062 void Thread::idle(bool exitCheck)
1063 {
1064 #ifdef DEBUG
1065     const Thread* t = Thread::current();
1066     if (t && t->locked())
1067 	Debug(DebugMild,"Thread '%s' idling with %d mutex locks held [%p]",
1068 	    t->name(),t->locks(),t);
1069 #endif
1070     msleep(s_idleMs,exitCheck);
1071 }
1072 
sleep(unsigned int sec,bool exitCheck)1073 void Thread::sleep(unsigned int sec, bool exitCheck)
1074 {
1075 #ifdef _WINDOWS
1076     ::Sleep(sec*1000);
1077 #else
1078     ::sleep(sec);
1079 #endif
1080     if (exitCheck)
1081 	check();
1082 }
1083 
msleep(unsigned long msec,bool exitCheck)1084 void Thread::msleep(unsigned long msec, bool exitCheck)
1085 {
1086 #ifdef _WINDOWS
1087     ::Sleep(msec);
1088 #else
1089     ::usleep(msec*1000L);
1090 #endif
1091     if (exitCheck)
1092 	check();
1093 }
1094 
usleep(unsigned long usec,bool exitCheck)1095 void Thread::usleep(unsigned long usec, bool exitCheck)
1096 {
1097 #ifdef _WINDOWS
1098     if (usec) {
1099 	usec = (usec + 500) / 1000;
1100 	if (!usec)
1101 	    usec = 1;
1102     }
1103     ::Sleep(usec);
1104 #else
1105     ::usleep(usec);
1106 #endif
1107     if (exitCheck)
1108 	check();
1109 }
1110 
idleUsec()1111 unsigned long Thread::idleUsec()
1112 {
1113     return s_idleMs * 1000;
1114 }
1115 
idleMsec()1116 unsigned long Thread::idleMsec()
1117 {
1118     return s_idleMs;
1119 }
1120 
idleMsec(unsigned long msec)1121 void Thread::idleMsec(unsigned long msec)
1122 {
1123     if (msec == 0)
1124 	msec = THREAD_IDLE_MSEC;
1125     else if (msec < THREAD_IDLE_MIN)
1126 	msec = THREAD_IDLE_MIN;
1127     else if (msec > THREAD_IDLE_MAX)
1128 	msec = THREAD_IDLE_MAX;
1129     s_idleMs = msec;
1130 }
1131 
priority(const char * name,Thread::Priority defvalue)1132 Thread::Priority Thread::priority(const char* name, Thread::Priority defvalue)
1133 {
1134     return (Thread::Priority)lookup(name,s_prio,defvalue);
1135 }
1136 
priority(Thread::Priority prio)1137 const char* Thread::priority(Thread::Priority prio)
1138 {
1139     return lookup(prio,s_prio);
1140 }
1141 
preExec()1142 void Thread::preExec()
1143 {
1144 #ifdef THREAD_KILL
1145     ::pthread_kill_other_threads_np();
1146 #endif
1147 }
1148 
1149 // Get the last thread error
lastError()1150 int Thread::lastError()
1151 {
1152 #ifdef _WINDOWS
1153     return ::GetLastError();
1154 #else
1155     return errno;
1156 #endif
1157 }
1158 
1159 // Get an error string from system.
errorString(String & buffer,int code)1160 bool Thread::errorString(String& buffer, int code)
1161 {
1162 #ifdef _WINDOWS
1163     LPTSTR buf = 0;
1164     DWORD res = FormatMessageA(
1165 	FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
1166 	NULL,code,0,(LPTSTR)&buf,0,0);
1167     if (buf) {
1168 	if (res > 0)
1169 	    buffer.assign(buf,res);
1170 	::LocalFree(buf);
1171     }
1172 #else
1173     buffer = ::strerror(code);
1174 #endif
1175     if (buffer)
1176 	return true;
1177     buffer << "Unknown error (code=" << code << ")";
1178     return false;
1179 }
1180 
1181 /* vi: set ts=8 sw=4 sts=4 noet: */
1182