1 /* $Id: test_ncbithr.cpp 590797 2019-08-05 17:23:21Z grichenk $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Aleksey Grichenko
27 *
28 * File Description:
29 * Test for multithreading classes
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34 #include <corelib/ncbistd.hpp>
35 #include <corelib/ncbiapp.hpp>
36 #include <corelib/ncbithr.hpp>
37 #include <corelib/ncbimtx.hpp>
38 #include <corelib/ncbienv.hpp>
39 #include <corelib/ncbiargs.hpp>
40 #include <corelib/ncbidiag.hpp>
41 #include <corelib/ncbi_system.hpp>
42 #include <map>
43
44 #include <common/test_assert.h> /* This header must go last */
45
46 USING_NCBI_SCOPE;
47
48
49 //#define USE_NATIVE_THREADS
50
51
52 /////////////////////////////////////////////////////////////////////////////
53 // Globals
54
55
56 const int cNumThreadsMin = 1;
57 const int cNumThreadsMax = 500;
58 const int cSpawnByMin = 1;
59 const int cSpawnByMax = 100;
60 const int cRCyclesMin = 10;
61 const int cRCyclesMax = 5000;
62 const int cWCyclesMin = 5;
63 const int cWCyclesMax = 1000;
64
65 static int sNumThreads = 35;
66 static int sSpawnBy = 6;
67 static int sRCycles = 100;
68 static int sWCycles = 50;
69 static int s_NextIndex = 0;
70
71
72 DEFINE_STATIC_FAST_MUTEX(s_GlobalLock);
73
74
delay(int value)75 void delay(int value)
76 {
77 for (int i=0; i<value; i++) {
78 CFastMutexGuard out_guard(s_GlobalLock);
79 }
80 }
81
82
TestTlsCleanup(int * p_value,void * p_ref)83 void TestTlsCleanup(int* p_value, void* p_ref)
84 {
85 // Copy current value, then delete it
86 *static_cast<int*>(p_ref) = *p_value;
87 delete p_value;
88 }
89
90
Main_Thread_Tls_Cleanup(int * p_value,void *)91 void Main_Thread_Tls_Cleanup(int* p_value, void* /* p_data */)
92 {
93 (*p_value)++;
94 }
95
96
97
98 /////////////////////////////////////////////////////////////////////////////
99 // Shared resource imitation
100 //
101 // Imitates reading & writing of a shared resource.
102 // Checks if there are violations of read-write locks.
103 //
104
105 class CSharedResource
106 {
107 public:
CSharedResource(void)108 CSharedResource(void) : m_Readers(0),
109 m_Writer(-1) {}
~CSharedResource(void)110 ~CSharedResource(void) { assert(!m_Readers); }
111 void BeginRead(int ID);
112 void BeginWrite(int ID);
113 void EndRead(int ID);
114 void EndWrite(int ID);
115 private:
116 int m_Readers;
117 int m_Writer;
118 CFastMutex m_Mutex;
119 };
120
121
BeginRead(int ID)122 void CSharedResource::BeginRead(int ID)
123 {
124 CFastMutexGuard guard(m_Mutex);
125 // Must be unlocked, R-locked, or W-locked by the same thread
126 assert(m_Readers >= 0 || (m_Readers < 0 && ID == m_Writer));
127 (m_Readers >= 0) ? m_Readers++ : m_Readers--;
128 }
129
130
BeginWrite(int ID)131 void CSharedResource::BeginWrite(int ID)
132 {
133 CFastMutexGuard guard(m_Mutex);
134 // Must be unlocked or W-locked by the same thread
135 assert(m_Readers == 0 || (m_Readers < 0 && ID == m_Writer));
136 m_Readers--;
137 m_Writer = ID;
138 }
139
140
EndRead(int ID)141 void CSharedResource::EndRead(int ID)
142 {
143 CFastMutexGuard guard(m_Mutex);
144 // Must be R-locked or W-locked by the same thread
145 assert(m_Readers > 0 || (m_Readers < 0 && m_Writer == ID));
146 (m_Readers > 0) ? m_Readers-- : m_Readers++;
147 if (m_Readers == 0) {
148 m_Writer = -1;
149 }
150 }
151
152
EndWrite(int ID)153 void CSharedResource::EndWrite(int ID)
154 {
155 CFastMutexGuard guard(m_Mutex);
156 // Must be W-locked by the same thread;
157 assert(m_Readers < 0 && m_Writer == ID);
158 m_Readers++;
159 if (m_Readers == 0) {
160 m_Writer = -1;
161 }
162 }
163
164
165 // Prevent threads from termination before Detach() or Join()
166 CMutex* exit_locks[cNumThreadsMax];
167
168 template<int N>
169 struct SValue
170 {
171 int value;
172 };
173
174 #define USE_STATIC_TLS
175 #ifdef USE_STATIC_TLS
176 # define TTls CStaticTls
177 #else
178 # define TTls CTls
179 #endif
180
181 template<class Value>
s_GetTls()182 static TTls<Value>& s_GetTls()
183 {
184 #ifdef USE_STATIC_TLS
185 static CStaticTls<Value> s_tls;
186 return s_tls;
187 #else
188 static CSafeStaticRef< TTls<Value> > s_tls;
189 return s_tls.Get();
190 #endif
191 }
192
193 #if !defined(NCBI_COMPILER_WORKSHOP) || NCBI_COMPILER_VERSION >= 590
194 static
195 #endif
wait_threads(CAtomicCounter & counter)196 void wait_threads(CAtomicCounter& counter)
197 {
198 int add = 1;
199 int wait_count = 0;
200 while ( counter.Add(add) < CAtomicCounter::TValue(sNumThreads) ) {
201 add = 0;
202 if ( (++wait_count & 0xff) == 0 ) { NCBI_SCHED_YIELD(); }
203 }
204 }
205
206 template<int N>
test_static_tls(int idx,bool init=true)207 void test_static_tls(int idx, bool init = true)
208 {
209 static CAtomicCounter thread_counter;
210 if ( idx >= 0 && init ) {
211 wait_threads(thread_counter);
212 }
213 idx += N*1000;
214 typedef SValue<N> Value;
215 TTls<Value>& tls = s_GetTls<Value>();
216 if ( init ) {
217 Value* v = new Value;
218 v->value = idx;
219 tls.SetValue(v, CTlsBase::DefaultCleanup<Value>);
220 }
221 assert(tls.GetValue()->value == idx);
222 }
223
224 template<int N, int CNT>
225 struct STestStaticTlss
226 {
do_testSTestStaticTlss227 void do_test(int idx, bool init) const {
228 STestStaticTlss<N, CNT/2> t1;
229 t1.do_test(idx, init);
230 STestStaticTlss<N+CNT/2, CNT-CNT/2> t2;
231 t2.do_test(idx, init);
232 }
233 };
234
235 template<int N>
236 struct STestStaticTlss<N, 1>
237 {
do_testSTestStaticTlss238 void do_test(int idx, bool init) const {
239 test_static_tls<N>(idx, init);
240 }
241 };
242
243 template<int N>
244 struct STestStaticTlss<N, 0>
245 {
do_testSTestStaticTlss246 void do_test(int, bool) const {
247 }
248 };
249
250 template<int N, int CNT>
test_static_tlss(int idx,bool init=true)251 void test_static_tlss(int idx, bool init = true)
252 {
253 STestStaticTlss<N, CNT> t;
254 t.do_test(idx, init);
255 }
256
257
258
259 /////////////////////////////////////////////////////////////////////////////
260 // Test thread
261
262
263 #ifndef USE_NATIVE_THREADS
264 class CTestThread : public CThread
265 #else
266 class CTestThread : public CObject
267 #endif
268 {
269 public:
270 CTestThread(int index, CTls<int>* tls, CRWLock* rw, CSharedResource* res);
271 ~CTestThread(void);
272
273 protected:
274 virtual void* Main(void);
275 virtual void OnExit(void);
276
277 private:
278 int m_Index; // Thread sequential index
279 CTls<int>* m_Tls; // Test TLS object
280 int m_CheckValue; // Value to compare with the TLS data
281 CRWLock* m_RW; // Test RWLock object
282 CSharedResource* m_Res; // Shared resource imitation
283
284 #ifdef USE_NATIVE_THREADS
285 friend void NativeWrapperCallerImpl(TWrapperArg arg);
286
287 CAtomicCounter_WithAutoInit m_IsValid;
288 TThreadHandle m_Handle;
289
290 CRef<CTestThread> m_SelfRef;
291 public:
292 bool Run(CThread::TRunMode flags = CThread::fRunDefault);
293 void Join(void** result = 0);
294 void Detach(void);
295 void Discard(void);
296 #endif
297 };
298
299
300 // Thread states checked by the main thread
301 enum TTestThreadState {
302 eNull, // Initial value
303 eCreated, // Set by CTestThread::CTestThread()
304 eRunning, // Set by CTestThread::Main()
305 eTerminated, // Set by CTestThread::OnExit()
306 eDestroyed // Set by CTestThread::~CTestThread()
307 };
308
309
310 // Pointers to all threads and corresponding states
311 CTestThread* thr[cNumThreadsMax];
312 atomic<TTestThreadState> states[cNumThreadsMax];
313
314
CTestThread(int index,CTls<int> * tls,CRWLock * rw,CSharedResource * res)315 CTestThread::CTestThread(int index,
316 CTls<int>* tls,
317 CRWLock* rw,
318 CSharedResource* res)
319 : m_Index(index),
320 m_Tls(tls),
321 m_CheckValue(15),
322 m_RW(rw),
323 m_Res(res)
324 #ifdef USE_NATIVE_THREADS
325 ,
326 m_IsValid(0)
327 #endif
328 {
329 #ifdef USE_NATIVE_THREADS
330 m_SelfRef.Reset(this);
331 #endif
332 CFastMutexGuard guard(s_GlobalLock);
333 states[m_Index] = eCreated;
334 }
335
336
~CTestThread(void)337 CTestThread::~CTestThread(void)
338 {
339 CFastMutexGuard guard(s_GlobalLock);
340 assert(m_CheckValue == 15);
341 states[m_Index] = eDestroyed;
342 }
343
OnExit(void)344 void CTestThread::OnExit(void)
345 {
346 CFastMutexGuard guard(s_GlobalLock);
347 states[m_Index] = eTerminated;
348 }
349
350
Test_CThreadExit(void)351 bool Test_CThreadExit(void)
352 {
353 DEFINE_STATIC_FAST_MUTEX(s_Exit_Mutex);
354 CFastMutexGuard guard(s_Exit_Mutex);
355 // The mutex must be unlocked after call to CThread::Exit()
356 try {
357 CThread::Exit(reinterpret_cast<void*>(-1));
358 }
359 catch (...) {
360 throw;
361 }
362 return false; // this line should never be executed
363 }
364
365
Main(void)366 void* CTestThread::Main(void)
367 {
368 {{
369 CFastMutexGuard guard(s_GlobalLock);
370 states[m_Index] = eRunning;
371 }}
372
373 // ======= CTls test =======
374 // Verify TLS - initial value must be 0
375 m_CheckValue = 0;
376 assert(m_Tls->GetValue() == 0);
377 int* stored_value = new int;
378 assert(stored_value != 0);
379 *stored_value = 0;
380 m_Tls->SetValue(stored_value, TestTlsCleanup, &m_CheckValue);
381 for (int i=0; i<5; i++) {
382 stored_value = new int;
383 assert(stored_value != 0);
384 *stored_value = *m_Tls->GetValue()+1;
385 m_Tls->SetValue(stored_value,
386 TestTlsCleanup, &m_CheckValue);
387 assert(*stored_value == m_CheckValue+1);
388 }
389
390 // ======= CThread test =======
391 for (int i = 0; i < sSpawnBy; i++) {
392 int idx; /* NCBI_FAKE_WARNING: GCC */
393 {{
394 CFastMutexGuard spawn_guard(s_GlobalLock);
395 if (s_NextIndex >= sNumThreads) {
396 break;
397 }
398 idx = s_NextIndex;
399 s_NextIndex++;
400 }}
401 thr[idx] = new CTestThread(idx, m_Tls, m_RW, m_Res);
402 assert(states[idx] == eCreated);
403 thr[idx]->Run();
404 {{
405 CFastMutexGuard guard(s_GlobalLock);
406 NcbiCout << idx << " ";
407 }}
408 }
409
410 // ======= CTls test =======
411 // Verify TLS - current value must be 5
412 assert(*m_Tls->GetValue() == 5);
413 for (int i=0; i<5; i++) {
414 stored_value = new int;
415 assert(stored_value != 0);
416 *stored_value = *m_Tls->GetValue()+1;
417 m_Tls->SetValue(stored_value,
418 TestTlsCleanup, &m_CheckValue);
419 assert(*stored_value == m_CheckValue+1);
420 }
421
422 // ======= CStaticTls test =======
423 {
424 m_RW->ReadLock();
425 test_static_tlss<1, 10>(m_Index);
426 test_static_tlss<1, 10>(m_Index, false);
427 m_RW->Unlock();
428 }
429
430 // ======= CRWLock test =======
431 static int s_var = 0; // global value
432 int lock_success = 0;
433 int lock_fail = 0;
434 if (m_Index % 5) { // <------ Reader
435 for (int r_cycle=0; r_cycle<sRCycles*2; r_cycle++) {
436 // Allow writers to obtain lock.
437 delay(100);
438 // Lock immediately or wait for locking
439 bool locked = false;
440 if ( r_cycle % 3 ) {
441 locked = m_RW->TryReadLock();
442 }
443 else {
444 locked = m_RW->TryReadLock(CTimeout(0, 3000));
445 if ( locked ) {
446 lock_success++;
447 }
448 else {
449 lock_fail++;
450 }
451 }
452 if ( !locked ) {
453 m_RW->ReadLock();
454 }
455
456 int l_var = s_var; // must remain the same while R-locked
457
458 m_Res->BeginRead(m_Index);
459 assert(l_var == s_var);
460
461 // Cascaded R-lock
462 if (r_cycle % 6 == 0) {
463 m_RW->ReadLock();
464 m_Res->BeginRead(m_Index);
465 }
466 assert(l_var == s_var);
467
468 // Cascaded R-lock must be allowed
469 if (r_cycle % 12 == 0) {
470 assert(m_RW->TryReadLock());
471 m_Res->BeginRead(m_Index);
472 m_Res->EndRead(m_Index);
473 m_RW->Unlock();
474 }
475
476 // W-after-R must be prohibited
477 assert( !m_RW->TryWriteLock() );
478
479 delay(10);
480
481 // ======= CTls test =======
482 // Verify TLS - current value must be 10
483 assert(*m_Tls->GetValue() == 10);
484
485 assert(l_var == s_var);
486
487 // Cascaded R-lock must be allowed
488 if (r_cycle % 7 == 0) {
489 assert(m_RW->TryReadLock());
490 m_Res->BeginRead(m_Index);
491 m_Res->EndRead(m_Index);
492 m_RW->Unlock();
493 }
494 assert(l_var == s_var);
495
496 if (r_cycle % 6 == 0) {
497 m_Res->EndRead(m_Index);
498 m_RW->Unlock();
499 }
500 assert(l_var == s_var);
501
502 m_Res->EndRead(m_Index);
503 m_RW->Unlock();
504 }
505 }
506 else { // <------ Writer
507 for (int w_cycle=0; w_cycle<sWCycles; w_cycle++) {
508 // Lock immediately or wait for locking
509 bool locked = false;
510 if ( w_cycle % 3 ) {
511 locked = m_RW->TryWriteLock();
512 }
513 else {
514 locked = m_RW->TryWriteLock(CTimeout(0, 3000));
515 if ( locked ) {
516 lock_success++;
517 }
518 else {
519 lock_fail++;
520 }
521 }
522 if ( !locked ) {
523 m_RW->WriteLock();
524 }
525 m_Res->BeginWrite(m_Index);
526 // Allow other threads to try-lock with timeout.
527 SleepMilliSec(10);
528
529 // Cascaded W-lock
530 if (w_cycle % 4 == 0) {
531 m_RW->WriteLock();
532 m_Res->BeginWrite(m_Index);
533 }
534
535 // R-after-W (treated as cascaded W-lock)
536 if (w_cycle % 6 == 0) {
537 m_RW->ReadLock();
538 m_Res->BeginRead(m_Index);
539 }
540
541 // Cascaded W-lock must be allowed
542 if (w_cycle % 8 == 0) {
543 assert(m_RW->TryWriteLock());
544 m_Res->BeginWrite(m_Index);
545 }
546
547 // Cascaded R-lock must be allowed
548 if (w_cycle % 10 == 0) {
549 assert(m_RW->TryReadLock());
550 m_Res->BeginRead(m_Index);
551 }
552
553 // ======= CTls test =======
554 // Verify TLS - current value must be 10
555 assert(*m_Tls->GetValue() == 10);
556
557 // Continue CRWLock test
558 for (int i=0; i<7; i++) {
559 delay(1);
560 s_var++;
561 }
562
563 if (w_cycle % 4 == 0) {
564 m_Res->EndWrite(m_Index);
565 m_RW->Unlock();
566 }
567 if (w_cycle % 6 == 0) {
568 m_Res->EndRead(m_Index);
569 m_RW->Unlock();
570 }
571 if (w_cycle % 8 == 0) {
572 m_Res->EndWrite(m_Index);
573 m_RW->Unlock();
574 }
575 if (w_cycle % 10 == 0) {
576 m_Res->EndRead(m_Index);
577 m_RW->Unlock();
578 }
579 m_Res->EndWrite(m_Index);
580 m_RW->Unlock();
581 delay(1);
582 }
583 }
584
585 // ======= CTls test =======
586 // Verify TLS - current value must be 10
587 assert(*m_Tls->GetValue() == 10);
588 for (int i=0; i<5; i++) {
589 stored_value = new int;
590 assert(stored_value != 0);
591 *stored_value = *m_Tls->GetValue()+1;
592 m_Tls->SetValue(stored_value,
593 TestTlsCleanup, &m_CheckValue);
594 assert(*stored_value == m_CheckValue+1);
595 }
596 assert(*m_Tls->GetValue() == 15);
597 assert(m_CheckValue == 14);
598
599 // ======= CThread::Detach() and CThread::Join() test =======
600 if (m_Index % 2 == 0) {
601 CMutexGuard exit_guard(*exit_locks[m_Index]);
602 delay(10); // Provide delay for join-before-exit
603 }
604
605 #ifndef USE_NATIVE_THREADS
606 if (m_Index % 3 == 0)
607 {
608 // Never verified, since CThread::Exit() terminates the thread
609 // inside Test_CThreadExit().
610 assert(Test_CThreadExit());
611 }
612 #endif
613
614 return reinterpret_cast<void*>(-1);
615 }
616
617
618 /////////////////////////////////////////////////////////////////////////////
619 // Use native threads rather than CThread to run the tests.
620
621 #ifdef USE_NATIVE_THREADS
622
Join(void ** result)623 void CTestThread::Join(void** result)
624 {
625 // Join (wait for) and destroy
626 bool valid = m_IsValid.Add(-1) == 0;
627 if (valid) {
628 #if defined(NCBI_WIN32_THREADS)
629 _ASSERT(WaitForSingleObject(m_Handle, INFINITE) == WAIT_OBJECT_0);
630 DWORD status;
631 _ASSERT(GetExitCodeThread(m_Handle, &status));
632 _ASSERT(status != DWORD(STILL_ACTIVE));
633 _ASSERT(CloseHandle(m_Handle));
634 #elif defined(NCBI_POSIX_THREADS)
635 _ASSERT(pthread_join(m_Handle, 0) == 0);
636 #endif
637 }
638 if (result) {
639 *result = this;
640 }
641 m_SelfRef.Reset();
642 }
643
644
Detach(void)645 void CTestThread::Detach(void)
646 {
647 // The second who increments this can delete the object
648 bool valid = m_IsValid.Add(-1) == 0;
649 if (valid) {
650 #if defined(NCBI_WIN32_THREADS)
651 _ASSERT(CloseHandle(m_Handle));
652 #elif defined(NCBI_POSIX_THREADS)
653 _ASSERT(pthread_detach(m_Handle) == 0);
654 #endif
655 }
656 m_SelfRef.Reset();
657 }
658
659
Discard(void)660 void CTestThread::Discard(void)
661 {
662 m_SelfRef.Reset();
663 return;
664 }
665
666
NativeWrapperCallerImpl(TWrapperArg arg)667 void NativeWrapperCallerImpl(TWrapperArg arg)
668 {
669 CRef<CTestThread> thread_obj(static_cast<CTestThread*>(arg));
670 thread_obj->Main();
671 thread_obj->OnExit();
672 CUsedTlsBases::GetUsedTlsBases().ClearAll();
673 }
674
675
676 #if defined(NCBI_WIN32_THREADS)
677 extern "C" {
678 typedef TWrapperRes (WINAPI *FSystemWrapper)(TWrapperArg);
679
NativeWrapperCaller(TWrapperArg arg)680 static TWrapperRes WINAPI NativeWrapperCaller(TWrapperArg arg) {
681 NativeWrapperCallerImpl(arg);
682 return 0;
683 }
684 }
685 #elif defined(NCBI_POSIX_THREADS)
686 extern "C" {
687 typedef TWrapperRes (*FSystemWrapper)(TWrapperArg);
688
NativeWrapperCaller(TWrapperArg arg)689 static TWrapperRes NativeWrapperCaller(TWrapperArg arg) {
690 NativeWrapperCallerImpl(arg);
691 return 0;
692 }
693 }
694 #endif
695
696
Run(CThread::TRunMode flags)697 bool CTestThread::Run(CThread::TRunMode flags)
698 {
699 // Run as the platform native thread rather than CThread
700 // Not all functionality will work in this mode. E.g. TLS
701 // cleanup can not be done automatically.
702 #if defined(NCBI_WIN32_THREADS)
703 // We need this parameter in WinNT - can not use NULL instead!
704 DWORD thread_id;
705 // Suspend thread to adjust its priority
706 DWORD creation_flags = (flags & CThread::fRunNice) == 0 ? 0 : CREATE_SUSPENDED;
707 m_Handle = CreateThread(NULL, 0, NativeWrapperCaller,
708 this, creation_flags, &thread_id);
709 _ASSERT(m_Handle != NULL);
710 m_IsValid.Set(1);
711 if (flags & CThread::fRunNice) {
712 // Adjust priority and resume the thread
713 SetThreadPriority(m_Handle, THREAD_PRIORITY_BELOW_NORMAL);
714 ResumeThread(m_Handle);
715 }
716 if ((flags & CThread::fRunDetached) != 0) {
717 CloseHandle(m_Handle);
718 m_IsValid.Set(0);
719 }
720 else {
721 // duplicate handle to adjust security attributes
722 HANDLE oldHandle = m_Handle;
723 _ASSERT(DuplicateHandle(GetCurrentProcess(), oldHandle,
724 GetCurrentProcess(), &m_Handle,
725 0, FALSE, DUPLICATE_SAME_ACCESS));
726 _ASSERT(CloseHandle(oldHandle));
727 }
728 #elif defined(NCBI_POSIX_THREADS)
729 pthread_attr_t attr;
730 _ASSERT(pthread_attr_init (&attr) == 0);
731 if ( ! (flags & CThread::fRunUnbound) ) {
732 #if defined(NCBI_OS_BSD) || defined(NCBI_OS_CYGWIN) || defined(NCBI_OS_IRIX)
733 _ASSERT(pthread_attr_setscope(&attr,
734 PTHREAD_SCOPE_PROCESS) == 0);
735 #else
736 _ASSERT(pthread_attr_setscope(&attr,
737 PTHREAD_SCOPE_SYSTEM) == 0);
738 #endif
739 }
740 if ( flags & CThread::fRunDetached ) {
741 _ASSERT(pthread_attr_setdetachstate(&attr,
742 PTHREAD_CREATE_DETACHED) == 0);
743 }
744 _ASSERT(pthread_create(&m_Handle, &attr,
745 NativeWrapperCaller, this) == 0);
746
747 _ASSERT(pthread_attr_destroy(&attr) == 0);
748 m_IsValid.Set(1);
749 #else
750 if (flags & fRunAllowST) {
751 Wrapper(this);
752 }
753 else {
754 _ASSERT(0);
755 }
756 #endif
757 return true;
758 }
759
760
761 /////////////////////////////////////////////////////////////////////////////
762 // Test application
763
764 #endif // USE_NATIVE_THREADS
765
766
767 class CThreadedApp : public CNcbiApplication
768 {
769 void Init(void);
770 int Run(void);
771 };
772
773
Init(void)774 void CThreadedApp::Init(void)
775 {
776 // Prepare command line descriptions
777 unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
778
779 // sNumThreads
780 arg_desc->AddDefaultKey
781 ("threads", "NumThreads",
782 "Total number of threads to create and run",
783 CArgDescriptions::eInteger, NStr::IntToString(sNumThreads));
784 arg_desc->SetConstraint
785 ("threads", new CArgAllow_Integers(cNumThreadsMin, cNumThreadsMax));
786
787 // sSpawnBy
788 arg_desc->AddDefaultKey
789 ("spawnby", "SpawnBy",
790 "Threads spawning factor",
791 CArgDescriptions::eInteger, NStr::IntToString(sSpawnBy));
792 arg_desc->SetConstraint
793 ("spawnby", new CArgAllow_Integers(cSpawnByMin, cSpawnByMax));
794
795 // sNumRCycles
796 arg_desc->AddDefaultKey
797 ("rcycles", "RCycles",
798 "Number of read cycles by each reader thread",
799 CArgDescriptions::eInteger, NStr::IntToString(sRCycles));
800 arg_desc->SetConstraint
801 ("rcycles", new CArgAllow_Integers(cRCyclesMin, cRCyclesMax));
802
803 // sNumWCycles
804 arg_desc->AddDefaultKey
805 ("wcycles", "WCycles",
806 "Number of write cycles by each writer thread",
807 CArgDescriptions::eInteger, NStr::IntToString(sWCycles));
808 arg_desc->SetConstraint
809 ("wcycles", new CArgAllow_Integers(cWCyclesMin, cWCyclesMax));
810
811 arg_desc->AddFlag("favorwriters",
812 "Operate the RW-lock in fFavorWriters mode.");
813
814 string prog_description =
815 "This is a program testing thread, TLS, mutex and RW-lock classes.";
816 arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
817 prog_description, false);
818
819 SetupArgDescriptions(arg_desc.release());
820 }
821
822
Run(void)823 int CThreadedApp::Run(void)
824 {
825 # if defined(NCBI_POSIX_THREADS)
826 NcbiCout << "OS: Unix" << NcbiEndl;
827 # elif defined(NCBI_WIN32_THREADS)
828 NcbiCout << "OS: MS-Windows" << NcbiEndl;
829 # else
830 NcbiCout << "OS: unknown" << NcbiEndl;
831 # endif
832
833 // Process command line
834 const CArgs& args = GetArgs();
835
836 sNumThreads = args["threads"].AsInteger();
837 sSpawnBy = args["spawnby"].AsInteger();
838 sRCycles = args["rcycles"].AsInteger();
839 sWCycles = args["wcycles"].AsInteger();
840
841 CRWLock::TFlags rwflags = 0;
842 if (args["favorwriters"]) {
843 rwflags |= CRWLock::fFavorWriters;
844 }
845
846 NcbiCout << "Test parameters:" << NcbiEndl;
847 NcbiCout << "\tTotal threads " << sNumThreads << NcbiEndl;
848 NcbiCout << "\tSpawn threads by " << sSpawnBy << NcbiEndl;
849 NcbiCout << "\tR-cycles " << sRCycles << NcbiEndl;
850 NcbiCout << "\tW-cycles " << sWCycles << NcbiEndl;
851 NcbiCout << "\tRW-lock flags 0x" << NStr::IntToString(rwflags, 0, 16)
852 << NcbiEndl;
853 #ifdef USE_NATIVE_THREADS
854 NcbiCout << "\tUsing native threads" << NcbiEndl;
855 #endif
856 NcbiCout << NcbiEndl;
857
858 // Redirect error log to hide messages sent by delay()
859 //SetDiagStream(0);
860
861 // Test CBaseTls::Discard()
862 NcbiCout << "Creating/discarding TLS test...";
863 int main_cleanup_flag = 0;
864 CTls<int>* dummy_tls = new CTls<int>;
865 assert(main_cleanup_flag == 0);
866 dummy_tls->SetValue(&main_cleanup_flag, Main_Thread_Tls_Cleanup);
867 assert(main_cleanup_flag == 0);
868 dummy_tls->Discard();
869 assert(main_cleanup_flag == 1);
870 NcbiCout << " Passed" << NcbiEndl << NcbiEndl;
871
872 // Create test objects
873 main_cleanup_flag = 0;
874 CRef< CTls<int> > tls(new CTls<int>);
875 tls->SetValue(0);
876 CRWLock rw(rwflags);
877 CSharedResource res;
878 {{
879 CFastMutexGuard guard(s_GlobalLock);
880 NcbiCout << "===== Starting threads =====" << NcbiEndl;
881 }}
882
883 // Prepare exit-locks, lock each 2nd mutex
884 // They will be used by join/detach before exit
885 for (int i=0; i<sNumThreads; i++) {
886 states[i] = eNull;
887 exit_locks[i] = new CMutex;
888 if (i % 2 == 0) {
889 exit_locks[i]->Lock();
890 }
891 }
892
893 // Prepare stop with RWLock
894 rw.WriteLock();
895 if ( 0 ) { // pre-initialize static tlss
896 test_static_tlss<1, 10>(-1);
897 test_static_tlss<1, 10>(-1, false);
898 }
899
900 // Create and run threads
901 for (int i = 0; i < sSpawnBy; i++) {
902 int idx; /* NCBI_FAKE_WARNING: GCC */
903 {{
904 CFastMutexGuard spawn_guard(s_GlobalLock);
905 if (s_NextIndex >= sNumThreads) {
906 break;
907 }
908 idx = s_NextIndex;
909 s_NextIndex++;
910 }}
911
912 // Check Discard() for some threads
913 if (i % 2 == 0) {
914 thr[idx] = new CTestThread(idx, tls, &rw, &res);
915 assert(states[idx] == eCreated);
916 thr[idx]->Discard();
917 assert(states[idx] == eDestroyed);
918 }
919
920 thr[idx] = new CTestThread(idx, tls, &rw, &res);
921 assert(states[idx] == eCreated);
922 thr[idx]->Run();
923 {{
924 CFastMutexGuard guard(s_GlobalLock);
925 NcbiCout << idx << " ";
926 }}
927 }
928
929 // Wait for all threads running
930 {{
931 CFastMutexGuard guard(s_GlobalLock);
932 NcbiCout << NcbiEndl << NcbiEndl <<
933 "===== Waiting for all threads to run =====" << NcbiEndl;
934 }}
935 map<int, bool> thread_map;
936 for (int i=0; i<sNumThreads; i++) {
937 thread_map[i] = false;
938 }
939 int ready = 0;
940 while (ready < sNumThreads) {
941 ready = 0;
942 for (int i=0; i<sNumThreads; i++) {
943 if (states[i] >= eRunning) {
944 ready++;
945 if ( !thread_map[i] ) {
946 CFastMutexGuard guard(s_GlobalLock);
947 NcbiCout << i << " ";
948 thread_map[i] = true;
949 }
950 }
951 }
952 }
953
954 // pause and release RWLock
955 {{
956 CFastMutexGuard guard(s_GlobalLock);
957 NcbiCout << NcbiEndl << NcbiEndl <<
958 "===== Releasing threads run after a pause ====="
959 << NcbiEndl;
960 }}
961 SleepSec(1);
962 rw.Unlock();
963
964 {{
965 CFastMutexGuard guard(s_GlobalLock);
966 NcbiCout << NcbiEndl << NcbiEndl <<
967 "===== Joining threads before exit =====" << NcbiEndl;
968 }}
969 for (int i=0; i<sNumThreads; i++) {
970 // Try to join before exit
971 if (i % 4 == 2) {
972 assert(states[i] < eTerminated);
973 exit_locks[i]->Unlock();
974 void* exit_data = 0;
975 thr[i]->Join(&exit_data);
976 // Must be set to 1 by Main()
977 assert(exit_data != 0);
978 {{
979 CFastMutexGuard guard(s_GlobalLock);
980 NcbiCout << i << " ";
981 }}
982 }
983 }
984
985 // Reset CRef to the test tls. One more CRef to the object
986 // must have been stored in the CThread's m_UsedTlsSet.
987 assert(main_cleanup_flag == 0);
988 tls.Reset();
989 assert(main_cleanup_flag == 0);
990
991 {{
992 CFastMutexGuard guard(s_GlobalLock);
993 NcbiCout << NcbiEndl << NcbiEndl <<
994 "===== Detaching threads before exit =====" << NcbiEndl;
995 }}
996 for (int i=0; i<sNumThreads; i++) {
997 // Detach before exit
998 if (i % 4 == 0) {
999 assert(states[i] < eTerminated);
1000 thr[i]->Detach();
1001 exit_locks[i]->Unlock();
1002 {{
1003 CFastMutexGuard guard(s_GlobalLock);
1004 NcbiCout << i << " ";
1005 }}
1006 }
1007 }
1008
1009 // Wait for all threads to exit
1010 delay(10);
1011
1012 {{
1013 CFastMutexGuard guard(s_GlobalLock);
1014 NcbiCout << NcbiEndl << NcbiEndl <<
1015 "===== Detaching threads after exit =====" << NcbiEndl;
1016 }}
1017 for (int i=0; i<sNumThreads; i++) {
1018 // Detach after exit
1019 if (i % 4 == 1) {
1020 assert(states[i] != eDestroyed);
1021 thr[i]->Detach();
1022 {{
1023 CFastMutexGuard guard(s_GlobalLock);
1024 NcbiCout << i << " ";
1025 }}
1026 }
1027 }
1028
1029 {{
1030 CFastMutexGuard guard(s_GlobalLock);
1031 NcbiCout << NcbiEndl << NcbiEndl <<
1032 "===== Joining threads after exit =====" << NcbiEndl;
1033 }}
1034 for (int i=0; i<sNumThreads; i++) {
1035 // Join after exit
1036 if (i % 4 == 3) {
1037 assert(states[i] != eDestroyed);
1038 thr[i]->Join();
1039 {{
1040 CFastMutexGuard guard(s_GlobalLock);
1041 NcbiCout << i << " ";
1042 }}
1043 }
1044 }
1045
1046 // Wait for all threads to be destroyed
1047 {{
1048 CFastMutexGuard guard(s_GlobalLock);
1049 NcbiCout << NcbiEndl << NcbiEndl <<
1050 "===== Waiting for all threads to be destroyed =====" << NcbiEndl;
1051 }}
1052 for (int i=0; i<sNumThreads; i++) {
1053 thread_map[i] = false;
1054 }
1055 ready = 0;
1056 while (ready < sNumThreads) {
1057 ready = 0;
1058 for (int i=0; i<sNumThreads; i++) {
1059 if (states[i] == eDestroyed) {
1060 ready++;
1061 if ( !thread_map[i] ) {
1062 CFastMutexGuard guard(s_GlobalLock);
1063 NcbiCout << i << " ";
1064 thread_map[i] = true;
1065 delete exit_locks[i];
1066 }
1067 }
1068 else {
1069 // Provide a possibility for the child thread to run
1070 CMutexGuard exit_guard(*exit_locks[i]);
1071 }
1072 }
1073 }
1074
1075 {{
1076 CFastMutexGuard guard(s_GlobalLock);
1077 NcbiCout << NcbiEndl << NcbiEndl << "Test passed" << NcbiEndl;
1078 }}
1079
1080 return 0;
1081 }
1082
1083
1084 /////////////////////////////////////////////////////////////////////////////
1085 // MAIN
1086
1087 #if 1
main(int argc,const char * argv[])1088 int main(int argc, const char* argv[])
1089 {
1090 CThread::InitializeMainThreadId();
1091 return CThreadedApp().AppMain(argc, argv);
1092 }
1093 #else
1094
1095 // the following is a simple speed test
1096
1097 const int max_count = 100000;
1098 //const int max_count = 1;
1099
1100 #if 1 //-------------------------------------
1101
1102 typedef double PERF_COUNTER;
1103 #define START_PERF_COUNTER(b) b = sw.Restart()
1104 #define STOP_PERF_COUNTER(e) e = sw.Elapsed()
1105
PrintTime(PERF_COUNTER t,const string & str)1106 void PrintTime(PERF_COUNTER t, const string& str)
1107 {
1108 cout << str << t * 1000000 / max_count << " usec" << endl;
1109 }
1110
1111 #else //-------------------------------------
1112
1113 #ifdef NCBI_COMPILER_MSVC
QUERY_PERF_COUNTER(void)1114 inline Uint8 QUERY_PERF_COUNTER(void) {
1115 LARGE_INTEGER p;
1116 return QueryPerformanceCounter(&p) ? p.QuadPart : 0;
1117 }
1118 #elif defined(NCBI_OS_DARWIN)
1119 #include <mach/mach_time.h>
QUERY_PERF_COUNTER(void)1120 inline Uint8 QUERY_PERF_COUNTER(void) {
1121 return mach_absolute_time();
1122 }
1123 #else
QUERY_PERF_COUNTER(void)1124 inline Uint8 QUERY_PERF_COUNTER(void) {
1125 timespec p;
1126 clock_gettime(CLOCK_REALTIME, &p);
1127 return p.tv_sec * 1000 * 1000 + p.tv_nsec / 1000;
1128 }
1129 #endif
1130
1131 typedef Uint8 PERF_COUNTER;
1132 #define START_PERF_COUNTER(b) b = QUERY_PERF_COUNTER()
1133 #define STOP_PERF_COUNTER(e) e = (QUERY_PERF_COUNTER() - b)
1134
PrintTime(PERF_COUNTER t,const string & str)1135 void PrintTime(PERF_COUNTER t, const string& str)
1136 {
1137 cout << str << t << endl;
1138 }
1139
1140 #endif //-------------------------------------
1141
1142
main(int argc,const char * argv[])1143 int main(int argc, const char* argv[])
1144 {
1145 {
1146 CRWLock rw;
1147 bool locked = rw.TryReadLock(CTimeout(0, 3000));
1148 if (locked) {
1149 rw.Unlock();
1150 }
1151 // return 0;
1152 }
1153
1154 CStopWatch sw(CStopWatch::eStart);
1155 PERF_COUNTER t, b;
1156
1157 cout << "NCBI C++ Toolkit:" << endl;
1158 CFastMutex fm;
1159 START_PERF_COUNTER(b);
1160 for (int i = 0; i < max_count; ++i) {
1161 fm.Lock(); fm.Unlock();
1162 }
1163 STOP_PERF_COUNTER(t);
1164 PrintTime(t, "CFastMutex: ");
1165
1166 CMutex m;
1167 START_PERF_COUNTER(b);
1168 for (int i = 0; i < max_count; ++i) {
1169 m.Lock(); m.Unlock();
1170 }
1171 STOP_PERF_COUNTER(t);
1172 PrintTime(t, "CMutex: ");
1173
1174 CAtomicCounter ac; ac.Set(0);
1175 START_PERF_COUNTER(b);
1176 for (int i = 0; i < max_count; ++i) {
1177 ac.Add(1); ac.Add(-1);
1178 }
1179 STOP_PERF_COUNTER(t);
1180 PrintTime(t, "CAtomicCounter: ");
1181
1182 CSemaphore sema(0, 1);
1183 START_PERF_COUNTER(b);
1184 for (int i = 0; i < max_count; ++i) {
1185 sema.Post(); sema.Wait();
1186 }
1187 STOP_PERF_COUNTER(t);
1188 PrintTime(t, "CSemaphore: ");
1189
1190 CRWLock rw;
1191 START_PERF_COUNTER(b);
1192 for (int i = 0; i < max_count; ++i) {
1193 rw.ReadLock(); rw.Unlock();
1194 }
1195 STOP_PERF_COUNTER(t);
1196 PrintTime(t, "CRWLock::ReadLock(): ");
1197
1198 START_PERF_COUNTER(b);
1199 for (int i = 0; i < max_count; ++i) {
1200 rw.WriteLock(); rw.Unlock();
1201 }
1202 STOP_PERF_COUNTER(t);
1203 PrintTime(t, "CRWLock::WriteLock(): ");
1204
1205 CRWLock rwf(CRWLock::fFavorWriters);
1206 START_PERF_COUNTER(b);
1207 for (int i = 0; i < max_count; ++i) {
1208 rwf.ReadLock(); rwf.Unlock();
1209 }
1210 STOP_PERF_COUNTER(t);
1211 PrintTime(t, "CRWLock(fFavorWriters)::ReadLock(): ");
1212
1213 START_PERF_COUNTER(b);
1214 for (int i = 0; i < max_count; ++i) {
1215 rwf.WriteLock(); rwf.Unlock();
1216 }
1217 STOP_PERF_COUNTER(t);
1218 PrintTime(t, "CRWLock(fFavorWriters)::WriteLock(): ");
1219
1220 CFastRWLock frw;
1221 START_PERF_COUNTER(b);
1222 for (int i = 0; i < max_count; ++i) {
1223 frw.ReadLock(); frw.ReadUnlock();
1224 }
1225 STOP_PERF_COUNTER(t);
1226 PrintTime(t, "CFastRWLock::ReadLock(): ");
1227
1228 START_PERF_COUNTER(b);
1229 for (int i = 0; i < max_count; ++i) {
1230 frw.WriteLock(); frw.WriteUnlock();
1231 }
1232 STOP_PERF_COUNTER(t);
1233 PrintTime(t, "CFastRWLock::WriteLock(): ");
1234
1235 #ifdef NCBI_COMPILER_MSVC
1236 cout << endl << "System objects:" << endl;
1237 CRITICAL_SECTION cs;
1238 InitializeCriticalSection(&cs);
1239 START_PERF_COUNTER(b);
1240 for (int i = 0; i < max_count; ++i) {
1241 EnterCriticalSection(&cs); LeaveCriticalSection(&cs);
1242 }
1243 STOP_PERF_COUNTER(t);
1244 DeleteCriticalSection(&cs);
1245 PrintTime(t, "CRITICAL_SECTION: ");
1246
1247 HANDLE mh = CreateMutex(NULL, FALSE, NULL);
1248 START_PERF_COUNTER(b);
1249 for (int i = 0; i < max_count; ++i) {
1250 WaitForSingleObject(mh, INFINITE); ReleaseMutex(mh);
1251 }
1252 STOP_PERF_COUNTER(t);
1253 CloseHandle(mh);
1254 PrintTime(t, "Mutex: ");
1255
1256 SRWLOCK swr;
1257 InitializeSRWLock(&swr);
1258 START_PERF_COUNTER(b);
1259 for (int i = 0; i < max_count; ++i) {
1260 AcquireSRWLockShared(&swr); ReleaseSRWLockShared(&swr);
1261 }
1262 STOP_PERF_COUNTER(t);
1263 PrintTime(t, "SRWLOCK Shared: ");
1264
1265 START_PERF_COUNTER(b);
1266 for (int i = 0; i < max_count; ++i) {
1267 AcquireSRWLockExclusive(&swr); ReleaseSRWLockExclusive(&swr);
1268 }
1269 STOP_PERF_COUNTER(t);
1270 PrintTime(t, "SRWLOCK Exclusive: ");
1271
1272 cout << "WaitForMultipleObjects():" << endl;
1273 for (int mmh_count = 2; mmh_count < 5; ++mmh_count) {
1274 HANDLE* mmh = new HANDLE[mmh_count];
1275 for (int mi = 0; mi < mmh_count; ++mi) {
1276 mmh[mi] = CreateMutex(NULL, FALSE, NULL);
1277 }
1278 START_PERF_COUNTER(b);
1279 for (int i = 0; i < max_count; ++i) {
1280 WaitForMultipleObjects(mmh_count, mmh, TRUE, INFINITE);
1281 for (int mi = 0; mi < mmh_count; ++mi) {
1282 ReleaseMutex(mmh[mi]);
1283 }
1284 }
1285 STOP_PERF_COUNTER(t);
1286 for (int mi = 0; mi < mmh_count; ++mi) {
1287 CloseHandle(mmh[mi]);
1288 }
1289 delete[] mmh;
1290 PrintTime(t, "Mutex x " + NStr::NumericToString(mmh_count) + ": ");
1291 }
1292 #endif
1293
1294 #if 0
1295 cout << endl << "STL:" << endl;
1296 shared_mutex shm;
1297 sw.Restart();
1298 int c = 0;
1299 for (int i = 0; i < max_count; ++i) {
1300 shared_lock<shared_mutex> lock(shm);
1301 c++;
1302 }
1303 t = sw.Elapsed();
1304 PrintTime(t, "shared_lock: ");
1305
1306 sw.Restart();
1307 for (int i = 0; i < max_count; ++i) {
1308 unique_lock<shared_mutex> lock(shm);
1309 c--;
1310 }
1311 t = sw.Elapsed();
1312 PrintTime(t, "unique_lock: ");
1313 #endif
1314
1315 return 0;
1316 }
1317 #endif
1318