1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #define TBB_DEPRECATED_FLOW_NODE_ALLOCATOR __TBB_CPF_BUILD
18 
19 #include "harness.h"
20 #include "harness_graph.h"
21 #include "harness_barrier.h"
22 #include "tbb/concurrent_queue.h"
23 #include "tbb/flow_graph.h"
24 #include "tbb/task.h"
25 #include "tbb/tbb_thread.h"
26 #include "tbb/mutex.h"
27 #include "tbb/compat/condition_variable"
28 
29 #include <string>
30 
31 class minimal_type {
32     template<typename T>
33     friend struct place_wrapper;
34 
35     int value;
36 
37 public:
minimal_type()38     minimal_type() : value(-1) {}
minimal_type(int v)39     minimal_type(int v) : value(v) {}
minimal_type(const minimal_type & m)40     minimal_type(const minimal_type &m) : value(m.value) { }
operator =(const minimal_type & m)41     minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
42 };
43 
44 template <typename T>
45 struct place_wrapper {
46     typedef T wrapped_type;
47     T value;
48     tbb::tbb_thread::id thread_id;
49     tbb::task* task_ptr;
50 
place_wrapperplace_wrapper51     place_wrapper( ) : value(0) {
52         thread_id = tbb::this_tbb_thread::get_id();
53         task_ptr = &tbb::task::self();
54     }
place_wrapperplace_wrapper55     place_wrapper( int v ) : value(v) {
56         thread_id = tbb::this_tbb_thread::get_id();
57         task_ptr = &tbb::task::self();
58     }
59 
place_wrapperplace_wrapper60     place_wrapper( const place_wrapper<int> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { }
61 
place_wrapperplace_wrapper62     place_wrapper( const place_wrapper<minimal_type> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { }
63 
operator =place_wrapper64     place_wrapper<minimal_type>& operator=(const place_wrapper<minimal_type> &v) {
65         if( this != &v ) {
66             value = v.value;
67             thread_id = v.thread_id;
68             task_ptr = v.task_ptr;
69         }
70         return *this;
71     }
72 };
73 
74 template<typename T1, typename T2>
75 struct wrapper_helper {
checkwrapper_helper76     static void check(const T1 &, const T2 &) { }
77 
copy_valuewrapper_helper78     static void copy_value(const T1 &in, T2 &out) {
79         out = in;
80     }
81 };
82 
83 template<typename T1, typename T2>
84 struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
checkwrapper_helper85     static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
86        REMARK("a.task_ptr == %p != b.task_ptr == %p\n", a.task_ptr, b.task_ptr);
87        ASSERT( (a.thread_id != b.thread_id), "same thread used to execute adjacent nodes");
88        ASSERT( (a.task_ptr != b.task_ptr), "same task used to execute adjacent nodes");
89        return;
90     }
copy_valuewrapper_helper91     static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
92         out.value = in.value;
93     }
94 };
95 
96 const int NUMBER_OF_MSGS = 10;
97 const int UNKNOWN_NUMBER_OF_ITEMS = -1;
98 tbb::atomic<int> async_body_exec_count;
99 tbb::atomic<int> async_activity_processed_msg_count;
100 tbb::atomic<int> end_body_exec_count;
101 
102 // queueing required in test_reset for testing of cancellation
103 typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
104 typedef counting_async_node_type::gateway_type counting_gateway_type;
105 
106 struct counting_async_body {
107     tbb::atomic<int> my_async_body_exec_count;
108 
counting_async_bodycounting_async_body109     counting_async_body() {
110         my_async_body_exec_count = 0;
111     }
112 
operator ()counting_async_body113     void operator()( const int &input, counting_gateway_type& gateway) {
114         REMARK( "Body execution with input == %d\n", input);
115         ++my_async_body_exec_count;
116         ++async_body_exec_count;
117         if ( input == -1 ) {
118             bool result = tbb::task::self().group()->cancel_group_execution();
119             REMARK( "Canceling graph execution\n" );
120             ASSERT( result == true, "attempted to cancel graph twice" );
121             Harness::Sleep(50);
122         }
123         gateway.try_put(input);
124     }
125 };
126 
test_reset()127 void test_reset() {
128     const int N = NUMBER_OF_MSGS;
129     async_body_exec_count = 0;
130 
131     tbb::flow::graph g;
132     counting_async_node_type a(g, tbb::flow::serial, counting_async_body() );
133 
134     const int R = 3;
135     std::vector< harness_counting_receiver<int> > r(R, harness_counting_receiver<int>(g));
136 
137     for (int i = 0; i < R; ++i) {
138 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
139         tbb::flow::make_edge(a, r[i]);
140 #else
141         tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[i] );
142 #endif
143     }
144 
145     REMARK( "One body execution\n" );
146     a.try_put(-1);
147     for (int i = 0; i < N; ++i) {
148        a.try_put(i);
149     }
150     g.wait_for_all();
151     // should be canceled with only 1 item reaching the async_body and the counting receivers
152     // and N items left in the node's queue
153     ASSERT( g.is_cancelled() == true, "task group not canceled" );
154 
155     counting_async_body b1 = tbb::flow::copy_body<counting_async_body>(a);
156     ASSERT( int(async_body_exec_count) == int(b1.my_async_body_exec_count), "body and global body counts are different" );
157     ASSERT( int(async_body_exec_count) == 1, "global body execution count not 1"  );
158     for (int i = 0; i < R; ++i) {
159         ASSERT( int(r[i].my_count) == 1, "counting receiver count not 1" );
160     }
161 
162     // should clear the async_node queue, but retain its local count at 1 and keep all edges
163     g.reset(tbb::flow::rf_reset_protocol);
164 
165     REMARK( "N body executions\n" );
166     for (int i = 0; i < N; ++i) {
167        a.try_put(i);
168     }
169     g.wait_for_all();
170     ASSERT( g.is_cancelled() == false, "task group not canceled" );
171 
172     // a total of N+1 items should have passed through the node body
173     // the local body count should also be N+1
174     // and the counting receivers should all have a count of N+1
175     counting_async_body b2 = tbb::flow::copy_body<counting_async_body>(a);
176     ASSERT( int(async_body_exec_count) == int(b2.my_async_body_exec_count), "local and global body execution counts are different" );
177     REMARK( "async_body_exec_count==%d\n", int(async_body_exec_count) );
178     ASSERT( int(async_body_exec_count) == N+1, "globcal body execution count not N+1"  );
179     for (int i = 0; i < R; ++i) {
180         ASSERT( int(r[i].my_count) == N+1, "counting receiver has not received N+1 items" );
181     }
182 
183     REMARK( "N body executions with new bodies\n" );
184     // should clear the async_node queue and reset its local count to 0, but keep all edges
185     g.reset(tbb::flow::rf_reset_bodies);
186     for (int i = 0; i < N; ++i) {
187        a.try_put(i);
188     }
189     g.wait_for_all();
190     ASSERT( g.is_cancelled() == false, "task group not canceled" );
191 
192     // a total of 2N+1 items should have passed through the node body
193     // the local body count should be N
194     // and the counting receivers should all have a count of 2N+1
195     counting_async_body b3 = tbb::flow::copy_body<counting_async_body>(a);
196     ASSERT( int(async_body_exec_count) == 2*N+1, "global body execution count not 2N+1"  );
197     ASSERT( int(b3.my_async_body_exec_count) == N, "local body execution count not N"  );
198     for (int i = 0; i < R; ++i) {
199         ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
200     }
201 
202     // should clear the async_node queue and keep its local count at N and remove all edges
203     REMARK( "N body executions with no edges\n" );
204     g.reset(tbb::flow::rf_clear_edges);
205     for (int i = 0; i < N; ++i) {
206        a.try_put(i);
207     }
208     g.wait_for_all();
209     ASSERT( g.is_cancelled() == false, "task group not canceled" );
210 
211     // a total of 3N+1 items should have passed through the node body
212     // the local body count should now be 2*N
213     // and the counting receivers should remain at a count of 2N+1
214     counting_async_body b4 = tbb::flow::copy_body<counting_async_body>(a);
215     ASSERT( int(async_body_exec_count) == 3*N+1, "global body execution count not 3N+1"  );
216     ASSERT( int(b4.my_async_body_exec_count) == 2*N, "local body execution count not 2N"  );
217     for (int i = 0; i < R; ++i) {
218         ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
219     }
220 
221     // put back 1 edge to receiver 0
222     REMARK( "N body executions with 1 edge\n" );
223 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
224     tbb::flow::make_edge(a, r[0]);
225 #else
226     tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[0] );
227 #endif
228     for (int i = 0; i < N; ++i) {
229        a.try_put(i);
230     }
231     g.wait_for_all();
232     ASSERT( g.is_cancelled() == false, "task group not canceled" );
233 
234     // a total of 4N+1 items should have passed through the node body
235     // the local body count should now be 3*N
236     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
237     counting_async_body b5 = tbb::flow::copy_body<counting_async_body>(a);
238     ASSERT( int(async_body_exec_count) == 4*N+1, "global body execution count not 4N+1"  );
239     ASSERT( int(b5.my_async_body_exec_count) == 3*N, "local body execution count not 3N"  );
240     ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" );
241     for (int i = 1; i < R; ++i) {
242         ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
243     }
244 
245     // should clear the async_node queue and keep its local count at N and remove all edges
246     REMARK( "N body executions with no edges and new body\n" );
247     g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
248     for (int i = 0; i < N; ++i) {
249        a.try_put(i);
250     }
251     g.wait_for_all();
252     ASSERT( g.is_cancelled() == false, "task group not canceled" );
253 
254     // a total of 4N+1 items should have passed through the node body
255     // the local body count should now be 3*N
256     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
257     counting_async_body b6 = tbb::flow::copy_body<counting_async_body>(a);
258     ASSERT( int(async_body_exec_count) == 5*N+1, "global body execution count not 5N+1"  );
259     ASSERT( int(b6.my_async_body_exec_count) == N, "local body execution count not N"  );
260     ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" );
261     for (int i = 1; i < R; ++i) {
262         ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
263     }
264 }
265 
266 template< typename Input, typename Output >
267 class async_activity : NoAssign {
268 public:
269     typedef Input input_type;
270     typedef Output output_type;
271     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
272     typedef typename async_node_type::gateway_type gateway_type;
273 
274     struct work_type {
275         input_type input;
276         gateway_type* gateway;
277     };
278 
279     class ServiceThreadBody {
280     public:
ServiceThreadBody(async_activity * activity)281         ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
282 
operator ()()283         void operator()() {
284             my_activity->process();
285         }
286     private:
287         async_activity* my_activity;
288     };
289 
async_activity(int expected_items,bool deferred=false,int sleep_time=50)290     async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
291         : my_expected_items(expected_items), my_sleep_time(sleep_time) {
292         is_active = !deferred;
293         my_quit = false;
294         tbb::tbb_thread( ServiceThreadBody( this ) ).swap( my_service_thread );
295     }
296 
297 private:
298 
async_activity(const async_activity &)299     async_activity( const async_activity& )
300         : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0) {
301         is_active = true;
302     }
303 
304 public:
~async_activity()305     ~async_activity() {
306         stop();
307         my_service_thread.join();
308     }
309 
submit(const input_type & input,gateway_type & gateway)310     void submit( const input_type &input, gateway_type& gateway ) {
311         work_type work = { input, &gateway};
312         my_work_queue.push( work );
313     }
314 
process()315     void process() {
316         do {
317             work_type work;
318             if( is_active && my_work_queue.try_pop( work ) ) {
319                 Harness::Sleep(my_sleep_time);
320                 ++async_activity_processed_msg_count;
321                 output_type output;
322                 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
323                 wrapper_helper<output_type, output_type>::check(work.input, output);
324                 work.gateway->try_put(output);
325                 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
326                      int(async_activity_processed_msg_count) == my_expected_items ) {
327                     work.gateway->release_wait();
328                 }
329             }
330         } while( my_quit == false || !my_work_queue.empty());
331     }
332 
stop()333     void stop() {
334         my_quit = true;
335     }
336 
activate()337     void activate() {
338         is_active = true;
339     }
340 
should_reserve_each_time()341     bool should_reserve_each_time() {
342         if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
343             return true;
344         else
345             return false;
346     }
347 
348 private:
349 
350     const int my_expected_items;
351     const int my_sleep_time;
352     tbb::atomic< bool > is_active;
353 
354     tbb::concurrent_queue< work_type > my_work_queue;
355 
356     tbb::atomic< bool > my_quit;
357 
358     tbb::tbb_thread my_service_thread;
359 };
360 
361 template<typename Input, typename Output>
362 struct basic_test {
363     typedef Input input_type;
364     typedef Output output_type;
365     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
366     typedef typename async_node_type::gateway_type gateway_type;
367 
368     class start_body_type {
369         typedef Input input_type;
370     public:
operator ()(int input)371         input_type operator()( int input ) {
372             return input_type(input);
373         }
374     };
375 
376 #if !__TBB_CPP11_LAMBDAS_PRESENT
377     class async_body_type {
378         typedef Input input_type;
379         typedef Output output_type;
380         typedef tbb::flow::async_node< input_type, output_type > async_node_type;
381         typedef typename async_node_type::gateway_type gateway_type;
382     public:
383         typedef async_activity<input_type, output_type> async_activity_type;
384 
async_body_type(async_activity_type * aa)385         async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { }
386 
async_body_type(const async_body_type & other)387         async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { }
388 
operator ()(const input_type & input,gateway_type & gateway)389         void operator()( const input_type &input, gateway_type& gateway ) {
390             ++async_body_exec_count;
391             my_async_activity->submit( input, gateway);
392             if ( my_async_activity->should_reserve_each_time() )
393                 gateway.reserve_wait();
394         }
395 
396     private:
397         async_activity_type* my_async_activity;
398     };
399 #endif
400 
401     class end_body_type {
402         typedef Output output_type;
403     public:
operator ()(const output_type & input)404         void operator()( const output_type &input ) {
405             ++end_body_exec_count;
406             output_type output;
407             wrapper_helper<output_type, output_type>::check(input, output);
408         }
409     };
410 
basic_testbasic_test411     basic_test() {}
412 
413 public:
414 
runbasic_test415     static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
416         async_activity<input_type, output_type> my_async_activity(async_expected_items);
417         tbb::flow::graph g;
418         tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
419 #if __TBB_CPP11_LAMBDAS_PRESENT
420         async_node_type offload_node(g, tbb::flow::unlimited, [&] (const input_type &input, gateway_type& gateway) {
421             ++async_body_exec_count;
422             my_async_activity.submit(input, gateway);
423             if(my_async_activity.should_reserve_each_time())
424                 gateway.reserve_wait();
425         } );
426 #else
427         async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) );
428 #endif
429 
430         tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() );
431 
432         tbb::flow::make_edge( start_node, offload_node );
433 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
434         tbb::flow::make_edge( offload_node, end_node );
435 #else
436         tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node );
437 #endif
438         async_body_exec_count = 0;
439         async_activity_processed_msg_count = 0;
440         end_body_exec_count = 0;
441 
442         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
443             offload_node.gateway().reserve_wait();
444         }
445         for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
446             start_node.try_put(i);
447         }
448         g.wait_for_all();
449         ASSERT( async_body_exec_count == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
450         ASSERT( async_activity_processed_msg_count == NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" );
451         ASSERT( end_body_exec_count == NUMBER_OF_MSGS, "EndBody processed wrong number of signals");
452         REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n",
453                 int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count));
454         return Harness::Done;
455     }
456 
457 };
458 
test_copy_ctor()459 int test_copy_ctor() {
460     const int N = NUMBER_OF_MSGS;
461     async_body_exec_count = 0;
462 
463     tbb::flow::graph g;
464 
465     harness_counting_receiver<int> r1(g);
466     harness_counting_receiver<int> r2(g);
467 
468     counting_async_node_type a(g, tbb::flow::unlimited, counting_async_body() );
469     counting_async_node_type b(a);
470 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
471     tbb::flow::make_edge(a, r1);
472     tbb::flow::make_edge(b, r2);
473 #else
474     tbb::flow::make_edge(tbb::flow::output_port<0>(a), r1);
475     tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);
476 #endif
477 
478     for (int i = 0; i < N; ++i) {
479        a.try_put(i);
480     }
481     g.wait_for_all();
482 
483     REMARK("async_body_exec_count = %d\n", int(async_body_exec_count));
484     REMARK("r1.my_count == %d and r2.my_count = %d\n", int(r1.my_count), int(r2.my_count));
485     ASSERT( int(async_body_exec_count) == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
486     ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" );
487     ASSERT( int(r2.my_count) == 0, "counting receiver r2 has not received 0 items" );
488 
489     for (int i = 0; i < N; ++i) {
490        b.try_put(i);
491     }
492     g.wait_for_all();
493 
494     REMARK("async_body_exec_count = %d\n", int(async_body_exec_count));
495     REMARK("r1.my_count == %d and r2.my_count = %d\n", int(r1.my_count), int(r2.my_count));
496     ASSERT( int(async_body_exec_count) == 2*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
497     ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" );
498     ASSERT( int(r2.my_count) == N, "counting receiver r2 has not received N items" );
499     return Harness::Done;
500 }
501 
502 tbb::atomic<int> main_tid_count;
503 
504 template<typename Input, typename Output>
505 struct spin_test {
506     typedef Input input_type;
507     typedef Output output_type;
508     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
509     typedef typename async_node_type::gateway_type gateway_type;
510 
511     class start_body_type {
512         typedef Input input_type;
513     public:
operator ()(int input)514         input_type operator()( int input ) {
515             return input_type(input);
516         }
517     };
518 
519 #if !__TBB_CPP11_LAMBDAS_PRESENT
520     class async_body_type {
521         typedef Input input_type;
522         typedef Output output_type;
523         typedef tbb::flow::async_node< input_type, output_type > async_node_type;
524         typedef typename async_node_type::gateway_type gateway_type;
525     public:
526         typedef async_activity<input_type, output_type> async_activity_type;
527 
async_body_type(async_activity_type * aa)528         async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { }
529 
async_body_type(const async_body_type & other)530         async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { }
531 
operator ()(const input_type & input,gateway_type & gateway)532         void operator()(const input_type &input, gateway_type& gateway) {
533             ++async_body_exec_count;
534             my_async_activity->submit(input, gateway);
535             if(my_async_activity->should_reserve_each_time())
536                 gateway.reserve_wait();
537         }
538 
539     private:
540         async_activity_type* my_async_activity;
541     };
542 #endif
543 
544     class end_body_type {
545         typedef Output output_type;
546         tbb::tbb_thread::id my_main_tid;
547         Harness::SpinBarrier *my_barrier;
548     public:
end_body_type(tbb::tbb_thread::id t,Harness::SpinBarrier & b)549         end_body_type(tbb::tbb_thread::id t, Harness::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
550 
operator ()(const output_type &)551         void operator()( const output_type & ) {
552             ++end_body_exec_count;
553             if (tbb::this_tbb_thread::get_id() == my_main_tid) {
554                ++main_tid_count;
555             }
556             my_barrier->timed_wait_noerror(10);
557         }
558     };
559 
spin_testspin_test560     spin_test() {}
561 
runspin_test562     static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
563         async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
564         Harness::SpinBarrier spin_barrier(nthreads);
565         tbb::flow::graph g;
566         tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
567 #if __TBB_CPP11_LAMBDAS_PRESENT
568         async_node_type offload_node(g, tbb::flow::unlimited, [&](const input_type &input, gateway_type& gateway) {
569             ++async_body_exec_count;
570             my_async_activity.submit(input, gateway);
571             if(my_async_activity.should_reserve_each_time())
572                 gateway.reserve_wait();
573         });
574 #else
575         async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) );
576 #endif
577         tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type(tbb::this_tbb_thread::get_id(), spin_barrier) );
578         tbb::flow::make_edge( start_node, offload_node );
579 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
580         tbb::flow::make_edge( offload_node, end_node );
581 #else
582         tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node );
583 #endif
584         async_body_exec_count = 0;
585         async_activity_processed_msg_count = 0;
586         end_body_exec_count = 0;
587         main_tid_count = 0;
588 
589         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
590             offload_node.gateway().reserve_wait();
591         }
592         for (int i = 0; i < nthreads*NUMBER_OF_MSGS; ++i) {
593             start_node.try_put(i);
594         }
595         g.wait_for_all();
596         ASSERT( async_body_exec_count == nthreads*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
597         ASSERT( async_activity_processed_msg_count == nthreads*NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" );
598         ASSERT( end_body_exec_count == nthreads*NUMBER_OF_MSGS, "EndBody processed wrong number of signals");
599         ASSERT_WARNING( main_tid_count != 0, "Main thread did not participate in end_body tasks");
600         REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n",
601                 int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count));
602         return Harness::Done;
603     }
604 
605 };
606 
test_for_spin_avoidance()607 void test_for_spin_avoidance() {
608     spin_test<int, int>::run(4);
609 }
610 
611 template< typename Input, typename Output >
run_tests()612 int run_tests() {
613     basic_test<Input, Output>::run();
614     basic_test<Input, Output>::run(NUMBER_OF_MSGS);
615     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
616     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
617     return Harness::Done;
618 }
619 
620 #include "tbb/parallel_for.h"
621 template<typename Input, typename Output>
622 class equeueing_on_inner_level {
623     typedef Input input_type;
624     typedef Output output_type;
625     typedef async_activity<input_type, output_type> async_activity_type;
626     typedef tbb::flow::async_node<Input, Output> async_node_type;
627     typedef typename async_node_type::gateway_type gateway_type;
628 
629     class start_body_type {
630     public:
operator ()(int input)631         input_type operator() ( int input ) {
632             return input_type( input);
633         }
634     };
635 
636     class async_body_type {
637     public:
async_body_type(async_activity_type & activity)638         async_body_type( async_activity_type& activity ) : my_async_activity(&activity) {}
639 
operator ()(const input_type & input,gateway_type & gateway)640         void operator() ( const input_type &input, gateway_type& gateway ) {
641             gateway.reserve_wait();
642             my_async_activity->submit( input, gateway );
643         }
644     private:
645         async_activity_type* my_async_activity;
646     };
647 
648     class end_body_type {
649     public:
operator ()(output_type)650         void operator()( output_type ) {}
651     };
652 
653     class body_graph_with_async {
654     public:
body_graph_with_async(Harness::SpinBarrier & barrier,async_activity_type & activity)655         body_graph_with_async( Harness::SpinBarrier& barrier, async_activity_type& activity )
656             : spin_barrier(&barrier), my_async_activity(&activity) {}
657 
operator ()(int) const658         void operator()(int) const {
659             tbb::flow::graph g;
660             tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
661 
662             async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( *my_async_activity ) );
663 
664             tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() );
665 
666             tbb::flow::make_edge( start_node, offload_node );
667             tbb::flow::make_edge( offload_node, end_node );
668 
669             start_node.try_put(1);
670 
671             spin_barrier->wait();
672 
673             my_async_activity->activate();
674 
675             g.wait_for_all();
676         }
677 
678     private:
679         Harness::SpinBarrier* spin_barrier;
680         async_activity_type* my_async_activity;
681     };
682 
683 
684 public:
run()685     static int run ()
686     {
687         const int nthreads = tbb::this_task_arena::max_concurrency();
688         Harness::SpinBarrier spin_barrier( nthreads );
689 
690         async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
691 
692         tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
693         return Harness::Done;
694     }
695 };
696 
run_test_equeueing_on_inner_level()697 int run_test_equeueing_on_inner_level() {
698     equeueing_on_inner_level<int, int>::run();
699     return Harness::Done;
700 }
701 
702 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
703 #include <array>
704 #include <thread>
705 
706 template<typename NodeType>
707 class AsyncActivity {
708 public:
709     using gateway_t = typename NodeType::gateway_type;
710 
711     struct work_type {
712         int input;
713         gateway_t* gateway;
714     };
715 
__anon12ee7c900302() 716     AsyncActivity(size_t limit) : thr([this]() {
717         while(!end_of_work()) {
718             work_type w;
719             while( my_q.try_pop(w) ) {
720                 int res = do_work(w.input);
721                 w.gateway->try_put(res);
722                 w.gateway->release_wait();
723                 ++c;
724             }
725         }
726     }), stop_limit(limit), c(0) {}
727 
submit(int i,gateway_t * gateway)728     void submit(int i, gateway_t* gateway) {
729         work_type w = {i, gateway};
730         gateway->reserve_wait();
731         my_q.push(w);
732     }
733 
wait_for_all()734     void wait_for_all() { thr.join(); }
735 
736 private:
end_of_work()737     bool end_of_work() { return c >= stop_limit; }
738 
do_work(int & i)739     int do_work(int& i) { return i + i; }
740 
741     tbb::concurrent_queue<work_type> my_q;
742     tbb::tbb_thread thr;
743     size_t stop_limit;
744     size_t c;
745 };
746 
test_follows()747 void test_follows() {
748     using namespace tbb::flow;
749 
750     using input_t = int;
751     using output_t = int;
752     using node_t = async_node<input_t, output_t>;
753 
754     graph g;
755 
756     AsyncActivity<node_t> async_activity(3);
757 
758     std::array<broadcast_node<input_t>, 3> preds = {
759       {
760         broadcast_node<input_t>(g),
761         broadcast_node<input_t>(g),
762         broadcast_node<input_t>(g)
763       }
764     };
765 
766     node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
767         async_activity.submit(input, &gtw);
768     });
769 
770     buffer_node<output_t> buf(g);
771     make_edge(node, buf);
772 
773     for(auto& pred: preds) {
774         pred.try_put(1);
775     }
776 
777     g.wait_for_all();
778     async_activity.wait_for_all();
779 
780     output_t storage;
781     ASSERT((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
782             "Not exact edge quantity was made");
783 }
784 
test_precedes()785 void test_precedes() {
786     using namespace tbb::flow;
787 
788     using input_t = int;
789     using output_t = int;
790     using node_t = async_node<input_t, output_t>;
791 
792     graph g;
793 
794     AsyncActivity<node_t> async_activity(1);
795 
796     std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} };
797 
798     broadcast_node<input_t> start(g);
799 
800     node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) {
801         async_activity.submit(input, &gtw);
802     });
803 
804     make_edge(start, node);
805 
806     start.try_put(1);
807 
808     g.wait_for_all();
809     async_activity.wait_for_all();
810 
811     for(auto& successor : successors) {
812         output_t storage;
813         ASSERT(successor.try_get(storage) && !successor.try_get(storage),
814                "Not exact edge quantity was made");
815     }
816 }
817 
test_follows_and_precedes_api()818 void test_follows_and_precedes_api() {
819     test_follows();
820     test_precedes();
821 }
822 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
823 
824 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
825 typedef tbb::flow::async_node< int, int, tbb::flow::queueing, std::allocator<int> > async_node_type;
826 
827 struct async_body {
operator ()async_body828     void operator()( const int&, async_node_type::gateway_type& ) {}
829 };
830 
test_node_allocator()831 void test_node_allocator() {
832     tbb::flow::graph g;
833     async_node_type tmp(g, tbb::flow::unlimited, async_body());
834 }
835 #endif
836 
TestMain()837 int TestMain() {
838     tbb::task_scheduler_init init(4);
839     run_tests<int, int>();
840     run_tests<minimal_type, minimal_type>();
841     run_tests<int, minimal_type>();
842 
843     lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
844 
845     test_reset();
846     test_copy_ctor();
847     test_for_spin_avoidance();
848     run_test_equeueing_on_inner_level();
849 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
850     test_follows_and_precedes_api();
851 #endif
852 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
853     test_node_allocator();
854 #endif
855     return Harness::Done;
856 }
857 
858