1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #define TBB_DEPRECATED_FLOW_NODE_ALLOCATOR __TBB_CPF_BUILD
18
19 #include "harness.h"
20 #include "harness_graph.h"
21 #include "harness_barrier.h"
22 #include "tbb/concurrent_queue.h"
23 #include "tbb/flow_graph.h"
24 #include "tbb/task.h"
25 #include "tbb/tbb_thread.h"
26 #include "tbb/mutex.h"
27 #include "tbb/compat/condition_variable"
28
29 #include <string>
30
31 class minimal_type {
32 template<typename T>
33 friend struct place_wrapper;
34
35 int value;
36
37 public:
minimal_type()38 minimal_type() : value(-1) {}
minimal_type(int v)39 minimal_type(int v) : value(v) {}
minimal_type(const minimal_type & m)40 minimal_type(const minimal_type &m) : value(m.value) { }
operator =(const minimal_type & m)41 minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
42 };
43
44 template <typename T>
45 struct place_wrapper {
46 typedef T wrapped_type;
47 T value;
48 tbb::tbb_thread::id thread_id;
49 tbb::task* task_ptr;
50
place_wrapperplace_wrapper51 place_wrapper( ) : value(0) {
52 thread_id = tbb::this_tbb_thread::get_id();
53 task_ptr = &tbb::task::self();
54 }
place_wrapperplace_wrapper55 place_wrapper( int v ) : value(v) {
56 thread_id = tbb::this_tbb_thread::get_id();
57 task_ptr = &tbb::task::self();
58 }
59
place_wrapperplace_wrapper60 place_wrapper( const place_wrapper<int> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { }
61
place_wrapperplace_wrapper62 place_wrapper( const place_wrapper<minimal_type> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { }
63
operator =place_wrapper64 place_wrapper<minimal_type>& operator=(const place_wrapper<minimal_type> &v) {
65 if( this != &v ) {
66 value = v.value;
67 thread_id = v.thread_id;
68 task_ptr = v.task_ptr;
69 }
70 return *this;
71 }
72 };
73
74 template<typename T1, typename T2>
75 struct wrapper_helper {
checkwrapper_helper76 static void check(const T1 &, const T2 &) { }
77
copy_valuewrapper_helper78 static void copy_value(const T1 &in, T2 &out) {
79 out = in;
80 }
81 };
82
83 template<typename T1, typename T2>
84 struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
checkwrapper_helper85 static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
86 REMARK("a.task_ptr == %p != b.task_ptr == %p\n", a.task_ptr, b.task_ptr);
87 ASSERT( (a.thread_id != b.thread_id), "same thread used to execute adjacent nodes");
88 ASSERT( (a.task_ptr != b.task_ptr), "same task used to execute adjacent nodes");
89 return;
90 }
copy_valuewrapper_helper91 static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
92 out.value = in.value;
93 }
94 };
95
96 const int NUMBER_OF_MSGS = 10;
97 const int UNKNOWN_NUMBER_OF_ITEMS = -1;
98 tbb::atomic<int> async_body_exec_count;
99 tbb::atomic<int> async_activity_processed_msg_count;
100 tbb::atomic<int> end_body_exec_count;
101
102 // queueing required in test_reset for testing of cancellation
103 typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
104 typedef counting_async_node_type::gateway_type counting_gateway_type;
105
106 struct counting_async_body {
107 tbb::atomic<int> my_async_body_exec_count;
108
counting_async_bodycounting_async_body109 counting_async_body() {
110 my_async_body_exec_count = 0;
111 }
112
operator ()counting_async_body113 void operator()( const int &input, counting_gateway_type& gateway) {
114 REMARK( "Body execution with input == %d\n", input);
115 ++my_async_body_exec_count;
116 ++async_body_exec_count;
117 if ( input == -1 ) {
118 bool result = tbb::task::self().group()->cancel_group_execution();
119 REMARK( "Canceling graph execution\n" );
120 ASSERT( result == true, "attempted to cancel graph twice" );
121 Harness::Sleep(50);
122 }
123 gateway.try_put(input);
124 }
125 };
126
test_reset()127 void test_reset() {
128 const int N = NUMBER_OF_MSGS;
129 async_body_exec_count = 0;
130
131 tbb::flow::graph g;
132 counting_async_node_type a(g, tbb::flow::serial, counting_async_body() );
133
134 const int R = 3;
135 std::vector< harness_counting_receiver<int> > r(R, harness_counting_receiver<int>(g));
136
137 for (int i = 0; i < R; ++i) {
138 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
139 tbb::flow::make_edge(a, r[i]);
140 #else
141 tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[i] );
142 #endif
143 }
144
145 REMARK( "One body execution\n" );
146 a.try_put(-1);
147 for (int i = 0; i < N; ++i) {
148 a.try_put(i);
149 }
150 g.wait_for_all();
151 // should be canceled with only 1 item reaching the async_body and the counting receivers
152 // and N items left in the node's queue
153 ASSERT( g.is_cancelled() == true, "task group not canceled" );
154
155 counting_async_body b1 = tbb::flow::copy_body<counting_async_body>(a);
156 ASSERT( int(async_body_exec_count) == int(b1.my_async_body_exec_count), "body and global body counts are different" );
157 ASSERT( int(async_body_exec_count) == 1, "global body execution count not 1" );
158 for (int i = 0; i < R; ++i) {
159 ASSERT( int(r[i].my_count) == 1, "counting receiver count not 1" );
160 }
161
162 // should clear the async_node queue, but retain its local count at 1 and keep all edges
163 g.reset(tbb::flow::rf_reset_protocol);
164
165 REMARK( "N body executions\n" );
166 for (int i = 0; i < N; ++i) {
167 a.try_put(i);
168 }
169 g.wait_for_all();
170 ASSERT( g.is_cancelled() == false, "task group not canceled" );
171
172 // a total of N+1 items should have passed through the node body
173 // the local body count should also be N+1
174 // and the counting receivers should all have a count of N+1
175 counting_async_body b2 = tbb::flow::copy_body<counting_async_body>(a);
176 ASSERT( int(async_body_exec_count) == int(b2.my_async_body_exec_count), "local and global body execution counts are different" );
177 REMARK( "async_body_exec_count==%d\n", int(async_body_exec_count) );
178 ASSERT( int(async_body_exec_count) == N+1, "globcal body execution count not N+1" );
179 for (int i = 0; i < R; ++i) {
180 ASSERT( int(r[i].my_count) == N+1, "counting receiver has not received N+1 items" );
181 }
182
183 REMARK( "N body executions with new bodies\n" );
184 // should clear the async_node queue and reset its local count to 0, but keep all edges
185 g.reset(tbb::flow::rf_reset_bodies);
186 for (int i = 0; i < N; ++i) {
187 a.try_put(i);
188 }
189 g.wait_for_all();
190 ASSERT( g.is_cancelled() == false, "task group not canceled" );
191
192 // a total of 2N+1 items should have passed through the node body
193 // the local body count should be N
194 // and the counting receivers should all have a count of 2N+1
195 counting_async_body b3 = tbb::flow::copy_body<counting_async_body>(a);
196 ASSERT( int(async_body_exec_count) == 2*N+1, "global body execution count not 2N+1" );
197 ASSERT( int(b3.my_async_body_exec_count) == N, "local body execution count not N" );
198 for (int i = 0; i < R; ++i) {
199 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
200 }
201
202 // should clear the async_node queue and keep its local count at N and remove all edges
203 REMARK( "N body executions with no edges\n" );
204 g.reset(tbb::flow::rf_clear_edges);
205 for (int i = 0; i < N; ++i) {
206 a.try_put(i);
207 }
208 g.wait_for_all();
209 ASSERT( g.is_cancelled() == false, "task group not canceled" );
210
211 // a total of 3N+1 items should have passed through the node body
212 // the local body count should now be 2*N
213 // and the counting receivers should remain at a count of 2N+1
214 counting_async_body b4 = tbb::flow::copy_body<counting_async_body>(a);
215 ASSERT( int(async_body_exec_count) == 3*N+1, "global body execution count not 3N+1" );
216 ASSERT( int(b4.my_async_body_exec_count) == 2*N, "local body execution count not 2N" );
217 for (int i = 0; i < R; ++i) {
218 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
219 }
220
221 // put back 1 edge to receiver 0
222 REMARK( "N body executions with 1 edge\n" );
223 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
224 tbb::flow::make_edge(a, r[0]);
225 #else
226 tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[0] );
227 #endif
228 for (int i = 0; i < N; ++i) {
229 a.try_put(i);
230 }
231 g.wait_for_all();
232 ASSERT( g.is_cancelled() == false, "task group not canceled" );
233
234 // a total of 4N+1 items should have passed through the node body
235 // the local body count should now be 3*N
236 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
237 counting_async_body b5 = tbb::flow::copy_body<counting_async_body>(a);
238 ASSERT( int(async_body_exec_count) == 4*N+1, "global body execution count not 4N+1" );
239 ASSERT( int(b5.my_async_body_exec_count) == 3*N, "local body execution count not 3N" );
240 ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" );
241 for (int i = 1; i < R; ++i) {
242 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
243 }
244
245 // should clear the async_node queue and keep its local count at N and remove all edges
246 REMARK( "N body executions with no edges and new body\n" );
247 g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
248 for (int i = 0; i < N; ++i) {
249 a.try_put(i);
250 }
251 g.wait_for_all();
252 ASSERT( g.is_cancelled() == false, "task group not canceled" );
253
254 // a total of 4N+1 items should have passed through the node body
255 // the local body count should now be 3*N
256 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
257 counting_async_body b6 = tbb::flow::copy_body<counting_async_body>(a);
258 ASSERT( int(async_body_exec_count) == 5*N+1, "global body execution count not 5N+1" );
259 ASSERT( int(b6.my_async_body_exec_count) == N, "local body execution count not N" );
260 ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" );
261 for (int i = 1; i < R; ++i) {
262 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
263 }
264 }
265
266 template< typename Input, typename Output >
267 class async_activity : NoAssign {
268 public:
269 typedef Input input_type;
270 typedef Output output_type;
271 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
272 typedef typename async_node_type::gateway_type gateway_type;
273
274 struct work_type {
275 input_type input;
276 gateway_type* gateway;
277 };
278
279 class ServiceThreadBody {
280 public:
ServiceThreadBody(async_activity * activity)281 ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
282
operator ()()283 void operator()() {
284 my_activity->process();
285 }
286 private:
287 async_activity* my_activity;
288 };
289
async_activity(int expected_items,bool deferred=false,int sleep_time=50)290 async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
291 : my_expected_items(expected_items), my_sleep_time(sleep_time) {
292 is_active = !deferred;
293 my_quit = false;
294 tbb::tbb_thread( ServiceThreadBody( this ) ).swap( my_service_thread );
295 }
296
297 private:
298
async_activity(const async_activity &)299 async_activity( const async_activity& )
300 : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0) {
301 is_active = true;
302 }
303
304 public:
~async_activity()305 ~async_activity() {
306 stop();
307 my_service_thread.join();
308 }
309
submit(const input_type & input,gateway_type & gateway)310 void submit( const input_type &input, gateway_type& gateway ) {
311 work_type work = { input, &gateway};
312 my_work_queue.push( work );
313 }
314
process()315 void process() {
316 do {
317 work_type work;
318 if( is_active && my_work_queue.try_pop( work ) ) {
319 Harness::Sleep(my_sleep_time);
320 ++async_activity_processed_msg_count;
321 output_type output;
322 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
323 wrapper_helper<output_type, output_type>::check(work.input, output);
324 work.gateway->try_put(output);
325 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
326 int(async_activity_processed_msg_count) == my_expected_items ) {
327 work.gateway->release_wait();
328 }
329 }
330 } while( my_quit == false || !my_work_queue.empty());
331 }
332
stop()333 void stop() {
334 my_quit = true;
335 }
336
activate()337 void activate() {
338 is_active = true;
339 }
340
should_reserve_each_time()341 bool should_reserve_each_time() {
342 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
343 return true;
344 else
345 return false;
346 }
347
348 private:
349
350 const int my_expected_items;
351 const int my_sleep_time;
352 tbb::atomic< bool > is_active;
353
354 tbb::concurrent_queue< work_type > my_work_queue;
355
356 tbb::atomic< bool > my_quit;
357
358 tbb::tbb_thread my_service_thread;
359 };
360
361 template<typename Input, typename Output>
362 struct basic_test {
363 typedef Input input_type;
364 typedef Output output_type;
365 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
366 typedef typename async_node_type::gateway_type gateway_type;
367
368 class start_body_type {
369 typedef Input input_type;
370 public:
operator ()(int input)371 input_type operator()( int input ) {
372 return input_type(input);
373 }
374 };
375
376 #if !__TBB_CPP11_LAMBDAS_PRESENT
377 class async_body_type {
378 typedef Input input_type;
379 typedef Output output_type;
380 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
381 typedef typename async_node_type::gateway_type gateway_type;
382 public:
383 typedef async_activity<input_type, output_type> async_activity_type;
384
async_body_type(async_activity_type * aa)385 async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { }
386
async_body_type(const async_body_type & other)387 async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { }
388
operator ()(const input_type & input,gateway_type & gateway)389 void operator()( const input_type &input, gateway_type& gateway ) {
390 ++async_body_exec_count;
391 my_async_activity->submit( input, gateway);
392 if ( my_async_activity->should_reserve_each_time() )
393 gateway.reserve_wait();
394 }
395
396 private:
397 async_activity_type* my_async_activity;
398 };
399 #endif
400
401 class end_body_type {
402 typedef Output output_type;
403 public:
operator ()(const output_type & input)404 void operator()( const output_type &input ) {
405 ++end_body_exec_count;
406 output_type output;
407 wrapper_helper<output_type, output_type>::check(input, output);
408 }
409 };
410
basic_testbasic_test411 basic_test() {}
412
413 public:
414
runbasic_test415 static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
416 async_activity<input_type, output_type> my_async_activity(async_expected_items);
417 tbb::flow::graph g;
418 tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
419 #if __TBB_CPP11_LAMBDAS_PRESENT
420 async_node_type offload_node(g, tbb::flow::unlimited, [&] (const input_type &input, gateway_type& gateway) {
421 ++async_body_exec_count;
422 my_async_activity.submit(input, gateway);
423 if(my_async_activity.should_reserve_each_time())
424 gateway.reserve_wait();
425 } );
426 #else
427 async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) );
428 #endif
429
430 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() );
431
432 tbb::flow::make_edge( start_node, offload_node );
433 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
434 tbb::flow::make_edge( offload_node, end_node );
435 #else
436 tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node );
437 #endif
438 async_body_exec_count = 0;
439 async_activity_processed_msg_count = 0;
440 end_body_exec_count = 0;
441
442 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
443 offload_node.gateway().reserve_wait();
444 }
445 for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
446 start_node.try_put(i);
447 }
448 g.wait_for_all();
449 ASSERT( async_body_exec_count == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
450 ASSERT( async_activity_processed_msg_count == NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" );
451 ASSERT( end_body_exec_count == NUMBER_OF_MSGS, "EndBody processed wrong number of signals");
452 REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n",
453 int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count));
454 return Harness::Done;
455 }
456
457 };
458
test_copy_ctor()459 int test_copy_ctor() {
460 const int N = NUMBER_OF_MSGS;
461 async_body_exec_count = 0;
462
463 tbb::flow::graph g;
464
465 harness_counting_receiver<int> r1(g);
466 harness_counting_receiver<int> r2(g);
467
468 counting_async_node_type a(g, tbb::flow::unlimited, counting_async_body() );
469 counting_async_node_type b(a);
470 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
471 tbb::flow::make_edge(a, r1);
472 tbb::flow::make_edge(b, r2);
473 #else
474 tbb::flow::make_edge(tbb::flow::output_port<0>(a), r1);
475 tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);
476 #endif
477
478 for (int i = 0; i < N; ++i) {
479 a.try_put(i);
480 }
481 g.wait_for_all();
482
483 REMARK("async_body_exec_count = %d\n", int(async_body_exec_count));
484 REMARK("r1.my_count == %d and r2.my_count = %d\n", int(r1.my_count), int(r2.my_count));
485 ASSERT( int(async_body_exec_count) == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
486 ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" );
487 ASSERT( int(r2.my_count) == 0, "counting receiver r2 has not received 0 items" );
488
489 for (int i = 0; i < N; ++i) {
490 b.try_put(i);
491 }
492 g.wait_for_all();
493
494 REMARK("async_body_exec_count = %d\n", int(async_body_exec_count));
495 REMARK("r1.my_count == %d and r2.my_count = %d\n", int(r1.my_count), int(r2.my_count));
496 ASSERT( int(async_body_exec_count) == 2*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
497 ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" );
498 ASSERT( int(r2.my_count) == N, "counting receiver r2 has not received N items" );
499 return Harness::Done;
500 }
501
502 tbb::atomic<int> main_tid_count;
503
504 template<typename Input, typename Output>
505 struct spin_test {
506 typedef Input input_type;
507 typedef Output output_type;
508 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
509 typedef typename async_node_type::gateway_type gateway_type;
510
511 class start_body_type {
512 typedef Input input_type;
513 public:
operator ()(int input)514 input_type operator()( int input ) {
515 return input_type(input);
516 }
517 };
518
519 #if !__TBB_CPP11_LAMBDAS_PRESENT
520 class async_body_type {
521 typedef Input input_type;
522 typedef Output output_type;
523 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
524 typedef typename async_node_type::gateway_type gateway_type;
525 public:
526 typedef async_activity<input_type, output_type> async_activity_type;
527
async_body_type(async_activity_type * aa)528 async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { }
529
async_body_type(const async_body_type & other)530 async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { }
531
operator ()(const input_type & input,gateway_type & gateway)532 void operator()(const input_type &input, gateway_type& gateway) {
533 ++async_body_exec_count;
534 my_async_activity->submit(input, gateway);
535 if(my_async_activity->should_reserve_each_time())
536 gateway.reserve_wait();
537 }
538
539 private:
540 async_activity_type* my_async_activity;
541 };
542 #endif
543
544 class end_body_type {
545 typedef Output output_type;
546 tbb::tbb_thread::id my_main_tid;
547 Harness::SpinBarrier *my_barrier;
548 public:
end_body_type(tbb::tbb_thread::id t,Harness::SpinBarrier & b)549 end_body_type(tbb::tbb_thread::id t, Harness::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
550
operator ()(const output_type &)551 void operator()( const output_type & ) {
552 ++end_body_exec_count;
553 if (tbb::this_tbb_thread::get_id() == my_main_tid) {
554 ++main_tid_count;
555 }
556 my_barrier->timed_wait_noerror(10);
557 }
558 };
559
spin_testspin_test560 spin_test() {}
561
runspin_test562 static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
563 async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
564 Harness::SpinBarrier spin_barrier(nthreads);
565 tbb::flow::graph g;
566 tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
567 #if __TBB_CPP11_LAMBDAS_PRESENT
568 async_node_type offload_node(g, tbb::flow::unlimited, [&](const input_type &input, gateway_type& gateway) {
569 ++async_body_exec_count;
570 my_async_activity.submit(input, gateway);
571 if(my_async_activity.should_reserve_each_time())
572 gateway.reserve_wait();
573 });
574 #else
575 async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) );
576 #endif
577 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type(tbb::this_tbb_thread::get_id(), spin_barrier) );
578 tbb::flow::make_edge( start_node, offload_node );
579 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
580 tbb::flow::make_edge( offload_node, end_node );
581 #else
582 tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node );
583 #endif
584 async_body_exec_count = 0;
585 async_activity_processed_msg_count = 0;
586 end_body_exec_count = 0;
587 main_tid_count = 0;
588
589 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
590 offload_node.gateway().reserve_wait();
591 }
592 for (int i = 0; i < nthreads*NUMBER_OF_MSGS; ++i) {
593 start_node.try_put(i);
594 }
595 g.wait_for_all();
596 ASSERT( async_body_exec_count == nthreads*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
597 ASSERT( async_activity_processed_msg_count == nthreads*NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" );
598 ASSERT( end_body_exec_count == nthreads*NUMBER_OF_MSGS, "EndBody processed wrong number of signals");
599 ASSERT_WARNING( main_tid_count != 0, "Main thread did not participate in end_body tasks");
600 REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n",
601 int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count));
602 return Harness::Done;
603 }
604
605 };
606
test_for_spin_avoidance()607 void test_for_spin_avoidance() {
608 spin_test<int, int>::run(4);
609 }
610
611 template< typename Input, typename Output >
run_tests()612 int run_tests() {
613 basic_test<Input, Output>::run();
614 basic_test<Input, Output>::run(NUMBER_OF_MSGS);
615 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
616 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
617 return Harness::Done;
618 }
619
620 #include "tbb/parallel_for.h"
621 template<typename Input, typename Output>
622 class equeueing_on_inner_level {
623 typedef Input input_type;
624 typedef Output output_type;
625 typedef async_activity<input_type, output_type> async_activity_type;
626 typedef tbb::flow::async_node<Input, Output> async_node_type;
627 typedef typename async_node_type::gateway_type gateway_type;
628
629 class start_body_type {
630 public:
operator ()(int input)631 input_type operator() ( int input ) {
632 return input_type( input);
633 }
634 };
635
636 class async_body_type {
637 public:
async_body_type(async_activity_type & activity)638 async_body_type( async_activity_type& activity ) : my_async_activity(&activity) {}
639
operator ()(const input_type & input,gateway_type & gateway)640 void operator() ( const input_type &input, gateway_type& gateway ) {
641 gateway.reserve_wait();
642 my_async_activity->submit( input, gateway );
643 }
644 private:
645 async_activity_type* my_async_activity;
646 };
647
648 class end_body_type {
649 public:
operator ()(output_type)650 void operator()( output_type ) {}
651 };
652
653 class body_graph_with_async {
654 public:
body_graph_with_async(Harness::SpinBarrier & barrier,async_activity_type & activity)655 body_graph_with_async( Harness::SpinBarrier& barrier, async_activity_type& activity )
656 : spin_barrier(&barrier), my_async_activity(&activity) {}
657
operator ()(int) const658 void operator()(int) const {
659 tbb::flow::graph g;
660 tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
661
662 async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( *my_async_activity ) );
663
664 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() );
665
666 tbb::flow::make_edge( start_node, offload_node );
667 tbb::flow::make_edge( offload_node, end_node );
668
669 start_node.try_put(1);
670
671 spin_barrier->wait();
672
673 my_async_activity->activate();
674
675 g.wait_for_all();
676 }
677
678 private:
679 Harness::SpinBarrier* spin_barrier;
680 async_activity_type* my_async_activity;
681 };
682
683
684 public:
run()685 static int run ()
686 {
687 const int nthreads = tbb::this_task_arena::max_concurrency();
688 Harness::SpinBarrier spin_barrier( nthreads );
689
690 async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
691
692 tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
693 return Harness::Done;
694 }
695 };
696
run_test_equeueing_on_inner_level()697 int run_test_equeueing_on_inner_level() {
698 equeueing_on_inner_level<int, int>::run();
699 return Harness::Done;
700 }
701
702 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
703 #include <array>
704 #include <thread>
705
706 template<typename NodeType>
707 class AsyncActivity {
708 public:
709 using gateway_t = typename NodeType::gateway_type;
710
711 struct work_type {
712 int input;
713 gateway_t* gateway;
714 };
715
__anon12ee7c900302() 716 AsyncActivity(size_t limit) : thr([this]() {
717 while(!end_of_work()) {
718 work_type w;
719 while( my_q.try_pop(w) ) {
720 int res = do_work(w.input);
721 w.gateway->try_put(res);
722 w.gateway->release_wait();
723 ++c;
724 }
725 }
726 }), stop_limit(limit), c(0) {}
727
submit(int i,gateway_t * gateway)728 void submit(int i, gateway_t* gateway) {
729 work_type w = {i, gateway};
730 gateway->reserve_wait();
731 my_q.push(w);
732 }
733
wait_for_all()734 void wait_for_all() { thr.join(); }
735
736 private:
end_of_work()737 bool end_of_work() { return c >= stop_limit; }
738
do_work(int & i)739 int do_work(int& i) { return i + i; }
740
741 tbb::concurrent_queue<work_type> my_q;
742 tbb::tbb_thread thr;
743 size_t stop_limit;
744 size_t c;
745 };
746
test_follows()747 void test_follows() {
748 using namespace tbb::flow;
749
750 using input_t = int;
751 using output_t = int;
752 using node_t = async_node<input_t, output_t>;
753
754 graph g;
755
756 AsyncActivity<node_t> async_activity(3);
757
758 std::array<broadcast_node<input_t>, 3> preds = {
759 {
760 broadcast_node<input_t>(g),
761 broadcast_node<input_t>(g),
762 broadcast_node<input_t>(g)
763 }
764 };
765
766 node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
767 async_activity.submit(input, >w);
768 });
769
770 buffer_node<output_t> buf(g);
771 make_edge(node, buf);
772
773 for(auto& pred: preds) {
774 pred.try_put(1);
775 }
776
777 g.wait_for_all();
778 async_activity.wait_for_all();
779
780 output_t storage;
781 ASSERT((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
782 "Not exact edge quantity was made");
783 }
784
test_precedes()785 void test_precedes() {
786 using namespace tbb::flow;
787
788 using input_t = int;
789 using output_t = int;
790 using node_t = async_node<input_t, output_t>;
791
792 graph g;
793
794 AsyncActivity<node_t> async_activity(1);
795
796 std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} };
797
798 broadcast_node<input_t> start(g);
799
800 node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) {
801 async_activity.submit(input, >w);
802 });
803
804 make_edge(start, node);
805
806 start.try_put(1);
807
808 g.wait_for_all();
809 async_activity.wait_for_all();
810
811 for(auto& successor : successors) {
812 output_t storage;
813 ASSERT(successor.try_get(storage) && !successor.try_get(storage),
814 "Not exact edge quantity was made");
815 }
816 }
817
test_follows_and_precedes_api()818 void test_follows_and_precedes_api() {
819 test_follows();
820 test_precedes();
821 }
822 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
823
824 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
825 typedef tbb::flow::async_node< int, int, tbb::flow::queueing, std::allocator<int> > async_node_type;
826
827 struct async_body {
operator ()async_body828 void operator()( const int&, async_node_type::gateway_type& ) {}
829 };
830
test_node_allocator()831 void test_node_allocator() {
832 tbb::flow::graph g;
833 async_node_type tmp(g, tbb::flow::unlimited, async_body());
834 }
835 #endif
836
TestMain()837 int TestMain() {
838 tbb::task_scheduler_init init(4);
839 run_tests<int, int>();
840 run_tests<minimal_type, minimal_type>();
841 run_tests<int, minimal_type>();
842
843 lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
844
845 test_reset();
846 test_copy_ctor();
847 test_for_spin_avoidance();
848 run_test_equeueing_on_inner_level();
849 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
850 test_follows_and_precedes_api();
851 #endif
852 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
853 test_node_allocator();
854 #endif
855 return Harness::Done;
856 }
857
858