1 /*
2     Copyright (c) 2018-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "harness_defs.h"
18 
19 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
20 #define TBB_DEPRECATED_INPUT_NODE_BODY __TBB_CPF_BUILD
21 
22 #include "harness_graph.h"
23 #include "harness_barrier.h"
24 
25 #include "tbb/flow_graph.h"
26 #include "tbb/tbb_thread.h"
27 #include "tbb/parallel_for.h"
28 #include "tbb/concurrent_queue.h"
29 
30 #include <vector>
31 #include <cstdlib>
32 
33 using namespace tbb::flow;
34 
35 tbb::atomic<unsigned> g_task_num;
36 
spin_for(double delta)37 void spin_for( double delta ) {
38     tbb::tick_count start = tbb::tick_count::now();
39     while( (tbb::tick_count::now() - start).seconds() < delta ) ;
40 }
41 
42 namespace PriorityNodesTakePrecedence {
43 
44 struct TaskInfo {
TaskInfoPriorityNodesTakePrecedence::TaskInfo45     TaskInfo() : my_priority(-1), my_task_index(-1) {}
TaskInfoPriorityNodesTakePrecedence::TaskInfo46     TaskInfo( int priority, int task_index )
47         : my_priority(priority), my_task_index(task_index) {}
48     int my_priority;
49     int my_task_index;
50 };
51 std::vector<TaskInfo> g_task_info;
52 tbb::atomic<bool> g_work_submitted;
53 
54 const unsigned node_num = 100;
55 const unsigned start_index = node_num / 3;
56 const unsigned end_index = node_num * 2 / 3;
57 tbb::atomic<unsigned> g_priority_task_index;
58 
body_func(int priority)59 void body_func( int priority ) {
60     while( !g_work_submitted ) __TBB_Yield();
61     int current_task_index = g_task_num++;
62     if( priority )
63         g_task_info[g_priority_task_index++] = TaskInfo( priority, current_task_index );
64 }
65 
66 struct FunctionBody {
FunctionBodyPriorityNodesTakePrecedence::FunctionBody67     FunctionBody( int priority ) : my_priority( priority ) {}
operator ()PriorityNodesTakePrecedence::FunctionBody68     int operator()( int msg ) const {
69         body_func( my_priority );
70         return msg;
71     }
72 private:
73     int my_priority;
74 };
75 
76 typedef multifunction_node< int,tuple<int> > multi_node;
77 
78 struct MultifunctionBody {
MultifunctionBodyPriorityNodesTakePrecedence::MultifunctionBody79     MultifunctionBody( int priority ) : my_priority( priority ) {}
operator ()PriorityNodesTakePrecedence::MultifunctionBody80     void operator()( int msg, multi_node::output_ports_type& op ) const {
81         body_func( my_priority );
82         get<0>(op).try_put( msg );
83     }
84 private:
85     int my_priority;
86 };
87 
88 template<typename NodeType, typename BodyType>
89 NodeType* node_creator( graph& g, unsigned index ) {
90     if( start_index <= index && index < end_index )
91         return new NodeType( g, unlimited, BodyType(index), node_priority_t(index) );
92     else
93         return new NodeType( g, unlimited, BodyType(0) );
94 }
95 
96 struct passthru_body {
operator ()PriorityNodesTakePrecedence::passthru_body97     continue_msg operator()( int ) const {
98         return continue_msg();
99     }
100 };
101 
get_sender(NodeType & node)102 template<typename NodeType> sender<int>& get_sender( NodeType& node ) { return node; }
get_sender(multi_node & node)103 template<> sender<int>& get_sender<multi_node>( multi_node& node ) { return output_port<0>(node); }
104 
105 template<typename NodeType, typename NodeTypeCreator, typename NodePortRetriever>
test_node(NodeTypeCreator node_creator_func,NodePortRetriever get_sender)106 void test_node( NodeTypeCreator node_creator_func, NodePortRetriever get_sender ) {
107     graph g;
108     broadcast_node<int> bn(g);
109     function_node<int> tn(g, unlimited, passthru_body());
110     // Using pointers to nodes to avoid errors on compilers, which try to generate assignment
111     // operator for the nodes
112     std::vector<NodeType*> nodes;
113     for( unsigned i = 0; i < node_num; ++i ) {
114         nodes.push_back( node_creator_func(g, i) );
115         make_edge( bn, *nodes.back() );
116         make_edge( get_sender(*nodes.back()), tn );
117     }
118 
119     const size_t repeats = 50;
120     const size_t priority_nodes_num = end_index - start_index;
121     size_t internal_order_failures = 0;
122     size_t global_order_failures = 0;
123     for( size_t repeat = 0; repeat < repeats; ++repeat ) {
124         g_work_submitted = false;
125         g_task_num = g_priority_task_index = 0;
126         g_task_info.clear(); g_task_info.resize( priority_nodes_num );
127 
128         bn.try_put( 0 );
129         // Setting of the flag is based on the knowledge that the calling thread broadcasts the message
130         // to successor nodes, that is spawns tasks. Thus, this makes this test to be a whitebox test to
131         // some extent.
132         g_work_submitted = true;
133 
134         g.wait_for_all();
135 
136         ASSERT( g_priority_task_index == g_task_info.size(), "Incorrect number of tasks with priority" );
137         bool found_max = false;
138         bool found_min = false;
139         for( unsigned i = 0; i < g_priority_task_index/2; ++i ) {
140             if( g_task_info[i].my_priority == int(end_index-1) )
141                 found_max = true;
142             if( g_task_info[g_priority_task_index-1-i].my_priority == int(start_index) )
143                 found_min = true;
144         }
145         if( !found_min || !found_max )
146             ++internal_order_failures;
147         for( unsigned i = 0; i < g_priority_task_index; ++i ) {
148             // This check might fail because priorities do not guarantee ordering, i.e. assumption
149             // that all priority nodes should increment the task counter before any subsequent
150             // no-priority node is not correct. In the worst case, a thread that took a priority
151             // node might be preempted and become the last to increment the counter. That's why the
152             // test passing is based on statistics, which could be affected by machine overload
153             // unfortunately.
154             // TODO: make the test deterministic.
155             if( g_task_info[i].my_task_index > int(priority_nodes_num) + MaxThread )
156                 ++global_order_failures;
157         }
158     }
159     float failure_ratio = float(internal_order_failures) / float(repeats);
160     ASSERT(
161         failure_ratio <= 0.3f,
162         "Nodes with priorities executed in wrong order among each other too frequently."
163     );
164     failure_ratio = float(global_order_failures) / float(repeats*priority_nodes_num);
165     ASSERT(
166         failure_ratio <= 0.1f,
167         "Nodes with priorities executed in wrong order too frequently over non-prioritized nodes."
168     );
169     for( size_t i = 0; i < nodes.size(); ++i )
170         delete nodes[i];
171 }
172 
test(int num_threads)173 void test( int num_threads ) {
174     REMARK( "Testing execution of nodes with priority takes precedence (num_threads=%d) - ", num_threads );
175     tbb::task_scheduler_init init(num_threads);
176     test_node< function_node<int,int> >( &node_creator<function_node<int,int>, FunctionBody>,
177                                          &get_sender< function_node<int,int> > );
178     test_node<multi_node>( &node_creator<multi_node, MultifunctionBody>, &get_sender< multi_node > );
179     REMARK( "done\n" );
180 }
181 } /* namespace PriorityNodesTakePrecedence */
182 
183 namespace ThreadsEagerReaction {
184 
185 using Harness::SpinBarrier;
186 
187 enum task_type_t { no_task, regular_task, async_task };
188 
189 struct profile_t {
190     task_type_t task_type;
191     unsigned global_task_id;
192     double elapsed;
193 };
194 
195 std::vector<unsigned> g_async_task_ids;
196 
197 typedef unsigned data_type;
198 typedef async_node<data_type, data_type> async_node_type;
199 typedef multifunction_node<
200     data_type, tuple<data_type, data_type> > decider_node_type;
201 struct AsyncActivity {
202     typedef async_node_type::gateway_type gateway_type;
203 
204     struct work_type { data_type input; gateway_type* gateway; };
205     bool done;
206     tbb::concurrent_queue<work_type> my_queue;
207     tbb::tbb_thread my_service_thread;
208 
209     struct ServiceThreadFunc {
210         SpinBarrier& my_barrier;
ServiceThreadFuncThreadsEagerReaction::AsyncActivity::ServiceThreadFunc211         ServiceThreadFunc(SpinBarrier& barrier) : my_barrier(barrier) {}
operator ()ThreadsEagerReaction::AsyncActivity::ServiceThreadFunc212         void operator()(AsyncActivity* activity) {
213             while (!activity->done) {
214                 work_type work;
215                 while (activity->my_queue.try_pop(work)) {
216                     g_async_task_ids.push_back( ++g_task_num );
217                     work.gateway->try_put(work.input);
218                     work.gateway->release_wait();
219                     my_barrier.wait();
220                 }
221             }
222         }
223     };
stop_and_waitThreadsEagerReaction::AsyncActivity224     void stop_and_wait() { done = true; my_service_thread.join(); }
225 
submitThreadsEagerReaction::AsyncActivity226     void submit(data_type input, gateway_type* gateway) {
227         work_type work = { input, gateway };
228         gateway->reserve_wait();
229         my_queue.push(work);
230     }
AsyncActivityThreadsEagerReaction::AsyncActivity231     AsyncActivity(SpinBarrier& barrier)
232         : done(false), my_service_thread(ServiceThreadFunc(barrier), this) {}
233 };
234 
235 struct StartBody {
236     bool has_run;
237 #if TBB_DEPRECATED_INPUT_NODE_BODY
operator ()ThreadsEagerReaction::StartBody238     bool operator()(data_type& input) {
239         if (has_run) return false;
240         else {
241             input = 1;
242             has_run = true;
243             return true;
244         }
245     }
246 #else
operator ()ThreadsEagerReaction::StartBody247     data_type operator()(tbb::flow_control& fc) {
248         if (has_run){
249             fc.stop();
250             return data_type();
251         }
252         has_run = true;
253         return 1;
254     }
255 #endif
StartBodyThreadsEagerReaction::StartBody256     StartBody() : has_run(false) {}
257 };
258 
259 struct ParallelForBody {
260     SpinBarrier& my_barrier;
261     const data_type& my_input;
ParallelForBodyThreadsEagerReaction::ParallelForBody262     ParallelForBody(SpinBarrier& barrier, const data_type& input)
263         : my_barrier(barrier), my_input(input) {}
operator ()ThreadsEagerReaction::ParallelForBody264     void operator()(const data_type&) const {
265         my_barrier.wait();
266         ++g_task_num;
267     }
268 };
269 
270 struct CpuWorkBody {
271     SpinBarrier& my_barrier;
272     const int my_tasks_count;
operator ()ThreadsEagerReaction::CpuWorkBody273     data_type operator()(const data_type& input) {
274         tbb::parallel_for(0, my_tasks_count, ParallelForBody(my_barrier, input), tbb::simple_partitioner());
275         return input;
276     }
CpuWorkBodyThreadsEagerReaction::CpuWorkBody277     CpuWorkBody(SpinBarrier& barrier, int tasks_count)
278         : my_barrier(barrier), my_tasks_count(tasks_count) {}
279 };
280 
281 struct DeciderBody {
282     const data_type& my_limit;
DeciderBodyThreadsEagerReaction::DeciderBody283     DeciderBody( const data_type& limit ) : my_limit( limit ) {}
operator ()ThreadsEagerReaction::DeciderBody284     void operator()(data_type input, decider_node_type::output_ports_type& ports) {
285         if (input < my_limit)
286             get<0>(ports).try_put(input + 1);
287     }
288 };
289 
290 struct AsyncSubmissionBody {
291     AsyncActivity* my_activity;
operator ()ThreadsEagerReaction::AsyncSubmissionBody292     void operator()(data_type input, async_node_type::gateway_type& gateway) {
293         my_activity->submit(input, &gateway);
294     }
AsyncSubmissionBodyThreadsEagerReaction::AsyncSubmissionBody295     AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {}
296 };
297 
test(int num_threads)298 void test( int num_threads ) {
299     REMARK( "Testing threads react eagerly on asynchronous tasks (num_threads=%d) - ", num_threads );
300     if( num_threads == tbb::task_scheduler_init::default_num_threads() ) {
301         // one thread is required for asynchronous compute resource
302         REMARK("skipping test since it is designed to work on less number of threads than "
303                "hardware concurrency allows\n");
304         return;
305     }
306     const unsigned cpu_threads = unsigned(num_threads);
307     const unsigned cpu_tasks_per_thread = 4;
308     const unsigned nested_cpu_tasks = cpu_tasks_per_thread * cpu_threads;
309     const unsigned async_subgraph_reruns = 8;
310     const unsigned cpu_subgraph_reruns = 2;
311 
312     SpinBarrier barrier(cpu_threads + /*async thread=*/1);
313     g_task_num = 0;
314     g_async_task_ids.clear();
315     g_async_task_ids.reserve(async_subgraph_reruns);
316 
317     tbb::task_scheduler_init init(cpu_threads);
318     AsyncActivity activity(barrier);
319     graph g;
320 
321     input_node<data_type> starter_node(g, StartBody());
322     function_node<data_type, data_type> cpu_work_node(
323         g, unlimited, CpuWorkBody(barrier, nested_cpu_tasks));
324     decider_node_type cpu_restarter_node(g, unlimited, DeciderBody(cpu_subgraph_reruns));
325     async_node_type async_node(g, unlimited, AsyncSubmissionBody(&activity));
326     decider_node_type async_restarter_node(
327         g, unlimited, DeciderBody(async_subgraph_reruns), node_priority_t(1)
328     );
329 
330     make_edge(starter_node, cpu_work_node);
331     make_edge(cpu_work_node, cpu_restarter_node);
332     make_edge(output_port<0>(cpu_restarter_node), cpu_work_node);
333 
334     make_edge(starter_node, async_node);
335     make_edge(async_node, async_restarter_node);
336     make_edge(output_port<0>(async_restarter_node), async_node);
337 
338     starter_node.activate();
339     g.wait_for_all();
340     activity.stop_and_wait();
341 
342     const size_t async_task_num = size_t(async_subgraph_reruns);
343     ASSERT( g_async_task_ids.size() == async_task_num, "Incorrect number of async tasks." );
344     unsigned max_span = unsigned(2 * cpu_threads + 1);
345     for( size_t idx = 1; idx < async_task_num; ++idx ) {
346         ASSERT( g_async_task_ids[idx] - g_async_task_ids[idx-1] <= max_span,
347                 "Async tasks were not able to interfere with CPU tasks." );
348     }
349     REMARK("done\n");
350 }
351 } /* ThreadsEagerReaction */
352 
353 namespace LimitingExecutionToPriorityTask {
354 
355 enum work_type_t { NONPRIORITIZED_WORK, PRIORITIZED_WORK };
356 
357 struct execution_tracker_t {
execution_tracker_tLimitingExecutionToPriorityTask::execution_tracker_t358     execution_tracker_t() { reset(); }
resetLimitingExecutionToPriorityTask::execution_tracker_t359     void reset() {
360         prioritized_work_submitter = tbb::tbb_thread::id();
361         prioritized_work_started = false;
362         prioritized_work_finished = false;
363         prioritized_work_interrupted = false;
364     }
365     tbb::tbb_thread::id prioritized_work_submitter;
366     bool prioritized_work_started;
367     bool prioritized_work_finished;
368     bool prioritized_work_interrupted;
369 } exec_tracker;
370 
371 template<work_type_t work_type>
372 void do_node_work( int work_size );
373 
374 template<work_type_t>
375 void do_nested_work( const tbb::tbb_thread::id& tid, const tbb::blocked_range<int>& subrange );
376 
377 template<work_type_t work_type>
378 struct CommonBody {
CommonBodyLimitingExecutionToPriorityTask::CommonBody379     CommonBody() : my_body_size( 0 ) { }
CommonBodyLimitingExecutionToPriorityTask::CommonBody380     CommonBody( int body_size ) : my_body_size( body_size ) { }
operator ()LimitingExecutionToPriorityTask::CommonBody381     continue_msg operator()( const continue_msg& msg ) const {
382         do_node_work<work_type>(my_body_size);
383         return msg;
384     }
operator ()LimitingExecutionToPriorityTask::CommonBody385     void operator()( const tbb::blocked_range<int>& subrange ) const {
386         do_nested_work<work_type>( /*tid=*/tbb::this_tbb_thread::get_id(), subrange );
387     }
388     int my_body_size;
389 };
390 
391 template<work_type_t work_type>
do_node_work(int work_size)392 void do_node_work(int work_size) {
393     tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<work_type>(),
394                        tbb::simple_partitioner() );
395 }
396 
397 template<work_type_t>
do_nested_work(const tbb::tbb_thread::id & tid,const tbb::blocked_range<int> &)398 void do_nested_work( const tbb::tbb_thread::id& tid, const tbb::blocked_range<int>& /*subrange*/ ) {
399     // This is non-prioritized work...
400     if( exec_tracker.prioritized_work_submitter != tid )
401         return;
402     // ...being executed by the thread that initially started prioritized one...
403     ASSERT( exec_tracker.prioritized_work_started,
404             "Prioritized work should have been started by that time." );
405     // ...prioritized work has been started already...
406     if( exec_tracker.prioritized_work_finished )
407         return;
408     // ...but has not been finished yet
409     exec_tracker.prioritized_work_interrupted = true;
410 }
411 
412 struct IsolationFunctor {
413     int work_size;
IsolationFunctorLimitingExecutionToPriorityTask::IsolationFunctor414     IsolationFunctor(int ws) : work_size(ws) {}
operator ()LimitingExecutionToPriorityTask::IsolationFunctor415     void operator()() const {
416         tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<PRIORITIZED_WORK>(),
417                            tbb::simple_partitioner() );
418     }
419 };
420 
421 template<>
do_node_work(int work_size)422 void do_node_work<PRIORITIZED_WORK>(int work_size) {
423     exec_tracker.prioritized_work_submitter = tbb::this_tbb_thread::get_id();
424     exec_tracker.prioritized_work_started = true;
425     tbb::this_task_arena::isolate( IsolationFunctor(work_size) );
426     exec_tracker.prioritized_work_finished = true;
427 }
428 
429 template<>
do_nested_work(const tbb::tbb_thread::id & tid,const tbb::blocked_range<int> &)430 void do_nested_work<PRIORITIZED_WORK>( const tbb::tbb_thread::id& tid,
431                                        const tbb::blocked_range<int>& /*subrange*/ ) {
432     if( exec_tracker.prioritized_work_submitter == tid ) {
433         ASSERT( !exec_tracker.prioritized_work_interrupted,
434                 "Thread was not fully devoted to processing of prioritized task." );
435     } else {
436         // prolong processing of prioritized work so that the thread that started
437         // prioritized work has higher probability to help with non-prioritized one.
438         spin_for(0.1);
439     }
440 }
441 
442 // Using pointers to nodes to avoid errors on compilers, which try to generate assignment operator
443 // for the nodes
444 typedef std::vector< continue_node<continue_msg>* > nodes_container_t;
445 
create_nodes(nodes_container_t & nodes,graph & g,int num,int body_size)446 void create_nodes( nodes_container_t& nodes, graph& g, int num, int body_size ) {
447     for( int i = 0; i < num; ++i )
448         nodes.push_back(
449             new continue_node<continue_msg>( g, CommonBody<NONPRIORITIZED_WORK>( body_size ) )
450         );
451 }
452 
test(int num_threads)453 void test( int num_threads ) {
454     REMARK( "Testing limit execution to priority tasks (num_threads=%d) - ", num_threads );
455 
456     tbb::task_scheduler_init init( num_threads );
457 
458     const int nodes_num = 100;
459     const int priority_node_position_part = 10;
460     const int pivot = nodes_num / priority_node_position_part;
461     const int nodes_in_lane = 3 * num_threads;
462     const int small_problem_size = 100;
463     const int large_problem_size = 1000;
464 
465     graph g;
466     nodes_container_t nodes;
467     create_nodes( nodes, g, pivot, large_problem_size );
468     nodes.push_back(
469         new continue_node<continue_msg>(
470             g, CommonBody<PRIORITIZED_WORK>(small_problem_size), node_priority_t(1)
471         )
472     );
473     create_nodes( nodes, g, nodes_num - pivot - 1, large_problem_size );
474 
475     broadcast_node<continue_msg> bn(g);
476     for( int i = 0; i < nodes_num; ++i )
477         if( i % nodes_in_lane == 0 )
478             make_edge( bn, *nodes[i] );
479         else
480             make_edge( *nodes[i-1], *nodes[i] );
481     exec_tracker.reset();
482     bn.try_put( continue_msg() );
483     g.wait_for_all();
484 
485     for( size_t i = 0; i < nodes.size(); ++i )
486         delete nodes[i];
487     REMARK( "done\n" );
488 }
489 
490 } /* namespace LimitingExecutionToPriorityTask */
491 
492 #include "tbb/task_arena.h"
493 namespace NestedCase {
494 
495 using tbb::task_arena;
496 
497 struct ResetGraphFunctor {
498     graph& my_graph;
ResetGraphFunctorNestedCase::ResetGraphFunctor499     ResetGraphFunctor(graph& g) : my_graph(g) {}
500     // copy constructor to please some old compilers
ResetGraphFunctorNestedCase::ResetGraphFunctor501     ResetGraphFunctor(const ResetGraphFunctor& rgf) : my_graph(rgf.my_graph) {}
operator ()NestedCase::ResetGraphFunctor502     void operator()() const { my_graph.reset(); }
503 };
504 
505 struct InnerBody {
operator ()NestedCase::InnerBody506     continue_msg operator()( const continue_msg& ) const {
507         return continue_msg();
508     }
509 };
510 
511 struct OuterBody {
512     int my_max_threads;
513     task_arena& my_inner_arena;
OuterBodyNestedCase::OuterBody514     OuterBody( int max_threads, task_arena& inner_arena )
515         : my_max_threads(max_threads), my_inner_arena(inner_arena) {}
516     // copy constructor to please some old compilers
OuterBodyNestedCase::OuterBody517     OuterBody( const OuterBody& rhs )
518         : my_max_threads(rhs.my_max_threads), my_inner_arena(rhs.my_inner_arena) {}
operator ()NestedCase::OuterBody519     int operator()( const int& ) {
520         graph inner_graph;
521         continue_node<continue_msg> start_node(inner_graph, InnerBody());
522         continue_node<continue_msg> mid_node1(inner_graph, InnerBody(), node_priority_t(5));
523         continue_node<continue_msg> mid_node2(inner_graph, InnerBody());
524         continue_node<continue_msg> end_node(inner_graph, InnerBody(), node_priority_t(15));
525         make_edge( start_node, mid_node1 );
526         make_edge( mid_node1, end_node );
527         make_edge( start_node, mid_node2 );
528         make_edge( mid_node2, end_node );
529         my_inner_arena.execute( ResetGraphFunctor(inner_graph) );
530         start_node.try_put( continue_msg() );
531         inner_graph.wait_for_all();
532         return 13;
533     }
534 };
535 
execute_outer_graph(bool same_arena,task_arena & inner_arena,int max_threads,graph & outer_graph,function_node<int,int> & start_node)536 void execute_outer_graph( bool same_arena, task_arena& inner_arena, int max_threads,
537                           graph& outer_graph, function_node<int,int>& start_node ) {
538     if( same_arena ) {
539         start_node.try_put( 42 );
540         outer_graph.wait_for_all();
541         return;
542     }
543     for( int num_threads = 1; num_threads <= max_threads; ++num_threads ) {
544         inner_arena.initialize( num_threads );
545         start_node.try_put( 42 );
546         outer_graph.wait_for_all();
547         inner_arena.terminate();
548     }
549 }
550 
test_in_arena(int max_threads,task_arena & outer_arena,task_arena & inner_arena)551 void test_in_arena( int max_threads, task_arena& outer_arena, task_arena& inner_arena ) {
552     graph outer_graph;
553     const unsigned num_outer_nodes = 10;
554     const size_t concurrency = unlimited;
555     std::vector< function_node<int,int>* > outer_nodes;
556     for( unsigned node_index = 0; node_index < num_outer_nodes; ++node_index ) {
557         internal::node_priority_t priority = internal::no_priority;
558         if( node_index == num_outer_nodes / 2 )
559             priority = 10;
560 
561         outer_nodes.push_back(
562             new function_node<int,int>(
563                 outer_graph, concurrency, OuterBody(max_threads, inner_arena), priority
564             )
565         );
566     }
567 
568     for( unsigned node_index1 = 0; node_index1 < num_outer_nodes; ++node_index1 )
569         for( unsigned node_index2 = node_index1+1; node_index2 < num_outer_nodes; ++node_index2 )
570             make_edge( *outer_nodes[node_index1], *outer_nodes[node_index2] );
571 
572     bool same_arena = &outer_arena == &inner_arena;
573     for( int num_threads = 1; num_threads <= max_threads; ++num_threads ) {
574         REMARK( "Testing nested nodes with specified priority in %s arenas, num_threads=%d) - ",
575                 same_arena? "same" : "different", num_threads );
576         outer_arena.initialize( num_threads );
577         outer_arena.execute( ResetGraphFunctor(outer_graph) );
578         execute_outer_graph( same_arena, inner_arena, max_threads, outer_graph, *outer_nodes[0] );
579         outer_arena.terminate();
580         REMARK( "done\n" );
581     }
582 
583     for( size_t i = 0; i < outer_nodes.size(); ++i )
584         delete outer_nodes[i];
585 }
586 
test(int max_threads)587 void test( int max_threads ) {
588     tbb::task_scheduler_init init( max_threads );
589     task_arena outer_arena; task_arena inner_arena;
590     test_in_arena( max_threads, outer_arena, outer_arena );
591     test_in_arena( max_threads, outer_arena, inner_arena );
592 }
593 }
594 
TestMain()595 int TestMain() {
596     if( MinThread < 1 ) {
597         REPORT( "Number of threads must be positive\n" );
598         return Harness::Skipped;
599     }
600     for( int p = MinThread; p <= MaxThread; ++p ) {
601         PriorityNodesTakePrecedence::test( p );
602         ThreadsEagerReaction::test( p );
603         LimitingExecutionToPriorityTask::test( p );
604     }
605     NestedCase::test( MaxThread );
606     return Harness::Done;
607 }
608 #else /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
609 #define HARNESS_SKIP_TEST 1
610 #include "harness.h"
611 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
612