1 /* $Id: ncbithr.cpp 604321 2020-03-26 13:03:44Z grichenk $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Denis Vakatov, Aleksey Grichenko
27 *
28 * File Description:
29 * Multi-threading -- classes, functions, and features.
30 *
31 * TLS:
32 * CTlsBase -- TLS implementation (base class for CTls<>)
33 *
34 * THREAD:
35 * CThread -- thread wrapper class
36 *
37 * RW-LOCK:
38 * CInternalRWLock -- platform-dependent RW-lock structure (fwd-decl)
39 * CRWLock -- Read/Write lock related data and methods
40 *
41 */
42
43
44 #include <ncbi_pch.hpp>
45 #include <corelib/ncbi_param.hpp>
46 #include <corelib/request_ctx.hpp>
47 #include <corelib/ncbi_system.hpp>
48 #include <corelib/error_codes.hpp>
49 #ifdef NCBI_POSIX_THREADS
50 # include <sys/time.h> // for gettimeofday()
51 #endif
52 #ifdef NCBI_OS_LINUX
53 # include <sys/prctl.h>
54 #endif
55
56 #include "ncbidbg_p.hpp"
57
58
59 #define NCBI_USE_ERRCODE_X Corelib_Threads
60
61 BEGIN_NCBI_SCOPE
62
63
64 /////////////////////////////////////////////////////////////////////////////
65 // CTlsBase::
66 //
67
68
69 DEFINE_STATIC_MUTEX(s_TlsCleanupMutex);
70
71
CUsedTlsBases(void)72 CUsedTlsBases::CUsedTlsBases(void)
73 {
74 }
75
76
~CUsedTlsBases(void)77 CUsedTlsBases::~CUsedTlsBases(void)
78 {
79 }
80
81
ClearAll(CTlsBase::ECleanupMode mode)82 void CUsedTlsBases::ClearAll(CTlsBase::ECleanupMode mode)
83 {
84 CMutexGuard tls_cleanup_guard(s_TlsCleanupMutex);
85 // Prevent double-destruction
86 CTlsBase* used_tls = NULL;
87 NON_CONST_ITERATE(TTlsSet, it, m_UsedTls) {
88 CTlsBase* tls = *it;
89 // Do not cleanup it now - this will cause infinite recursion
90 if (tls == &sm_UsedTlsBases.Get()) {
91 used_tls = tls;
92 continue;
93 }
94 // Prevent double-destruction
95 tls->x_DeleteTlsData(mode);
96 if (tls->m_AutoDestroy && tls->Referenced()) {
97 tls->RemoveReference();
98 }
99 }
100 m_UsedTls.clear();
101
102 if (used_tls) {
103 used_tls->x_DeleteTlsData(mode);
104 if (used_tls->m_AutoDestroy && used_tls->Referenced()) {
105 used_tls->RemoveReference();
106 }
107 }
108 }
109
110
Register(CTlsBase * tls)111 void CUsedTlsBases::Register(CTlsBase* tls)
112 {
113 CMutexGuard tls_cleanup_guard(s_TlsCleanupMutex);
114 if ( m_UsedTls.insert(tls).second ) {
115 if (tls->m_AutoDestroy) {
116 tls->AddReference();
117 }
118 }
119 }
120
121
Deregister(CTlsBase * tls)122 void CUsedTlsBases::Deregister(CTlsBase* tls)
123 {
124 CMutexGuard tls_cleanup_guard(s_TlsCleanupMutex);
125 xncbi_VerifyAndErrorReport(m_UsedTls.erase(tls));
126 if (tls->m_AutoDestroy) {
127 tls->RemoveReference();
128 }
129 }
130
131
s_CleanupUsedTlsBases(CUsedTlsBases * tls,void *)132 static void s_CleanupUsedTlsBases(CUsedTlsBases* tls, void*)
133 {
134 delete tls;
135 }
136
s_CleanupMainUsedTlsBases(CUsedTlsBases & tls)137 static void s_CleanupMainUsedTlsBases(CUsedTlsBases& tls)
138 {
139 tls.ClearAll();
140 }
141
142 // Storage for used TLS sets
143 CStaticTls<CUsedTlsBases>
144 CUsedTlsBases::sm_UsedTlsBases(0, CSafeStaticLifeSpan::eLifeSpan_Long);
145 // Main thread needs a usual safe-static-ref for proper cleanup --
146 // there's no thread which can do it on destruction.
147 static CSafeStatic<CUsedTlsBases>
148 s_MainUsedTlsBases(0, s_CleanupMainUsedTlsBases,
149 CSafeStaticLifeSpan::eLifeSpan_Long);
150
GetUsedTlsBases(void)151 CUsedTlsBases& CUsedTlsBases::GetUsedTlsBases(void)
152 {
153 if ( CThread::IsMain() )
154 {
155 return *s_MainUsedTlsBases;
156 }
157
158 CUsedTlsBases* tls = sm_UsedTlsBases.GetValue();
159 if ( !tls )
160 {
161 tls = new CUsedTlsBases();
162 sm_UsedTlsBases.SetValue(tls, s_CleanupUsedTlsBases);
163 }
164 return *tls;
165 }
166
167
Init(void)168 void CUsedTlsBases::Init(void)
169 {
170 sm_UsedTlsBases.Get();
171 }
172
173
ClearAllCurrentThread(CTlsBase::ECleanupMode mode)174 void CUsedTlsBases::ClearAllCurrentThread(CTlsBase::ECleanupMode mode)
175 {
176 if ( CUsedTlsBases* tls = sm_UsedTlsBases.GetValue() ) {
177 tls->ClearAll(mode);
178 }
179 }
180
181
182 struct SNativeThreadTlsCleanup
183 {
SNativeThreadTlsCleanupSNativeThreadTlsCleanup184 SNativeThreadTlsCleanup(void) {
185 }
~SNativeThreadTlsCleanupSNativeThreadTlsCleanup186 ~SNativeThreadTlsCleanup(void) {
187 CUsedTlsBases::ClearAllCurrentThread(CTlsBase::eCleanup_Native);
188 }
189 };
190
191
192 static thread_local SNativeThreadTlsCleanup s_NativeThreadTlsCleanup;
193
194
CleanupTlsData(void * data,ECleanupMode mode)195 void CTlsBase::CleanupTlsData(void* data, ECleanupMode mode)
196 {
197 if (!data) return;
198 STlsData* tls_data = static_cast<STlsData*>(data);
199 if (!tls_data->m_Value || !tls_data->m_CleanupFunc) return;
200 if (mode == eCleanup_Native && tls_data->m_Native == eSkipCleanup) return;
201 tls_data->m_CleanupFunc(tls_data->m_Value, tls_data->m_CleanupData);
202 }
203
204
205 #if defined(NCBI_POSIX_THREADS)
206 // pthread can handle automatic cleanup
s_PosixTlsCleanup(void * ptr)207 extern "C" void s_PosixTlsCleanup(void* ptr)
208 {
209 CTlsBase::CleanupTlsData(ptr);
210 }
211 #endif
212
x_Init(void)213 void CTlsBase::x_Init(void)
214 {
215 // Create platform-dependent TLS key (index)
216 #if defined(NCBI_WIN32_THREADS)
217 xncbi_VerifyAndErrorReport((m_Key = TlsAlloc()) != DWORD(-1));
218 #elif defined(NCBI_POSIX_THREADS)
219 xncbi_VerifyAndErrorReport(pthread_key_create(&m_Key, s_PosixTlsCleanup) == 0);
220 // pthread_key_create does not reset the value to 0 if the key has been
221 // used and deleted.
222 xncbi_VerifyAndErrorReport(pthread_setspecific(m_Key, 0) == 0);
223 #else
224 m_Key = 0;
225 #endif
226
227 m_Initialized = true;
228 }
229
230
x_Destroy(void)231 void CTlsBase::x_Destroy(void)
232 {
233 x_Reset();
234 m_Initialized = false;
235
236 // Destroy system TLS key
237 #if defined(NCBI_WIN32_THREADS)
238 if ( TlsFree(m_Key) ) {
239 m_Key = 0;
240 return;
241 }
242 assert(0);
243 #elif defined(NCBI_POSIX_THREADS)
244 if (pthread_key_delete(m_Key) == 0) {
245 m_Key = 0;
246 return;
247 }
248 assert(0);
249 #else
250 m_Key = 0;
251 return;
252 #endif
253 }
254
255
256 // Platform-specific TLS data storing
257 inline
s_TlsSetValue(TTlsKey & key,void * data,const char * err_message)258 void s_TlsSetValue(TTlsKey& key, void* data, const char* err_message)
259 {
260 #if defined(NCBI_WIN32_THREADS)
261 xncbi_Validate(TlsSetValue(key, data), err_message);
262 #elif defined(NCBI_POSIX_THREADS)
263 xncbi_ValidatePthread(pthread_setspecific(key, data), 0, err_message);
264 #else
265 key = data;
266 assert(err_message); // to get rid of the "unused variable" warning
267 #endif
268 }
269
270
x_SetValue(void * value,FCleanupBase cleanup,void * cleanup_data,ENativeThreadCleanup native)271 void CTlsBase::x_SetValue(void* value,
272 FCleanupBase cleanup,
273 void* cleanup_data,
274 ENativeThreadCleanup native)
275 {
276 if ( !m_Initialized ) {
277 return;
278 }
279
280 // Get previously stored data
281 STlsData* tls_data = static_cast<STlsData*> (x_GetTlsData());
282
283 // Create and initialize TLS structure, if it was not present
284 if ( !tls_data ) {
285 tls_data = new STlsData;
286 xncbi_Validate(tls_data != 0,
287 "CTlsBase::x_SetValue() -- cannot allocate "
288 "memory for TLS data");
289 tls_data->m_Value = 0;
290 tls_data->m_CleanupFunc = 0;
291 tls_data->m_CleanupData = 0;
292 tls_data->m_Native = eSkipCleanup;
293 }
294
295 // Cleanup
296 if (tls_data->m_Value != value) {
297 CleanupTlsData(tls_data);
298 }
299
300 // Store the values
301 tls_data->m_Value = value;
302 tls_data->m_CleanupFunc = cleanup;
303 tls_data->m_CleanupData = cleanup_data;
304 tls_data->m_Native = native;
305
306 // Store the structure in the TLS
307 s_TlsSetValue(m_Key, tls_data,
308 "CTlsBase::x_SetValue() -- error setting value");
309
310 // Add to the used TLS list to cleanup data in the thread Exit()
311 CUsedTlsBases::GetUsedTlsBases().Register(this);
312 }
313
314
x_DeleteTlsData(ECleanupMode mode)315 bool CTlsBase::x_DeleteTlsData(ECleanupMode mode)
316 {
317 if ( !m_Initialized ) {
318 return false;
319 }
320
321 // Get previously stored data
322 STlsData* tls_data = static_cast<STlsData*> (x_GetTlsData());
323 if ( !tls_data ) {
324 return false;
325 }
326
327 // Cleanup & destroy
328 CleanupTlsData(tls_data, mode);
329 delete tls_data;
330
331 // Store NULL in the TLS
332 s_TlsSetValue(m_Key, 0,
333 "CTlsBase::x_Reset() -- error cleaning-up TLS");
334
335 return true;
336 }
337
338
x_Reset(void)339 void CTlsBase::x_Reset(void)
340 {
341 if ( x_DeleteTlsData() ) {
342 // Deregister this TLS from the current thread
343 CUsedTlsBases::GetUsedTlsBases().Deregister(this);
344 }
345 }
346
347
348 /////////////////////////////////////////////////////////////////////////////
349 // CExitThreadException::
350 //
351 // Exception used to terminate threads safely, cleaning up
352 // all the resources allocated.
353 //
354
355
356 class CExitThreadException
357 {
358 public:
359 // Create new exception object, initialize counter.
360 CExitThreadException(void);
361
362 // Create a copy of exception object, increase counter.
363 CExitThreadException(const CExitThreadException& prev);
364
365 // Destroy the object, decrease counter. If the counter is
366 // zero outside of CThread::Wrapper(), rethrow exception.
367 ~CExitThreadException(void);
368
369 // Inform the object it has reached CThread::Wrapper().
EnterWrapper(void)370 void EnterWrapper(void)
371 {
372 *m_InWrapper = true;
373 }
374 private:
375 int* m_RefCount;
376 bool* m_InWrapper;
377 };
378
379
CExitThreadException(void)380 CExitThreadException::CExitThreadException(void)
381 : m_RefCount(new int),
382 m_InWrapper(new bool)
383 {
384 *m_RefCount = 1;
385 *m_InWrapper = false;
386 }
387
388
CExitThreadException(const CExitThreadException & prev)389 CExitThreadException::CExitThreadException(const CExitThreadException& prev)
390 : m_RefCount(prev.m_RefCount),
391 m_InWrapper(prev.m_InWrapper)
392 {
393 (*m_RefCount)++;
394 }
395
396
~CExitThreadException(void)397 CExitThreadException::~CExitThreadException(void)
398 {
399 if (--(*m_RefCount) > 0) {
400 // Not the last object - continue to handle exceptions
401 return;
402 }
403
404 bool tmp_in_wrapper = *m_InWrapper; // save the flag
405 delete m_RefCount;
406 delete m_InWrapper;
407
408 if ( !tmp_in_wrapper ) {
409 // Something is wrong - terminate the thread
410 assert(((void)("CThread::Exit() -- cannot exit thread"), 0));
411 #if defined(NCBI_WIN32_THREADS)
412 ExitThread(0);
413 #elif defined(NCBI_POSIX_THREADS)
414 pthread_exit(0);
415 #endif
416 }
417
418 }
419
420
421
422 /////////////////////////////////////////////////////////////////////////////
423 // CThread::
424 //
425
426 // Mutex to protect CThread members and to make sure that Wrapper() function
427 // will not proceed until after the appropriate Run() is finished.
428 DEFINE_STATIC_FAST_MUTEX(s_ThreadMutex);
429
430 atomic<unsigned int> CThread::sm_ThreadsCount(0);
431
432
433 // Internal storage for thread objects and related variables/functions
434 static DECLARE_TLS_VAR(CThread*, sx_ThreadPtr);
435 static DECLARE_TLS_VAR(CThread::TID, sx_ThreadId);
436 static bool sm_MainThreadIdInitialized = false;
437 static const CThread::TID kMainThreadId = ~CThread::TID(0);
438 static CThread::TID sx_MainThreadId = kMainThreadId;
439
440
441 DEFINE_STATIC_FAST_MUTEX(s_MainThreadIdMutex);
442
sx_GetMainThreadId()443 CThread::TID sx_GetMainThreadId()
444 {
445 CFastMutexGuard guard(s_MainThreadIdMutex);
446 return sx_MainThreadId;
447 }
448
449
sx_SetMainThreadId(CThread::TID id)450 void sx_SetMainThreadId(CThread::TID id)
451 {
452 CFastMutexGuard guard(s_MainThreadIdMutex);
453 sx_MainThreadId = id;
454 }
455
456
sx_GetNextThreadId(void)457 static int sx_GetNextThreadId(void)
458 {
459 CFastMutexGuard guard(s_ThreadMutex);
460 static int s_ThreadCount = 0;
461 return ++s_ThreadCount;
462 }
463
464
x_InitializeThreadId(void)465 void CThread::x_InitializeThreadId(void)
466 {
467 #if defined(NCBI_THREADS)
468 _ASSERT(!sx_ThreadPtr);
469 _ASSERT(!sx_ThreadId);
470 #endif
471 sx_ThreadPtr = this;
472 sx_ThreadId = sx_GetNextThreadId();
473 }
474
475
InitializeMainThreadId(void)476 void CThread::InitializeMainThreadId(void)
477 {
478 // mark main thread
479 #if defined(NCBI_THREADS)
480 CFastMutexGuard guard(s_MainThreadIdMutex);
481 #endif
482 if ( sm_MainThreadIdInitialized ) {
483 if (sx_ThreadId != sx_MainThreadId) {
484 ERR_POST("Can not change main thread ID");
485 }
486 return;
487 }
488 #if defined(NCBI_THREADS)
489 _ASSERT(!sx_ThreadPtr);
490 _ASSERT(sx_MainThreadId == kMainThreadId);
491 #endif
492 if ( !sx_ThreadId ) {
493 // Not yet assigned - use the default value.
494 sx_ThreadId = kMainThreadId;
495 }
496 sx_MainThreadId = sx_ThreadId;
497 sx_ThreadPtr = 0;
498 sm_MainThreadIdInitialized = true;
499 }
500
501
GetCurrentThread(void)502 CThread* CThread::GetCurrentThread(void)
503 {
504 // Get pointer to the current thread object
505 return sx_ThreadPtr;
506 }
507
508
GetSelf(void)509 CThread::TID CThread::GetSelf(void)
510 {
511 TID id = sx_ThreadId;
512 if ( !id ) {
513 // If main thread has not been set, consider current thread is the main one.
514 // Since sx_ThreadId is still zero, InitializeMainThreadId() will set it to
515 // kMainThreadId, so that the value returned by GetSelf() will be zero.
516 if (!sm_MainThreadIdInitialized) {
517 InitializeMainThreadId();
518 id = sx_ThreadId;
519 }
520 else {
521 sx_ThreadId = id = sx_GetNextThreadId();
522 }
523 }
524 // kMainThreadId is usually marker for main thread, but when using native threads
525 // and InitializeMainThreadId() to set main thread, the main thread id may be
526 // different and it's more reliable to use IsMain() rather than GetSelf() == 0.
527 return id == kMainThreadId ? 0 : id;
528 }
529
530
IsMain(void)531 bool CThread::IsMain(void)
532 {
533 if (!sm_MainThreadIdInitialized) {
534 InitializeMainThreadId();
535 }
536 return sx_ThreadId == sx_GetMainThreadId();
537 }
538
539
540 NCBI_PARAM_DECL(bool, Thread, Catch_Unhandled_Exceptions);
541 NCBI_PARAM_DEF_EX(bool, Thread, Catch_Unhandled_Exceptions, true, 0,
542 THREAD_CATCH_UNHANDLED_EXCEPTIONS);
543 typedef NCBI_PARAM_TYPE(Thread, Catch_Unhandled_Exceptions) TParamThreadCatchExceptions;
544
545
Wrapper(TWrapperArg arg)546 TWrapperRes CThread::Wrapper(TWrapperArg arg)
547 {
548 // Get thread object and self ID
549 CThread* thread_obj = static_cast<CThread*>(arg);
550
551 // Set Toolkit thread ID.
552 thread_obj->x_InitializeThreadId();
553 xncbi_Validate(!IsMain(),
554 "CThread::Wrapper() -- error assigning thread ID");
555
556 #if defined NCBI_THREAD_PID_WORKAROUND
557 // Store this thread's PID. Changed PID means forking of the thread.
558 thread_obj->m_ThreadPID =
559 CProcess::sx_GetPid(CProcess::ePID_GetThread);
560 #endif
561
562 bool catch_all = TParamThreadCatchExceptions::GetDefault();
563
564 // Check if parent request context should be used.
565 if ( thread_obj->m_ParentRequestContext ) {
566 CDiagContext::SetRequestContext(thread_obj->m_ParentRequestContext);
567 }
568
569 // Run user-provided thread main function here
570 if ( catch_all ) {
571 try {
572 thread_obj->m_ExitData = thread_obj->Main();
573 }
574 catch (CExitThreadException& e) {
575 e.EnterWrapper();
576 }
577 #if defined(NCBI_COMPILER_MSVC) && defined(_DEBUG)
578 // Microsoft promotes many common application errors to exceptions.
579 // This includes occurrences such as dereference of a NULL pointer and
580 // walking off of a dangling pointer. The catch-all is lifted only in
581 // debug mode to permit easy inspection of such error conditions, while
582 // maintaining safety of production, release-mode applications.
583 NCBI_CATCH_X(1, "CThread::Wrapper: CThread::Main() failed");
584 #else
585 NCBI_CATCH_ALL_X(2, "CThread::Wrapper: CThread::Main() failed");
586 #endif
587 }
588 else {
589 try {
590 thread_obj->m_ExitData = thread_obj->Main();
591 }
592 catch (CExitThreadException& e) {
593 e.EnterWrapper();
594 }
595 }
596
597 // Call user-provided OnExit()
598 if ( catch_all ) {
599 try {
600 thread_obj->OnExit();
601 }
602 #if defined(NCBI_COMPILER_MSVC) && defined(_DEBUG)
603 // Microsoft promotes many common application errors to exceptions.
604 // This includes occurrences such as dereference of a NULL pointer and
605 // walking off of a dangling pointer. The catch-all is lifted only in
606 // debug mode to permit easy inspection of such error conditions, while
607 // maintaining safety of production, release-mode applications.
608 NCBI_CATCH_X(3, "CThread::Wrapper: CThread::OnExit() failed");
609 #else
610 NCBI_CATCH_ALL_X(4, "CThread::Wrapper: CThread::OnExit() failed");
611 #endif
612 }
613 else {
614 thread_obj->OnExit();
615 }
616
617 // Cleanup local storages used by this thread
618 CUsedTlsBases::ClearAllCurrentThread();
619
620 {{
621 CFastMutexGuard state_guard(s_ThreadMutex);
622
623 // Thread is terminated - decrement counter under mutex
624 --sm_ThreadsCount;
625
626 // Indicate the thread is terminated
627 thread_obj->m_IsTerminated = true;
628
629 // Schedule the thread object for destruction, if detached
630 if ( thread_obj->m_IsDetached ) {
631 thread_obj->m_SelfRef.Reset();
632 }
633 }}
634
635 return 0;
636 }
637
638
CThread(void)639 CThread::CThread(void)
640 : m_Handle(0), m_IsRun(false),
641 m_IsDetached(false),
642 m_IsJoined(false),
643 m_IsTerminated(false),
644 m_ExitData(0)
645 #if defined NCBI_THREAD_PID_WORKAROUND
646 , m_ThreadPID(0)
647 #endif
648 {
649 DoDeleteThisObject();
650 }
651
652
~CThread(void)653 CThread::~CThread(void)
654 {
655 #if defined(NCBI_WIN32_THREADS)
656 // close handle if it's not yet closed
657 // CFastMutexGuard state_guard(s_ThreadMutex);
658 if ( m_IsRun && m_Handle != NULL ) {
659 CloseHandle(m_Handle);
660 m_Handle = NULL;
661 }
662 #endif
663 }
664
665
666
ThreadWrapperCaller(TWrapperArg arg)667 inline TWrapperRes ThreadWrapperCaller(TWrapperArg arg) {
668 return CThread::Wrapper(arg);
669 }
670
671 #if defined(NCBI_POSIX_THREADS)
672 extern "C" {
673 typedef TWrapperRes (*FSystemWrapper)(TWrapperArg);
674
ThreadWrapperCallerImpl(TWrapperArg arg)675 static TWrapperRes ThreadWrapperCallerImpl(TWrapperArg arg) {
676 return ThreadWrapperCaller(arg);
677 }
678 }
679 #elif defined(NCBI_WIN32_THREADS)
680 extern "C" {
681 typedef TWrapperRes (WINAPI *FSystemWrapper)(TWrapperArg);
682
ThreadWrapperCallerImpl(TWrapperArg arg)683 static TWrapperRes WINAPI ThreadWrapperCallerImpl(TWrapperArg arg) {
684 return ThreadWrapperCaller(arg);
685 }
686 }
687 #endif
688
689
690 #if defined NCBI_THREAD_PID_WORKAROUND
sx_GetThreadPid(void)691 TPid CThread::sx_GetThreadPid(void)
692 {
693 CThread* thread_ptr = GetCurrentThread();
694 return thread_ptr ? thread_ptr->m_ThreadPID : 0;
695 }
696
697
sx_SetThreadPid(TPid pid)698 void CThread::sx_SetThreadPid(TPid pid)
699 {
700 CThread* thread_ptr = GetCurrentThread();
701 if ( thread_ptr ) {
702 thread_ptr->m_ThreadPID = pid;
703 }
704 }
705 #endif
706
707
708 #define NCBI_THREAD_VALIDATE(cond, error_code, message) \
709 if ( !(cond) ) NCBI_THROW(CThreadException, error_code, message)
710
711
712 // Stack size parameter, 2M by default.
713 NCBI_PARAM_DECL(size_t, Thread, StackSize);
714 NCBI_PARAM_DEF_EX(size_t, Thread, StackSize, 2048*1024, eParam_NoThread, THREAD_STACK_SIZE);
715 typedef NCBI_PARAM_TYPE(Thread, StackSize) TParamThreadStackSize;
716
717
Run(TRunMode flags)718 bool CThread::Run(TRunMode flags)
719 {
720 CUsedTlsBases::Init();
721
722 // Do not allow the new thread to run until m_Handle is set
723 CFastMutexGuard state_guard(s_ThreadMutex);
724
725 // Check
726 NCBI_THREAD_VALIDATE(!m_IsRun, eRunError,
727 "CThread::Run() -- called for already started thread");
728
729 m_IsDetached = (flags & fRunDetached) != 0;
730
731 #if defined NCBI_THREAD_PID_WORKAROUND
732 CProcess::sx_GetPid(CProcess::ePID_GetCurrent);
733 #endif
734
735 // Thread will run - increment counter under mutex
736 ++sm_ThreadsCount;
737 try {
738
739 if (flags & fRunCloneRequestContext) {
740 m_ParentRequestContext = CDiagContext::GetRequestContext().Clone();
741 }
742
743 #if defined(NCBI_WIN32_THREADS)
744 // We need this parameter in WinNT - can not use NULL instead!
745 DWORD thread_id;
746 // Suspend thread to adjust its priority
747 DWORD creation_flags = (flags & fRunNice) == 0 ? 0 : CREATE_SUSPENDED;
748 m_Handle = CreateThread(NULL, 0, ThreadWrapperCallerImpl,
749 this, creation_flags, &thread_id);
750 NCBI_THREAD_VALIDATE(m_Handle != NULL, eRunError,
751 "CThread::Run() -- error creating thread");
752 if (flags & fRunNice) {
753 // Adjust priority and resume the thread
754 SetThreadPriority(m_Handle, THREAD_PRIORITY_BELOW_NORMAL);
755 ResumeThread(m_Handle);
756 }
757 if ( m_IsDetached ) {
758 CloseHandle(m_Handle);
759 m_Handle = NULL;
760 }
761 else {
762 // duplicate handle to adjust security attributes
763 HANDLE oldHandle = m_Handle;
764 NCBI_THREAD_VALIDATE(DuplicateHandle(GetCurrentProcess(), oldHandle,
765 GetCurrentProcess(), &m_Handle,
766 0, FALSE, DUPLICATE_SAME_ACCESS),
767 eRunError, "CThread::Run() -- error getting thread handle");
768 NCBI_THREAD_VALIDATE(CloseHandle(oldHandle),
769 eRunError, "CThread::Run() -- error closing thread handle");
770 }
771 #elif defined(NCBI_POSIX_THREADS)
772 pthread_attr_t attr;
773 NCBI_THREAD_VALIDATE(pthread_attr_init(&attr) == 0, eRunError,
774 "CThread::Run() - error initializing thread attributes");
775 if ( ! (flags & fRunUnbound) ) {
776 #if defined(NCBI_OS_BSD) || defined(NCBI_OS_CYGWIN) || defined(NCBI_OS_IRIX)
777 NCBI_THREAD_VALIDATE(pthread_attr_setscope(&attr,
778 PTHREAD_SCOPE_PROCESS) == 0, eRunError,
779 "CThread::Run() - error setting thread scope");
780 #else
781 NCBI_THREAD_VALIDATE(pthread_attr_setscope(&attr,
782 PTHREAD_SCOPE_SYSTEM) == 0, eRunError,
783 "CThread::Run() - error setting thread scope");
784 #endif
785 }
786 if ( m_IsDetached ) {
787 NCBI_THREAD_VALIDATE(pthread_attr_setdetachstate(&attr,
788 PTHREAD_CREATE_DETACHED) == 0, eRunError,
789 "CThread::Run() - error setting thread detach state");
790 }
791 NCBI_THREAD_VALIDATE(pthread_attr_setstacksize(&attr,
792 TParamThreadStackSize::GetDefault()) == 0, eRunError,
793 "Thread::Run() -- error setting stack size");
794 NCBI_THREAD_VALIDATE(pthread_create(&m_Handle, &attr,
795 ThreadWrapperCallerImpl, this) == 0, eRunError,
796 "CThread::Run() -- error creating thread");
797
798 NCBI_THREAD_VALIDATE(pthread_attr_destroy(&attr) == 0, eRunError,
799 "CThread::Run() - error destroying thread attributes");
800
801 #else
802 if (flags & fRunAllowST) {
803 Wrapper(this);
804 }
805 else {
806 NCBI_THREAD_VALIDATE(0, eRunError,
807 "CThread::Run() -- system does not support threads");
808 }
809 #endif
810
811 // prevent deletion of CThread until thread is finished
812 m_SelfRef.Reset(this);
813
814 }
815 catch (...) {
816 // In case of any error we need to decrement threads count
817 --sm_ThreadsCount;
818 throw;
819 }
820
821 // Indicate that the thread is run
822 m_IsRun = true;
823 return true;
824 }
825
826
Detach(void)827 void CThread::Detach(void)
828 {
829 CFastMutexGuard state_guard(s_ThreadMutex);
830
831 // Check the thread state: it must be run, but not detached yet
832 NCBI_THREAD_VALIDATE(m_IsRun, eControlError,
833 "CThread::Detach() -- called for not yet started thread");
834 NCBI_THREAD_VALIDATE(!m_IsDetached, eControlError,
835 "CThread::Detach() -- called for already detached thread");
836
837 // Detach the thread
838 #if defined(NCBI_WIN32_THREADS)
839 NCBI_THREAD_VALIDATE(CloseHandle(m_Handle), eControlError,
840 "CThread::Detach() -- error closing thread handle");
841 m_Handle = NULL;
842 #elif defined(NCBI_POSIX_THREADS)
843 NCBI_THREAD_VALIDATE(pthread_detach(m_Handle) == 0, eControlError,
844 "CThread::Detach() -- error detaching thread");
845 #endif
846
847 // Indicate the thread is detached
848 m_IsDetached = true;
849
850 // Schedule the thread object for destruction, if already terminated
851 if ( m_IsTerminated ) {
852 m_SelfRef.Reset();
853 }
854 }
855
856
Join(void ** exit_data)857 void CThread::Join(void** exit_data)
858 {
859 // Check the thread state: it must be run, but not detached yet
860 {{
861 CFastMutexGuard state_guard(s_ThreadMutex);
862 NCBI_THREAD_VALIDATE(m_IsRun, eControlError,
863 "CThread::Join() -- called for not yet started thread");
864 NCBI_THREAD_VALIDATE(!m_IsDetached, eControlError,
865 "CThread::Join() -- called for detached thread");
866 NCBI_THREAD_VALIDATE(!m_IsJoined, eControlError,
867 "CThread::Join() -- called for already joined thread");
868 m_IsJoined = true;
869 }}
870
871 // Join (wait for) and destroy
872 #if defined(NCBI_WIN32_THREADS)
873 NCBI_THREAD_VALIDATE(WaitForSingleObject(m_Handle, INFINITE) == WAIT_OBJECT_0,
874 eControlError, "CThread::Join() -- can not join thread");
875 DWORD status;
876 NCBI_THREAD_VALIDATE(GetExitCodeThread(m_Handle, &status) &&
877 status != DWORD(STILL_ACTIVE), eControlError,
878 "CThread::Join() -- thread is still running after join");
879 NCBI_THREAD_VALIDATE(CloseHandle(m_Handle), eControlError,
880 "CThread::Join() -- can not close thread handle");
881 m_Handle = NULL;
882 #elif defined(NCBI_POSIX_THREADS)
883 NCBI_THREAD_VALIDATE(pthread_join(m_Handle, 0) == 0, eControlError,
884 "CThread::Join() -- can not join thread");
885 #endif
886
887 // Set exit_data value
888 if ( exit_data ) {
889 *exit_data = m_ExitData;
890 }
891
892 // Schedule the thread object for destruction
893 {{
894 CFastMutexGuard state_guard(s_ThreadMutex);
895 m_SelfRef.Reset();
896 }}
897 }
898
899
Exit(void * exit_data)900 void CThread::Exit(void* exit_data)
901 {
902 // Don't exit from the main thread
903 CThread* x_this = GetCurrentThread();
904 NCBI_THREAD_VALIDATE(x_this != 0, eControlError,
905 "CThread::Exit() -- attempt to call for the main thread");
906
907 {{
908 CFastMutexGuard state_guard(s_ThreadMutex);
909 x_this->m_ExitData = exit_data;
910 }}
911
912 // Throw the exception to be caught by Wrapper()
913 throw CExitThreadException();
914 }
915
916
Discard(void)917 bool CThread::Discard(void)
918 {
919 CFastMutexGuard state_guard(s_ThreadMutex);
920
921 // Do not discard after Run()
922 if ( m_IsRun ) {
923 return false;
924 }
925
926 // Schedule for destruction (or, destroy it right now if there is no
927 // other CRef<>-based references to this object left).
928 m_SelfRef.Reset(this);
929 m_SelfRef.Reset();
930 return true;
931 }
932
933
OnExit(void)934 void CThread::OnExit(void)
935 {
936 return;
937 }
938
939
GetSystemID(TThreadSystemID * id)940 void CThread::GetSystemID(TThreadSystemID* id)
941 {
942 *id = GetCurrentThreadSystemID();
943 }
944
945
946 #if defined(NCBI_OS_LINUX) && defined(PR_SET_NAME)
SetCurrentThreadName(const CTempString & name)947 void CThread::SetCurrentThreadName(const CTempString& name)
948 {
949 prctl(PR_SET_NAME, (unsigned long)name.data(), 0, 0, 0);
950 }
951 #else
SetCurrentThreadName(const CTempString &)952 void CThread::SetCurrentThreadName(const CTempString&)
953 {
954 }
955 #endif
956
957
958 bool CThread::sm_IsExiting = false;
959 CTimeout CThread::sm_WaitForThreadsTimeout = CTimeout(0.1);
960
961
SetWaitForAllThreadsTimeout(const CTimeout & timeout)962 void CThread::SetWaitForAllThreadsTimeout(const CTimeout& timeout)
963 {
964 sm_WaitForThreadsTimeout = timeout;
965 }
966
967
WaitForAllThreads(void)968 bool CThread::WaitForAllThreads(void)
969 {
970 if (sm_ThreadsCount == 0) return true;
971 if ( !IsMain() ) return false;
972
973 CStopWatch sw(CStopWatch::eStart);
974 bool infinite = sm_WaitForThreadsTimeout.IsInfinite();
975 unsigned long to = 0;
976 unsigned long q = 10;
977 if ( !infinite ) {
978 to = sm_WaitForThreadsTimeout.GetAsMilliSeconds();
979 if (to < q) q = to;
980 }
981 while (sm_ThreadsCount > 0 && (infinite || sw.Elapsed()*1000 < to)) {
982 SleepMilliSec(q);
983 }
984 return sm_ThreadsCount == 0;
985 }
986
987
GetErrCodeString(void) const988 const char* CThreadException::GetErrCodeString(void) const
989 {
990 switch (GetErrCode()) {
991 case eRunError: return "eRunError";
992 case eControlError: return "eControlError";
993 case eOther: return "eOther";
994 default: return CException::GetErrCodeString();
995 }
996 }
997
998
999 END_NCBI_SCOPE
1000