1 /* $Id: test_mt.cpp 604988 2020-04-06 14:05:26Z gouriano $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author:  Aleksey Grichenko
27  *
28  * File Description:
29  *   Wrapper for testing modules in MT environment
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 #include <corelib/test_mt.hpp>
35 #include <corelib/ncbimtx.hpp>
36 #include <corelib/ncbi_system.hpp>
37 #include <corelib/ncbi_param.hpp>
38 #include <math.h>
39 #include <common/test_assert.h>  /* This header must go last */
40 
41 
42 BEGIN_NCBI_SCOPE
43 
44 // Uncomment the definition to use platform native threads rather
45 // than CThread.
46 //#define USE_NATIVE_THREADS
47 
48 DEFINE_STATIC_FAST_MUTEX(s_GlobalLock);
49 static CThreadedApp* s_Application;
50 
51 // Default values
52 unsigned int  s_NumThreads = 34;
53 int           s_SpawnBy    = 6;
54 
55 // Next test thread index
56 static volatile unsigned int s_NextIndex = 0;
57 
58 #define TESTAPP_LOG_POST(x)  do { ++m_LogMsgCount; LOG_POST(x); } while (0)
59 
60 /////////////////////////////////////////////////////////////////////////////
61 // Randomization parameters
62 
63 // if (rand() % 100 < threshold) then use cascading threads
64 NCBI_PARAM_DECL(unsigned int, TEST_MT, Cascading);
65 NCBI_PARAM_DEF(unsigned int, TEST_MT, Cascading, 25);
66 
67 // calculate # of thread groups
s_GroupsCount(void)68 static string s_GroupsCount(void)
69 {
70     return NStr::UIntToString( (unsigned int)sqrt((double)s_NumThreads));
71 }
72 NCBI_PARAM_DECL(string, TEST_MT, GroupsCount);
73 NCBI_PARAM_DEF_WITH_INIT(string, TEST_MT, GroupsCount, "", s_GroupsCount);
74 
75 // group.has_sync_point = (rand() % 100) < threshold;
76 NCBI_PARAM_DECL(unsigned int, TEST_MT, IntragroupSyncPoint);
77 NCBI_PARAM_DEF(unsigned int, TEST_MT, IntragroupSyncPoint, 75);
78 
79 /////////////////////////////////////////////////////////////////////////////
80 // Test thread
81 //
82 
83 class CTestThread : public CThread
84 {
85 public:
86     static void StartCascadingThreads(void);
87 
88     CTestThread(int id);
SyncPoint(void)89     virtual void SyncPoint(void) {};
90     virtual void GlobalSyncPoint(void);
91 
92 protected:
93     ~CTestThread(void);
94     virtual void* Main(void);
95     virtual void  OnExit(void);
96 
97     int m_Idx;
98 
99 #ifdef USE_NATIVE_THREADS
100     TThreadHandle m_Handle;
101 
102 public:
103     void RunNative(void);
104     void JoinNative(void** result);
105 
106     friend TWrapperRes NativeWrapper(TWrapperArg arg);
107 #endif
108 };
109 
110 
111 static CSemaphore           s_Semaphore(0, INT_MAX); /* For GlobalSyncPoint()*/
112 static CAtomicCounter       s_SyncCounter;           /* For GlobalSyncPoint()*/
113 static CAtomicCounter       s_NumberOfThreads;       /* For GlobalSyncPoint()*/
114 
115 
CTestThread(int idx)116 CTestThread::CTestThread(int idx)
117     : m_Idx(idx)
118 {
119     /* We want to know total number of threads, and the easiest way is to make
120        them register themselves */
121     s_NumberOfThreads.Add(1);
122     if ( s_Application != 0 )
123         assert(s_Application->Thread_Init(m_Idx));
124 }
125 
126 
~CTestThread(void)127 CTestThread::~CTestThread(void)
128 {
129     s_NumberOfThreads.Add(-1);
130     assert(s_NumberOfThreads.Get() >= 0);
131     if ( s_Application != 0 )
132         assert(s_Application->Thread_Destroy(m_Idx));
133 }
134 
135 
OnExit(void)136 void CTestThread::OnExit(void)
137 {
138     if ( s_Application != 0 )
139         assert(s_Application->Thread_Exit(m_Idx));
140 }
141 
142 
GlobalSyncPoint(void)143 void CTestThread::GlobalSyncPoint(void)
144 {
145     /* Semaphore is supposed to have zero value when threads come here,
146        so Wait() causes stop */
147     if (s_SyncCounter.Add(1) != s_NumberOfThreads.Get()) {
148         s_Semaphore.Wait();
149         return;
150     }
151     /* If we are the last thread to come to sync point, we yield
152        so that threads that were waiting for us go first      */
153     if (s_NumberOfThreads.Get() > 1) {
154         s_Semaphore.Post((unsigned int)s_NumberOfThreads.Get() - 1);
155         s_SyncCounter.Set(0);
156         SleepMilliSec(0);
157     }
158 }
159 
160 
161 #ifdef USE_NATIVE_THREADS
162 
NativeWrapper(TWrapperArg arg)163 TWrapperRes NativeWrapper(TWrapperArg arg)
164 {
165     CTestThread* thread_obj = static_cast<CTestThread*>(arg);
166     thread_obj->Main();
167     return 0;
168 }
169 
170 
171 #if defined(NCBI_POSIX_THREADS)
172 extern "C" {
173     typedef TWrapperRes (*FSystemWrapper)(TWrapperArg);
174 
NativeWrapperCaller(TWrapperArg arg)175     static TWrapperRes NativeWrapperCaller(TWrapperArg arg) {
176         return NativeWrapper(arg);
177     }
178 }
179 #elif defined(NCBI_WIN32_THREADS)
180 extern "C" {
181     typedef TWrapperRes (WINAPI *FSystemWrapper)(TWrapperArg);
182 
NativeWrapperCaller(TWrapperArg arg)183     static TWrapperRes WINAPI NativeWrapperCaller(TWrapperArg arg) {
184         return NativeWrapper(arg);
185     }
186 }
187 #endif
188 
189 
RunNative(void)190 void CTestThread::RunNative(void)
191 {
192     // Run as the platform native thread rather than CThread
193     // Not all functionality will work in this mode. E.g. TLS
194     // cleanup can not be done automatically.
195 #if defined(NCBI_WIN32_THREADS)
196     // We need this parameter on WinNT - cannot use NULL instead!
197     DWORD thread_id;
198     // Suspend thread to adjust its priority
199     DWORD creation_flags = 0;
200     m_Handle = CreateThread(NULL, 0, NativeWrapperCaller,
201                             this, creation_flags, &thread_id);
202     _ASSERT(m_Handle != NULL);
203     // duplicate handle to adjust security attributes
204     HANDLE oldHandle = m_Handle;
205     _ASSERT(DuplicateHandle(GetCurrentProcess(), oldHandle,
206                             GetCurrentProcess(), &m_Handle,
207                             0, FALSE, DUPLICATE_SAME_ACCESS));
208     _ASSERT(CloseHandle(oldHandle));
209 #elif defined(NCBI_POSIX_THREADS)
210     pthread_attr_t attr;
211     _ASSERT(pthread_attr_init(&attr) == 0);
212     _ASSERT(pthread_create(&m_Handle, &attr,
213                            NativeWrapperCaller, this) == 0);
214     _ASSERT(pthread_attr_destroy(&attr) == 0);
215 #else
216     if (flags & fRunAllowST) {
217         Wrapper(this);
218     }
219     else {
220         _ASSERT(0);
221     }
222 #endif
223 }
224 
225 
JoinNative(void ** result)226 void CTestThread::JoinNative(void** result)
227 {
228     // Join (wait for) and destroy
229 #if defined(NCBI_WIN32_THREADS)
230     _ASSERT(WaitForSingleObject(m_Handle, INFINITE) == WAIT_OBJECT_0);
231     DWORD status;
232     _ASSERT(GetExitCodeThread(m_Handle, &status)
233             &&  status != DWORD(STILL_ACTIVE));
234     _ASSERT(CloseHandle(m_Handle));
235     m_Handle = NULL;
236 #elif defined(NCBI_POSIX_THREADS)
237     _ASSERT(pthread_join(m_Handle, 0) == 0);
238 #endif
239     *result = this;
240 }
241 
242 #endif // USE_NATIVE_THREADS
243 
244 
245 CRef<CTestThread> thr[k_NumThreadsMax];
246 
StartCascadingThreads(void)247 void CTestThread::StartCascadingThreads(void)
248 {
249     int spawn_max;
250     int first_idx;
251     {{
252         CFastMutexGuard spawn_guard(s_GlobalLock);
253         spawn_max = s_NumThreads - s_NextIndex;
254         if (spawn_max > s_SpawnBy) {
255             spawn_max = s_SpawnBy;
256         }
257         first_idx = s_NextIndex;
258         s_NextIndex += s_SpawnBy;
259     }}
260     // Spawn more threads
261     for (int i = first_idx;  i < first_idx + spawn_max;  i++) {
262         thr[i] = new CTestThread(i);
263         // Allow threads to run even in single thread environment
264 #ifdef USE_NATIVE_THREADS
265         thr[i]->RunNative();
266 #else
267         thr[i]->Run(CThread::fRunAllowST);
268 #endif
269     }
270 }
271 
Main(void)272 void* CTestThread::Main(void)
273 {
274     StartCascadingThreads();
275     // Run the test
276     if ( s_Application != 0  &&  s_Application->Thread_Run(m_Idx) ) {
277         return this;
278     }
279 
280     return 0;
281 }
282 
283 /////////////////////////////////////////////////////////////////////////////
284 //  Thread group
285 
286 class CThreadGroup;
287 class CInGroupThread : public CTestThread
288 {
289 public:
290     CInGroupThread(CThreadGroup& group, int id);
291     virtual void SyncPoint(void);
292 
293 protected:
294     ~CInGroupThread(void);
295     virtual void* Main(void);
296     CThreadGroup& m_Group;
297 };
298 
299 class CThreadGroup : public CObject
300 {
301 public:
302     CThreadGroup(
303         unsigned int number_of_threads,
304         bool has_sync_point);
305     ~CThreadGroup(void);
306 
307     void Go(void);
308     void SyncPoint(void);
309 
310     void ThreadWait(void);
311     void ThreadComplete(void);
312 
313 private:
314     unsigned int m_Number_of_threads;
315     bool m_Has_sync_point;
316     CSemaphore m_Semaphore;
317     CFastMutex m_Mutex;
318     unsigned int m_SyncCounter;
319 };
320 
321 
322 static CRef<CThreadGroup>   thr_group[k_NumThreadsMax];
323 static CStaticTls<int>      s_ThreadIdxTLS;
324 
CInGroupThread(CThreadGroup & group,int id)325 CInGroupThread::CInGroupThread(CThreadGroup& group, int id)
326     : CTestThread(id), m_Group(group)
327 {
328 }
329 
~CInGroupThread(void)330 CInGroupThread::~CInGroupThread(void)
331 {
332 }
333 
SyncPoint(void)334 void CInGroupThread::SyncPoint(void)
335 {
336     m_Group.SyncPoint();
337 }
338 
339 
Main(void)340 void* CInGroupThread::Main(void)
341 {
342     m_Group.ThreadWait();
343     s_ThreadIdxTLS.SetValue(reinterpret_cast<int*>((intptr_t)m_Idx));
344     // Run the test
345     if ( s_Application != 0  &&  s_Application->Thread_Run(m_Idx) ) {
346         m_Group.ThreadComplete();
347         return this;
348     }
349     return 0;
350 }
351 
CThreadGroup(unsigned int number_of_threads,bool has_sync_point)352 CThreadGroup::CThreadGroup(
353         unsigned int number_of_threads,
354         bool has_sync_point)
355     : m_Number_of_threads(number_of_threads), m_Has_sync_point(has_sync_point),
356       m_Semaphore(0,number_of_threads), m_SyncCounter(0)
357 {
358     for (unsigned int t = 0;  t < m_Number_of_threads;  ++t) {
359         thr[s_NextIndex] = new CInGroupThread(*this, s_NextIndex);
360 #ifdef USE_NATIVE_THREADS
361         thr[s_NextIndex]->RunNative();
362 #else
363         thr[s_NextIndex]->Run();
364 #endif
365         ++s_NextIndex;
366     }
367 }
368 
~CThreadGroup(void)369 CThreadGroup::~CThreadGroup(void)
370 {
371 }
372 
373 inline
Go(void)374 void CThreadGroup::Go(void)
375 {
376     s_NumberOfThreads.Add(m_Number_of_threads);
377     m_Semaphore.Post(m_Number_of_threads);
378 }
379 
SyncPoint(void)380 void CThreadGroup::SyncPoint(void)
381 {
382     if (m_Has_sync_point) {
383         bool reached = false;
384         m_Mutex.Lock();
385         ++m_SyncCounter;
386         if (m_SyncCounter == m_Number_of_threads) {
387             m_SyncCounter = 0;
388             reached = true;
389         }
390         m_Mutex.Unlock();
391         if (reached) {
392             if (m_Number_of_threads > 1) {
393                 m_Semaphore.Post(m_Number_of_threads-1);
394                 SleepMilliSec(0);
395             }
396         } else {
397             m_Semaphore.Wait();
398         }
399     }
400 }
401 
402 
403 inline
ThreadWait(void)404 void CThreadGroup::ThreadWait(void)
405 {
406     s_NumberOfThreads.Add(-1);
407     assert(s_NumberOfThreads.Get() >= 0);
408     m_Semaphore.Wait();
409 }
410 
411 inline
ThreadComplete(void)412 void CThreadGroup::ThreadComplete(void)
413 {
414     if (m_Has_sync_point) {
415         m_Semaphore.Post();
416     }
417 }
418 
419 
420 /////////////////////////////////////////////////////////////////////////////
421 //  Test application
422 
423 
CThreadedApp(void)424 CThreadedApp::CThreadedApp(void)
425 {
426     m_Min = m_Max = 0;
427     m_NextGroup   = 0;
428     m_LogMsgCount = 0;
429     s_Application = this;
430     CThread::InitializeMainThreadId();
431 }
432 
433 
~CThreadedApp(void)434 CThreadedApp::~CThreadedApp(void)
435 {
436     s_Application = 0;
437 }
438 
439 
Init(void)440 void CThreadedApp::Init(void)
441 {
442     // Prepare command line descriptions
443     unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
444 
445     // s_NumThreads
446     arg_desc->AddDefaultKey
447         ("threads", "NumThreads",
448          "Total number of threads to create and run",
449          CArgDescriptions::eInteger, NStr::IntToString(s_NumThreads));
450     arg_desc->SetConstraint
451         ("threads", new CArgAllow_Integers(k_NumThreadsMin, k_NumThreadsMax));
452 
453     // s_NumThreads (emulation in ST)
454     arg_desc->AddDefaultKey
455         ("repeats", "NumRepeats",
456          "In non-MT mode only(!) -- how many times to repeat the test. "
457          "If passed 0, then the value of argument `-threads' will be used.",
458          CArgDescriptions::eInteger, "0");
459     arg_desc->SetConstraint
460         ("repeats", new CArgAllow_Integers(0, k_NumThreadsMax));
461 
462     // s_SpawnBy
463     arg_desc->AddDefaultKey
464         ("spawnby", "SpawnBy",
465          "Threads spawning factor",
466          CArgDescriptions::eInteger, NStr::IntToString(s_SpawnBy));
467     arg_desc->SetConstraint
468         ("spawnby", new CArgAllow_Integers(k_SpawnByMin, k_SpawnByMax));
469 
470     arg_desc->AddOptionalKey("seed", "Randomization",
471                              "Randomization seed value",
472                              CArgDescriptions::eInteger);
473 
474     arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
475                               "MT-environment test");
476 
477     // Let test application add its own arguments
478     TestApp_Args(*arg_desc);
479 
480     SetupArgDescriptions(arg_desc.release());
481 }
482 
483 
Run(void)484 int CThreadedApp::Run(void)
485 {
486     // Process command line
487     const CArgs& args = GetArgs();
488 
489 #if !defined(NCBI_THREADS)
490     s_NumThreads = args["repeats"].AsInteger();
491     if ( !s_NumThreads )
492 #endif
493         s_NumThreads = args["threads"].AsInteger();
494 
495 #if !defined(NCBI_THREADS)
496     // Set reasonable repeats if not set through the argument
497     if (!args["repeats"].AsInteger()) {
498         unsigned int repeats = s_NumThreads / 6;
499         if (repeats < 4)
500             repeats = 4;
501         if (repeats < s_NumThreads)
502             s_NumThreads = repeats;
503     }
504 #endif
505 
506     s_SpawnBy = args["spawnby"].AsInteger();
507 
508     assert(TestApp_Init());
509 
510     unsigned int seed = GetArgs()["seed"]
511         ? static_cast<unsigned int>(GetArgs()["seed"].AsInteger())
512         : (static_cast<unsigned int>(CCurrentProcess::GetPid()) ^
513            static_cast<unsigned int>(time(NULL)) % 1000000);
514     TESTAPP_LOG_POST("Randomization seed value: " << seed);
515     srand(seed);
516 
517     unsigned int threshold = NCBI_PARAM_TYPE(TEST_MT, Cascading)::GetDefault();
518     if (threshold > 100) {
519         ERR_FATAL("Cascading threshold must be less than 100");
520     }
521     bool cascading = (static_cast<unsigned int>(rand() % 100)) < threshold;
522 #if !defined(NCBI_THREADS)
523     cascading = true;
524 #endif
525     if ( !cascading ) {
526         x_InitializeThreadGroups();
527         x_PrintThreadGroups();
528     }
529     cascading = cascading || (m_ThreadGroups.size() == 0);
530 
531 #if defined(NCBI_THREADS)
532     TESTAPP_LOG_POST("Running " << s_NumThreads << " threads");
533 #else
534     TESTAPP_LOG_POST("Simulating " << s_NumThreads << " threads in ST mode");
535 #endif
536 
537     if (cascading) {
538         CTestThread::StartCascadingThreads();
539     } else {
540         unsigned int start_now = x_InitializeDelayedStart();
541 
542         for (unsigned int g = 0;  g < m_ThreadGroups.size();  ++g) {
543             thr_group[g] = new CThreadGroup
544                 (m_ThreadGroups[g].number_of_threads,
545                  m_ThreadGroups[g].has_sync_point);
546         }
547         x_StartThreadGroup(start_now);
548     }
549 
550     // Wait for all threads
551     if ( cascading ) {
552         for (unsigned int i = 0;  i < s_NumThreads;  i++) {
553             void* ok;
554             // make sure all threads have started
555             assert(thr[i].NotEmpty());
556 #ifdef USE_NATIVE_THREADS
557             if (thr[i]) {
558                 thr[i]->JoinNative(&ok);
559                 assert(ok);
560             }
561 #else
562             thr[i]->Join(&ok);
563             assert(ok);
564 #endif
565         }
566     } else {
567         // join only those that started
568         unsigned int i = 0;
569         for (unsigned int g = 0;  g < m_NextGroup;  ++g) {
570             for (unsigned int t = 0;
571                  t < m_ThreadGroups[g].number_of_threads; ++t, ++i) {
572                 void* ok;
573                 thr[i]->Join(&ok);
574                 assert(ok);
575             }
576         }
577         assert(m_Reached.size() >= m_Min);
578     }
579 
580     assert(TestApp_Exit());
581 
582     // Destroy all threads
583     for (unsigned int i=0; i<s_NumThreads; i++) {
584         thr[i].Reset();
585     }
586     // Destroy all groups
587     for (unsigned int i=0; i<m_ThreadGroups.size(); i++) {
588         thr_group[i].Reset();
589     }
590 
591     return 0;
592 }
593 
594 
x_InitializeThreadGroups(void)595 void CThreadedApp::x_InitializeThreadGroups(void)
596 {
597     unsigned int count = NStr::StringToUInt
598         (NCBI_PARAM_TYPE(TEST_MT, GroupsCount)::GetDefault());
599     if (count == 0) {
600         return;
601     }
602 
603     if(count > s_NumThreads) {
604         ERR_FATAL("Thread groups with no threads are not allowed");
605     }
606 
607     unsigned int threshold =
608         NCBI_PARAM_TYPE(TEST_MT, IntragroupSyncPoint)::GetDefault();
609     if (threshold > 100) {
610         ERR_FATAL("IntragroupSyncPoint threshold must be less than 100");
611     }
612 
613     for (unsigned int g = 0;  g < count;  ++g) {
614         SThreadGroup group;
615         // randomize intra-group sync points
616         group.has_sync_point = ((unsigned int)(rand() % 100)) < threshold;
617         group.number_of_threads = 1;
618         m_ThreadGroups.push_back(group);
619     }
620 
621     if (s_NumThreads > count) {
622         unsigned int threads_left = s_NumThreads - count;
623         for (unsigned int t = 0;  t < threads_left;  ++t) {
624             // randomize # of threads
625             m_ThreadGroups[ rand() % count ].number_of_threads += 1;
626         }
627     }
628 }
629 
630 
x_PrintThreadGroups(void)631 void CThreadedApp::x_PrintThreadGroups( void)
632 {
633     size_t count = m_ThreadGroups.size();
634     if (count != 0) {
635         TESTAPP_LOG_POST("Thread groups: " << count);
636         TESTAPP_LOG_POST("Number of delayed thread groups: from "
637                          << m_Min << " to " << m_Max);
638         TESTAPP_LOG_POST("------------------------");
639         TESTAPP_LOG_POST("group threads sync_point");
640         for (unsigned int g = 0;  g < count;  ++g) {
641             CNcbiOstrstream os;
642             os.width(6);
643             os << left << g;
644             os.width(8);
645             os << left << m_ThreadGroups[g].number_of_threads;
646             if (m_ThreadGroups[g].has_sync_point) {
647                 os << "yes";
648             } else {
649                 os << "no ";
650             }
651             TESTAPP_LOG_POST(string(CNcbiOstrstreamToString(os)));
652         }
653         TESTAPP_LOG_POST("------------------------");
654     }
655 }
656 
657 
x_InitializeDelayedStart(void)658 unsigned int CThreadedApp::x_InitializeDelayedStart(void)
659 {
660     const unsigned int count = static_cast<unsigned int>(m_ThreadGroups.size());
661     unsigned int start_now = count;
662     unsigned int g;
663     if (m_Max == 0)
664         return start_now;
665 
666     for (g = 0;  g < m_Max;  ++g) {
667         m_Delayed.push_back(0);
668     }
669 
670     for (g = 1;  g < count;  ++g) {
671         unsigned int dest = rand() % (m_Max+1);
672         if (dest != 0) {
673             m_Delayed[dest - 1] += 1;
674             --start_now;
675         }
676     }
677 
678     CNcbiOstrstream os;
679     os << "Delayed thread groups: " << (count - start_now)
680        << ", starting order: " << start_now;
681     for (g = 0; g < m_Max; ++g) {
682         os << '+' << m_Delayed[g];
683     }
684     TESTAPP_LOG_POST(string(CNcbiOstrstreamToString(os)));
685 
686     return start_now;
687 }
688 
689 
x_StartThreadGroup(unsigned int count)690 void CThreadedApp::x_StartThreadGroup(unsigned int count)
691 {
692     CFastMutexGuard LOCK(m_AppMutex);
693     while (count--) {
694         thr_group[m_NextGroup++]->Go();
695     }
696 }
697 
698 
699 /////////////////////////////////////////////////////////////////////////////
700 
Thread_Init(int)701 bool CThreadedApp::Thread_Init(int /*idx*/)
702 {
703     return true;
704 }
705 
706 
Thread_Run(int)707 bool CThreadedApp::Thread_Run(int /*idx*/)
708 {
709     return true;
710 }
711 
712 
Thread_Exit(int)713 bool CThreadedApp::Thread_Exit(int /*idx*/)
714 {
715     return true;
716 }
717 
718 
Thread_Destroy(int)719 bool CThreadedApp::Thread_Destroy(int /*idx*/)
720 {
721     return true;
722 }
723 
TestApp_Args(CArgDescriptions &)724 bool CThreadedApp::TestApp_Args(CArgDescriptions& /*args*/)
725 {
726     return true;
727 }
728 
TestApp_Init(void)729 bool CThreadedApp::TestApp_Init(void)
730 {
731     return true;
732 }
733 
734 
TestApp_IntraGroupSyncPoint(void)735 void CThreadedApp::TestApp_IntraGroupSyncPoint(void)
736 {
737     int idx = (int)(intptr_t(s_ThreadIdxTLS.GetValue()));
738     thr[idx]->SyncPoint();
739 }
740 
741 
TestApp_GlobalSyncPoint(void)742 void CThreadedApp::TestApp_GlobalSyncPoint(void)
743 {
744     {{
745         CFastMutexGuard LOCK(m_AppMutex);
746         if (!m_Delayed.empty()) {
747             TESTAPP_LOG_POST("There were delayed threads, running them now, "
748                 "because TestApp_GlobalSyncPoint() was called");
749             for (size_t i = m_Reached.size(); i < m_Delayed.size(); i++) {
750                 m_Reached.insert(NStr::SizetToString(i));
751                 x_StartThreadGroup(m_Delayed[i]);
752             }
753         }
754     }}
755     int idx = static_cast<int>(intptr_t(s_ThreadIdxTLS.GetValue()));
756     thr[idx]->GlobalSyncPoint();
757 }
758 
759 
SetNumberOfDelayedStartSyncPoints(unsigned int num_min,unsigned int num_max)760 void CThreadedApp::SetNumberOfDelayedStartSyncPoints(
761     unsigned int num_min, unsigned int num_max)
762 {
763     m_Min = num_min;
764     m_Max = num_max;
765 }
766 
767 
TestApp_DelayedStartSyncPoint(const string & name)768 void CThreadedApp::TestApp_DelayedStartSyncPoint(const string& name)
769 {
770     CFastMutexGuard LOCK(m_AppMutex);
771     if (!m_Delayed.empty()  &&  m_Reached.find(name) == m_Reached.end()) {
772         m_Reached.insert(name);
773         if (m_Reached.size() <= m_Delayed.size()) {
774             x_StartThreadGroup(m_Delayed[m_Reached.size() - 1]);
775         }
776     }
777 }
778 
779 
TestApp_Exit(void)780 bool CThreadedApp::TestApp_Exit(void)
781 {
782     return true;
783 }
784 
785 
786 END_NCBI_SCOPE
787