1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "tbb/tbb_config.h"
18 #include "tbb/task_scheduler_observer.h"
19 
20 #include "harness.h"
21 
22 #if !__TBB_PREVIEW_RESUMABLE_TASKS
TestMain()23 int TestMain() {
24     return Harness::Skipped;
25 }
26 #else // __TBB_PREVIEW_RESUMABLE_TASKS
27 
28 #include "tbb/task.h"
29 #include "tbb/concurrent_queue.h"
30 #include "tbb/atomic.h"
31 #include "tbb/parallel_for.h"
32 #include "tbb/task_scheduler_init.h"
33 #include "tbb/enumerable_thread_specific.h"
34 #include "tbb/task_arena.h"
35 #include "tbb/task_group.h"
36 #include "tbb/tbb_thread.h"
37 
38 #include <vector>
39 
40 typedef tbb::enumerable_thread_specific<int, tbb::cache_aligned_allocator<int> > ets_int_t;
41 const int N = 10;
42 
43 // External activity used in all tests, which resumes suspended execution point
44 class AsyncActivity {
45 public:
AsyncActivity(int num_)46     AsyncActivity(int num_) : m_numAsyncThreads(num_) {
47         for (int i = 0; i < m_numAsyncThreads ; ++i) {
48             m_asyncThreads.push_back( new tbb::tbb_thread(AsyncActivity::asyncLoop, this) );
49         }
50     }
~AsyncActivity()51     ~AsyncActivity() {
52         for (int i = 0; i < m_numAsyncThreads; ++i) {
53             m_tagQueue.push(NULL);
54         }
55         for (int i = 0; i < m_numAsyncThreads; ++i) {
56             m_asyncThreads[i]->join();
57             delete m_asyncThreads[i];
58         }
59         ASSERT(m_tagQueue.empty(), NULL);
60     }
submit(void * ctx)61     void submit(void* ctx) {
62         m_tagQueue.push(ctx);
63     }
64 
65 private:
asyncLoop(AsyncActivity * async)66     static void asyncLoop(AsyncActivity* async) {
67         tbb::task::suspend_point tag;
68         async->m_tagQueue.pop(tag);
69         while (tag) {
70             tbb::task::resume(tag);
71             async->m_tagQueue.pop(tag);
72         }
73     }
74 
75     const int m_numAsyncThreads;
76     tbb::concurrent_bounded_queue<void*> m_tagQueue;
77     std::vector<tbb::tbb_thread*> m_asyncThreads;
78 };
79 
80 struct SuspendBody {
SuspendBodySuspendBody81     SuspendBody(AsyncActivity& a_) :
82         m_asyncActivity(a_) {}
operator ()SuspendBody83     void operator()(tbb::task::suspend_point tag) {
84         m_asyncActivity.submit(tag);
85     }
86 
87 private:
88     AsyncActivity& m_asyncActivity;
89 };
90 
91 class InnermostArenaBody {
92 public:
InnermostArenaBody(AsyncActivity & a_)93     InnermostArenaBody(AsyncActivity& a_) : m_asyncActivity(a_) {}
94 
operator ()()95     void operator()() {
96         InnermostOuterParFor inner_outer_body(m_asyncActivity);
97         tbb::parallel_for(0, N, inner_outer_body );
98     }
99 
100 private:
101     struct InnermostInnerParFor {
InnermostInnerParForInnermostArenaBody::InnermostInnerParFor102         InnermostInnerParFor(AsyncActivity& a_) : m_asyncActivity(a_) {}
operator ()InnermostArenaBody::InnermostInnerParFor103         void operator()(int) const {
104             tbb::task::suspend(SuspendBody(m_asyncActivity));
105         }
106         AsyncActivity& m_asyncActivity;
107     };
108     struct InnermostOuterParFor {
InnermostOuterParForInnermostArenaBody::InnermostOuterParFor109         InnermostOuterParFor(AsyncActivity& a_) : m_asyncActivity(a_) {}
operator ()InnermostArenaBody::InnermostOuterParFor110         void operator()(int) const {
111             tbb::task::suspend(SuspendBody(m_asyncActivity));
112             InnermostInnerParFor inner_inner_body(m_asyncActivity);
113             tbb::parallel_for(0, N, inner_inner_body);
114         }
115         AsyncActivity& m_asyncActivity;
116     };
117     AsyncActivity& m_asyncActivity;
118 };
119 
120 class OutermostArenaBody {
121 public:
OutermostArenaBody(AsyncActivity & a_,tbb::task_arena & o_,tbb::task_arena & i_,tbb::task_arena & id_,ets_int_t & ets_)122     OutermostArenaBody(AsyncActivity& a_, tbb::task_arena& o_, tbb::task_arena& i_, tbb::task_arena& id_, ets_int_t& ets_) :
123         m_asyncActivity(a_), m_outermostArena(o_), m_innermostArena(i_), m_innermostArenaDefault(id_), m_etsInner(ets_) {}
124 
operator ()()125     void operator()() {
126         tbb::parallel_for(0, 32, *this);
127     }
128 
operator ()(int i) const129     void operator()(int i) const {
130         tbb::task::suspend(SuspendBody(m_asyncActivity));
131 
132         tbb::task_arena& nested_arena = (i % 3 == 0) ?
133             m_outermostArena : (i % 3 == 1 ? m_innermostArena : m_innermostArenaDefault);
134 
135         if (i % 3 != 0) {
136             // We can only guarantee recall coorectness for "not-same" nested arenas entry
137             m_etsInner.local() = i;
138         }
139         InnermostArenaBody innermost_arena_body(m_asyncActivity);
140         nested_arena.execute(innermost_arena_body);
141         if (i % 3 != 0) {
142             ASSERT(i == m_etsInner.local(), "Original thread wasn't recalled for innermost nested arena.");
143         }
144     }
145 
146 private:
147     AsyncActivity& m_asyncActivity;
148     tbb::task_arena& m_outermostArena;
149     tbb::task_arena& m_innermostArena;
150     tbb::task_arena& m_innermostArenaDefault;
151     ets_int_t& m_etsInner;
152 };
153 
TestNestedArena()154 void TestNestedArena() {
155     AsyncActivity asyncActivity(4);
156 
157     ets_int_t ets_outer;
158     ets_int_t ets_inner;
159 
160     tbb::task_arena outermost_arena;
161     tbb::task_arena innermost_arena(2,2);
162     tbb::task_arena innermost_arena_default;
163 
164     outermost_arena.initialize();
165     innermost_arena_default.initialize();
166     innermost_arena.initialize();
167 
168     ets_outer.local() = 42;
169     OutermostArenaBody outer_arena_body(asyncActivity, outermost_arena, innermost_arena, innermost_arena_default, ets_inner);
170     outermost_arena.execute(outer_arena_body);
171     ASSERT(ets_outer.local() == 42, "Original/main thread wasn't recalled.");
172 }
173 
174 #if __TBB_CPP11_LAMBDAS_PRESENT
175 
176 #include <thread>
177 
178 // External activity used in all tests, which resumes suspended execution point
179 class EpochAsyncActivity {
180 public:
EpochAsyncActivity(int num_,tbb::atomic<int> & e_)181     EpochAsyncActivity(int num_, tbb::atomic<int>& e_) : m_numAsyncThreads(num_), m_globalEpoch(e_) {
182         for (int i = 0; i < m_numAsyncThreads ; ++i) {
183             m_asyncThreads.push_back( new tbb::tbb_thread(EpochAsyncActivity::asyncLoop, this) );
184         }
185     }
~EpochAsyncActivity()186     ~EpochAsyncActivity() {
187         for (int i = 0; i < m_numAsyncThreads; ++i) {
188             m_ctxQueue.push(NULL);
189         }
190         for (int i = 0; i < m_numAsyncThreads; ++i) {
191             m_asyncThreads[i]->join();
192             delete m_asyncThreads[i];
193         }
194         ASSERT(m_ctxQueue.empty(), NULL);
195     }
submit(void * ctx)196     void submit(void* ctx) {
197         m_ctxQueue.push(ctx);
198     }
199 
200 private:
asyncLoop(EpochAsyncActivity * async)201     static void asyncLoop(EpochAsyncActivity* async) {
202         tbb::task::suspend_point ctx;
203         async->m_ctxQueue.pop(ctx);
204         while (ctx) {
205             // Track the global epoch
206             async->m_globalEpoch++;
207             // Continue execution from suspended ctx
208             tbb::task::resume(ctx);
209             async->m_ctxQueue.pop(ctx);
210         }
211     }
212 
213     const int m_numAsyncThreads;
214     tbb::atomic<int>& m_globalEpoch;
215     tbb::concurrent_bounded_queue<void*> m_ctxQueue;
216     std::vector<tbb::tbb_thread*> m_asyncThreads;
217 };
218 
219 struct EpochSuspendBody {
EpochSuspendBodyEpochSuspendBody220     EpochSuspendBody(EpochAsyncActivity& a_, tbb::atomic<int>& e_, int& le_) :
221         m_asyncActivity(a_), m_globalEpoch(e_), m_localEpoch(le_) {}
222 
operator ()EpochSuspendBody223     void operator()(tbb::task::suspend_point ctx) {
224         m_localEpoch = m_globalEpoch;
225         m_asyncActivity.submit(ctx);
226     }
227 
228 private:
229     EpochAsyncActivity& m_asyncActivity;
230     tbb::atomic<int>& m_globalEpoch;
231     int& m_localEpoch;
232 };
233 
234 // Simple test for basic resumable tasks functionality
TestSuspendResume()235 void TestSuspendResume() {
236     tbb::atomic<int> global_epoch; global_epoch = 0;
237     EpochAsyncActivity async(4, global_epoch);
238 
239     tbb::enumerable_thread_specific<int, tbb::cache_aligned_allocator<int>, tbb::ets_suspend_aware> ets_fiber;
240     tbb::atomic<int> inner_par_iters, outer_par_iters;
241     inner_par_iters = outer_par_iters = 0;
242 
243     tbb::parallel_for(0, N, [&](int) {
244         for (int i = 0; i < 100; ++i) {
245             ets_fiber.local() = i;
246 
247             int local_epoch;
248             tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch));
249             ASSERT(local_epoch < global_epoch, NULL);
250             ASSERT(ets_fiber.local() == i, NULL);
251 
252             tbb::parallel_for(0, N, [&](int) {
253                 int local_epoch2;
254                 tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch2));
255                 ASSERT(local_epoch2 < global_epoch, NULL);
256                 ++inner_par_iters;
257             });
258 
259             ets_fiber.local() = i;
260             tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch));
261             ASSERT(local_epoch < global_epoch, NULL);
262             ASSERT(ets_fiber.local() == i, NULL);
263         }
264         ++outer_par_iters;
265     });
266     ASSERT(outer_par_iters == N, NULL);
267     ASSERT(inner_par_iters == N*N*100, NULL);
268 }
269 
270 // During cleanup master's local task pool may
271 // e.g. contain proxies of affinitized tasks, but can be recalled
TestCleanupMaster()272 void TestCleanupMaster() {
273     AsyncActivity asyncActivity(4);
274     tbb::task_group tg;
275     tbb::enumerable_thread_specific<int> ets;
276     tbb::atomic<int> iter_spawned;
277     tbb::atomic<int> iter_executed;
278 
279     for (int i = 0; i < 100; i++) {
280         ets.local() = i;
281         iter_spawned = 0;
282         iter_executed = 0;
283 
284         NativeParallelFor(N, [&asyncActivity, &tg, &iter_spawned, &iter_executed](int j) {
285             tbb::task_scheduler_init init(tbb::task_scheduler_init::deferred);
286             if (tbb::task_scheduler_init::default_num_threads() == 1) {
287                 init.initialize(2);
288             }
289             for (int k = 0; k < j*10 + 1; ++k) {
290                 tg.run([&asyncActivity, j, &iter_executed] {
291                     for (volatile int l = 0; l < j*10; ++l) {}
292                     tbb::task::suspend(SuspendBody(asyncActivity));
293                     iter_executed++;
294                 });
295                 iter_spawned++;
296             }
297         });
298         ASSERT(iter_spawned == 460, NULL);
299         tg.wait();
300         ASSERT(iter_executed == 460, NULL);
301         ASSERT(ets.local() == i, NULL);
302     }
303 }
304 
305 class ParForSuspendBody {
306     AsyncActivity& asyncActivity;
307     int m_numIters;
308 public:
ParForSuspendBody(AsyncActivity & a_,int iters)309     ParForSuspendBody(AsyncActivity& a_, int iters) : asyncActivity(a_), m_numIters(iters) {}
operator ()(int) const310     void operator()(int) const {
311         for (volatile int i = 0; i < m_numIters; ++i) {}
312         tbb::task::suspend(SuspendBody(asyncActivity));
313     }
314 };
315 
316 #if __TBB_TASK_PRIORITY
317 class InnerParFor {
318     AsyncActivity& asyncActivity;
319 public:
InnerParFor(AsyncActivity & a_)320     InnerParFor(AsyncActivity& a_) : asyncActivity(a_) {}
operator ()(int) const321     void operator()(int) const {
322         tbb::affinity_partitioner ap;
323         tbb::task_group_context ctx;
324         ctx.set_priority(tbb::priority_high);
325         tbb::parallel_for(0, 10, ParForSuspendBody(asyncActivity, 1000), ap, ctx);
326     }
327 };
328 
TestPriorities()329 void TestPriorities() {
330     AsyncActivity asyncActivity(4);
331 
332     tbb::task_scheduler_init init;
333     tbb::affinity_partitioner ap;
334     tbb::enumerable_thread_specific<int> ets;
335     for (int i = 0; i < 10; ++i) {
336         ets.local() = i;
337         tbb::parallel_for(0, 10, InnerParFor(asyncActivity), ap);
338         ASSERT(ets.local() == i, NULL);
339     }
340 }
341 #endif
342 
TestNativeThread()343 void TestNativeThread() {
344     AsyncActivity asyncActivity(4);
345 
346     int num_threads = tbb::task_scheduler_init::default_num_threads();
347     tbb::task_arena arena(num_threads);
348     tbb::task_group tg;
349     tbb::atomic<int> iter = 0;
350     NativeParallelFor(num_threads / 2, [&arena, &tg, &asyncActivity, &iter](int){
351         for (int i = 0; i < 10; i++) {
352             arena.execute([&tg, &asyncActivity, &iter]() {
353                 tg.run([&asyncActivity]() {
354                     tbb::task::suspend(SuspendBody(asyncActivity));
355                 });
356                 iter++;
357             });
358         }
359     });
360 
361     tbb::enumerable_thread_specific<bool> ets;
362     ets.local() = true;
363     ASSERT(iter == (num_threads / 2) * 10, NULL);
364     arena.execute([&tg](){
365         tg.wait();
366     });
367     ASSERT(ets.local() == true, NULL);
368 }
369 
370 class ObserverTracker : public tbb::task_scheduler_observer {
371     tbb::enumerable_thread_specific<bool> is_in_arena;
372 public:
373     tbb::atomic<int> counter;
374 
ObserverTracker(tbb::task_arena & a)375     ObserverTracker(tbb::task_arena& a) : tbb::task_scheduler_observer(a) {
376         counter = 0;
377         observe(true);
378     }
on_scheduler_entry(bool)379     void on_scheduler_entry(bool) __TBB_override {
380         bool& l = is_in_arena.local();
381         ASSERT(l == false, "The thread must call on_scheduler_entry only one time.");
382         l = true;
383         ++counter;
384     }
on_scheduler_exit(bool)385     void on_scheduler_exit(bool) __TBB_override {
386         bool& l = is_in_arena.local();
387         ASSERT(l == true, "The thread must call on_scheduler_entry before calling on_scheduler_exit.");
388         l = false;
389     }
390 };
391 
TestObservers()392 void TestObservers() {
393     tbb::task_arena arena;
394     ObserverTracker tracker(arena);
395     do {
396         arena.execute([] {
397             tbb::parallel_for(0, 10, [](int) {
398                 tbb::task::suspend([](tbb::task::suspend_point tag) {
399                     tbb::task::resume(tag);
400                 });
401             }, tbb::simple_partitioner());
402         });
403     } while (tracker.counter < 100);
404     tracker.observe(false);
405 }
406 #endif
407 
TestMain()408 int TestMain() {
409     tbb::enumerable_thread_specific<bool> ets;
410     ets.local() = true;
411 
412     tbb::task_scheduler_init init(max(tbb::task_scheduler_init::default_num_threads(), 16));
413 
414     TestNestedArena();
415 #if __TBB_CPP11_LAMBDAS_PRESENT
416     // Using functors would make this test much bigger and with
417     // unnecessary complexity, one C++03 TestNestedArena is enough
418     TestSuspendResume();
419     TestCleanupMaster();
420 #if __TBB_TASK_PRIORITY
421     TestPriorities();
422 #endif
423     TestNativeThread();
424     TestObservers();
425 #endif
426     ASSERT(ets.local() == true, NULL);
427     return Harness::Done;
428 }
429 
430 #endif // !__TBB_PREVIEW_RESUMABLE_TASKS
431 
432