1 /*
2 Copyright (c) 2018-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "harness_defs.h"
18
19 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
20 #define TBB_DEPRECATED_INPUT_NODE_BODY __TBB_CPF_BUILD
21
22 #include "harness_graph.h"
23 #include "harness_barrier.h"
24
25 #include "tbb/flow_graph.h"
26 #include "tbb/tbb_thread.h"
27 #include "tbb/parallel_for.h"
28 #include "tbb/concurrent_queue.h"
29
30 #include <vector>
31 #include <cstdlib>
32
33 using namespace tbb::flow;
34
35 tbb::atomic<unsigned> g_task_num;
36
spin_for(double delta)37 void spin_for( double delta ) {
38 tbb::tick_count start = tbb::tick_count::now();
39 while( (tbb::tick_count::now() - start).seconds() < delta ) ;
40 }
41
42 namespace PriorityNodesTakePrecedence {
43
44 struct TaskInfo {
TaskInfoPriorityNodesTakePrecedence::TaskInfo45 TaskInfo() : my_priority(-1), my_task_index(-1) {}
TaskInfoPriorityNodesTakePrecedence::TaskInfo46 TaskInfo( int priority, int task_index )
47 : my_priority(priority), my_task_index(task_index) {}
48 int my_priority;
49 int my_task_index;
50 };
51 std::vector<TaskInfo> g_task_info;
52 tbb::atomic<bool> g_work_submitted;
53
54 const unsigned node_num = 100;
55 const unsigned start_index = node_num / 3;
56 const unsigned end_index = node_num * 2 / 3;
57 tbb::atomic<unsigned> g_priority_task_index;
58
body_func(int priority)59 void body_func( int priority ) {
60 while( !g_work_submitted ) __TBB_Yield();
61 int current_task_index = g_task_num++;
62 if( priority )
63 g_task_info[g_priority_task_index++] = TaskInfo( priority, current_task_index );
64 }
65
66 struct FunctionBody {
FunctionBodyPriorityNodesTakePrecedence::FunctionBody67 FunctionBody( int priority ) : my_priority( priority ) {}
operator ()PriorityNodesTakePrecedence::FunctionBody68 int operator()( int msg ) const {
69 body_func( my_priority );
70 return msg;
71 }
72 private:
73 int my_priority;
74 };
75
76 typedef multifunction_node< int,tuple<int> > multi_node;
77
78 struct MultifunctionBody {
MultifunctionBodyPriorityNodesTakePrecedence::MultifunctionBody79 MultifunctionBody( int priority ) : my_priority( priority ) {}
operator ()PriorityNodesTakePrecedence::MultifunctionBody80 void operator()( int msg, multi_node::output_ports_type& op ) const {
81 body_func( my_priority );
82 get<0>(op).try_put( msg );
83 }
84 private:
85 int my_priority;
86 };
87
88 template<typename NodeType, typename BodyType>
89 NodeType* node_creator( graph& g, unsigned index ) {
90 if( start_index <= index && index < end_index )
91 return new NodeType( g, unlimited, BodyType(index), node_priority_t(index) );
92 else
93 return new NodeType( g, unlimited, BodyType(0) );
94 }
95
96 struct passthru_body {
operator ()PriorityNodesTakePrecedence::passthru_body97 continue_msg operator()( int ) const {
98 return continue_msg();
99 }
100 };
101
get_sender(NodeType & node)102 template<typename NodeType> sender<int>& get_sender( NodeType& node ) { return node; }
get_sender(multi_node & node)103 template<> sender<int>& get_sender<multi_node>( multi_node& node ) { return output_port<0>(node); }
104
105 template<typename NodeType, typename NodeTypeCreator, typename NodePortRetriever>
test_node(NodeTypeCreator node_creator_func,NodePortRetriever get_sender)106 void test_node( NodeTypeCreator node_creator_func, NodePortRetriever get_sender ) {
107 graph g;
108 broadcast_node<int> bn(g);
109 function_node<int> tn(g, unlimited, passthru_body());
110 // Using pointers to nodes to avoid errors on compilers, which try to generate assignment
111 // operator for the nodes
112 std::vector<NodeType*> nodes;
113 for( unsigned i = 0; i < node_num; ++i ) {
114 nodes.push_back( node_creator_func(g, i) );
115 make_edge( bn, *nodes.back() );
116 make_edge( get_sender(*nodes.back()), tn );
117 }
118
119 const size_t repeats = 50;
120 const size_t priority_nodes_num = end_index - start_index;
121 size_t internal_order_failures = 0;
122 size_t global_order_failures = 0;
123 for( size_t repeat = 0; repeat < repeats; ++repeat ) {
124 g_work_submitted = false;
125 g_task_num = g_priority_task_index = 0;
126 g_task_info.clear(); g_task_info.resize( priority_nodes_num );
127
128 bn.try_put( 0 );
129 // Setting of the flag is based on the knowledge that the calling thread broadcasts the message
130 // to successor nodes, that is spawns tasks. Thus, this makes this test to be a whitebox test to
131 // some extent.
132 g_work_submitted = true;
133
134 g.wait_for_all();
135
136 ASSERT( g_priority_task_index == g_task_info.size(), "Incorrect number of tasks with priority" );
137 bool found_max = false;
138 bool found_min = false;
139 for( unsigned i = 0; i < g_priority_task_index/2; ++i ) {
140 if( g_task_info[i].my_priority == int(end_index-1) )
141 found_max = true;
142 if( g_task_info[g_priority_task_index-1-i].my_priority == int(start_index) )
143 found_min = true;
144 }
145 if( !found_min || !found_max )
146 ++internal_order_failures;
147 for( unsigned i = 0; i < g_priority_task_index; ++i ) {
148 // This check might fail because priorities do not guarantee ordering, i.e. assumption
149 // that all priority nodes should increment the task counter before any subsequent
150 // no-priority node is not correct. In the worst case, a thread that took a priority
151 // node might be preempted and become the last to increment the counter. That's why the
152 // test passing is based on statistics, which could be affected by machine overload
153 // unfortunately.
154 // TODO: make the test deterministic.
155 if( g_task_info[i].my_task_index > int(priority_nodes_num) + MaxThread )
156 ++global_order_failures;
157 }
158 }
159 float failure_ratio = float(internal_order_failures) / float(repeats);
160 ASSERT(
161 failure_ratio <= 0.3f,
162 "Nodes with priorities executed in wrong order among each other too frequently."
163 );
164 failure_ratio = float(global_order_failures) / float(repeats*priority_nodes_num);
165 ASSERT(
166 failure_ratio <= 0.1f,
167 "Nodes with priorities executed in wrong order too frequently over non-prioritized nodes."
168 );
169 for( size_t i = 0; i < nodes.size(); ++i )
170 delete nodes[i];
171 }
172
test(int num_threads)173 void test( int num_threads ) {
174 REMARK( "Testing execution of nodes with priority takes precedence (num_threads=%d) - ", num_threads );
175 tbb::task_scheduler_init init(num_threads);
176 test_node< function_node<int,int> >( &node_creator<function_node<int,int>, FunctionBody>,
177 &get_sender< function_node<int,int> > );
178 test_node<multi_node>( &node_creator<multi_node, MultifunctionBody>, &get_sender< multi_node > );
179 REMARK( "done\n" );
180 }
181 } /* namespace PriorityNodesTakePrecedence */
182
183 namespace ThreadsEagerReaction {
184
185 using Harness::SpinBarrier;
186
187 enum task_type_t { no_task, regular_task, async_task };
188
189 struct profile_t {
190 task_type_t task_type;
191 unsigned global_task_id;
192 double elapsed;
193 };
194
195 std::vector<unsigned> g_async_task_ids;
196
197 typedef unsigned data_type;
198 typedef async_node<data_type, data_type> async_node_type;
199 typedef multifunction_node<
200 data_type, tuple<data_type, data_type> > decider_node_type;
201 struct AsyncActivity {
202 typedef async_node_type::gateway_type gateway_type;
203
204 struct work_type { data_type input; gateway_type* gateway; };
205 bool done;
206 tbb::concurrent_queue<work_type> my_queue;
207 tbb::tbb_thread my_service_thread;
208
209 struct ServiceThreadFunc {
210 SpinBarrier& my_barrier;
ServiceThreadFuncThreadsEagerReaction::AsyncActivity::ServiceThreadFunc211 ServiceThreadFunc(SpinBarrier& barrier) : my_barrier(barrier) {}
operator ()ThreadsEagerReaction::AsyncActivity::ServiceThreadFunc212 void operator()(AsyncActivity* activity) {
213 while (!activity->done) {
214 work_type work;
215 while (activity->my_queue.try_pop(work)) {
216 g_async_task_ids.push_back( ++g_task_num );
217 work.gateway->try_put(work.input);
218 work.gateway->release_wait();
219 my_barrier.wait();
220 }
221 }
222 }
223 };
stop_and_waitThreadsEagerReaction::AsyncActivity224 void stop_and_wait() { done = true; my_service_thread.join(); }
225
submitThreadsEagerReaction::AsyncActivity226 void submit(data_type input, gateway_type* gateway) {
227 work_type work = { input, gateway };
228 gateway->reserve_wait();
229 my_queue.push(work);
230 }
AsyncActivityThreadsEagerReaction::AsyncActivity231 AsyncActivity(SpinBarrier& barrier)
232 : done(false), my_service_thread(ServiceThreadFunc(barrier), this) {}
233 };
234
235 struct StartBody {
236 bool has_run;
237 #if TBB_DEPRECATED_INPUT_NODE_BODY
operator ()ThreadsEagerReaction::StartBody238 bool operator()(data_type& input) {
239 if (has_run) return false;
240 else {
241 input = 1;
242 has_run = true;
243 return true;
244 }
245 }
246 #else
operator ()ThreadsEagerReaction::StartBody247 data_type operator()(tbb::flow_control& fc) {
248 if (has_run){
249 fc.stop();
250 return data_type();
251 }
252 has_run = true;
253 return 1;
254 }
255 #endif
StartBodyThreadsEagerReaction::StartBody256 StartBody() : has_run(false) {}
257 };
258
259 struct ParallelForBody {
260 SpinBarrier& my_barrier;
261 const data_type& my_input;
ParallelForBodyThreadsEagerReaction::ParallelForBody262 ParallelForBody(SpinBarrier& barrier, const data_type& input)
263 : my_barrier(barrier), my_input(input) {}
operator ()ThreadsEagerReaction::ParallelForBody264 void operator()(const data_type&) const {
265 my_barrier.wait();
266 ++g_task_num;
267 }
268 };
269
270 struct CpuWorkBody {
271 SpinBarrier& my_barrier;
272 const int my_tasks_count;
operator ()ThreadsEagerReaction::CpuWorkBody273 data_type operator()(const data_type& input) {
274 tbb::parallel_for(0, my_tasks_count, ParallelForBody(my_barrier, input), tbb::simple_partitioner());
275 return input;
276 }
CpuWorkBodyThreadsEagerReaction::CpuWorkBody277 CpuWorkBody(SpinBarrier& barrier, int tasks_count)
278 : my_barrier(barrier), my_tasks_count(tasks_count) {}
279 };
280
281 struct DeciderBody {
282 const data_type& my_limit;
DeciderBodyThreadsEagerReaction::DeciderBody283 DeciderBody( const data_type& limit ) : my_limit( limit ) {}
operator ()ThreadsEagerReaction::DeciderBody284 void operator()(data_type input, decider_node_type::output_ports_type& ports) {
285 if (input < my_limit)
286 get<0>(ports).try_put(input + 1);
287 }
288 };
289
290 struct AsyncSubmissionBody {
291 AsyncActivity* my_activity;
operator ()ThreadsEagerReaction::AsyncSubmissionBody292 void operator()(data_type input, async_node_type::gateway_type& gateway) {
293 my_activity->submit(input, &gateway);
294 }
AsyncSubmissionBodyThreadsEagerReaction::AsyncSubmissionBody295 AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {}
296 };
297
test(int num_threads)298 void test( int num_threads ) {
299 REMARK( "Testing threads react eagerly on asynchronous tasks (num_threads=%d) - ", num_threads );
300 if( num_threads == tbb::task_scheduler_init::default_num_threads() ) {
301 // one thread is required for asynchronous compute resource
302 REMARK("skipping test since it is designed to work on less number of threads than "
303 "hardware concurrency allows\n");
304 return;
305 }
306 const unsigned cpu_threads = unsigned(num_threads);
307 const unsigned cpu_tasks_per_thread = 4;
308 const unsigned nested_cpu_tasks = cpu_tasks_per_thread * cpu_threads;
309 const unsigned async_subgraph_reruns = 8;
310 const unsigned cpu_subgraph_reruns = 2;
311
312 SpinBarrier barrier(cpu_threads + /*async thread=*/1);
313 g_task_num = 0;
314 g_async_task_ids.clear();
315 g_async_task_ids.reserve(async_subgraph_reruns);
316
317 tbb::task_scheduler_init init(cpu_threads);
318 AsyncActivity activity(barrier);
319 graph g;
320
321 input_node<data_type> starter_node(g, StartBody());
322 function_node<data_type, data_type> cpu_work_node(
323 g, unlimited, CpuWorkBody(barrier, nested_cpu_tasks));
324 decider_node_type cpu_restarter_node(g, unlimited, DeciderBody(cpu_subgraph_reruns));
325 async_node_type async_node(g, unlimited, AsyncSubmissionBody(&activity));
326 decider_node_type async_restarter_node(
327 g, unlimited, DeciderBody(async_subgraph_reruns), node_priority_t(1)
328 );
329
330 make_edge(starter_node, cpu_work_node);
331 make_edge(cpu_work_node, cpu_restarter_node);
332 make_edge(output_port<0>(cpu_restarter_node), cpu_work_node);
333
334 make_edge(starter_node, async_node);
335 make_edge(async_node, async_restarter_node);
336 make_edge(output_port<0>(async_restarter_node), async_node);
337
338 starter_node.activate();
339 g.wait_for_all();
340 activity.stop_and_wait();
341
342 const size_t async_task_num = size_t(async_subgraph_reruns);
343 ASSERT( g_async_task_ids.size() == async_task_num, "Incorrect number of async tasks." );
344 unsigned max_span = unsigned(2 * cpu_threads + 1);
345 for( size_t idx = 1; idx < async_task_num; ++idx ) {
346 ASSERT( g_async_task_ids[idx] - g_async_task_ids[idx-1] <= max_span,
347 "Async tasks were not able to interfere with CPU tasks." );
348 }
349 REMARK("done\n");
350 }
351 } /* ThreadsEagerReaction */
352
353 namespace LimitingExecutionToPriorityTask {
354
355 enum work_type_t { NONPRIORITIZED_WORK, PRIORITIZED_WORK };
356
357 struct execution_tracker_t {
execution_tracker_tLimitingExecutionToPriorityTask::execution_tracker_t358 execution_tracker_t() { reset(); }
resetLimitingExecutionToPriorityTask::execution_tracker_t359 void reset() {
360 prioritized_work_submitter = tbb::tbb_thread::id();
361 prioritized_work_started = false;
362 prioritized_work_finished = false;
363 prioritized_work_interrupted = false;
364 }
365 tbb::tbb_thread::id prioritized_work_submitter;
366 bool prioritized_work_started;
367 bool prioritized_work_finished;
368 bool prioritized_work_interrupted;
369 } exec_tracker;
370
371 template<work_type_t work_type>
372 void do_node_work( int work_size );
373
374 template<work_type_t>
375 void do_nested_work( const tbb::tbb_thread::id& tid, const tbb::blocked_range<int>& subrange );
376
377 template<work_type_t work_type>
378 struct CommonBody {
CommonBodyLimitingExecutionToPriorityTask::CommonBody379 CommonBody() : my_body_size( 0 ) { }
CommonBodyLimitingExecutionToPriorityTask::CommonBody380 CommonBody( int body_size ) : my_body_size( body_size ) { }
operator ()LimitingExecutionToPriorityTask::CommonBody381 continue_msg operator()( const continue_msg& msg ) const {
382 do_node_work<work_type>(my_body_size);
383 return msg;
384 }
operator ()LimitingExecutionToPriorityTask::CommonBody385 void operator()( const tbb::blocked_range<int>& subrange ) const {
386 do_nested_work<work_type>( /*tid=*/tbb::this_tbb_thread::get_id(), subrange );
387 }
388 int my_body_size;
389 };
390
391 template<work_type_t work_type>
do_node_work(int work_size)392 void do_node_work(int work_size) {
393 tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<work_type>(),
394 tbb::simple_partitioner() );
395 }
396
397 template<work_type_t>
do_nested_work(const tbb::tbb_thread::id & tid,const tbb::blocked_range<int> &)398 void do_nested_work( const tbb::tbb_thread::id& tid, const tbb::blocked_range<int>& /*subrange*/ ) {
399 // This is non-prioritized work...
400 if( exec_tracker.prioritized_work_submitter != tid )
401 return;
402 // ...being executed by the thread that initially started prioritized one...
403 ASSERT( exec_tracker.prioritized_work_started,
404 "Prioritized work should have been started by that time." );
405 // ...prioritized work has been started already...
406 if( exec_tracker.prioritized_work_finished )
407 return;
408 // ...but has not been finished yet
409 exec_tracker.prioritized_work_interrupted = true;
410 }
411
412 struct IsolationFunctor {
413 int work_size;
IsolationFunctorLimitingExecutionToPriorityTask::IsolationFunctor414 IsolationFunctor(int ws) : work_size(ws) {}
operator ()LimitingExecutionToPriorityTask::IsolationFunctor415 void operator()() const {
416 tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<PRIORITIZED_WORK>(),
417 tbb::simple_partitioner() );
418 }
419 };
420
421 template<>
do_node_work(int work_size)422 void do_node_work<PRIORITIZED_WORK>(int work_size) {
423 exec_tracker.prioritized_work_submitter = tbb::this_tbb_thread::get_id();
424 exec_tracker.prioritized_work_started = true;
425 tbb::this_task_arena::isolate( IsolationFunctor(work_size) );
426 exec_tracker.prioritized_work_finished = true;
427 }
428
429 template<>
do_nested_work(const tbb::tbb_thread::id & tid,const tbb::blocked_range<int> &)430 void do_nested_work<PRIORITIZED_WORK>( const tbb::tbb_thread::id& tid,
431 const tbb::blocked_range<int>& /*subrange*/ ) {
432 if( exec_tracker.prioritized_work_submitter == tid ) {
433 ASSERT( !exec_tracker.prioritized_work_interrupted,
434 "Thread was not fully devoted to processing of prioritized task." );
435 } else {
436 // prolong processing of prioritized work so that the thread that started
437 // prioritized work has higher probability to help with non-prioritized one.
438 spin_for(0.1);
439 }
440 }
441
442 // Using pointers to nodes to avoid errors on compilers, which try to generate assignment operator
443 // for the nodes
444 typedef std::vector< continue_node<continue_msg>* > nodes_container_t;
445
create_nodes(nodes_container_t & nodes,graph & g,int num,int body_size)446 void create_nodes( nodes_container_t& nodes, graph& g, int num, int body_size ) {
447 for( int i = 0; i < num; ++i )
448 nodes.push_back(
449 new continue_node<continue_msg>( g, CommonBody<NONPRIORITIZED_WORK>( body_size ) )
450 );
451 }
452
test(int num_threads)453 void test( int num_threads ) {
454 REMARK( "Testing limit execution to priority tasks (num_threads=%d) - ", num_threads );
455
456 tbb::task_scheduler_init init( num_threads );
457
458 const int nodes_num = 100;
459 const int priority_node_position_part = 10;
460 const int pivot = nodes_num / priority_node_position_part;
461 const int nodes_in_lane = 3 * num_threads;
462 const int small_problem_size = 100;
463 const int large_problem_size = 1000;
464
465 graph g;
466 nodes_container_t nodes;
467 create_nodes( nodes, g, pivot, large_problem_size );
468 nodes.push_back(
469 new continue_node<continue_msg>(
470 g, CommonBody<PRIORITIZED_WORK>(small_problem_size), node_priority_t(1)
471 )
472 );
473 create_nodes( nodes, g, nodes_num - pivot - 1, large_problem_size );
474
475 broadcast_node<continue_msg> bn(g);
476 for( int i = 0; i < nodes_num; ++i )
477 if( i % nodes_in_lane == 0 )
478 make_edge( bn, *nodes[i] );
479 else
480 make_edge( *nodes[i-1], *nodes[i] );
481 exec_tracker.reset();
482 bn.try_put( continue_msg() );
483 g.wait_for_all();
484
485 for( size_t i = 0; i < nodes.size(); ++i )
486 delete nodes[i];
487 REMARK( "done\n" );
488 }
489
490 } /* namespace LimitingExecutionToPriorityTask */
491
492 #include "tbb/task_arena.h"
493 namespace NestedCase {
494
495 using tbb::task_arena;
496
497 struct ResetGraphFunctor {
498 graph& my_graph;
ResetGraphFunctorNestedCase::ResetGraphFunctor499 ResetGraphFunctor(graph& g) : my_graph(g) {}
500 // copy constructor to please some old compilers
ResetGraphFunctorNestedCase::ResetGraphFunctor501 ResetGraphFunctor(const ResetGraphFunctor& rgf) : my_graph(rgf.my_graph) {}
operator ()NestedCase::ResetGraphFunctor502 void operator()() const { my_graph.reset(); }
503 };
504
505 struct InnerBody {
operator ()NestedCase::InnerBody506 continue_msg operator()( const continue_msg& ) const {
507 return continue_msg();
508 }
509 };
510
511 struct OuterBody {
512 int my_max_threads;
513 task_arena& my_inner_arena;
OuterBodyNestedCase::OuterBody514 OuterBody( int max_threads, task_arena& inner_arena )
515 : my_max_threads(max_threads), my_inner_arena(inner_arena) {}
516 // copy constructor to please some old compilers
OuterBodyNestedCase::OuterBody517 OuterBody( const OuterBody& rhs )
518 : my_max_threads(rhs.my_max_threads), my_inner_arena(rhs.my_inner_arena) {}
operator ()NestedCase::OuterBody519 int operator()( const int& ) {
520 graph inner_graph;
521 continue_node<continue_msg> start_node(inner_graph, InnerBody());
522 continue_node<continue_msg> mid_node1(inner_graph, InnerBody(), node_priority_t(5));
523 continue_node<continue_msg> mid_node2(inner_graph, InnerBody());
524 continue_node<continue_msg> end_node(inner_graph, InnerBody(), node_priority_t(15));
525 make_edge( start_node, mid_node1 );
526 make_edge( mid_node1, end_node );
527 make_edge( start_node, mid_node2 );
528 make_edge( mid_node2, end_node );
529 my_inner_arena.execute( ResetGraphFunctor(inner_graph) );
530 start_node.try_put( continue_msg() );
531 inner_graph.wait_for_all();
532 return 13;
533 }
534 };
535
execute_outer_graph(bool same_arena,task_arena & inner_arena,int max_threads,graph & outer_graph,function_node<int,int> & start_node)536 void execute_outer_graph( bool same_arena, task_arena& inner_arena, int max_threads,
537 graph& outer_graph, function_node<int,int>& start_node ) {
538 if( same_arena ) {
539 start_node.try_put( 42 );
540 outer_graph.wait_for_all();
541 return;
542 }
543 for( int num_threads = 1; num_threads <= max_threads; ++num_threads ) {
544 inner_arena.initialize( num_threads );
545 start_node.try_put( 42 );
546 outer_graph.wait_for_all();
547 inner_arena.terminate();
548 }
549 }
550
test_in_arena(int max_threads,task_arena & outer_arena,task_arena & inner_arena)551 void test_in_arena( int max_threads, task_arena& outer_arena, task_arena& inner_arena ) {
552 graph outer_graph;
553 const unsigned num_outer_nodes = 10;
554 const size_t concurrency = unlimited;
555 std::vector< function_node<int,int>* > outer_nodes;
556 for( unsigned node_index = 0; node_index < num_outer_nodes; ++node_index ) {
557 internal::node_priority_t priority = internal::no_priority;
558 if( node_index == num_outer_nodes / 2 )
559 priority = 10;
560
561 outer_nodes.push_back(
562 new function_node<int,int>(
563 outer_graph, concurrency, OuterBody(max_threads, inner_arena), priority
564 )
565 );
566 }
567
568 for( unsigned node_index1 = 0; node_index1 < num_outer_nodes; ++node_index1 )
569 for( unsigned node_index2 = node_index1+1; node_index2 < num_outer_nodes; ++node_index2 )
570 make_edge( *outer_nodes[node_index1], *outer_nodes[node_index2] );
571
572 bool same_arena = &outer_arena == &inner_arena;
573 for( int num_threads = 1; num_threads <= max_threads; ++num_threads ) {
574 REMARK( "Testing nested nodes with specified priority in %s arenas, num_threads=%d) - ",
575 same_arena? "same" : "different", num_threads );
576 outer_arena.initialize( num_threads );
577 outer_arena.execute( ResetGraphFunctor(outer_graph) );
578 execute_outer_graph( same_arena, inner_arena, max_threads, outer_graph, *outer_nodes[0] );
579 outer_arena.terminate();
580 REMARK( "done\n" );
581 }
582
583 for( size_t i = 0; i < outer_nodes.size(); ++i )
584 delete outer_nodes[i];
585 }
586
test(int max_threads)587 void test( int max_threads ) {
588 tbb::task_scheduler_init init( max_threads );
589 task_arena outer_arena; task_arena inner_arena;
590 test_in_arena( max_threads, outer_arena, outer_arena );
591 test_in_arena( max_threads, outer_arena, inner_arena );
592 }
593 }
594
TestMain()595 int TestMain() {
596 if( MinThread < 1 ) {
597 REPORT( "Number of threads must be positive\n" );
598 return Harness::Skipped;
599 }
600 for( int p = MinThread; p <= MaxThread; ++p ) {
601 PriorityNodesTakePrecedence::test( p );
602 ThreadsEagerReaction::test( p );
603 LimitingExecutionToPriorityTask::test( p );
604 }
605 NestedCase::test( MaxThread );
606 return Harness::Done;
607 }
608 #else /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
609 #define HARNESS_SKIP_TEST 1
610 #include "harness.h"
611 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
612