1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // undefine __TBB_CPF_BUILD to simulate user's setup
18 #undef __TBB_CPF_BUILD
19 
20 #include "tbb/tbb_config.h"
21 #include "harness.h"
22 
23 #if __TBB_SCHEDULER_OBSERVER
24 #include "tbb/task_scheduler_observer.h"
25 #include "tbb/task_scheduler_init.h"
26 #include "tbb/atomic.h"
27 #include "tbb/task.h"
28 #include "tbb/enumerable_thread_specific.h"
29 #include "../tbb/tls.h"
30 #include "tbb/tick_count.h"
31 #include "harness_barrier.h"
32 
33 #if _MSC_VER && __TBB_NO_IMPLICIT_LINKAGE
34 // plays around __TBB_NO_IMPLICIT_LINKAGE. __TBB_LIB_NAME should be defined (in makefiles)
35     #pragma comment(lib, __TBB_STRING(__TBB_LIB_NAME))
36 #endif
37 
38 const int MaxFlagIndex = sizeof(uintptr_t)*8-1;
39 
40 struct ObserverStats {
41     tbb::atomic<int> m_entries;
42     tbb::atomic<int> m_exits;
43     tbb::atomic<int> m_workerEntries;
44     tbb::atomic<int> m_workerExits;
45 
ResetObserverStats46     void Reset () {
47         m_entries = m_exits = m_workerEntries = m_workerExits = 0;
48     }
49 
operator +=ObserverStats50     void operator += ( const ObserverStats& s ) {
51         m_entries += s.m_entries;
52         m_exits += s.m_exits;
53         m_workerEntries += s.m_workerEntries;
54         m_workerExits += s.m_workerExits;
55     }
56 };
57 
58 struct ThreadState {
59     uintptr_t m_flags;
60     tbb::task_scheduler_observer *m_dyingObserver;
61     bool m_isMaster;
ThreadStateThreadState62     ThreadState() { reset(); }
resetThreadState63     void reset() {
64         m_flags = 0;
65         m_dyingObserver = NULL;
66         m_isMaster = false;
67     }
68     static ThreadState &get();
69 };
70 
71 tbb::enumerable_thread_specific<ThreadState> theLocalState;
72 tbb::internal::tls<intptr_t> theThreadPrivate;
73 
get()74 ThreadState &ThreadState::get() {
75     bool exists;
76     ThreadState& state = theLocalState.local(exists);
77     // ETS will not detect that a thread was allocated with the same id as a destroyed thread
78     if( exists && theThreadPrivate.get() == 0 ) state.reset();
79     theThreadPrivate = 1; // mark thread constructed
80     return state;
81 }
82 
83 static ObserverStats theStats;
84 static tbb::atomic<int> theNumObservers;
85 
86 const int P = min( tbb::task_scheduler_init::default_num_threads(), (int)sizeof(int) * CHAR_BIT );
87 
88 enum TestMode {
89     //! Ensure timely workers destruction in order to guarantee all exit notification are fired.
90     tmSynchronized = 1,
91     //! Use local observer.
92     tmLocalObservation = 2,
93     //! Observer causes autoinitialization of the scheduler
94     tmAutoinitialization = 4
95 };
96 
97 uintptr_t theTestMode,
98           thePrevMode = 0;
99 
100 class MyObserver : public tbb::task_scheduler_observer, public ObserverStats {
101     uintptr_t m_flag;
102     tbb::atomic<bool> m_dying;
103 
on_scheduler_entry(bool is_worker)104     void on_scheduler_entry( bool is_worker ) __TBB_override {
105         ThreadState& state = ThreadState::get();
106         ASSERT( is_worker==!state.m_isMaster, NULL );
107         if ( thePrevMode & tmSynchronized ) {
108             ASSERT( !(state.m_flags & m_flag), "Observer repeatedly invoked for the same thread" );
109             if ( theTestMode & tmLocalObservation )
110                 ASSERT( !state.m_flags, "Observer locality breached" );
111         }
112         if ( m_dying && theTestMode & tmLocalObservation ) {
113             // In case of local observation a worker may enter the arena after
114             // the wait for lagging on_entry calls in the MyObserver destructor
115             // succeeds but before its base class tbb::task_scheduler_observer
116             // destructor removes it from the internal list maintained by the
117             // task scheduler. This will result in on_entry notification without,
118             // subsequent on_exit as the observer is likely to be destroyed before
119             // the worker discovers that the arena is empty and leaves it.
120             //
121             // To prevent statistics distortion, ignore the notifications for
122             // observers about to be destroyed.
123             ASSERT( !state.m_dyingObserver || state.m_dyingObserver != this || thePrevMode & tmSynchronized, NULL );
124             state.m_dyingObserver = this;
125             return;
126         }
127         state.m_dyingObserver = NULL;
128         ++m_entries;
129         state.m_flags |= m_flag;
130         if ( is_worker )
131             ++m_workerEntries;
132     }
on_scheduler_exit(bool is_worker)133     void on_scheduler_exit( bool is_worker ) __TBB_override {
134         ThreadState& state = ThreadState::get();
135         ASSERT( is_worker==!state.m_isMaster, NULL );
136         if ( m_dying && state.m_dyingObserver ) {
137             ASSERT( state.m_dyingObserver == this, "Exit without entry (for a dying observer)" );
138             state.m_dyingObserver = NULL;
139             return;
140         }
141         ASSERT( state.m_flags & m_flag, "Exit without entry" );
142         state.m_flags &= ~m_flag;
143         ++m_exits;
144         if ( is_worker )
145             ++m_workerExits;
146     }
147 public:
MyObserver(uintptr_t flag)148     MyObserver( uintptr_t flag )
149         : tbb::task_scheduler_observer(theTestMode & tmLocalObservation ? true : false)
150         , m_flag(flag)
151     {
152         ++theNumObservers;
153         Reset();
154         m_dying = false;
155         // Local observer causes automatic scheduler initialization
156         // in the current thread, so here, we must postpone the activation.
157         if ( !(theTestMode & tmLocalObservation))
158             observe(true);
159     }
160 
~MyObserver()161     ~MyObserver () {
162         m_dying = true;
163         ASSERT( m_exits <= m_entries, NULL );
164         if ( theTestMode & tmSynchronized ) {
165             tbb::tick_count t0 = tbb::tick_count::now();
166             while ( m_exits < m_entries && (tbb::tick_count::now() - t0).seconds() < 5 )
167                 Harness::Sleep(10);
168             if ( m_exits < m_entries )
169                 REPORT( "Warning: Entry/exit count mismatch (%d, %d). Observer is broken or machine is overloaded.\n", (int)m_entries, (int)m_exits );
170         }
171         theStats += *this;
172         --theNumObservers;
173         // it is recommended to disable observation before destructor of the base class starts,
174         // otherwise it can lead to concurrent notification callback on partly destroyed object,
175         // which in turn can harm (in addition) if derived class has new virtual methods.
176         // This class has no, and for test purposes we rely on implementation failsafe mechanism.
177         //observe(false);
178     }
179 }; // class MyObserver
180 
181 Harness::SpinBarrier theGlobalBarrier;
182 bool theGlobalBarrierActive = true;
183 
184 class FibTask : public tbb::task {
185     const int N;
186     uintptr_t m_flag;
187     MyObserver &m_observer;
188 public:
FibTask(int n,uintptr_t flags,MyObserver & obs)189     FibTask( int n, uintptr_t flags, MyObserver &obs ) : N(n), m_flag(flags), m_observer(obs) {}
190 
execute()191     tbb::task* execute() __TBB_override {
192         ThreadState& s = ThreadState::get();
193         ASSERT( !(~s.m_flags & m_flag), NULL );
194         if( N < 2 )
195             return NULL;
196         bool globalBarrierActive = false;
197         if ( s.m_isMaster ) {
198             if ( theGlobalBarrierActive ) {
199                 // This is the root task. Its N is equal to the number of threads.
200                 // Spawn a task for each worker.
201                 set_ref_count(N);
202                 for ( int i = 1; i < N; ++i )
203                     spawn( *new( allocate_child() ) FibTask(20, m_flag, m_observer) );
204                 if ( theTestMode & tmSynchronized ) {
205                     theGlobalBarrier.wait();
206                     ASSERT( m_observer.m_entries >= N, "Wrong number of on_entry calls after the first barrier" );
207                     // All the spawned tasks have been stolen by workers.
208                     // Now wait for workers to spawn some more tasks for this thread to steal back.
209                     theGlobalBarrier.wait();
210                     ASSERT( !theGlobalBarrierActive, "Workers are expected to have reset this flag" );
211                 }
212                 else
213                     theGlobalBarrierActive = false;
214                 wait_for_all();
215                 return NULL;
216             }
217         }
218         else {
219             if ( theGlobalBarrierActive ) {
220                 if ( theTestMode & tmSynchronized ) {
221                     theGlobalBarrier.wait();
222                     globalBarrierActive = true;
223                 }
224                 theGlobalBarrierActive = false;
225             }
226         }
227         set_ref_count(3);
228         spawn( *new( allocate_child() ) FibTask(N-1, m_flag, m_observer) );
229         spawn( *new( allocate_child() ) FibTask(N-2, m_flag, m_observer) );
230         if ( globalBarrierActive ) {
231             // It's the first task executed by a worker. Release the master thread.
232             theGlobalBarrier.wait();
233         }
234         wait_for_all();
235         return NULL;
236     }
237 }; // class FibTask
238 
239 Harness::SpinBarrier theMasterBarrier;
240 
241 class TestBody {
242     int m_numThreads;
243 public:
TestBody(int numThreads)244     TestBody( int numThreads ) : m_numThreads(numThreads) {}
245 
operator ()(int i) const246     void operator()( int i ) const {
247         ThreadState &state = ThreadState::get();
248         ASSERT( !state.m_isMaster, "should be newly initialized thread");
249         state.m_isMaster = true;
250         uintptr_t f = i <= MaxFlagIndex ? 1<<i : 0;
251         MyObserver o(f);
252         if ( theTestMode & tmSynchronized )
253             theMasterBarrier.wait();
254         // when mode is local observation but not synchronized and when num threads == default
255         if ( theTestMode & tmAutoinitialization )
256             o.observe(true); // test autoinitialization can be done by observer
257         // Observer in enabled state must outlive the scheduler to ensure that
258         // all exit notifications are called.
259         tbb::task_scheduler_init init(m_numThreads);
260         // when local & non-autoinitialized observation mode
261         if ( theTestMode & tmLocalObservation )
262             o.observe(true);
263         for ( int j = 0; j < 2; ++j ) {
264             tbb::task &t = *new( tbb::task::allocate_root() ) FibTask(m_numThreads, f, o);
265             tbb::task::spawn_root_and_wait(t);
266             thePrevMode = theTestMode;
267         }
268     }
269 }; // class TestBody
270 
TestObserver(int M,int T,uintptr_t testMode)271 void TestObserver( int M, int T, uintptr_t testMode ) {
272     theLocalState.clear();
273     theStats.Reset();
274     theGlobalBarrierActive = true;
275     theTestMode = testMode;
276     NativeParallelFor( M, TestBody(T) );
277     // When T (number of threads in arena, i.e. master + workers) is less than P
278     // (hardware concurrency), more than T-1 workers can visit the same arena. This
279     // is possible in case of imbalance or when other arenas are activated/deactivated
280     // concurrently).
281     ASSERT( !theNumObservers, "Unexpected alive observer(s)" );
282     REMARK( "Entries %d / %d, exits %d\n", (int)theStats.m_entries, (int)theStats.m_workerEntries, (int)theStats.m_exits );
283     if ( testMode & tmSynchronized ) {
284         if ( testMode & tmLocalObservation ) {
285             ASSERT( theStats.m_entries >= M * T, "Too few on_entry calls" );
286             ASSERT( theStats.m_workerEntries >= M * (T - 1), "Too few worker entries" );
287         }
288         else {
289             ASSERT( theStats.m_entries >= M * M * T, "Too few on_entry calls" );
290             ASSERT( theStats.m_entries <= M * (P + 1), "Too many on_entry calls" );
291             ASSERT( theStats.m_workerEntries >= M * M * (T - 1), "Too few worker entries" );
292             ASSERT( theStats.m_workerEntries <= M * (P - 1), "Too many worker entries" );
293         }
294         ASSERT( theStats.m_entries == theStats.m_exits, "Entries/exits mismatch" );
295     }
296     else {
297         ASSERT( theStats.m_entries >= M, "Too few on_entry calls" );
298         ASSERT( theStats.m_exits >= M || (testMode & tmAutoinitialization), "Too few on_exit calls" );
299         if ( !(testMode & tmLocalObservation) ) {
300             ASSERT( theStats.m_entries <= M * M * P, "Too many on_entry calls" );
301             ASSERT( theStats.m_exits <= M * M * T, "Too many on_exit calls" );
302         }
303         ASSERT( theStats.m_entries >= theStats.m_exits, "More exits than entries" );
304     }
305 }
306 
TestMain()307 int TestMain () {
308     if ( P < 2 )
309         return Harness::Skipped;
310     theNumObservers = 0;
311     // Fully- and under-utilized mode
312     for ( int M = 1; M < P; M <<= 1 ) {
313         if ( M > P/2 ) {
314             ASSERT( P & (P-1), "Can get here only in case of non power of two cores" );
315             M = P/2;
316             if ( M==1 || (M & (M-1)) )
317                 break; // Already tested this configuration
318         }
319         int T = P / M;
320         ASSERT( T > 1, NULL );
321         REMARK( "Masters: %d; Arena size: %d\n", M, T );
322         theMasterBarrier.initialize(M);
323         theGlobalBarrier.initialize(M * T);
324         TestObserver(M, T, 0);
325         TestObserver(M, T, tmSynchronized | tmLocalObservation );
326         // keep tmAutoInitialization the last, as it does not release worker threads
327         TestObserver(M, T, tmLocalObservation | ( T==P? tmAutoinitialization : 0) );
328     }
329     // Oversubscribed mode
330     for ( int i = 0; i < 4; ++i ) {
331         REMARK( "Masters: %d; Arena size: %d\n", P-1, P );
332         TestObserver(P-1, P, 0);
333         TestObserver(P-1, P, tmLocalObservation);
334     }
335     Harness::Sleep(20);
336     return Harness::Done;
337 }
338 
339 #else /* !__TBB_SCHEDULER_OBSERVER */
340 
TestMain()341 int TestMain () {
342     return Harness::Skipped;
343 }
344 #endif /* !__TBB_SCHEDULER_OBSERVER */
345