1 /**
2 * Thread.cpp
3 * This file is part of the YATE Project http://YATE.null.ro
4 *
5 * Yet Another Telephony Engine - a fully featured software PBX and IVR
6 * Copyright (C) 2004-2014 Null Team
7 *
8 * This software is distributed under multiple licenses;
9 * see the COPYING file in the main directory for licensing
10 * information for this specific distribution.
11 *
12 * This use of this software may be subject to additional restrictions.
13 * See the LEGAL file in the main directory for details.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 */
19
20 #include "yateclass.h"
21
22 #include <string.h>
23
24 #ifdef _WINDOWS
25 #include <process.h>
26 typedef unsigned long HTHREAD;
27
28 #define EINVAL_ERR ERROR_INVALID_PARAMETER
29 #define ENOTSUP_ERR ERROR_NOT_SUPPORTED
30 #define EINVALTHR_ERR ERROR_INVALID_HANDLE
31
32 #else
33
34 #if defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
35 #ifndef _GNU_SOURCE
36 #define _GNU_SOURCE
37 #endif
38 #ifdef SCHED_AFFINITY
39 #include <sys/syscall.h>
40 #endif
41 #endif
42
43 #define EINVAL_ERR EINVAL
44 #define ENOTSUP_ERR ENOTSUP
45 #define EINVALTHR_ERR ESRCH
46
47 #include <pthread.h>
48 typedef pthread_t HTHREAD;
49 #ifndef PTHREAD_EXPLICIT_SCHED
50 #define PTHREAD_EXPLICIT_SCHED 0
pthread_attr_setinheritsched(pthread_attr_t *,int)51 static int pthread_attr_setinheritsched(pthread_attr_t *,int) { return 0; }
52 #endif
53 #endif
54
55 #ifdef HAVE_PRCTL
56 #include <sys/prctl.h>
57 #endif
58
59 #ifndef PTHREAD_STACK_MIN
60 #define PTHREAD_STACK_MIN 16384
61 #else
62 #ifndef PAGE_SIZE
63 #define PAGE_SIZE 4096
64 #endif
65 #endif
66
67 #ifndef THREAD_IDLE_MSEC
68 #define THREAD_IDLE_MSEC 5
69 #endif
70 #define THREAD_IDLE_MIN 1
71 #define THREAD_IDLE_MAX 20
72
73 namespace TelEngine {
74
75 class ThreadPrivate : public GenObject {
76 friend class Thread;
77 public:
78 ThreadPrivate(Thread* t,const char* name);
79 ~ThreadPrivate();
80 void run();
81 bool cancel(bool hard);
82 void cleanup();
83 void destroy();
84 void pubdestroy();
85 static ThreadPrivate* create(Thread* t,const char* name,Thread::Priority prio);
86 static void killall();
87 static ThreadPrivate* current();
88 static int setAffinity(ThreadPrivate* t, const DataBlock& bits);
89 static int getAffinity(ThreadPrivate* t, DataBlock& outMask);
90 Thread* m_thread;
91 HTHREAD thread;
92 #if !defined(_WINDOWS) && defined(SCHED_AFFINITY)
93 pid_t m_tid;
94 #endif
95 NamedCounter* m_counter;
96 bool m_running;
97 bool m_started;
98 bool m_updest;
99 bool m_cancel;
100 const char* m_name;
101 #ifdef _WINDOWS
102 static void startFunc(void* arg);
103 DWORD_PTR m_affinityMask;
104 #else
105 static void* startFunc(void* arg);
106 #endif
107 static void cleanupFunc(void* arg);
108 static void destroyFunc(void* arg);
109 };
110
111 };
112
113 using namespace TelEngine;
114
115 #define SOFT_WAITS 3
116 #define HARD_KILLS 5
117 #define KILL_WAIT 32
118
119 #ifdef _WINDOWS
120 #define TLS_MAGIC 0xfeeb1e
121 static DWORD tls_index = TLS_MAGIC;
getTls()122 static DWORD getTls()
123 {
124 // this seems unsafe but is not - allocation happens once before
125 // any Thread is actually created
126 if (tls_index == TLS_MAGIC)
127 tls_index = ::TlsAlloc();
128 if (tls_index == (DWORD)-1)
129 // if it happened is REALLY BAD so better die quick and clean!
130 abort();
131 return tls_index;
132 }
133
haveAffinity()134 static inline bool haveAffinity()
135 {
136 // TODO handle detection of support on Windows
137 return true;
138 }
139 #else /* _WINDOWS */
140 static pthread_key_t current_key = PTHREAD_KEYS_MAX; // this signifies invalid key
141
142 class ThreadPrivateKeyAlloc
143 {
144 public:
ThreadPrivateKeyAlloc()145 ThreadPrivateKeyAlloc()
146 {
147 if (::pthread_key_create(¤t_key,ThreadPrivate::destroyFunc)) {
148 abortOnBug(true);
149 Debug(DebugFail,"Failed to create current thread key!");
150 }
151 }
152 };
153
haveAffinity()154 static inline bool haveAffinity()
155 {
156 #if defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
157 return true;
158 #endif
159 return false;
160 }
161
162 static ThreadPrivateKeyAlloc keyAllocator;
163 #endif /* _WINDOWS */
164
165 static TokenDict s_prio[] = {
166 { "lowest", Thread::Lowest },
167 { "low", Thread::Low },
168 { "normal", Thread::Normal },
169 { "high", Thread::High },
170 { "highest", Thread::Highest },
171 { 0, 0 }
172 };
173
174 static unsigned long s_idleMs = 1000 * THREAD_IDLE_MSEC;
175 static ObjList s_threads;
176 static Mutex s_tmutex(true,"Thread");
177 static NamedCounter* s_counter = 0;
178
create(Thread * t,const char * name,Thread::Priority prio)179 ThreadPrivate* ThreadPrivate::create(Thread* t,const char* name,Thread::Priority prio)
180 {
181 ThreadPrivate *p = new ThreadPrivate(t,name);
182 int e = 0;
183 #ifndef _WINDOWS
184 // Set a decent (256K) stack size that won't eat all virtual memory
185 pthread_attr_t attr;
186 ::pthread_attr_init(&attr);
187 ::pthread_attr_setstacksize(&attr, 16*PTHREAD_STACK_MIN);
188 if (prio > Thread::Normal) {
189 struct sched_param param;
190 param.sched_priority = 0;
191 int policy = SCHED_OTHER;
192 switch (prio) {
193 case Thread::High:
194 policy = SCHED_RR;
195 param.sched_priority = 1;
196 break;
197 case Thread::Highest:
198 policy = SCHED_FIFO;
199 param.sched_priority = 99;
200 break;
201 default:
202 break;
203 }
204 int err = ::pthread_attr_setinheritsched(&attr,PTHREAD_EXPLICIT_SCHED);
205 if (!err)
206 err = ::pthread_attr_setschedpolicy(&attr,policy);
207 if (!err)
208 err = ::pthread_attr_setschedparam(&attr,¶m);
209 if (err)
210 Debug(
211 #ifdef DEBUG
212 DebugWarn,
213 #else
214 DebugNote,
215 #endif
216 "Could not set thread scheduling parameters: %s (%d)",
217 strerror(err),err);
218 #ifdef XDEBUG
219 else
220 Debug(DebugInfo,"Successfully set high thread priority %d",prio);
221 #endif
222 }
223 #endif /* _WINDOWS */
224
225 for (int i=0; i<5; i++) {
226 #ifdef _WINDOWS
227 HTHREAD t = ::_beginthread(startFunc,16*PTHREAD_STACK_MIN,p);
228 e = (t == (HTHREAD)-1) ? errno : 0;
229 if (!e) {
230 p->thread = t;
231 int pr = THREAD_PRIORITY_NORMAL;
232 switch (prio) {
233 case Thread::Lowest:
234 pr = THREAD_PRIORITY_LOWEST;
235 break;
236 case Thread::Low:
237 pr = THREAD_PRIORITY_BELOW_NORMAL;
238 break;
239 case Thread::High:
240 pr = THREAD_PRIORITY_ABOVE_NORMAL;
241 break;
242 case Thread::Highest:
243 pr = THREAD_PRIORITY_HIGHEST;
244 break;
245 default:
246 break;
247 }
248 if (pr != THREAD_PRIORITY_NORMAL)
249 ::SetThreadPriority(reinterpret_cast<HANDLE>(t),pr);
250 }
251 #else /* _WINDOWS */
252 e = ::pthread_create(&p->thread,&attr,startFunc,p);
253 #ifdef PTHREAD_INHERIT_SCHED
254 if ((0 == i) && (EPERM == e) && (prio > Thread::Normal)) {
255 Debug(DebugWarn,"Failed to create thread with priority %d, trying with inherited",prio);
256 ::pthread_attr_setinheritsched(&attr,PTHREAD_INHERIT_SCHED);
257 e = EAGAIN;
258 }
259 #endif
260 #endif /* _WINDOWS */
261 if (e != EAGAIN)
262 break;
263 Thread::usleep(20);
264 }
265 #ifndef _WINDOWS
266 ::pthread_attr_destroy(&attr);
267 #endif
268 if (e) {
269 Alarm("engine","system",DebugCrit,"Error %d while creating pthread in '%s' [%p]",e,name,p);
270 p->m_thread = 0;
271 p->destroy();
272 return 0;
273 }
274 p->m_running = true;
275 return p;
276 }
277
ThreadPrivate(Thread * t,const char * name)278 ThreadPrivate::ThreadPrivate(Thread* t,const char* name)
279 : m_thread(t), m_counter(0),
280 m_running(false), m_started(false), m_updest(true), m_cancel(false), m_name(name)
281 {
282 #ifdef DEBUG
283 Debugger debug("ThreadPrivate::ThreadPrivate","(%p,\"%s\") [%p]",t,name,this);
284 #endif
285 #if defined(_WINDOWS)
286 m_affinityMask = 0;
287 DWORD_PTR sysAffin = 0;
288 if (!::GetProcessAffinityMask(::GetCurrentProcess(),&m_affinityMask,&sysAffin)) {
289 int err = ::GetLastError();
290 Debug(DebugNote,"ThreadPrivate::ThreadPrivate(%p,\"%s\") - "
291 "Failed to get process affinity, err=%d [%p]",t,name,err,this);
292 }
293 #ifdef DEBUG
294 else
295 Debug(DebugAll,"ThreadPrivate::ThreadPrivate(%p,\"%s\") - Process affinity = %lx,"
296 " system affinity = %lx [%p]",t,name,m_affinityMask,sysAffin,this);
297 #endif
298 #elif defined(SCHED_AFFINITY)
299 m_tid = -1;
300 #endif
301 // Inherit object counter of creating thread
302 m_counter = Thread::getCurrentObjCounter(true);
303 Lock lock(s_tmutex);
304 s_threads.append(this);
305 }
306
~ThreadPrivate()307 ThreadPrivate::~ThreadPrivate()
308 {
309 #ifdef DEBUG
310 Debugger debug("ThreadPrivate::~ThreadPrivate()"," %p '%s' [%p]",m_thread,m_name,this);
311 #endif
312 m_running = false;
313 Lock lock(s_tmutex);
314 s_threads.remove(this,false);
315 if (m_thread && m_updest) {
316 Thread *t = m_thread;
317 m_thread = 0;
318 // let other threads access the list while we delete our upper layer
319 lock.drop();
320 delete t;
321 }
322 }
323
destroy()324 void ThreadPrivate::destroy()
325 {
326 DDebug(DebugAll,"ThreadPrivate::destroy() '%s' [%p]",m_name,this);
327 cleanup();
328 delete this;
329 }
330
pubdestroy()331 void ThreadPrivate::pubdestroy()
332 {
333 #ifdef DEBUG
334 Debugger debug(DebugAll,"ThreadPrivate::pubdestroy()"," %p '%s' [%p]",m_thread,m_name,this);
335 #endif
336 m_updest = false;
337 cleanup();
338 m_thread = 0;
339
340 if (current() == this) {
341 cancel(true);
342 // should never reach here...
343 Debug(DebugFail,"ThreadPrivate::pubdestroy() past cancel??? [%p]",this);
344 }
345 else {
346 cancel(false);
347 // delay a little so thread has a chance to clean up
348 for (int i=0; i<20; i++) {
349 s_tmutex.lock();
350 bool done = !s_threads.find(this);
351 s_tmutex.unlock();
352 if (done)
353 return;
354 Thread::idle(false);
355 }
356 if (m_cancel && !cancel(true))
357 Debug(DebugWarn,"ThreadPrivate::pubdestroy() %p '%s' failed cancel [%p]",m_thread,m_name,this);
358 }
359 }
360
run()361 void ThreadPrivate::run()
362 {
363 DDebug(DebugAll,"ThreadPrivate::run() '%s' [%p]",m_name,this);
364 #ifdef _WINDOWS
365 ::TlsSetValue(getTls(),this);
366 #else
367 ::pthread_setspecific(current_key,this);
368 pthread_cleanup_push(cleanupFunc,this);
369 #ifdef PTHREAD_CANCEL_ASYNCHRONOUS
370 ::pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,0);
371 #endif
372 ::pthread_detach(::pthread_self());
373 #endif /* _WINDOWS */
374
375 #ifdef HAVE_PRCTL
376 #ifdef PR_SET_NAME
377 if (m_name)
378 prctl(PR_SET_NAME,(unsigned long)m_name,0,0,0);
379 #endif
380 #endif
381 #ifdef _WINDOWS
382 #ifndef NDEBUG
383 if (m_name) {
384 struct {
385 DWORD dwType;
386 LPCSTR szName;
387 DWORD dwThreadID;
388 DWORD dwFlags;
389 } threadInfo;
390 threadInfo.dwType = 0x1000;
391 threadInfo.szName = m_name;
392 threadInfo.dwThreadID = (DWORD)-1;
393 threadInfo.dwFlags = 0;
394 __try { RaiseException(0x406D1388, 0, sizeof(threadInfo)/sizeof(DWORD), (DWORD*)&threadInfo); }
395 __except (EXCEPTION_CONTINUE_EXECUTION) { }
396 }
397 #endif
398 #endif
399
400 // FIXME: possible race if public object is destroyed during thread startup
401 while (!m_started)
402 Thread::usleep(10,true);
403 if (m_thread)
404 m_thread->run();
405
406 #ifndef _WINDOWS
407 pthread_cleanup_pop(1);
408 #endif
409 }
410
cancel(bool hard)411 bool ThreadPrivate::cancel(bool hard)
412 {
413 DDebug(DebugAll,"ThreadPrivate::cancel(%s) '%s' [%p]",String::boolText(hard),m_name,this);
414 bool ret = true;
415 if (m_running) {
416 ret = false;
417 if (hard) {
418 bool critical = m_thread && m_thread->m_locking;
419 if (critical) {
420 // give the thread a chance to cancel without locking a mutex
421 Debug(DebugMild,"Hard canceling '%s' while is taking a lock [%p]",m_name,this);
422 m_cancel = true;
423 for (int i = 0; i < 50; i++) {
424 Thread::msleep(1);
425 if (!m_running)
426 return true;
427 }
428 }
429 m_running = false;
430 #ifdef _WINDOWS
431 Debug(DebugCrit,"ThreadPrivate '%s' terminating win32 thread %lu [%p]",
432 m_name,thread,this);
433 ret = ::TerminateThread(reinterpret_cast<HANDLE>(thread),0) != 0;
434 #else
435 #ifdef PTHREAD_CANCEL_ASYNCHRONOUS
436 Debug(critical ? DebugInfo : DebugWarn,"ThreadPrivate '%s' terminating pthread %p [%p]",
437 m_name,&thread,this);
438 ret = !::pthread_cancel(thread);
439 #else
440 Debug(DebugCrit,"ThreadPrivate '%s' cannot terminate %p on this platform [%p]",
441 m_name,&thread,this);
442 #endif
443 #endif /* _WINDOWS */
444 if (ret) {
445 // hard cancel succeeded - object is unsafe to touch any more
446 Thread::msleep(1);
447 return true;
448 }
449 // hard cancel failed - set back the running flag
450 m_running = true;
451 }
452 m_cancel = true;
453 }
454 return ret;
455 }
456
cleanup()457 void ThreadPrivate::cleanup()
458 {
459 DDebug(DebugAll,"ThreadPrivate::cleanup() %p '%s' [%p]",m_thread,m_name,this);
460 if (m_thread && m_thread->m_private) {
461 if (m_thread->m_private == this) {
462 m_thread->m_private = 0;
463 m_thread->cleanup();
464 if (m_thread->locked())
465 Alarm("engine","bug",DebugFail,"Thread '%s' destroyed with mutex locks (%d held) [%p]",m_name,m_thread->locks(),m_thread);
466 }
467 else {
468 Alarm("engine","bug",DebugFail,"ThreadPrivate::cleanup() %p '%s' mismatching %p [%p]",m_thread,m_name,m_thread->m_private,this);
469 m_thread = 0;
470 }
471 }
472 }
473
current()474 ThreadPrivate* ThreadPrivate::current()
475 {
476 #ifdef _WINDOWS
477 return reinterpret_cast<ThreadPrivate *>(::TlsGetValue(getTls()));
478 #else
479 return reinterpret_cast<ThreadPrivate *>(::pthread_getspecific(current_key));
480 #endif
481 }
482
setAffinity(ThreadPrivate * t,const DataBlock & cpuMask)483 int ThreadPrivate::setAffinity(ThreadPrivate* t, const DataBlock& cpuMask)
484 {
485 #ifdef DEBUG
486 String str;
487 str.hexify(cpuMask.data(),cpuMask.length());
488 Debug(DebugAll,"ThreadPrivate::setAffinity() %s(%p) to affinity mask:'%s'",
489 t ? t->m_name : "self",t,str.c_str());
490 #endif
491 if (!haveAffinity())
492 return ENOTSUP_ERR;
493 if (!cpuMask.length())
494 return EINVAL_ERR;
495
496 #ifdef _WINDOWS
497
498 DWORD_PTR mask = 0;
499 if (sizeof(mask) < cpuMask.length())
500 Debug(DebugNote,"CPU affinity mask is '%u' long, permitted length is '%u', ignoring exceeding bits",
501 cpuMask.length() * 8, sizeof(DWORD_PTR) * 8);
502 for (unsigned int i = 0; i < sizeof(mask) && i < cpuMask.length(); i++)
503 mask |= (cpuMask.at(i) << (i << 3));
504 if (!(mask = ::SetThreadAffinityMask(t ?reinterpret_cast<HANDLE>(t->thread) : ::GetCurrentThread(),mask)))
505 return ::GetLastError();
506 if (t)
507 t->m_affinityMask = mask;
508 return 0;
509
510 #elif defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
511
512 cpu_set_t cpuSet;
513 CPU_ZERO(&cpuSet);
514 uint8_t* bytes = (uint8_t*)cpuMask.data();
515 unsigned int len = cpuMask.length();
516 for (unsigned int i = 0; i < len; i++) {
517 uint8_t b = bytes[i];
518 for (uint8_t j = 0; j < 8; j++)
519 if ((b & (1 << j)))
520 CPU_SET(((i << 3) + j),&cpuSet);
521 }
522
523 #ifdef THREAD_AFFINITY
524 return pthread_setaffinity_np(t ? t->thread : pthread_self(),sizeof(cpuSet),&cpuSet);
525 #else
526 pid_t tid = -1;
527 if (!t)
528 tid = (pid_t)syscall(SYS_gettid);
529 else {
530 // there is a race between getting the TID and this call
531 // try to hold off for a while, maybe it will get set
532 // otherwise signal that the user should try again
533 unsigned int i = 0;
534 while (t->m_tid < 0 && i++ < 5)
535 ::usleep(0);
536 tid = t->m_tid;
537 }
538 DDebug(DebugAll,"ThreadPrivate::setAffinity() %s(%p) for TID:'%d'",t ? t->m_name : "self",t,tid);
539 if (-1 == tid)
540 return EAGAIN;
541 return sched_setaffinity(tid,sizeof(cpuSet),&cpuSet) ? errno : 0;
542 #endif
543
544 #endif /* _WINDOWS */
545 return ENOTSUP_ERR;
546 }
547
getAffinity(ThreadPrivate * t,DataBlock & outMask)548 int ThreadPrivate::getAffinity(ThreadPrivate* t, DataBlock& outMask)
549 {
550 if (!haveAffinity())
551 return ENOTSUP_ERR;
552
553 #ifdef _WINDOWS
554
555 HANDLE thr = t ? reinterpret_cast<HANDLE>(t->thread) : ::GetCurrentThread();
556 DWORD_PTR setMask = 0;
557 if (t) {
558 setMask = t->m_affinityMask;
559 }
560 else {
561 DWORD_PTR sysAffin = 0;
562 if (!::GetProcessAffinityMask(::GetCurrentProcess(),&setMask,&sysAffin)) {
563 int err = ::GetLastError();
564 Debug(DebugNote,"ThreadPrivate::getAffinity(t=%p) - "
565 "Failed to get process affinity, err=%d [%p]",t,err);
566 }
567 }
568 DWORD_PTR mask = ::SetThreadAffinityMask(thr,setMask);
569 if (!mask)
570 return ::GetLastError();
571 if (mask != setMask) {// maybe mask was changed by something external, restore it
572 if (!(::SetThreadAffinityMask(thr,mask))) {
573 Debug(DebugNote,"Failed to restore thread CPU affinity to '%lx', "
574 "now set to '%lx' [%p] for thread '%s' [%p]",mask,setMask,t ? t->m_name : "self",t);
575 return ::GetLastError();
576 }
577 }
578 outMask.resize(sizeof(mask));
579 uint8_t* bytes = (uint8_t*)outMask.data();
580 for (unsigned int i = 0; i < sizeof(mask) ; i++) {
581 *bytes++ = (uint8_t)(mask >> (i << 3)) & 0xff;
582 }
583 return 0;
584
585 #elif defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
586
587 cpu_set_t cpuSet;
588 CPU_ZERO(&cpuSet);
589 int ret = 0;
590
591 #ifdef THREAD_AFFINITY
592 ret = pthread_getaffinity_np(t ? t->thread : pthread_self(),sizeof(cpuSet),&cpuSet);
593 #else
594 pid_t tid = -1;
595 if (!t)
596 tid = (pid_t)syscall(SYS_gettid);
597 else {
598 // there is a race between getting the TID and this call
599 // try to hold off for a while, maybe it will get set
600 // otherwise signal that the user should try again
601 unsigned int i = 0;
602 while (t->m_tid < 0 && i++ < 5)
603 ::usleep(0);
604 tid = t->m_tid;
605 }
606 DDebug(DebugAll,"ThreadPrivate::getAffinity() %s(%p) for TID:'%d'",t ? t->name : "self",t,tid);
607 if (-1 == tid)
608 return EAGAIN;
609 ret = sched_getaffinity(tid,sizeof(cpuSet),&cpuSet) ? errno : 0;
610 #endif
611
612 if (!ret) {
613 outMask.resize(sizeof(cpuSet));
614 uint8_t* bytes = (uint8_t*) outMask.data();
615 unsigned int lastSet = 0;
616 for (unsigned int i = 0; i < (sizeof(cpuSet) << 3); i++) {
617 if (!CPU_ISSET(i,&cpuSet))
618 continue;
619 bytes[i >> 3] |= (1 << (i & 7));
620 lastSet = i >> 3;
621 }
622 // cpu_set has CPU_SETSIZE(1024 bits)
623 // remove octets from the end that have no bit set
624 outMask.cut(outMask.length() - lastSet - 1);
625 }
626
627 return ret;
628 #endif /* _WINDOWS */
629 return ENOTSUP_ERR;
630 }
631
killall()632 void ThreadPrivate::killall()
633 {
634 Debugger debug("ThreadPrivate::killall()");
635 ThreadPrivate *t;
636 bool sledgehammer = false;
637 s_tmutex.lock();
638 ThreadPrivate* crt = ThreadPrivate::current();
639 int c = s_threads.count();
640 if (crt)
641 Debug(DebugNote,"Thread '%s' is soft cancelling other %d running threads",crt->m_name,c-1);
642 else
643 Debug(DebugNote,"Soft cancelling %d running threads",c);
644 ObjList* l = &s_threads;
645 while (l && (t = static_cast<ThreadPrivate *>(l->get())) != 0)
646 {
647 if (t != crt) {
648 Debug(DebugInfo,"Stopping ThreadPrivate '%s' [%p]",t->m_name,t);
649 t->cancel(false);
650 }
651 l = l->next();
652 }
653 for (int w = 0; w < SOFT_WAITS; w++) {
654 s_tmutex.unlock();
655 Thread::idle();
656 s_tmutex.lock();
657 c = s_threads.count();
658 // ignore the current thread if we have one
659 if (crt && c)
660 c--;
661 if (!c) {
662 s_tmutex.unlock();
663 return;
664 }
665 }
666 Debug(DebugMild,"Hard cancelling %d remaining threads",c);
667 l = &s_threads;
668 c = 1;
669 while (l && (t = static_cast<ThreadPrivate *>(l->get())) != 0)
670 {
671 if (t == crt) {
672 l = l->next();
673 continue;
674 }
675 Debug(DebugInfo,"Trying to kill ThreadPrivate '%s' [%p], attempt %d",t->m_name,t,c);
676 bool ok = t->cancel(true);
677 if (ok) {
678 int d = 0;
679 // delay a little (exponentially) so threads have a chance to clean up
680 for (int i=1; i<=KILL_WAIT; i<<=1) {
681 s_tmutex.unlock();
682 Thread::msleep(i-d);
683 d = i;
684 s_tmutex.lock();
685 if (t != l->get())
686 break;
687 }
688 }
689 if (t != l->get())
690 c = 1;
691 else {
692 if (ok) {
693 #ifdef _WINDOWS
694 Debug(DebugWarn,"Could not kill %p but seems OK to delete it (library bug?)",t);
695 s_tmutex.unlock();
696 t->destroy();
697 s_tmutex.lock();
698 if (t != l->get())
699 c = 1;
700 #else
701 Debug(DebugCrit,"Could not kill cancelled %p so we'll abandon it (library bug?)",t);
702 l->remove(t,false);
703 c = 1;
704 #endif
705 continue;
706 }
707 Thread::msleep(1);
708 if (++c >= HARD_KILLS) {
709 Debug(DebugWarn,"Could not kill %p, will use sledgehammer later.",t);
710 sledgehammer = true;
711 t->m_thread = 0;
712 l = l->next();
713 c = 1;
714 }
715 }
716 }
717 s_tmutex.unlock();
718 // last solution - a REALLY BIG tool!
719 // usually too big since many libraries have threads of their own...
720 if (sledgehammer) {
721 #ifdef THREAD_KILL
722 Debug(DebugCrit,"Brutally killing remaining threads!");
723 ::pthread_kill_other_threads_np();
724 #else
725 Debug(DebugCrit,"Cannot kill remaining threads on this platform!");
726 #endif
727 }
728 }
729
destroyFunc(void * arg)730 void ThreadPrivate::destroyFunc(void* arg)
731 {
732 #ifdef DEBUG
733 Debugger debug("ThreadPrivate::destroyFunc","(%p)",arg);
734 #endif
735 ThreadPrivate *t = reinterpret_cast<ThreadPrivate *>(arg);
736 if (t)
737 t->destroy();
738 }
739
cleanupFunc(void * arg)740 void ThreadPrivate::cleanupFunc(void* arg)
741 {
742 DDebug(DebugAll,"ThreadPrivate::cleanupFunc(%p)",arg);
743 ThreadPrivate *t = reinterpret_cast<ThreadPrivate *>(arg);
744 if (t)
745 t->cleanup();
746 }
747
748 #ifdef _WINDOWS
startFunc(void * arg)749 void ThreadPrivate::startFunc(void* arg)
750 #else
751 void* ThreadPrivate::startFunc(void* arg)
752 #endif
753 {
754 DDebug(DebugAll,"ThreadPrivate::startFunc(%p)",arg);
755 ThreadPrivate *t = reinterpret_cast<ThreadPrivate *>(arg);
756 #if !defined(_WINDOWS) && defined(SCHED_AFFINITY) && defined(SYS_gettid)
757 // get TID as early as possible if needed
758 t->m_tid = (pid_t)syscall(SYS_gettid);
759 DDebug(DebugAll,"Thread '%s' (%p) has TID:'%d'",t->m_name,t,t->m_tid);
760 #endif
761 t->run();
762 #ifdef _WINDOWS
763 t->m_running = false;
764 if (t->m_updest)
765 t->destroy();
766 #else
767 return 0;
768 #endif
769 }
770
~Runnable()771 Runnable::~Runnable()
772 {
773 }
774
Thread(const char * name,Priority prio)775 Thread::Thread(const char* name, Priority prio)
776 : m_private(0), m_locks(0), m_locking(false)
777 {
778 #ifdef DEBUG
779 Debugger debug("Thread::Thread","(\"%s\",%d) [%p]",name,prio,this);
780 #endif
781 m_private = ThreadPrivate::create(this,name,prio);
782 }
783
Thread(const char * name,const char * prio)784 Thread::Thread(const char *name, const char* prio)
785 : m_private(0), m_locks(0), m_locking(false)
786 {
787 #ifdef DEBUG
788 Debugger debug("Thread::Thread","(\"%s\",\"%s\") [%p]",name,prio,this);
789 #endif
790 m_private = ThreadPrivate::create(this,name,priority(prio));
791 }
792
~Thread()793 Thread::~Thread()
794 {
795 DDebug(DebugAll,"Thread::~Thread() [%p]",this);
796 if (m_private)
797 m_private->pubdestroy();
798 }
799
error() const800 bool Thread::error() const
801 {
802 return !m_private;
803 }
804
running() const805 bool Thread::running() const
806 {
807 Lock lock(s_tmutex);
808 return m_private ? m_private->m_started : false;
809 }
810
name() const811 const char* Thread::name() const
812 {
813 return m_private ? m_private->m_name : 0;
814 }
815
startup()816 bool Thread::startup()
817 {
818 if (!m_private)
819 return false;
820 m_private->m_started = true;
821 return true;
822 }
823
current()824 Thread *Thread::current()
825 {
826 ThreadPrivate* t = ThreadPrivate::current();
827 return t ? t->m_thread : 0;
828 }
829
currentName()830 const char* Thread::currentName()
831 {
832 ThreadPrivate* t = ThreadPrivate::current();
833 return t ? t->m_name : 0;
834 }
835
parseCPUMask(const String & cpus,DataBlock & mask)836 bool Thread::parseCPUMask(const String& cpus, DataBlock& mask)
837 {
838 if (!cpus)
839 return false;
840 ObjList* cpuList = cpus.split(',',false);
841 bool err = false;
842 for (ObjList* o = cpuList->skipNull(); o; o = o->skipNext()) {
843 String* str = static_cast<String*>(o->get());
844 int pos = str->find('-');
845 int16_t cStart = 0;
846 int16_t cEnd = 0;
847 switch (pos) {
848 case -1:
849 cStart = cEnd = str->toInteger(-1);
850 if (cStart < 0)
851 err = true;
852 break;
853 case 0:
854 err = true;
855 break;
856 default:
857 cStart = str->substr(0,pos).toInteger(-1);
858 cEnd = str->substr(pos + 1).toInteger(-1);
859 if (cStart < 0 || cEnd < 0 || cEnd < cStart)
860 err = true;
861 break;
862 }
863 if (err)
864 break;
865 unsigned int needLen = (cEnd >> 3) + 1;
866 // adjust bitmask length to be able to set the highest bit
867 while (mask.length() < needLen) {
868 uint8_t b = 0;
869 mask.append(&b,1);
870 }
871 uint8_t* bytes = (uint8_t*)mask.data();
872 for (int16_t i = cStart; i <= cEnd; i++) {
873 uint8_t* byte = bytes + (i >> 3);
874 *byte |= 1 << (i & 7);
875 }
876 }
877 TelEngine::destruct(cpuList);
878 #ifdef DEBUG
879 String str;
880 str.hexify(mask.data(),mask.length());
881 Debug(DebugAll,"Thread::parseCPUMask() Parsed '%s' into bitmask: '%s'",cpus.c_str(),str.c_str());
882 #endif
883 return !err && mask.length();
884 }
885
printCPUMask(const DataBlock & mask,String & str,bool hexa)886 void Thread::printCPUMask(const DataBlock& mask, String& str, bool hexa)
887 {
888 if (hexa) {
889 String c;
890 for (int i = mask.length() - 1; i >= 0; i--) {
891 c.hexify(mask.data(i),1);
892 str << " " << c;
893 }
894 str.trimBlanks();
895 }
896 else {
897 for (unsigned int i = 0; i < mask.length(); i++) {
898 uint8_t b = mask[i];
899 for (uint8_t j = 0; j < 8; j++)
900 if ((b & (1 << j))) {
901 if (str)
902 str << ",";
903 str << (uint32_t)((i << 3) + j);
904 }
905 }
906 }
907 }
908
setAffinity(const String & cpus)909 int Thread::setAffinity(const String& cpus)
910 {
911 DataBlock bits;
912 if (!parseCPUMask(cpus,bits))
913 return EINVAL_ERR;
914 Lock lock(s_tmutex);
915 return ThreadPrivate::setAffinity(m_private,bits);
916 }
917
setAffinity(const DataBlock & cpus)918 int Thread::setAffinity(const DataBlock& cpus)
919 {
920 Lock lock(s_tmutex);
921 return ThreadPrivate::setAffinity(m_private,cpus);
922 }
923
getAffinity(DataBlock & outCpuMask)924 int Thread::getAffinity(DataBlock& outCpuMask)
925 {
926 Lock lock(s_tmutex);
927 return ThreadPrivate::getAffinity(m_private,outCpuMask);
928 }
929
setCurrentAffinity(const String & cpus)930 int Thread::setCurrentAffinity(const String& cpus)
931 {
932 DataBlock bits;
933 if (!parseCPUMask(cpus,bits))
934 return EINVAL_ERR;
935 return ThreadPrivate::setAffinity(ThreadPrivate::current(),bits);
936 }
937
setCurrentAffinity(const DataBlock & cpuMask)938 int Thread::setCurrentAffinity(const DataBlock& cpuMask)
939 {
940 return ThreadPrivate::setAffinity(ThreadPrivate::current(),cpuMask);
941 }
942
getCurrentAffinity(DataBlock & outCpuMask)943 int Thread::getCurrentAffinity(DataBlock& outCpuMask)
944 {
945 return ThreadPrivate::getAffinity(ThreadPrivate::current(),outCpuMask);
946 }
947
getCurrentAffinity(String & outCpus,bool hex)948 int Thread::getCurrentAffinity(String& outCpus, bool hex)
949 {
950 DataBlock d;
951 if (int err = ThreadPrivate::getAffinity(ThreadPrivate::current(),d))
952 return err;
953 Thread::printCPUMask(d,outCpus,hex);
954 return 0;
955 }
956
getObjCounter() const957 NamedCounter* Thread::getObjCounter() const
958 {
959 return m_private ? m_private->m_counter : 0;
960 }
961
setObjCounter(NamedCounter * counter)962 NamedCounter* Thread::setObjCounter(NamedCounter* counter)
963 {
964 if (!m_private)
965 return 0;
966 if (counter == m_private->m_counter)
967 return counter;
968 s_tmutex.lock();
969 NamedCounter* oldCounter = m_private->m_counter;
970 m_private->m_counter = counter;
971 s_tmutex.unlock();
972 return oldCounter;
973 }
974
getCurrentObjCounter(bool always)975 NamedCounter* Thread::getCurrentObjCounter(bool always)
976 {
977 if (!(always || GenObject::getObjCounting()))
978 return 0;
979 ThreadPrivate* t = ThreadPrivate::current();
980 return t ? t->m_counter : s_counter;
981 }
982
setCurrentObjCounter(NamedCounter * counter)983 NamedCounter* Thread::setCurrentObjCounter(NamedCounter* counter)
984 {
985 ThreadPrivate* t = ThreadPrivate::current();
986 NamedCounter*& c = t ? t->m_counter : s_counter;
987 if (counter == c)
988 return counter;
989 if (!t)
990 s_tmutex.lock();
991 NamedCounter* oldCounter = c;
992 c = counter;
993 if (!t)
994 s_tmutex.unlock();
995 return oldCounter;
996 }
997
count()998 int Thread::count()
999 {
1000 Lock lock(s_tmutex);
1001 return s_threads.count();
1002 }
1003
cleanup()1004 void Thread::cleanup()
1005 {
1006 DDebug(DebugAll,"Thread::cleanup() [%p]",this);
1007 }
1008
killall()1009 void Thread::killall()
1010 {
1011 ThreadPrivate::killall();
1012 }
1013
exit()1014 void Thread::exit()
1015 {
1016 DDebug(DebugAll,"Thread::exit()");
1017 ThreadPrivate* t = ThreadPrivate::current();
1018 if (t && t->m_thread && t->m_thread->locked())
1019 Alarm("engine","bug",DebugFail,"Thread::exit() in '%s' with mutex locks (%d held) [%p]",
1020 t->m_name,t->m_thread->locks(),t->m_thread);
1021 #ifdef _WINDOWS
1022 if (t) {
1023 t->m_running = false;
1024 t->destroy();
1025 }
1026 ::_endthread();
1027 #else
1028 ::pthread_exit(0);
1029 #endif
1030 }
1031
check(bool exitNow)1032 bool Thread::check(bool exitNow)
1033 {
1034 ThreadPrivate* t = ThreadPrivate::current();
1035 if (!(t && t->m_cancel))
1036 return false;
1037 if (exitNow)
1038 exit();
1039 return true;
1040 }
1041
cancel(bool hard)1042 void Thread::cancel(bool hard)
1043 {
1044 DDebug(DebugAll,"Thread::cancel() [%p]",this);
1045 if (m_private)
1046 m_private->cancel(hard);
1047 }
1048
yield(bool exitCheck)1049 void Thread::yield(bool exitCheck)
1050 {
1051 #ifdef _WINDOWS
1052 // zero sleep is bad if we have high priority threads, they
1053 // won't relinquish the timeslice for lower priority ones
1054 ::Sleep(1);
1055 #else
1056 ::usleep(0);
1057 #endif
1058 if (exitCheck)
1059 check();
1060 }
1061
idle(bool exitCheck)1062 void Thread::idle(bool exitCheck)
1063 {
1064 #ifdef DEBUG
1065 const Thread* t = Thread::current();
1066 if (t && t->locked())
1067 Debug(DebugMild,"Thread '%s' idling with %d mutex locks held [%p]",
1068 t->name(),t->locks(),t);
1069 #endif
1070 msleep(s_idleMs,exitCheck);
1071 }
1072
sleep(unsigned int sec,bool exitCheck)1073 void Thread::sleep(unsigned int sec, bool exitCheck)
1074 {
1075 #ifdef _WINDOWS
1076 ::Sleep(sec*1000);
1077 #else
1078 ::sleep(sec);
1079 #endif
1080 if (exitCheck)
1081 check();
1082 }
1083
msleep(unsigned long msec,bool exitCheck)1084 void Thread::msleep(unsigned long msec, bool exitCheck)
1085 {
1086 #ifdef _WINDOWS
1087 ::Sleep(msec);
1088 #else
1089 ::usleep(msec*1000L);
1090 #endif
1091 if (exitCheck)
1092 check();
1093 }
1094
usleep(unsigned long usec,bool exitCheck)1095 void Thread::usleep(unsigned long usec, bool exitCheck)
1096 {
1097 #ifdef _WINDOWS
1098 if (usec) {
1099 usec = (usec + 500) / 1000;
1100 if (!usec)
1101 usec = 1;
1102 }
1103 ::Sleep(usec);
1104 #else
1105 ::usleep(usec);
1106 #endif
1107 if (exitCheck)
1108 check();
1109 }
1110
idleUsec()1111 unsigned long Thread::idleUsec()
1112 {
1113 return s_idleMs * 1000;
1114 }
1115
idleMsec()1116 unsigned long Thread::idleMsec()
1117 {
1118 return s_idleMs;
1119 }
1120
idleMsec(unsigned long msec)1121 void Thread::idleMsec(unsigned long msec)
1122 {
1123 if (msec == 0)
1124 msec = THREAD_IDLE_MSEC;
1125 else if (msec < THREAD_IDLE_MIN)
1126 msec = THREAD_IDLE_MIN;
1127 else if (msec > THREAD_IDLE_MAX)
1128 msec = THREAD_IDLE_MAX;
1129 s_idleMs = msec;
1130 }
1131
priority(const char * name,Thread::Priority defvalue)1132 Thread::Priority Thread::priority(const char* name, Thread::Priority defvalue)
1133 {
1134 return (Thread::Priority)lookup(name,s_prio,defvalue);
1135 }
1136
priority(Thread::Priority prio)1137 const char* Thread::priority(Thread::Priority prio)
1138 {
1139 return lookup(prio,s_prio);
1140 }
1141
preExec()1142 void Thread::preExec()
1143 {
1144 #ifdef THREAD_KILL
1145 ::pthread_kill_other_threads_np();
1146 #endif
1147 }
1148
1149 // Get the last thread error
lastError()1150 int Thread::lastError()
1151 {
1152 #ifdef _WINDOWS
1153 return ::GetLastError();
1154 #else
1155 return errno;
1156 #endif
1157 }
1158
1159 // Get an error string from system.
errorString(String & buffer,int code)1160 bool Thread::errorString(String& buffer, int code)
1161 {
1162 #ifdef _WINDOWS
1163 LPTSTR buf = 0;
1164 DWORD res = FormatMessageA(
1165 FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
1166 NULL,code,0,(LPTSTR)&buf,0,0);
1167 if (buf) {
1168 if (res > 0)
1169 buffer.assign(buf,res);
1170 ::LocalFree(buf);
1171 }
1172 #else
1173 buffer = ::strerror(code);
1174 #endif
1175 if (buffer)
1176 return true;
1177 buffer << "Unknown error (code=" << code << ")";
1178 return false;
1179 }
1180
1181 /* vi: set ts=8 sw=4 sts=4 noet: */
1182