1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __TBB_CPF_BUILD
18 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
19 #endif
20 
21 #include "harness.h"
22 #include "harness_graph.h"
23 
24 #include "tbb/flow_graph.h"
25 #include "tbb/task_scheduler_init.h"
26 #include "test_follows_and_precedes_api.h"
27 
28 #define N 1000
29 #define MAX_NODES 4
30 #define C 8
31 
32 struct empty_no_assign : private NoAssign {
empty_no_assignempty_no_assign33    empty_no_assign() {}
empty_no_assignempty_no_assign34    empty_no_assign( int ) {}
operator intempty_no_assign35    operator int() { return 0; }
36 };
37 
38 // A class to use as a fake predecessor of continue_node
39 struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
40 {
41     typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
42     // Define implementations of virtual methods that are abstract in the base class
register_successorfake_continue_sender43     bool register_successor( successor_type& ) __TBB_override { return false; }
remove_successorfake_continue_sender44     bool remove_successor( successor_type& )   __TBB_override { return false; }
45 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
46     typedef tbb::flow::sender<tbb::flow::continue_msg>::built_successors_type built_successors_type;
47     built_successors_type bst;
built_successorsfake_continue_sender48     built_successors_type &built_successors() __TBB_override { return bst; }
internal_add_built_successorfake_continue_sender49     void internal_add_built_successor( successor_type &) __TBB_override { }
internal_delete_built_successorfake_continue_sender50     void internal_delete_built_successor( successor_type &) __TBB_override { }
copy_successorsfake_continue_sender51     void copy_successors(successor_list_type &) __TBB_override {}
successor_countfake_continue_sender52     size_t successor_count() __TBB_override {return 0;}
53 #endif
54 };
55 
56 template< typename InputType >
57 struct parallel_puts : private NoAssign {
58 
59     tbb::flow::receiver< InputType > * const my_exe_node;
60 
parallel_putsparallel_puts61     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
62 
operator ()parallel_puts63     void operator()( int ) const  {
64         for ( int i = 0; i < N; ++i ) {
65             // the nodes will accept all puts
66             ASSERT( my_exe_node->try_put( InputType() ) == true, NULL );
67         }
68     }
69 
70 };
71 
72 template< typename OutputType >
run_continue_nodes(int p,tbb::flow::graph & g,tbb::flow::continue_node<OutputType> & n)73 void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
74     fake_continue_sender fake_sender;
75     for (size_t i = 0; i < N; ++i) {
76         n.register_predecessor( fake_sender );
77     }
78 
79     for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
80         std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
81         harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
82 
83         for (size_t r = 0; r < num_receivers; ++r ) {
84             tbb::flow::make_edge( n, receivers[r] );
85         }
86 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
87         ASSERT(n.successor_count() == (size_t)num_receivers, NULL);
88         ASSERT(n.predecessor_count() == 0, NULL);
89         typename tbb::flow::continue_node<OutputType>::successor_list_type my_succs;
90         typedef typename tbb::flow::continue_node<OutputType>::successor_list_type::iterator sv_iter_type;
91         n.copy_successors(my_succs);
92         ASSERT(my_succs.size() == num_receivers, NULL);
93 #endif
94 
95         NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
96         g.wait_for_all();
97 
98         // 2) the nodes will receive puts from multiple predecessors simultaneously,
99         size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
100         ASSERT( (int)ec == p, NULL );
101         for (size_t r = 0; r < num_receivers; ++r ) {
102             size_t c = receivers[r].my_count;
103             // 3) the nodes will send to multiple successors.
104             ASSERT( (int)c == p, NULL );
105         }
106 
107 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
108         for(sv_iter_type si=my_succs.begin(); si != my_succs.end(); ++si) {
109             tbb::flow::remove_edge( n, **si );
110         }
111 #else
112         for (size_t r = 0; r < num_receivers; ++r ) {
113             tbb::flow::remove_edge( n, receivers[r] );
114         }
115 #endif
116     }
117 }
118 
119 template< typename OutputType, typename Body >
continue_nodes(Body body)120 void continue_nodes( Body body ) {
121     for (int p = 1; p < 2*MaxThread; ++p) {
122         tbb::flow::graph g;
123         tbb::flow::continue_node< OutputType > exe_node( g, body );
124         run_continue_nodes( p, g, exe_node);
125         exe_node.try_put(tbb::flow::continue_msg());
126         tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
127         run_continue_nodes( p, g, exe_node_copy);
128     }
129 }
130 
131 const size_t Offset = 123;
132 tbb::atomic<size_t> global_execute_count;
133 
134 template< typename OutputType >
135 struct inc_functor {
136 
137     tbb::atomic<size_t> local_execute_count;
inc_functorinc_functor138     inc_functor( ) { local_execute_count = 0; }
inc_functorinc_functor139     inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; }
operator =inc_functor140     void operator=(const inc_functor &f) { local_execute_count = f.local_execute_count; }
141 
operator ()inc_functor142     OutputType operator()( tbb::flow::continue_msg ) {
143        ++global_execute_count;
144        ++local_execute_count;
145        return OutputType();
146     }
147 
148 };
149 
150 template< typename OutputType >
continue_nodes_with_copy()151 void continue_nodes_with_copy( ) {
152 
153     for (int p = 1; p < 2*MaxThread; ++p) {
154         tbb::flow::graph g;
155         inc_functor<OutputType> cf;
156         cf.local_execute_count = Offset;
157         global_execute_count = Offset;
158 
159         tbb::flow::continue_node< OutputType > exe_node( g, cf );
160         fake_continue_sender fake_sender;
161         for (size_t i = 0; i < N; ++i) {
162            exe_node.register_predecessor( fake_sender );
163         }
164 
165         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
166             std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
167 
168             for (size_t r = 0; r < num_receivers; ++r ) {
169                 tbb::flow::make_edge( exe_node, receivers[r] );
170             }
171 
172             NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
173             g.wait_for_all();
174 
175             // 2) the nodes will receive puts from multiple predecessors simultaneously,
176             for (size_t r = 0; r < num_receivers; ++r ) {
177                 size_t c = receivers[r].my_count;
178                 // 3) the nodes will send to multiple successors.
179                 ASSERT( (int)c == p, NULL );
180             }
181             for (size_t r = 0; r < num_receivers; ++r ) {
182                 tbb::flow::remove_edge( exe_node, receivers[r] );
183             }
184         }
185 
186         // validate that the local body matches the global execute_count and both are correct
187         inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
188         const size_t expected_count = p*MAX_NODES + Offset;
189         size_t global_count = global_execute_count;
190         size_t inc_count = body_copy.local_execute_count;
191         ASSERT( global_count == expected_count && global_count == inc_count, NULL );
192         g.reset(tbb::flow::rf_reset_bodies);
193         body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
194         inc_count = body_copy.local_execute_count;
195         ASSERT( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
196 
197     }
198 }
199 
200 template< typename OutputType >
run_continue_nodes()201 void run_continue_nodes() {
202     harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
203     #if __TBB_CPP11_LAMBDAS_PRESENT
204     continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
205     #endif
206     continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
207     continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
208     continue_nodes_with_copy<OutputType>();
209 }
210 
211 //! Tests limited concurrency cases for nodes that accept data messages
test_concurrency(int num_threads)212 void test_concurrency(int num_threads) {
213     tbb::task_scheduler_init init(num_threads);
214     run_continue_nodes<tbb::flow::continue_msg>();
215     run_continue_nodes<int>();
216     run_continue_nodes<empty_no_assign>();
217 }
218 /*
219  * Connection of two graphs is not currently supported, but works to some limited extent.
220  * This test is included to check for backward compatibility. It checks that a continue_node
221  * with predecessors in two different graphs receives the required
222  * number of continue messages before it executes.
223  */
224 using namespace tbb::flow;
225 
226 struct add_to_counter {
227     int* counter;
add_to_counteradd_to_counter228     add_to_counter(int& var):counter(&var){}
operator ()add_to_counter229     void operator()(continue_msg){*counter+=1;}
230 };
231 
test_two_graphs()232 void test_two_graphs(){
233     int count=0;
234 
235     //graph g with broadcast_node and continue_node
236     graph g;
237     broadcast_node<continue_msg> start_g(g);
238     continue_node<continue_msg> first_g(g, add_to_counter(count));
239 
240     //graph h with broadcast_node
241     graph h;
242     broadcast_node<continue_msg> start_h(h);
243 
244     //making two edges to first_g from the two graphs
245     make_edge(start_g,first_g);
246     make_edge(start_h, first_g);
247 
248     //two try_puts from the two graphs
249     start_g.try_put(continue_msg());
250     start_h.try_put(continue_msg());
251     g.wait_for_all();
252     ASSERT(count==1, "Not all continue messages received");
253 
254     //two try_puts from the graph that doesn't contain the node
255     count=0;
256     start_h.try_put(continue_msg());
257     start_h.try_put(continue_msg());
258     g.wait_for_all();
259     ASSERT(count==1, "Not all continue messages received -1");
260 
261     //only one try_put
262     count=0;
263     start_g.try_put(continue_msg());
264     g.wait_for_all();
265     ASSERT(count==0, "Node executed without waiting for all predecessors");
266 }
267 
268 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
test_extract()269 void test_extract() {
270     int my_count = 0;
271     tbb::flow::continue_msg cm;
272     tbb::flow::graph g;
273     tbb::flow::broadcast_node<tbb::flow::continue_msg> b0(g);
274     tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g);
275     tbb::flow::continue_node<tbb::flow::continue_msg>  c0(g, add_to_counter(my_count));
276     tbb::flow::queue_node<tbb::flow::continue_msg> q0(g);
277 
278     tbb::flow::make_edge(b0, c0);
279     tbb::flow::make_edge(b1, c0);
280     tbb::flow::make_edge(c0, q0);
281     for( int i = 0; i < 2; ++i ) {
282         ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
283         ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
284         ASSERT(c0.predecessor_count() == 2 && c0.successor_count() == 1, "c0 has incorrect counts");
285         ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
286 
287         /* b0         */
288         /*   \        */
289         /*    c0 - q0 */
290         /*   /        */
291         /* b1         */
292 
293         b0.try_put(tbb::flow::continue_msg());
294         g.wait_for_all();
295         ASSERT(my_count == 0, "continue_node fired too soon");
296         b1.try_put(tbb::flow::continue_msg());
297         g.wait_for_all();
298         ASSERT(my_count == 1, "continue_node didn't fire");
299         ASSERT(q0.try_get(cm), "continue_node didn't forward");
300 
301         b0.extract();
302 
303         /* b0         */
304         /*            */
305         /*    c0 - q0 */
306         /*   /        */
307         /* b1         */
308 
309         ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
310         ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
311         ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 1, "c0 has incorrect counts");
312         ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
313         b0.try_put(tbb::flow::continue_msg());
314         b0.try_put(tbb::flow::continue_msg());
315         g.wait_for_all();
316         ASSERT(my_count == 1, "b0 messages being forwarded to continue_node even though it is disconnected");
317         b1.try_put(tbb::flow::continue_msg());
318         g.wait_for_all();
319         ASSERT(my_count == 2, "continue_node didn't fire though it has only one predecessor");
320         ASSERT(q0.try_get(cm), "continue_node didn't forward second time");
321 
322         c0.extract();
323 
324         /* b0         */
325         /*            */
326         /*    c0   q0 */
327         /*            */
328         /* b1         */
329 
330         ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
331         ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
332         ASSERT(c0.predecessor_count() == 0 && c0.successor_count() == 0, "c0 has incorrect counts");
333         ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
334         b0.try_put(tbb::flow::continue_msg());
335         b0.try_put(tbb::flow::continue_msg());
336         b1.try_put(tbb::flow::continue_msg());
337         b1.try_put(tbb::flow::continue_msg());
338         g.wait_for_all();
339         ASSERT(my_count == 2, "continue didn't fire though it has only one predecessor");
340         ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't");
341         make_edge(b0, c0);
342 
343         /* b0         */
344         /*   \        */
345         /*    c0   q0 */
346         /*            */
347         /* b1         */
348 
349         ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
350         ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
351         ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 0, "c0 has incorrect counts");
352         ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
353 
354         b0.try_put(tbb::flow::continue_msg());
355         g.wait_for_all();
356 
357         ASSERT(my_count == 3, "continue didn't fire though it has only one predecessor");
358         ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't");
359 
360         tbb::flow::make_edge(b1, c0);
361         tbb::flow::make_edge(c0, q0);
362         my_count = 0;
363     }
364 }
365 #endif
366 
367 struct lightweight_policy_body : NoAssign {
368     const tbb::tbb_thread::id my_thread_id;
369     tbb::atomic<size_t> my_count;
370 
lightweight_policy_bodylightweight_policy_body371     lightweight_policy_body() : my_thread_id(tbb::this_tbb_thread::get_id()) {
372         my_count = 0;
373     }
operator ()lightweight_policy_body374     void operator()(tbb::flow::continue_msg) {
375         ++my_count;
376         tbb::tbb_thread::id body_thread_id = tbb::this_tbb_thread::get_id();
377         ASSERT(body_thread_id == my_thread_id, "Body executed as not lightweight");
378     }
379 };
380 
test_lightweight_policy()381 void test_lightweight_policy() {
382     tbb::flow::graph g;
383     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node1(g, lightweight_policy_body());
384     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node2(g, lightweight_policy_body());
385 
386     tbb::flow::make_edge(node1, node2);
387     const size_t n = 10;
388     for(size_t i = 0; i < n; ++i) {
389         node1.try_put(tbb::flow::continue_msg());
390     }
391     g.wait_for_all();
392 
393     lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
394     lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
395     ASSERT(body1.my_count == n, "Body of the first node needs to be executed N times");
396     ASSERT(body2.my_count == n, "Body of the second node needs to be executed N times");
397 }
398 
399 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
400 #include <array>
401 #include <vector>
test_follows_and_precedes_api()402 void test_follows_and_precedes_api() {
403     using msg_t = tbb::flow::continue_msg;
404 
405     std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } };
406     std::vector<msg_t> messages_for_precedes  = { msg_t() };
407 
408     auto pass_through = [](const msg_t& msg) { return msg; };
409 
410     follows_and_precedes_testing::test_follows
411         <msg_t, tbb::flow::continue_node<msg_t>>
412         (messages_for_follows, pass_through);
413 
414     follows_and_precedes_testing::test_precedes
415         <msg_t, tbb::flow::continue_node<msg_t>>
416         (messages_for_precedes, pass_through);
417 }
418 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
419 
420 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
421 
422 template <typename ExpectedType, typename Body>
test_deduction_guides_common(Body body)423 void test_deduction_guides_common(Body body) {
424     using namespace tbb::flow;
425     graph g;
426 
427     continue_node c1(g, body);
428     static_assert(std::is_same_v<decltype(c1), continue_node<ExpectedType>>);
429 
430     continue_node c2(g, body, lightweight());
431     static_assert(std::is_same_v<decltype(c2), continue_node<ExpectedType, lightweight>>);
432 
433     continue_node c3(g, 5, body);
434     static_assert(std::is_same_v<decltype(c3), continue_node<ExpectedType>>);
435 
436     continue_node c4(g, 5, body, lightweight());
437     static_assert(std::is_same_v<decltype(c4), continue_node<ExpectedType, lightweight>>);
438 
439 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
440     continue_node c5(g, body, node_priority_t(5));
441     static_assert(std::is_same_v<decltype(c5), continue_node<ExpectedType>>);
442 
443     continue_node c6(g, body, lightweight(), node_priority_t(5));
444     static_assert(std::is_same_v<decltype(c6), continue_node<ExpectedType, lightweight>>);
445 
446     continue_node c7(g, 5, body, node_priority_t(5));
447     static_assert(std::is_same_v<decltype(c7), continue_node<ExpectedType>>);
448 
449     continue_node c8(g, 5, body, lightweight(), node_priority_t(5));
450     static_assert(std::is_same_v<decltype(c8), continue_node<ExpectedType, lightweight>>);
451 #endif
452 
453 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
454     broadcast_node<continue_msg> b(g);
455 
456     continue_node c9(follows(b), body);
457     static_assert(std::is_same_v<decltype(c9), continue_node<ExpectedType>>);
458 
459     continue_node c10(follows(b), body, lightweight());
460     static_assert(std::is_same_v<decltype(c10), continue_node<ExpectedType, lightweight>>);
461 
462     continue_node c11(follows(b), 5, body);
463     static_assert(std::is_same_v<decltype(c11), continue_node<ExpectedType>>);
464 
465     continue_node c12(follows(b), 5, body, lightweight());
466     static_assert(std::is_same_v<decltype(c12), continue_node<ExpectedType, lightweight>>);
467 
468 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
469     continue_node c13(follows(b), body, node_priority_t(5));
470     static_assert(std::is_same_v<decltype(c13), continue_node<ExpectedType>>);
471 
472     continue_node c14(follows(b), body, lightweight(), node_priority_t(5));
473     static_assert(std::is_same_v<decltype(c14), continue_node<ExpectedType, lightweight>>);
474 
475     continue_node c15(follows(b), 5, body, node_priority_t(5));
476     static_assert(std::is_same_v<decltype(c15), continue_node<ExpectedType>>);
477 
478     continue_node c16(follows(b), 5, body, lightweight(), node_priority_t(5));
479     static_assert(std::is_same_v<decltype(c16), continue_node<ExpectedType, lightweight>>);
480 #endif
481 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
482 
483     continue_node c17(c1);
484     static_assert(std::is_same_v<decltype(c17), continue_node<ExpectedType>>);
485 }
486 
continue_body_f(const tbb::flow::continue_msg &)487 int continue_body_f(const tbb::flow::continue_msg&) { return 1; }
continue_void_body_f(const tbb::flow::continue_msg &)488 void continue_void_body_f(const tbb::flow::continue_msg&) {}
489 
test_deduction_guides()490 void test_deduction_guides() {
491     using tbb::flow::continue_msg;
492     test_deduction_guides_common<int>([](const continue_msg&)->int { return 1; } );
493     test_deduction_guides_common<continue_msg>([](const continue_msg&) {});
494 
495     test_deduction_guides_common<int>([](const continue_msg&) mutable ->int { return 1; });
496     test_deduction_guides_common<continue_msg>([](const continue_msg&) mutable {});
497 
498     test_deduction_guides_common<int>(continue_body_f);
499     test_deduction_guides_common<continue_msg>(continue_void_body_f);
500 }
501 
502 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
503 
504 // TODO: use pass_through from test_function_node instead
505 template<typename T>
506 struct passing_body {
operator ()passing_body507     T operator()(const T& val) {
508         return val;
509     }
510 };
511 
512 /*
513     The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
514     because there used to be a bug when make_edge(node, continue_node)
515     did not update continue_node's predecesosor threshold
516     since the specialization of node's successor_cache for a continue_node was not chosen.
517 */
test_successor_cache_specialization()518 void test_successor_cache_specialization() {
519     using namespace tbb::flow;
520 
521     graph g;
522 
523     broadcast_node<continue_msg> node_with_default_mutex_type(g);
524     buffer_node<continue_msg> node_with_non_default_mutex_type(g);
525 
526     continue_node<continue_msg> node(g, passing_body<continue_msg>());
527 
528     make_edge(node_with_default_mutex_type, node);
529     make_edge(node_with_non_default_mutex_type, node);
530 
531     buffer_node<continue_msg> buf(g);
532 
533     make_edge(node, buf);
534 
535     node_with_default_mutex_type.try_put(continue_msg());
536     node_with_non_default_mutex_type.try_put(continue_msg());
537 
538     g.wait_for_all();
539 
540     continue_msg storage;
541     ASSERT((buf.try_get(storage) && !buf.try_get(storage)),
542             "Wrong number of messages is passed via continue_node");
543 }
544 
TestMain()545 int TestMain() {
546     if( MinThread<1 ) {
547         REPORT("number of threads must be positive\n");
548         exit(1);
549     }
550     for( int p=MinThread; p<=MaxThread; ++p ) {
551        test_concurrency(p);
552    }
553    test_two_graphs();
554 #if __TBB_PREVIEW_LIGHTWEIGHT_POLICY
555    test_lightweight_policy();
556 #endif
557 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
558    test_extract();
559 #endif
560 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
561     test_follows_and_precedes_api();
562 #endif
563 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
564     test_deduction_guides();
565 #endif
566    test_successor_cache_specialization();
567    return Harness::Done;
568 }
569