1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "tbb/tbb_config.h"
18 #include "tbb/task_scheduler_observer.h"
19
20 #include "harness.h"
21
22 #if !__TBB_PREVIEW_RESUMABLE_TASKS
TestMain()23 int TestMain() {
24 return Harness::Skipped;
25 }
26 #else // __TBB_PREVIEW_RESUMABLE_TASKS
27
28 #include "tbb/task.h"
29 #include "tbb/concurrent_queue.h"
30 #include "tbb/atomic.h"
31 #include "tbb/parallel_for.h"
32 #include "tbb/task_scheduler_init.h"
33 #include "tbb/enumerable_thread_specific.h"
34 #include "tbb/task_arena.h"
35 #include "tbb/task_group.h"
36 #include "tbb/tbb_thread.h"
37
38 #include <vector>
39
40 typedef tbb::enumerable_thread_specific<int, tbb::cache_aligned_allocator<int> > ets_int_t;
41 const int N = 10;
42
43 // External activity used in all tests, which resumes suspended execution point
44 class AsyncActivity {
45 public:
AsyncActivity(int num_)46 AsyncActivity(int num_) : m_numAsyncThreads(num_) {
47 for (int i = 0; i < m_numAsyncThreads ; ++i) {
48 m_asyncThreads.push_back( new tbb::tbb_thread(AsyncActivity::asyncLoop, this) );
49 }
50 }
~AsyncActivity()51 ~AsyncActivity() {
52 for (int i = 0; i < m_numAsyncThreads; ++i) {
53 m_tagQueue.push(NULL);
54 }
55 for (int i = 0; i < m_numAsyncThreads; ++i) {
56 m_asyncThreads[i]->join();
57 delete m_asyncThreads[i];
58 }
59 ASSERT(m_tagQueue.empty(), NULL);
60 }
submit(void * ctx)61 void submit(void* ctx) {
62 m_tagQueue.push(ctx);
63 }
64
65 private:
asyncLoop(AsyncActivity * async)66 static void asyncLoop(AsyncActivity* async) {
67 tbb::task::suspend_point tag;
68 async->m_tagQueue.pop(tag);
69 while (tag) {
70 tbb::task::resume(tag);
71 async->m_tagQueue.pop(tag);
72 }
73 }
74
75 const int m_numAsyncThreads;
76 tbb::concurrent_bounded_queue<void*> m_tagQueue;
77 std::vector<tbb::tbb_thread*> m_asyncThreads;
78 };
79
80 struct SuspendBody {
SuspendBodySuspendBody81 SuspendBody(AsyncActivity& a_) :
82 m_asyncActivity(a_) {}
operator ()SuspendBody83 void operator()(tbb::task::suspend_point tag) {
84 m_asyncActivity.submit(tag);
85 }
86
87 private:
88 AsyncActivity& m_asyncActivity;
89 };
90
91 class InnermostArenaBody {
92 public:
InnermostArenaBody(AsyncActivity & a_)93 InnermostArenaBody(AsyncActivity& a_) : m_asyncActivity(a_) {}
94
operator ()()95 void operator()() {
96 InnermostOuterParFor inner_outer_body(m_asyncActivity);
97 tbb::parallel_for(0, N, inner_outer_body );
98 }
99
100 private:
101 struct InnermostInnerParFor {
InnermostInnerParForInnermostArenaBody::InnermostInnerParFor102 InnermostInnerParFor(AsyncActivity& a_) : m_asyncActivity(a_) {}
operator ()InnermostArenaBody::InnermostInnerParFor103 void operator()(int) const {
104 tbb::task::suspend(SuspendBody(m_asyncActivity));
105 }
106 AsyncActivity& m_asyncActivity;
107 };
108 struct InnermostOuterParFor {
InnermostOuterParForInnermostArenaBody::InnermostOuterParFor109 InnermostOuterParFor(AsyncActivity& a_) : m_asyncActivity(a_) {}
operator ()InnermostArenaBody::InnermostOuterParFor110 void operator()(int) const {
111 tbb::task::suspend(SuspendBody(m_asyncActivity));
112 InnermostInnerParFor inner_inner_body(m_asyncActivity);
113 tbb::parallel_for(0, N, inner_inner_body);
114 }
115 AsyncActivity& m_asyncActivity;
116 };
117 AsyncActivity& m_asyncActivity;
118 };
119
120 class OutermostArenaBody {
121 public:
OutermostArenaBody(AsyncActivity & a_,tbb::task_arena & o_,tbb::task_arena & i_,tbb::task_arena & id_,ets_int_t & ets_)122 OutermostArenaBody(AsyncActivity& a_, tbb::task_arena& o_, tbb::task_arena& i_, tbb::task_arena& id_, ets_int_t& ets_) :
123 m_asyncActivity(a_), m_outermostArena(o_), m_innermostArena(i_), m_innermostArenaDefault(id_), m_etsInner(ets_) {}
124
operator ()()125 void operator()() {
126 tbb::parallel_for(0, 32, *this);
127 }
128
operator ()(int i) const129 void operator()(int i) const {
130 tbb::task::suspend(SuspendBody(m_asyncActivity));
131
132 tbb::task_arena& nested_arena = (i % 3 == 0) ?
133 m_outermostArena : (i % 3 == 1 ? m_innermostArena : m_innermostArenaDefault);
134
135 if (i % 3 != 0) {
136 // We can only guarantee recall coorectness for "not-same" nested arenas entry
137 m_etsInner.local() = i;
138 }
139 InnermostArenaBody innermost_arena_body(m_asyncActivity);
140 nested_arena.execute(innermost_arena_body);
141 if (i % 3 != 0) {
142 ASSERT(i == m_etsInner.local(), "Original thread wasn't recalled for innermost nested arena.");
143 }
144 }
145
146 private:
147 AsyncActivity& m_asyncActivity;
148 tbb::task_arena& m_outermostArena;
149 tbb::task_arena& m_innermostArena;
150 tbb::task_arena& m_innermostArenaDefault;
151 ets_int_t& m_etsInner;
152 };
153
TestNestedArena()154 void TestNestedArena() {
155 AsyncActivity asyncActivity(4);
156
157 ets_int_t ets_outer;
158 ets_int_t ets_inner;
159
160 tbb::task_arena outermost_arena;
161 tbb::task_arena innermost_arena(2,2);
162 tbb::task_arena innermost_arena_default;
163
164 outermost_arena.initialize();
165 innermost_arena_default.initialize();
166 innermost_arena.initialize();
167
168 ets_outer.local() = 42;
169 OutermostArenaBody outer_arena_body(asyncActivity, outermost_arena, innermost_arena, innermost_arena_default, ets_inner);
170 outermost_arena.execute(outer_arena_body);
171 ASSERT(ets_outer.local() == 42, "Original/main thread wasn't recalled.");
172 }
173
174 #if __TBB_CPP11_LAMBDAS_PRESENT
175
176 #include <thread>
177
178 // External activity used in all tests, which resumes suspended execution point
179 class EpochAsyncActivity {
180 public:
EpochAsyncActivity(int num_,tbb::atomic<int> & e_)181 EpochAsyncActivity(int num_, tbb::atomic<int>& e_) : m_numAsyncThreads(num_), m_globalEpoch(e_) {
182 for (int i = 0; i < m_numAsyncThreads ; ++i) {
183 m_asyncThreads.push_back( new tbb::tbb_thread(EpochAsyncActivity::asyncLoop, this) );
184 }
185 }
~EpochAsyncActivity()186 ~EpochAsyncActivity() {
187 for (int i = 0; i < m_numAsyncThreads; ++i) {
188 m_ctxQueue.push(NULL);
189 }
190 for (int i = 0; i < m_numAsyncThreads; ++i) {
191 m_asyncThreads[i]->join();
192 delete m_asyncThreads[i];
193 }
194 ASSERT(m_ctxQueue.empty(), NULL);
195 }
submit(void * ctx)196 void submit(void* ctx) {
197 m_ctxQueue.push(ctx);
198 }
199
200 private:
asyncLoop(EpochAsyncActivity * async)201 static void asyncLoop(EpochAsyncActivity* async) {
202 tbb::task::suspend_point ctx;
203 async->m_ctxQueue.pop(ctx);
204 while (ctx) {
205 // Track the global epoch
206 async->m_globalEpoch++;
207 // Continue execution from suspended ctx
208 tbb::task::resume(ctx);
209 async->m_ctxQueue.pop(ctx);
210 }
211 }
212
213 const int m_numAsyncThreads;
214 tbb::atomic<int>& m_globalEpoch;
215 tbb::concurrent_bounded_queue<void*> m_ctxQueue;
216 std::vector<tbb::tbb_thread*> m_asyncThreads;
217 };
218
219 struct EpochSuspendBody {
EpochSuspendBodyEpochSuspendBody220 EpochSuspendBody(EpochAsyncActivity& a_, tbb::atomic<int>& e_, int& le_) :
221 m_asyncActivity(a_), m_globalEpoch(e_), m_localEpoch(le_) {}
222
operator ()EpochSuspendBody223 void operator()(tbb::task::suspend_point ctx) {
224 m_localEpoch = m_globalEpoch;
225 m_asyncActivity.submit(ctx);
226 }
227
228 private:
229 EpochAsyncActivity& m_asyncActivity;
230 tbb::atomic<int>& m_globalEpoch;
231 int& m_localEpoch;
232 };
233
234 // Simple test for basic resumable tasks functionality
TestSuspendResume()235 void TestSuspendResume() {
236 tbb::atomic<int> global_epoch; global_epoch = 0;
237 EpochAsyncActivity async(4, global_epoch);
238
239 tbb::enumerable_thread_specific<int, tbb::cache_aligned_allocator<int>, tbb::ets_suspend_aware> ets_fiber;
240 tbb::atomic<int> inner_par_iters, outer_par_iters;
241 inner_par_iters = outer_par_iters = 0;
242
243 tbb::parallel_for(0, N, [&](int) {
244 for (int i = 0; i < 100; ++i) {
245 ets_fiber.local() = i;
246
247 int local_epoch;
248 tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch));
249 ASSERT(local_epoch < global_epoch, NULL);
250 ASSERT(ets_fiber.local() == i, NULL);
251
252 tbb::parallel_for(0, N, [&](int) {
253 int local_epoch2;
254 tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch2));
255 ASSERT(local_epoch2 < global_epoch, NULL);
256 ++inner_par_iters;
257 });
258
259 ets_fiber.local() = i;
260 tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch));
261 ASSERT(local_epoch < global_epoch, NULL);
262 ASSERT(ets_fiber.local() == i, NULL);
263 }
264 ++outer_par_iters;
265 });
266 ASSERT(outer_par_iters == N, NULL);
267 ASSERT(inner_par_iters == N*N*100, NULL);
268 }
269
270 // During cleanup master's local task pool may
271 // e.g. contain proxies of affinitized tasks, but can be recalled
TestCleanupMaster()272 void TestCleanupMaster() {
273 AsyncActivity asyncActivity(4);
274 tbb::task_group tg;
275 tbb::enumerable_thread_specific<int> ets;
276 tbb::atomic<int> iter_spawned;
277 tbb::atomic<int> iter_executed;
278
279 for (int i = 0; i < 100; i++) {
280 ets.local() = i;
281 iter_spawned = 0;
282 iter_executed = 0;
283
284 NativeParallelFor(N, [&asyncActivity, &tg, &iter_spawned, &iter_executed](int j) {
285 tbb::task_scheduler_init init(tbb::task_scheduler_init::deferred);
286 if (tbb::task_scheduler_init::default_num_threads() == 1) {
287 init.initialize(2);
288 }
289 for (int k = 0; k < j*10 + 1; ++k) {
290 tg.run([&asyncActivity, j, &iter_executed] {
291 for (volatile int l = 0; l < j*10; ++l) {}
292 tbb::task::suspend(SuspendBody(asyncActivity));
293 iter_executed++;
294 });
295 iter_spawned++;
296 }
297 });
298 ASSERT(iter_spawned == 460, NULL);
299 tg.wait();
300 ASSERT(iter_executed == 460, NULL);
301 ASSERT(ets.local() == i, NULL);
302 }
303 }
304
305 class ParForSuspendBody {
306 AsyncActivity& asyncActivity;
307 int m_numIters;
308 public:
ParForSuspendBody(AsyncActivity & a_,int iters)309 ParForSuspendBody(AsyncActivity& a_, int iters) : asyncActivity(a_), m_numIters(iters) {}
operator ()(int) const310 void operator()(int) const {
311 for (volatile int i = 0; i < m_numIters; ++i) {}
312 tbb::task::suspend(SuspendBody(asyncActivity));
313 }
314 };
315
316 #if __TBB_TASK_PRIORITY
317 class InnerParFor {
318 AsyncActivity& asyncActivity;
319 public:
InnerParFor(AsyncActivity & a_)320 InnerParFor(AsyncActivity& a_) : asyncActivity(a_) {}
operator ()(int) const321 void operator()(int) const {
322 tbb::affinity_partitioner ap;
323 tbb::task_group_context ctx;
324 ctx.set_priority(tbb::priority_high);
325 tbb::parallel_for(0, 10, ParForSuspendBody(asyncActivity, 1000), ap, ctx);
326 }
327 };
328
TestPriorities()329 void TestPriorities() {
330 AsyncActivity asyncActivity(4);
331
332 tbb::task_scheduler_init init;
333 tbb::affinity_partitioner ap;
334 tbb::enumerable_thread_specific<int> ets;
335 for (int i = 0; i < 10; ++i) {
336 ets.local() = i;
337 tbb::parallel_for(0, 10, InnerParFor(asyncActivity), ap);
338 ASSERT(ets.local() == i, NULL);
339 }
340 }
341 #endif
342
TestNativeThread()343 void TestNativeThread() {
344 AsyncActivity asyncActivity(4);
345
346 int num_threads = tbb::task_scheduler_init::default_num_threads();
347 tbb::task_arena arena(num_threads);
348 tbb::task_group tg;
349 tbb::atomic<int> iter = 0;
350 NativeParallelFor(num_threads / 2, [&arena, &tg, &asyncActivity, &iter](int){
351 for (int i = 0; i < 10; i++) {
352 arena.execute([&tg, &asyncActivity, &iter]() {
353 tg.run([&asyncActivity]() {
354 tbb::task::suspend(SuspendBody(asyncActivity));
355 });
356 iter++;
357 });
358 }
359 });
360
361 tbb::enumerable_thread_specific<bool> ets;
362 ets.local() = true;
363 ASSERT(iter == (num_threads / 2) * 10, NULL);
364 arena.execute([&tg](){
365 tg.wait();
366 });
367 ASSERT(ets.local() == true, NULL);
368 }
369
370 class ObserverTracker : public tbb::task_scheduler_observer {
371 tbb::enumerable_thread_specific<bool> is_in_arena;
372 public:
373 tbb::atomic<int> counter;
374
ObserverTracker(tbb::task_arena & a)375 ObserverTracker(tbb::task_arena& a) : tbb::task_scheduler_observer(a) {
376 counter = 0;
377 observe(true);
378 }
on_scheduler_entry(bool)379 void on_scheduler_entry(bool) __TBB_override {
380 bool& l = is_in_arena.local();
381 ASSERT(l == false, "The thread must call on_scheduler_entry only one time.");
382 l = true;
383 ++counter;
384 }
on_scheduler_exit(bool)385 void on_scheduler_exit(bool) __TBB_override {
386 bool& l = is_in_arena.local();
387 ASSERT(l == true, "The thread must call on_scheduler_entry before calling on_scheduler_exit.");
388 l = false;
389 }
390 };
391
TestObservers()392 void TestObservers() {
393 tbb::task_arena arena;
394 ObserverTracker tracker(arena);
395 do {
396 arena.execute([] {
397 tbb::parallel_for(0, 10, [](int) {
398 tbb::task::suspend([](tbb::task::suspend_point tag) {
399 tbb::task::resume(tag);
400 });
401 }, tbb::simple_partitioner());
402 });
403 } while (tracker.counter < 100);
404 tracker.observe(false);
405 }
406 #endif
407
TestMain()408 int TestMain() {
409 tbb::enumerable_thread_specific<bool> ets;
410 ets.local() = true;
411
412 tbb::task_scheduler_init init(max(tbb::task_scheduler_init::default_num_threads(), 16));
413
414 TestNestedArena();
415 #if __TBB_CPP11_LAMBDAS_PRESENT
416 // Using functors would make this test much bigger and with
417 // unnecessary complexity, one C++03 TestNestedArena is enough
418 TestSuspendResume();
419 TestCleanupMaster();
420 #if __TBB_TASK_PRIORITY
421 TestPriorities();
422 #endif
423 TestNativeThread();
424 TestObservers();
425 #endif
426 ASSERT(ets.local() == true, NULL);
427 return Harness::Done;
428 }
429
430 #endif // !__TBB_PREVIEW_RESUMABLE_TASKS
431
432