1 /*
2 Copyright (c) 2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 // have to expose the reset_node method to be able to reset a function_body
18
19 #include "harness.h"
20 #define TBB_DEPRECATED_INPUT_NODE_BODY __TBB_CPF_BUILD
21
22 #include "harness_graph.h"
23 #include "tbb/flow_graph.h"
24 #include "tbb/task.h"
25 #include "tbb/task_scheduler_init.h"
26
27 const int N = 1000;
28
29 template< typename T >
30 class test_push_receiver : public tbb::flow::receiver<T>, NoAssign {
31
32 tbb::atomic<int> my_counters[N];
33 tbb::flow::graph& my_graph;
34
35 public:
36
test_push_receiver(tbb::flow::graph & g)37 test_push_receiver(tbb::flow::graph& g) : my_graph(g) {
38 for (int i = 0; i < N; ++i )
39 my_counters[i] = 0;
40 }
41
get_count(int i)42 int get_count( int i ) {
43 int v = my_counters[i];
44 return v;
45 }
46
47 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
48
49 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
50 typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
51 typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
52 built_predecessors_type bpt;
built_predecessors()53 built_predecessors_type &built_predecessors() __TBB_override { return bpt; }
internal_add_built_predecessor(predecessor_type &)54 void internal_add_built_predecessor( predecessor_type & ) __TBB_override { }
internal_delete_built_predecessor(predecessor_type &)55 void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { }
copy_predecessors(predecessor_list_type &)56 void copy_predecessors( predecessor_list_type & ) __TBB_override { }
predecessor_count()57 size_t predecessor_count() __TBB_override { return 0; }
58 #endif
59
try_put_task(const T & v)60 tbb::task *try_put_task( const T &v ) __TBB_override {
61 int i = (int)v;
62 ++my_counters[i];
63 return const_cast<tbb::task *>(SUCCESSFULLY_ENQUEUED);
64 }
65
graph_reference() const66 tbb::flow::graph& graph_reference() const __TBB_override {
67 return my_graph;
68 }
69
reset_receiver(tbb::flow::reset_flags)70 void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {}
71 };
72
73 template< typename T >
74 class source_body {
75
76 unsigned my_count;
77 int *ninvocations;
78
79 public:
80
source_body()81 source_body() : ninvocations(NULL) { my_count = 0; }
source_body(int & _inv)82 source_body(int &_inv) : ninvocations(&_inv) { my_count = 0; }
83
84 #if TBB_DEPRECATED_INPUT_NODE_BODY
operator ()(T & v)85 bool operator()( T &v ) {
86 v = (T)my_count++;
87 if(ninvocations) ++(*ninvocations);
88 if ( (int)v < N )
89 return true;
90 else
91 return false;
92 }
93 #else
operator ()(tbb::flow_control & fc)94 T operator()( tbb::flow_control& fc ) {
95 T v = (T)my_count++;
96 if(ninvocations) ++(*ninvocations);
97 if ( (int)v < N ){
98 return v;
99 }else{
100 fc.stop();
101 return T();
102 }
103 }
104 #endif
105 };
106
107 template< typename T >
108 class function_body {
109
110 tbb::atomic<int> *my_counters;
111
112 public:
113
function_body(tbb::atomic<int> * counters)114 function_body( tbb::atomic<int> *counters ) : my_counters(counters) {
115 for (int i = 0; i < N; ++i )
116 my_counters[i] = 0;
117 }
118
operator ()(T v)119 bool operator()( T v ) {
120 ++my_counters[(int)v];
121 return true;
122 }
123
124 };
125
126 template< typename T >
test_single_dest()127 void test_single_dest() {
128
129 // push only
130 tbb::flow::graph g;
131 tbb::flow::input_node<T> src(g, source_body<T>() );
132 test_push_receiver<T> dest(g);
133 tbb::flow::make_edge( src, dest );
134 src.activate();
135 g.wait_for_all();
136 for (int i = 0; i < N; ++i ) {
137 ASSERT( dest.get_count(i) == 1, NULL );
138 }
139
140 // push only
141 tbb::atomic<int> counters3[N];
142 tbb::flow::input_node<T> src3(g, source_body<T>() );
143
144 function_body<T> b3( counters3 );
145 tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 );
146 tbb::flow::make_edge( src3, dest3 );
147 src3.activate();
148 g.wait_for_all();
149 for (int i = 0; i < N; ++i ) {
150 int v = counters3[i];
151 ASSERT( v == 1, NULL );
152 }
153
154 // push & pull
155 tbb::flow::input_node<T> src2(g, source_body<T>() );
156 tbb::atomic<int> counters2[N];
157 function_body<T> b2( counters2 );
158 tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 );
159 tbb::flow::make_edge( src2, dest2 );
160 src2.activate();
161 g.wait_for_all();
162 for (int i = 0; i < N; ++i ) {
163 int v = counters2[i];
164 ASSERT( v == 1, NULL );
165 }
166
167 // test copy constructor
168 tbb::flow::input_node<T> src_copy(src);
169 test_push_receiver<T> dest_c(g);
170 ASSERT( src_copy.register_successor(dest_c), NULL );
171 src_copy.activate();
172 g.wait_for_all();
173 for (int i = 0; i < N; ++i ) {
174 ASSERT( dest_c.get_count(i) == 1, NULL );
175 }
176 }
177
test_reset()178 void test_reset() {
179 // source_node -> function_node
180 tbb::flow::graph g;
181 tbb::atomic<int> counters3[N];
182 tbb::flow::input_node<int> src3(g, source_body<int>() );
183 tbb::flow::input_node<int> src_inactive(g, source_body<int>());
184 function_body<int> b3( counters3 );
185 tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3 );
186 tbb::flow::make_edge( src3, dest3 );
187 src3.activate();
188 // source_node is now in active state. Let the graph run,
189 g.wait_for_all();
190 // check the array for each value.
191 for (int i = 0; i < N; ++i ) {
192 int v = counters3[i];
193 ASSERT( v == 1, NULL );
194 counters3[i] = 0;
195 }
196
197 g.reset(tbb::flow::rf_reset_bodies); // <-- re-initializes the counts.
198 // and spawns task to run source
199 src3.activate();
200
201 g.wait_for_all();
202 // check output queue again. Should be the same contents.
203 for (int i = 0; i < N; ++i ) {
204 int v = counters3[i];
205 ASSERT( v == 1, NULL );
206 counters3[i] = 0;
207 }
208 g.reset(); // doesn't reset the source_node_body to initial state, but does spawn a task
209 // to run the source_node.
210
211 g.wait_for_all();
212 // array should be all zero
213 for (int i = 0; i < N; ++i ) {
214 int v = counters3[i];
215 ASSERT( v == 0, NULL );
216 }
217
218 remove_edge(src3, dest3);
219 make_edge(src_inactive, dest3);
220
221 // src_inactive doesn't run
222 g.wait_for_all();
223 for (int i = 0; i < N; ++i ) {
224 int v = counters3[i];
225 ASSERT( v == 0, NULL );
226 }
227
228 // run graph
229 src_inactive.activate();
230 g.wait_for_all();
231 // check output
232 for (int i = 0; i < N; ++i ) {
233 int v = counters3[i];
234 ASSERT( v == 1, NULL );
235 counters3[i] = 0;
236 }
237 g.reset(tbb::flow::rf_reset_bodies); // <-- reinitializes the counts
238 // src_inactive doesn't run
239 g.wait_for_all();
240 for (int i = 0; i < N; ++i ) {
241 int v = counters3[i];
242 ASSERT( v == 0, NULL );
243 }
244
245 // start it up
246 src_inactive.activate();
247 g.wait_for_all();
248 for (int i = 0; i < N; ++i ) {
249 int v = counters3[i];
250 ASSERT( v == 1, NULL );
251 counters3[i] = 0;
252 }
253 g.reset(); // doesn't reset the source_node_body to initial state, and doesn't
254 // spawn a task to run the source_node.
255
256 g.wait_for_all();
257 // array should be all zero
258 for (int i = 0; i < N; ++i ) {
259 int v = counters3[i];
260 ASSERT( v == 0, NULL );
261 }
262 src_inactive.activate();
263 // source_node_body is already in final state, so source_node will not forward a message.
264 g.wait_for_all();
265 for (int i = 0; i < N; ++i ) {
266 int v = counters3[i];
267 ASSERT( v == 0, NULL );
268 }
269 }
270
271 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
272 #if TBB_DEPRECATED_INPUT_NODE_BODY
source_body_f(int & i)273 bool source_body_f(int& i) { return i > 5; }
274 #else
source_body_f(tbb::flow_control &)275 int source_body_f(tbb::flow_control&) { return 42; }
276 #endif
test_deduction_guides()277 void test_deduction_guides() {
278 using namespace tbb::flow;
279 graph g;
280
281 #if TBB_DEPRECATED_INPUT_NODE_BODY
282 auto lambda = [](int& i) { return i > 5; };
283 auto non_const_lambda = [](int& i) mutable { return i > 5; };
284 #else
285 auto lambda = [](tbb::flow_control&) { return 42; };
286 auto non_const_lambda = [](tbb::flow_control&) mutable { return 42; };
287 #endif
288 // Tests for source_node(graph&, Body)
289 input_node s1(g, lambda);
290 static_assert(std::is_same_v<decltype(s1), input_node<int>>);
291
292 input_node s2(g, non_const_lambda);
293 static_assert(std::is_same_v<decltype(s2), input_node<int>>);
294
295 input_node s3(g, source_body_f);
296 static_assert(std::is_same_v<decltype(s3), input_node<int>>);
297
298 input_node s4(s3);
299 static_assert(std::is_same_v<decltype(s4), input_node<int>>);
300
301 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
302 broadcast_node<int> bc(g);
303
304 // Tests for source_node(const node_set<Args...>&, Body)
305 input_node s5(precedes(bc), lambda);
306 static_assert(std::is_same_v<decltype(s5), input_node<int>>);
307
308 input_node s6(precedes(bc), non_const_lambda);
309 static_assert(std::is_same_v<decltype(s6), input_node<int>>);
310
311 input_node s7(precedes(bc), source_body_f);
312 static_assert(std::is_same_v<decltype(s7), input_node<int>>);
313 #endif
314 g.wait_for_all();
315 }
316
317 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
318
319 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
320 #include <array>
test_follows_and_precedes_api()321 void test_follows_and_precedes_api() {
322 using namespace tbb::flow;
323
324 graph g;
325
326 std::array<buffer_node<bool>, 3> successors {{
327 buffer_node<bool>(g),
328 buffer_node<bool>(g),
329 buffer_node<bool>(g)
330 }};
331
332 bool do_try_put = true;
333 input_node<bool> src(precedes(successors[0], successors[1], successors[2]),
334 #if TBB_DEPRECATED_INPUT_NODE_BODY
335 [&](bool& v) -> bool {
336 if(do_try_put) {
337 v = do_try_put;
338 do_try_put = false;
339 return true;
340 }
341 else {
342 return false;
343 }
344 }
345 #else
346 [&](tbb::flow_control& fc) -> bool {
347 if(!do_try_put)
348 fc.stop();
349 do_try_put = !do_try_put;
350 return true;
351 }
352 #endif
353 );
354
355 src.activate();
356 g.wait_for_all();
357
358 bool storage;
359 for(auto& successor: successors) {
360 ASSERT((successor.try_get(storage) && !successor.try_get(storage)),
361 "Not exact edge quantity was made");
362 }
363 }
364 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
365
TestMain()366 int TestMain() {
367 if( MinThread<1 ) {
368 REPORT("number of threads must be positive\n");
369 exit(1);
370 }
371 for ( int p = MinThread; p < MaxThread; ++p ) {
372 tbb::task_scheduler_init init(p);
373 test_single_dest<int>();
374 test_single_dest<float>();
375 }
376 test_reset();
377 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
378 test_extract();
379 #endif
380 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
381 test_follows_and_precedes_api();
382 #endif
383 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
384 test_deduction_guides();
385 #endif
386 return Harness::Done;
387 }
388
389