1 /* $Id: test_mt.cpp 604988 2020-04-06 14:05:26Z gouriano $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Aleksey Grichenko
27 *
28 * File Description:
29 * Wrapper for testing modules in MT environment
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34 #include <corelib/test_mt.hpp>
35 #include <corelib/ncbimtx.hpp>
36 #include <corelib/ncbi_system.hpp>
37 #include <corelib/ncbi_param.hpp>
38 #include <math.h>
39 #include <common/test_assert.h> /* This header must go last */
40
41
42 BEGIN_NCBI_SCOPE
43
44 // Uncomment the definition to use platform native threads rather
45 // than CThread.
46 //#define USE_NATIVE_THREADS
47
48 DEFINE_STATIC_FAST_MUTEX(s_GlobalLock);
49 static CThreadedApp* s_Application;
50
51 // Default values
52 unsigned int s_NumThreads = 34;
53 int s_SpawnBy = 6;
54
55 // Next test thread index
56 static volatile unsigned int s_NextIndex = 0;
57
58 #define TESTAPP_LOG_POST(x) do { ++m_LogMsgCount; LOG_POST(x); } while (0)
59
60 /////////////////////////////////////////////////////////////////////////////
61 // Randomization parameters
62
63 // if (rand() % 100 < threshold) then use cascading threads
64 NCBI_PARAM_DECL(unsigned int, TEST_MT, Cascading);
65 NCBI_PARAM_DEF(unsigned int, TEST_MT, Cascading, 25);
66
67 // calculate # of thread groups
s_GroupsCount(void)68 static string s_GroupsCount(void)
69 {
70 return NStr::UIntToString( (unsigned int)sqrt((double)s_NumThreads));
71 }
72 NCBI_PARAM_DECL(string, TEST_MT, GroupsCount);
73 NCBI_PARAM_DEF_WITH_INIT(string, TEST_MT, GroupsCount, "", s_GroupsCount);
74
75 // group.has_sync_point = (rand() % 100) < threshold;
76 NCBI_PARAM_DECL(unsigned int, TEST_MT, IntragroupSyncPoint);
77 NCBI_PARAM_DEF(unsigned int, TEST_MT, IntragroupSyncPoint, 75);
78
79 /////////////////////////////////////////////////////////////////////////////
80 // Test thread
81 //
82
83 class CTestThread : public CThread
84 {
85 public:
86 static void StartCascadingThreads(void);
87
88 CTestThread(int id);
SyncPoint(void)89 virtual void SyncPoint(void) {};
90 virtual void GlobalSyncPoint(void);
91
92 protected:
93 ~CTestThread(void);
94 virtual void* Main(void);
95 virtual void OnExit(void);
96
97 int m_Idx;
98
99 #ifdef USE_NATIVE_THREADS
100 TThreadHandle m_Handle;
101
102 public:
103 void RunNative(void);
104 void JoinNative(void** result);
105
106 friend TWrapperRes NativeWrapper(TWrapperArg arg);
107 #endif
108 };
109
110
111 static CSemaphore s_Semaphore(0, INT_MAX); /* For GlobalSyncPoint()*/
112 static CAtomicCounter s_SyncCounter; /* For GlobalSyncPoint()*/
113 static CAtomicCounter s_NumberOfThreads; /* For GlobalSyncPoint()*/
114
115
CTestThread(int idx)116 CTestThread::CTestThread(int idx)
117 : m_Idx(idx)
118 {
119 /* We want to know total number of threads, and the easiest way is to make
120 them register themselves */
121 s_NumberOfThreads.Add(1);
122 if ( s_Application != 0 )
123 assert(s_Application->Thread_Init(m_Idx));
124 }
125
126
~CTestThread(void)127 CTestThread::~CTestThread(void)
128 {
129 s_NumberOfThreads.Add(-1);
130 assert(s_NumberOfThreads.Get() >= 0);
131 if ( s_Application != 0 )
132 assert(s_Application->Thread_Destroy(m_Idx));
133 }
134
135
OnExit(void)136 void CTestThread::OnExit(void)
137 {
138 if ( s_Application != 0 )
139 assert(s_Application->Thread_Exit(m_Idx));
140 }
141
142
GlobalSyncPoint(void)143 void CTestThread::GlobalSyncPoint(void)
144 {
145 /* Semaphore is supposed to have zero value when threads come here,
146 so Wait() causes stop */
147 if (s_SyncCounter.Add(1) != s_NumberOfThreads.Get()) {
148 s_Semaphore.Wait();
149 return;
150 }
151 /* If we are the last thread to come to sync point, we yield
152 so that threads that were waiting for us go first */
153 if (s_NumberOfThreads.Get() > 1) {
154 s_Semaphore.Post((unsigned int)s_NumberOfThreads.Get() - 1);
155 s_SyncCounter.Set(0);
156 SleepMilliSec(0);
157 }
158 }
159
160
161 #ifdef USE_NATIVE_THREADS
162
NativeWrapper(TWrapperArg arg)163 TWrapperRes NativeWrapper(TWrapperArg arg)
164 {
165 CTestThread* thread_obj = static_cast<CTestThread*>(arg);
166 thread_obj->Main();
167 return 0;
168 }
169
170
171 #if defined(NCBI_POSIX_THREADS)
172 extern "C" {
173 typedef TWrapperRes (*FSystemWrapper)(TWrapperArg);
174
NativeWrapperCaller(TWrapperArg arg)175 static TWrapperRes NativeWrapperCaller(TWrapperArg arg) {
176 return NativeWrapper(arg);
177 }
178 }
179 #elif defined(NCBI_WIN32_THREADS)
180 extern "C" {
181 typedef TWrapperRes (WINAPI *FSystemWrapper)(TWrapperArg);
182
NativeWrapperCaller(TWrapperArg arg)183 static TWrapperRes WINAPI NativeWrapperCaller(TWrapperArg arg) {
184 return NativeWrapper(arg);
185 }
186 }
187 #endif
188
189
RunNative(void)190 void CTestThread::RunNative(void)
191 {
192 // Run as the platform native thread rather than CThread
193 // Not all functionality will work in this mode. E.g. TLS
194 // cleanup can not be done automatically.
195 #if defined(NCBI_WIN32_THREADS)
196 // We need this parameter on WinNT - cannot use NULL instead!
197 DWORD thread_id;
198 // Suspend thread to adjust its priority
199 DWORD creation_flags = 0;
200 m_Handle = CreateThread(NULL, 0, NativeWrapperCaller,
201 this, creation_flags, &thread_id);
202 _ASSERT(m_Handle != NULL);
203 // duplicate handle to adjust security attributes
204 HANDLE oldHandle = m_Handle;
205 _ASSERT(DuplicateHandle(GetCurrentProcess(), oldHandle,
206 GetCurrentProcess(), &m_Handle,
207 0, FALSE, DUPLICATE_SAME_ACCESS));
208 _ASSERT(CloseHandle(oldHandle));
209 #elif defined(NCBI_POSIX_THREADS)
210 pthread_attr_t attr;
211 _ASSERT(pthread_attr_init(&attr) == 0);
212 _ASSERT(pthread_create(&m_Handle, &attr,
213 NativeWrapperCaller, this) == 0);
214 _ASSERT(pthread_attr_destroy(&attr) == 0);
215 #else
216 if (flags & fRunAllowST) {
217 Wrapper(this);
218 }
219 else {
220 _ASSERT(0);
221 }
222 #endif
223 }
224
225
JoinNative(void ** result)226 void CTestThread::JoinNative(void** result)
227 {
228 // Join (wait for) and destroy
229 #if defined(NCBI_WIN32_THREADS)
230 _ASSERT(WaitForSingleObject(m_Handle, INFINITE) == WAIT_OBJECT_0);
231 DWORD status;
232 _ASSERT(GetExitCodeThread(m_Handle, &status)
233 && status != DWORD(STILL_ACTIVE));
234 _ASSERT(CloseHandle(m_Handle));
235 m_Handle = NULL;
236 #elif defined(NCBI_POSIX_THREADS)
237 _ASSERT(pthread_join(m_Handle, 0) == 0);
238 #endif
239 *result = this;
240 }
241
242 #endif // USE_NATIVE_THREADS
243
244
245 CRef<CTestThread> thr[k_NumThreadsMax];
246
StartCascadingThreads(void)247 void CTestThread::StartCascadingThreads(void)
248 {
249 int spawn_max;
250 int first_idx;
251 {{
252 CFastMutexGuard spawn_guard(s_GlobalLock);
253 spawn_max = s_NumThreads - s_NextIndex;
254 if (spawn_max > s_SpawnBy) {
255 spawn_max = s_SpawnBy;
256 }
257 first_idx = s_NextIndex;
258 s_NextIndex += s_SpawnBy;
259 }}
260 // Spawn more threads
261 for (int i = first_idx; i < first_idx + spawn_max; i++) {
262 thr[i] = new CTestThread(i);
263 // Allow threads to run even in single thread environment
264 #ifdef USE_NATIVE_THREADS
265 thr[i]->RunNative();
266 #else
267 thr[i]->Run(CThread::fRunAllowST);
268 #endif
269 }
270 }
271
Main(void)272 void* CTestThread::Main(void)
273 {
274 StartCascadingThreads();
275 // Run the test
276 if ( s_Application != 0 && s_Application->Thread_Run(m_Idx) ) {
277 return this;
278 }
279
280 return 0;
281 }
282
283 /////////////////////////////////////////////////////////////////////////////
284 // Thread group
285
286 class CThreadGroup;
287 class CInGroupThread : public CTestThread
288 {
289 public:
290 CInGroupThread(CThreadGroup& group, int id);
291 virtual void SyncPoint(void);
292
293 protected:
294 ~CInGroupThread(void);
295 virtual void* Main(void);
296 CThreadGroup& m_Group;
297 };
298
299 class CThreadGroup : public CObject
300 {
301 public:
302 CThreadGroup(
303 unsigned int number_of_threads,
304 bool has_sync_point);
305 ~CThreadGroup(void);
306
307 void Go(void);
308 void SyncPoint(void);
309
310 void ThreadWait(void);
311 void ThreadComplete(void);
312
313 private:
314 unsigned int m_Number_of_threads;
315 bool m_Has_sync_point;
316 CSemaphore m_Semaphore;
317 CFastMutex m_Mutex;
318 unsigned int m_SyncCounter;
319 };
320
321
322 static CRef<CThreadGroup> thr_group[k_NumThreadsMax];
323 static CStaticTls<int> s_ThreadIdxTLS;
324
CInGroupThread(CThreadGroup & group,int id)325 CInGroupThread::CInGroupThread(CThreadGroup& group, int id)
326 : CTestThread(id), m_Group(group)
327 {
328 }
329
~CInGroupThread(void)330 CInGroupThread::~CInGroupThread(void)
331 {
332 }
333
SyncPoint(void)334 void CInGroupThread::SyncPoint(void)
335 {
336 m_Group.SyncPoint();
337 }
338
339
Main(void)340 void* CInGroupThread::Main(void)
341 {
342 m_Group.ThreadWait();
343 s_ThreadIdxTLS.SetValue(reinterpret_cast<int*>((intptr_t)m_Idx));
344 // Run the test
345 if ( s_Application != 0 && s_Application->Thread_Run(m_Idx) ) {
346 m_Group.ThreadComplete();
347 return this;
348 }
349 return 0;
350 }
351
CThreadGroup(unsigned int number_of_threads,bool has_sync_point)352 CThreadGroup::CThreadGroup(
353 unsigned int number_of_threads,
354 bool has_sync_point)
355 : m_Number_of_threads(number_of_threads), m_Has_sync_point(has_sync_point),
356 m_Semaphore(0,number_of_threads), m_SyncCounter(0)
357 {
358 for (unsigned int t = 0; t < m_Number_of_threads; ++t) {
359 thr[s_NextIndex] = new CInGroupThread(*this, s_NextIndex);
360 #ifdef USE_NATIVE_THREADS
361 thr[s_NextIndex]->RunNative();
362 #else
363 thr[s_NextIndex]->Run();
364 #endif
365 ++s_NextIndex;
366 }
367 }
368
~CThreadGroup(void)369 CThreadGroup::~CThreadGroup(void)
370 {
371 }
372
373 inline
Go(void)374 void CThreadGroup::Go(void)
375 {
376 s_NumberOfThreads.Add(m_Number_of_threads);
377 m_Semaphore.Post(m_Number_of_threads);
378 }
379
SyncPoint(void)380 void CThreadGroup::SyncPoint(void)
381 {
382 if (m_Has_sync_point) {
383 bool reached = false;
384 m_Mutex.Lock();
385 ++m_SyncCounter;
386 if (m_SyncCounter == m_Number_of_threads) {
387 m_SyncCounter = 0;
388 reached = true;
389 }
390 m_Mutex.Unlock();
391 if (reached) {
392 if (m_Number_of_threads > 1) {
393 m_Semaphore.Post(m_Number_of_threads-1);
394 SleepMilliSec(0);
395 }
396 } else {
397 m_Semaphore.Wait();
398 }
399 }
400 }
401
402
403 inline
ThreadWait(void)404 void CThreadGroup::ThreadWait(void)
405 {
406 s_NumberOfThreads.Add(-1);
407 assert(s_NumberOfThreads.Get() >= 0);
408 m_Semaphore.Wait();
409 }
410
411 inline
ThreadComplete(void)412 void CThreadGroup::ThreadComplete(void)
413 {
414 if (m_Has_sync_point) {
415 m_Semaphore.Post();
416 }
417 }
418
419
420 /////////////////////////////////////////////////////////////////////////////
421 // Test application
422
423
CThreadedApp(void)424 CThreadedApp::CThreadedApp(void)
425 {
426 m_Min = m_Max = 0;
427 m_NextGroup = 0;
428 m_LogMsgCount = 0;
429 s_Application = this;
430 CThread::InitializeMainThreadId();
431 }
432
433
~CThreadedApp(void)434 CThreadedApp::~CThreadedApp(void)
435 {
436 s_Application = 0;
437 }
438
439
Init(void)440 void CThreadedApp::Init(void)
441 {
442 // Prepare command line descriptions
443 unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
444
445 // s_NumThreads
446 arg_desc->AddDefaultKey
447 ("threads", "NumThreads",
448 "Total number of threads to create and run",
449 CArgDescriptions::eInteger, NStr::IntToString(s_NumThreads));
450 arg_desc->SetConstraint
451 ("threads", new CArgAllow_Integers(k_NumThreadsMin, k_NumThreadsMax));
452
453 // s_NumThreads (emulation in ST)
454 arg_desc->AddDefaultKey
455 ("repeats", "NumRepeats",
456 "In non-MT mode only(!) -- how many times to repeat the test. "
457 "If passed 0, then the value of argument `-threads' will be used.",
458 CArgDescriptions::eInteger, "0");
459 arg_desc->SetConstraint
460 ("repeats", new CArgAllow_Integers(0, k_NumThreadsMax));
461
462 // s_SpawnBy
463 arg_desc->AddDefaultKey
464 ("spawnby", "SpawnBy",
465 "Threads spawning factor",
466 CArgDescriptions::eInteger, NStr::IntToString(s_SpawnBy));
467 arg_desc->SetConstraint
468 ("spawnby", new CArgAllow_Integers(k_SpawnByMin, k_SpawnByMax));
469
470 arg_desc->AddOptionalKey("seed", "Randomization",
471 "Randomization seed value",
472 CArgDescriptions::eInteger);
473
474 arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
475 "MT-environment test");
476
477 // Let test application add its own arguments
478 TestApp_Args(*arg_desc);
479
480 SetupArgDescriptions(arg_desc.release());
481 }
482
483
Run(void)484 int CThreadedApp::Run(void)
485 {
486 // Process command line
487 const CArgs& args = GetArgs();
488
489 #if !defined(NCBI_THREADS)
490 s_NumThreads = args["repeats"].AsInteger();
491 if ( !s_NumThreads )
492 #endif
493 s_NumThreads = args["threads"].AsInteger();
494
495 #if !defined(NCBI_THREADS)
496 // Set reasonable repeats if not set through the argument
497 if (!args["repeats"].AsInteger()) {
498 unsigned int repeats = s_NumThreads / 6;
499 if (repeats < 4)
500 repeats = 4;
501 if (repeats < s_NumThreads)
502 s_NumThreads = repeats;
503 }
504 #endif
505
506 s_SpawnBy = args["spawnby"].AsInteger();
507
508 assert(TestApp_Init());
509
510 unsigned int seed = GetArgs()["seed"]
511 ? static_cast<unsigned int>(GetArgs()["seed"].AsInteger())
512 : (static_cast<unsigned int>(CCurrentProcess::GetPid()) ^
513 static_cast<unsigned int>(time(NULL)) % 1000000);
514 TESTAPP_LOG_POST("Randomization seed value: " << seed);
515 srand(seed);
516
517 unsigned int threshold = NCBI_PARAM_TYPE(TEST_MT, Cascading)::GetDefault();
518 if (threshold > 100) {
519 ERR_FATAL("Cascading threshold must be less than 100");
520 }
521 bool cascading = (static_cast<unsigned int>(rand() % 100)) < threshold;
522 #if !defined(NCBI_THREADS)
523 cascading = true;
524 #endif
525 if ( !cascading ) {
526 x_InitializeThreadGroups();
527 x_PrintThreadGroups();
528 }
529 cascading = cascading || (m_ThreadGroups.size() == 0);
530
531 #if defined(NCBI_THREADS)
532 TESTAPP_LOG_POST("Running " << s_NumThreads << " threads");
533 #else
534 TESTAPP_LOG_POST("Simulating " << s_NumThreads << " threads in ST mode");
535 #endif
536
537 if (cascading) {
538 CTestThread::StartCascadingThreads();
539 } else {
540 unsigned int start_now = x_InitializeDelayedStart();
541
542 for (unsigned int g = 0; g < m_ThreadGroups.size(); ++g) {
543 thr_group[g] = new CThreadGroup
544 (m_ThreadGroups[g].number_of_threads,
545 m_ThreadGroups[g].has_sync_point);
546 }
547 x_StartThreadGroup(start_now);
548 }
549
550 // Wait for all threads
551 if ( cascading ) {
552 for (unsigned int i = 0; i < s_NumThreads; i++) {
553 void* ok;
554 // make sure all threads have started
555 assert(thr[i].NotEmpty());
556 #ifdef USE_NATIVE_THREADS
557 if (thr[i]) {
558 thr[i]->JoinNative(&ok);
559 assert(ok);
560 }
561 #else
562 thr[i]->Join(&ok);
563 assert(ok);
564 #endif
565 }
566 } else {
567 // join only those that started
568 unsigned int i = 0;
569 for (unsigned int g = 0; g < m_NextGroup; ++g) {
570 for (unsigned int t = 0;
571 t < m_ThreadGroups[g].number_of_threads; ++t, ++i) {
572 void* ok;
573 thr[i]->Join(&ok);
574 assert(ok);
575 }
576 }
577 assert(m_Reached.size() >= m_Min);
578 }
579
580 assert(TestApp_Exit());
581
582 // Destroy all threads
583 for (unsigned int i=0; i<s_NumThreads; i++) {
584 thr[i].Reset();
585 }
586 // Destroy all groups
587 for (unsigned int i=0; i<m_ThreadGroups.size(); i++) {
588 thr_group[i].Reset();
589 }
590
591 return 0;
592 }
593
594
x_InitializeThreadGroups(void)595 void CThreadedApp::x_InitializeThreadGroups(void)
596 {
597 unsigned int count = NStr::StringToUInt
598 (NCBI_PARAM_TYPE(TEST_MT, GroupsCount)::GetDefault());
599 if (count == 0) {
600 return;
601 }
602
603 if(count > s_NumThreads) {
604 ERR_FATAL("Thread groups with no threads are not allowed");
605 }
606
607 unsigned int threshold =
608 NCBI_PARAM_TYPE(TEST_MT, IntragroupSyncPoint)::GetDefault();
609 if (threshold > 100) {
610 ERR_FATAL("IntragroupSyncPoint threshold must be less than 100");
611 }
612
613 for (unsigned int g = 0; g < count; ++g) {
614 SThreadGroup group;
615 // randomize intra-group sync points
616 group.has_sync_point = ((unsigned int)(rand() % 100)) < threshold;
617 group.number_of_threads = 1;
618 m_ThreadGroups.push_back(group);
619 }
620
621 if (s_NumThreads > count) {
622 unsigned int threads_left = s_NumThreads - count;
623 for (unsigned int t = 0; t < threads_left; ++t) {
624 // randomize # of threads
625 m_ThreadGroups[ rand() % count ].number_of_threads += 1;
626 }
627 }
628 }
629
630
x_PrintThreadGroups(void)631 void CThreadedApp::x_PrintThreadGroups( void)
632 {
633 size_t count = m_ThreadGroups.size();
634 if (count != 0) {
635 TESTAPP_LOG_POST("Thread groups: " << count);
636 TESTAPP_LOG_POST("Number of delayed thread groups: from "
637 << m_Min << " to " << m_Max);
638 TESTAPP_LOG_POST("------------------------");
639 TESTAPP_LOG_POST("group threads sync_point");
640 for (unsigned int g = 0; g < count; ++g) {
641 CNcbiOstrstream os;
642 os.width(6);
643 os << left << g;
644 os.width(8);
645 os << left << m_ThreadGroups[g].number_of_threads;
646 if (m_ThreadGroups[g].has_sync_point) {
647 os << "yes";
648 } else {
649 os << "no ";
650 }
651 TESTAPP_LOG_POST(string(CNcbiOstrstreamToString(os)));
652 }
653 TESTAPP_LOG_POST("------------------------");
654 }
655 }
656
657
x_InitializeDelayedStart(void)658 unsigned int CThreadedApp::x_InitializeDelayedStart(void)
659 {
660 const unsigned int count = static_cast<unsigned int>(m_ThreadGroups.size());
661 unsigned int start_now = count;
662 unsigned int g;
663 if (m_Max == 0)
664 return start_now;
665
666 for (g = 0; g < m_Max; ++g) {
667 m_Delayed.push_back(0);
668 }
669
670 for (g = 1; g < count; ++g) {
671 unsigned int dest = rand() % (m_Max+1);
672 if (dest != 0) {
673 m_Delayed[dest - 1] += 1;
674 --start_now;
675 }
676 }
677
678 CNcbiOstrstream os;
679 os << "Delayed thread groups: " << (count - start_now)
680 << ", starting order: " << start_now;
681 for (g = 0; g < m_Max; ++g) {
682 os << '+' << m_Delayed[g];
683 }
684 TESTAPP_LOG_POST(string(CNcbiOstrstreamToString(os)));
685
686 return start_now;
687 }
688
689
x_StartThreadGroup(unsigned int count)690 void CThreadedApp::x_StartThreadGroup(unsigned int count)
691 {
692 CFastMutexGuard LOCK(m_AppMutex);
693 while (count--) {
694 thr_group[m_NextGroup++]->Go();
695 }
696 }
697
698
699 /////////////////////////////////////////////////////////////////////////////
700
Thread_Init(int)701 bool CThreadedApp::Thread_Init(int /*idx*/)
702 {
703 return true;
704 }
705
706
Thread_Run(int)707 bool CThreadedApp::Thread_Run(int /*idx*/)
708 {
709 return true;
710 }
711
712
Thread_Exit(int)713 bool CThreadedApp::Thread_Exit(int /*idx*/)
714 {
715 return true;
716 }
717
718
Thread_Destroy(int)719 bool CThreadedApp::Thread_Destroy(int /*idx*/)
720 {
721 return true;
722 }
723
TestApp_Args(CArgDescriptions &)724 bool CThreadedApp::TestApp_Args(CArgDescriptions& /*args*/)
725 {
726 return true;
727 }
728
TestApp_Init(void)729 bool CThreadedApp::TestApp_Init(void)
730 {
731 return true;
732 }
733
734
TestApp_IntraGroupSyncPoint(void)735 void CThreadedApp::TestApp_IntraGroupSyncPoint(void)
736 {
737 int idx = (int)(intptr_t(s_ThreadIdxTLS.GetValue()));
738 thr[idx]->SyncPoint();
739 }
740
741
TestApp_GlobalSyncPoint(void)742 void CThreadedApp::TestApp_GlobalSyncPoint(void)
743 {
744 {{
745 CFastMutexGuard LOCK(m_AppMutex);
746 if (!m_Delayed.empty()) {
747 TESTAPP_LOG_POST("There were delayed threads, running them now, "
748 "because TestApp_GlobalSyncPoint() was called");
749 for (size_t i = m_Reached.size(); i < m_Delayed.size(); i++) {
750 m_Reached.insert(NStr::SizetToString(i));
751 x_StartThreadGroup(m_Delayed[i]);
752 }
753 }
754 }}
755 int idx = static_cast<int>(intptr_t(s_ThreadIdxTLS.GetValue()));
756 thr[idx]->GlobalSyncPoint();
757 }
758
759
SetNumberOfDelayedStartSyncPoints(unsigned int num_min,unsigned int num_max)760 void CThreadedApp::SetNumberOfDelayedStartSyncPoints(
761 unsigned int num_min, unsigned int num_max)
762 {
763 m_Min = num_min;
764 m_Max = num_max;
765 }
766
767
TestApp_DelayedStartSyncPoint(const string & name)768 void CThreadedApp::TestApp_DelayedStartSyncPoint(const string& name)
769 {
770 CFastMutexGuard LOCK(m_AppMutex);
771 if (!m_Delayed.empty() && m_Reached.find(name) == m_Reached.end()) {
772 m_Reached.insert(name);
773 if (m_Reached.size() <= m_Delayed.size()) {
774 x_StartThreadGroup(m_Delayed[m_Reached.size() - 1]);
775 }
776 }
777 }
778
779
TestApp_Exit(void)780 bool CThreadedApp::TestApp_Exit(void)
781 {
782 return true;
783 }
784
785
786 END_NCBI_SCOPE
787