1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // We want to test waiting for workers feature with non-preview binaries. However,
18 // we want to have some testing of task_scheduler_init without this macro.
19 #if !__TBB_CPF_BUILD
20 #define TBB_PREVIEW_WAITING_FOR_WORKERS 1
21 #endif
22 
23 #include "tbb/task_scheduler_init.h"
24 #include <cstdlib>
25 #include <cstdio>
26 #if TBB_USE_EXCEPTIONS
27 #include <stdexcept>
28 #endif
29 
30 #include "harness_assert.h"
31 #if _MSC_VER
32 #pragma warning (push)
33     // MSVC discovers that ASSERT(false) inside TestBlockingTerminateNS::ExceptionTest2::Body makes the code
34     // in parallel_for after the body call unreachable. So suppress the warning.
35 #pragma warning (disable: 4702)
36 #endif
37 #include "tbb/parallel_for.h"
38 #if _MSC_VER
39 #pragma warning (pop)
40 #endif
41 
42 #include "harness_concurrency_tracker.h"
43 #include "harness_task.h"
44 #include "harness.h"
45 
46 const int DefaultThreads = tbb::task_scheduler_init::default_num_threads();
47 
48 namespace tbb { namespace internal {
49 size_t __TBB_EXPORTED_FUNC get_initial_auto_partitioner_divisor();
50 }}
51 
ArenaConcurrency()52 int ArenaConcurrency() {
53     return int(tbb::internal::get_initial_auto_partitioner_divisor()/4); // TODO: expose through task_arena interface?
54 }
55 
56 // Generally, TBB does not guarantee mandatory parallelism. This test uses some whitebox knowledge about when all the threads can be available
57 bool test_mandatory_parallelism = true;
58 
59 //! Test that task::initialize and task::terminate work when doing nothing else.
60 /** maxthread is treated as the "maximum" number of worker threads. */
InitializeAndTerminate(int maxthread)61 void InitializeAndTerminate( int maxthread ) {
62     __TBB_TRY {
63         for( int i=0; i<256; ++i ) {
64             int threads = (std::rand() % maxthread) + 1;
65             switch( i&3 ) {
66                 default: {
67                     tbb::task_scheduler_init init( threads );
68                     ASSERT(init.is_active(), NULL);
69                     ASSERT(ArenaConcurrency()==(threads==1?2:threads), NULL);
70                     if (test_mandatory_parallelism)
71                         Harness::ExactConcurrencyLevel::check(threads, Harness::ExactConcurrencyLevel::Serialize);
72                     if(i&0x20) tbb::task::enqueue( (*new( tbb::task::allocate_root() ) TaskGenerator(2,6)) ); // a work deferred to workers
73                     break;
74                 }
75                 case 0: {
76                     tbb::task_scheduler_init init;
77                     ASSERT(init.is_active(), NULL);
78                     ASSERT(ArenaConcurrency()==(DefaultThreads==1?2:init.default_num_threads()), NULL);
79                     if (test_mandatory_parallelism)
80                         Harness::ExactConcurrencyLevel::check(init.default_num_threads(), Harness::ExactConcurrencyLevel::Serialize);
81                     if(i&0x40) tbb::task::enqueue( (*new( tbb::task::allocate_root() ) TaskGenerator(3,5)) ); // a work deferred to workers
82                     break;
83                 }
84                 case 1: {
85                     tbb::task_scheduler_init init( tbb::task_scheduler_init::deferred );
86                     ASSERT(!init.is_active(), "init should not be active; initialization was deferred");
87                     init.initialize( threads );
88                     ASSERT(init.is_active(), NULL);
89                     ASSERT(ArenaConcurrency()==(threads==1?2:threads), NULL);
90                     if (test_mandatory_parallelism)
91                         Harness::ExactConcurrencyLevel::check(threads, Harness::ExactConcurrencyLevel::Serialize);
92                     init.terminate();
93                     ASSERT(!init.is_active(), "init should not be active; it was terminated");
94                     break;
95                 }
96                 case 2: {
97                     tbb::task_scheduler_init init( tbb::task_scheduler_init::automatic );
98                     ASSERT(init.is_active(), NULL);
99                     ASSERT(ArenaConcurrency()==(DefaultThreads==1?2:init.default_num_threads()), NULL);
100                     if (test_mandatory_parallelism)
101                         Harness::ExactConcurrencyLevel::check(init.default_num_threads(), Harness::ExactConcurrencyLevel::Serialize);
102                     break;
103                 }
104             }
105         }
106     } __TBB_CATCH( std::runtime_error& error ) {
107 #if TBB_USE_EXCEPTIONS
108         REPORT("ERROR: %s\n", error.what() );
109 #endif /* TBB_USE_EXCEPTIONS */
110     }
111 }
112 
113 #if _WIN64
114 namespace std {      // 64-bit Windows compilers have not caught up with 1998 ISO C++ standard
115     using ::srand;
116 }
117 #endif /* _WIN64 */
118 
119 struct ThreadedInit {
operator ()ThreadedInit120     void operator()( int ) const {
121         InitializeAndTerminate(MaxThread);
122     }
123 };
124 
125 #if _MSC_VER
126 #include "tbb/machine/windows_api.h"
127 #include <tchar.h>
128 #endif /* _MSC_VER */
129 
130 /** The test will fail in particular if task_scheduler_init mistakenly hooks up
131     auto-initialization mechanism. **/
AssertExplicitInitIsNotSupplanted()132 void AssertExplicitInitIsNotSupplanted () {
133     tbb::task_scheduler_init init(1);
134 
135     Harness::ExactConcurrencyLevel::check(1);
136 }
137 
138 struct TestNoWorkerSurplusRun {
operator ()TestNoWorkerSurplusRun139     void operator() (int) const {
140         const unsigned THREADS = tbb::tbb_thread::hardware_concurrency()*2/3;
141         for (int j=0; j<10; j++) {
142             tbb::task_scheduler_init t(THREADS);
143             Harness::ExactConcurrencyLevel::Combinable unique;
144 
145             for (int i=0; i<50; i++)
146                 Harness::ExactConcurrencyLevel::checkLessOrEqual(THREADS, &unique);
147         }
148     }
149 };
150 
TestNoWorkerSurplus()151 void TestNoWorkerSurplus () {
152     // Run the test in a special thread because otherwise the surplus issue
153     // is not observed for some hardware configurations
154     NativeParallelFor( 1, TestNoWorkerSurplusRun() );
155 }
156 
157 #if TBB_PREVIEW_WAITING_FOR_WORKERS
158 #include "tbb/task_group.h"
159 #include "tbb/task_arena.h"
160 
161 namespace TestBlockingTerminateNS {
162     struct EmptyBody {
operator ()TestBlockingTerminateNS::EmptyBody163         void operator()() const {}
operator ()TestBlockingTerminateNS::EmptyBody164         void operator()( int ) const {}
165     };
166 
167     struct TestAutoInitBody {
operator ()TestBlockingTerminateNS::TestAutoInitBody168         void operator()( int ) const {
169             tbb::parallel_for( 0, 100, EmptyBody() );
170         }
171     };
172 
173     static tbb::atomic<int> gSeed;
174     static tbb::atomic<int> gNumSuccesses;
175 
176     class TestMultpleWaitBody {
177         bool myAutoInit;
178     public:
TestMultpleWaitBody(bool autoInit=false)179         TestMultpleWaitBody( bool autoInit = false ) : myAutoInit( autoInit ) {}
operator ()(int) const180         void operator()( int ) const {
181             tbb::task_scheduler_init init( tbb::task_scheduler_init::deferred );
182             if ( !myAutoInit )
183                 init.initialize( tbb::task_scheduler_init::automatic );
184             Harness::FastRandom rnd( ++gSeed );
185             // In case of auto init sub-tests we skip
186             //  - case #4 to avoid recursion
187             //  - case #5 because it is explicit initialization
188             const int numCases = myAutoInit ? 4 : 6;
189             switch ( rnd.get() % numCases ) {
190             case 0: {
191                 tbb::task_arena a;
192                 a.enqueue( EmptyBody() );
193                 break;
194             }
195             case 1: {
196                 tbb::task_group tg;
197                 tg.run( EmptyBody() );
198                 tg.wait();
199                 break;
200             }
201             case 2:
202                 tbb::parallel_for( 0, 100, EmptyBody() );
203                 break;
204             case 3:
205                 /* do nothing */
206                 break;
207             case 4:
208                 // Create and join several threads with auto initialized scheduler.
209                 NativeParallelFor( rnd.get() % 5 + 1, TestMultpleWaitBody( true ) );
210                 break;
211             case 5:
212                 {
213                     tbb::task_scheduler_init init2;
214                     bool res = init2.blocking_terminate( std::nothrow );
215                     ASSERT( !res, NULL );
216                 }
217                 break;
218             }
219             if ( !myAutoInit && init.blocking_terminate( std::nothrow ) )
220                 ++gNumSuccesses;
221         }
222     };
223 
TestMultpleWait()224     void TestMultpleWait() {
225         const int minThreads = 1;
226         const int maxThreads = 16;
227         const int numRepeats = 5;
228         // Initialize seed with different values on different machines.
229         gSeed = tbb::task_scheduler_init::default_num_threads();
230         for ( int repeats = 0; repeats<numRepeats; ++repeats ) {
231             for ( int threads = minThreads; threads<maxThreads; ++threads ) {
232                 gNumSuccesses = 0;
233                 NativeParallelFor( threads, TestMultpleWaitBody() );
234                 ASSERT( gNumSuccesses > 0, "At least one blocking terminate must return 'true'" );
235             }
236         }
237     }
238 
239 #if TBB_USE_EXCEPTIONS
240     template <typename F>
TestException(F & f)241     void TestException( F &f ) {
242         Harness::suppress_unused_warning( f );
243         bool caught = false;
244         try {
245             f();
246             ASSERT( false, NULL );
247         }
248         catch ( const std::runtime_error& ) {
249             caught = true;
250         }
251 #if TBB_USE_CAPTURED_EXCEPTION
252         catch ( const tbb::captured_exception& ) {
253             caught = true;
254         }
255 #endif
256         catch ( ... ) {
257             ASSERT( false, NULL );
258         }
259         ASSERT( caught, NULL );
260     }
261 
262     class ExceptionTest1 {
263         tbb::task_scheduler_init tsi1;
264         int myIndex;
265     public:
ExceptionTest1(int index)266         ExceptionTest1( int index ) : myIndex( index ) {}
267 
operator ()()268         void operator()() {
269             tbb::task_scheduler_init tsi2;
270             (myIndex == 0 ? tsi1 : tsi2).blocking_terminate();
271             ASSERT( false, "Blocking terminate did not throw the exception" );
272         }
273     };
274 
275     struct ExceptionTest2 {
276         class Body {
277             Harness::SpinBarrier& myBarrier;
278         public:
Body(Harness::SpinBarrier & barrier)279             Body( Harness::SpinBarrier& barrier ) : myBarrier( barrier ) {}
operator ()(int) const280             void operator()( int ) const {
281                 myBarrier.wait();
282                 tbb::task_scheduler_init init;
283                 init.blocking_terminate();
284                 ASSERT( false, "Blocking terminate did not throw the exception inside the parallel region" );
285             }
286         };
operator ()TestBlockingTerminateNS::ExceptionTest2287         void operator()() {
288             const int numThreads = 4;
289             tbb::task_scheduler_init init( numThreads );
290             Harness::SpinBarrier barrier( numThreads );
291             tbb::parallel_for( 0, numThreads, Body( barrier ) );
292             ASSERT( false, "Parallel loop did not throw the exception" );
293         }
294     };
295 #endif /* TBB_USE_EXCEPTIONS */
296 
TestExceptions()297     void TestExceptions() {
298         for ( int i = 0; i<2; ++i ) {
299             tbb::task_scheduler_init tsi[2];
300             bool res1 = tsi[i].blocking_terminate( std::nothrow );
301             ASSERT( !res1, NULL );
302             bool res2 = tsi[1-i].blocking_terminate( std::nothrow );
303             ASSERT( res2, NULL );
304         }
305 #if TBB_USE_EXCEPTIONS
306         ExceptionTest1 Test1(0), Test2(1);
307         TestException( Test1 );
308         TestException( Test2 );
309         ExceptionTest2 Test3;
310         TestException( Test3 );
311 #endif
312     }
313 }
314 
TestBlockingTerminate()315 void TestBlockingTerminate() {
316     TestBlockingTerminateNS::TestExceptions();
317     TestBlockingTerminateNS::TestMultpleWait();
318 }
319 #endif /* TBB_PREVIEW_WAITING_FOR_WORKERS */
320 
TestMain()321 int TestMain () {
322     // Do not use tbb::task_scheduler_init directly in the scope of main's body,
323     // as a static variable, or as a member of a static variable.
324 #if _MSC_VER && !__TBB_NO_IMPLICIT_LINKAGE && !defined(__TBB_LIB_NAME)
325     #ifdef _DEBUG
326         ASSERT(!GetModuleHandle(_T("tbb.dll")) && GetModuleHandle(_T("tbb_debug.dll")),
327             "test linked with wrong (non-debug) TBB library");
328     #else
329         ASSERT(!GetModuleHandle(_T("tbb_debug.dll")) && GetModuleHandle(_T("tbb.dll")),
330             "test linked with wrong (debug) TBB library");
331     #endif
332 #endif /* _MSC_VER && !__TBB_NO_IMPLICIT_LINKAGE && !__TBB_LIB_NAME */
333     std::srand(2);
334     REMARK("testing master thread\n");
335     int threads = DefaultThreads*2;
336     {   // work-around shared RML
337         tbb::task_scheduler_init init( threads );
338         if( !Harness::ExactConcurrencyLevel::isEqual( threads ) ) {
339             threads = DefaultThreads;
340             if( MaxThread > DefaultThreads )
341                 MaxThread = DefaultThreads;
342 #if RML_USE_WCRM
343             REPORT("Known issue: shared RML for ConcRT does not support oversubscription\n");
344             test_mandatory_parallelism = false; // we cannot rely on ConcRT to provide all the requested threads
345 #else
346             REPORT("Known issue: machine is heavy loaded or shared RML which does not support oversubscription is loaded\n");
347 #endif
348         }
349     }
350     InitializeAndTerminate( threads ); // test initialization of more than default number of threads
351     for( int p=MinThread; p<=MaxThread; ++p ) {
352         REMARK("testing with %d threads\n", p );
353         // protect market with excess threads from default initializations
354         // TODO IDEA: enhance task_scheduler_init to serve as global_control setting so that
355         // number of threads > default concurrency will be requested from market.
356         // Such settings must be aggregated via 'max' function and 'max_allowed_parallelism' control
357         // (which has 'min' aggregation) will have precedence over it.
358         tbb::task_scheduler_init init( tbb::task_scheduler_init::deferred );
359         if( MaxThread > DefaultThreads ) init.initialize( MaxThread );
360         NativeParallelFor( p, ThreadedInit() );
361     }
362     AssertExplicitInitIsNotSupplanted();
363 #if TBB_PREVIEW_WAITING_FOR_WORKERS
364     TestBlockingTerminate();
365 #endif
366     return Harness::Done;
367 }
368