1 /*
2     Copyright (c) 2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // have to expose the reset_node method to be able to reset a function_body
18 
19 #include "harness.h"
20 #define TBB_DEPRECATED_INPUT_NODE_BODY __TBB_CPF_BUILD
21 
22 #include "harness_graph.h"
23 #include "tbb/flow_graph.h"
24 #include "tbb/task.h"
25 #include "tbb/task_scheduler_init.h"
26 
27 const int N = 1000;
28 
29 template< typename T >
30 class test_push_receiver : public tbb::flow::receiver<T>, NoAssign {
31 
32     tbb::atomic<int> my_counters[N];
33     tbb::flow::graph& my_graph;
34 
35 public:
36 
test_push_receiver(tbb::flow::graph & g)37     test_push_receiver(tbb::flow::graph& g) : my_graph(g) {
38         for (int i = 0; i < N; ++i )
39             my_counters[i] = 0;
40     }
41 
get_count(int i)42     int get_count( int i ) {
43        int v = my_counters[i];
44        return v;
45     }
46 
47     typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
48 
49 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
50     typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
51     typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
52     built_predecessors_type bpt;
built_predecessors()53     built_predecessors_type &built_predecessors() __TBB_override { return bpt; }
internal_add_built_predecessor(predecessor_type &)54     void internal_add_built_predecessor( predecessor_type & ) __TBB_override { }
internal_delete_built_predecessor(predecessor_type &)55     void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { }
copy_predecessors(predecessor_list_type &)56     void copy_predecessors( predecessor_list_type & ) __TBB_override { }
predecessor_count()57     size_t predecessor_count() __TBB_override { return 0; }
58 #endif
59 
try_put_task(const T & v)60     tbb::task *try_put_task( const T &v ) __TBB_override {
61        int i = (int)v;
62        ++my_counters[i];
63        return const_cast<tbb::task *>(SUCCESSFULLY_ENQUEUED);
64     }
65 
graph_reference() const66     tbb::flow::graph& graph_reference() const __TBB_override {
67         return my_graph;
68     }
69 
reset_receiver(tbb::flow::reset_flags)70     void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {}
71 };
72 
73 template< typename T >
74 class source_body {
75 
76    unsigned my_count;
77    int *ninvocations;
78 
79 public:
80 
source_body()81    source_body() : ninvocations(NULL) { my_count = 0; }
source_body(int & _inv)82    source_body(int &_inv) : ninvocations(&_inv)  { my_count = 0; }
83 
84 #if TBB_DEPRECATED_INPUT_NODE_BODY
operator ()(T & v)85     bool operator()( T &v ) {
86         v = (T)my_count++;
87         if(ninvocations) ++(*ninvocations);
88         if ( (int)v < N )
89             return true;
90         else
91             return false;
92     }
93 #else
operator ()(tbb::flow_control & fc)94     T operator()( tbb::flow_control& fc ) {
95         T v = (T)my_count++;
96         if(ninvocations) ++(*ninvocations);
97         if ( (int)v < N ){
98             return v;
99         }else{
100             fc.stop();
101             return T();
102         }
103     }
104 #endif
105 };
106 
107 template< typename T >
108 class function_body {
109 
110     tbb::atomic<int> *my_counters;
111 
112 public:
113 
function_body(tbb::atomic<int> * counters)114     function_body( tbb::atomic<int> *counters ) : my_counters(counters) {
115         for (int i = 0; i < N; ++i )
116             my_counters[i] = 0;
117     }
118 
operator ()(T v)119     bool operator()( T v ) {
120         ++my_counters[(int)v];
121         return true;
122     }
123 
124 };
125 
126 template< typename T >
test_single_dest()127 void test_single_dest() {
128 
129    // push only
130    tbb::flow::graph g;
131    tbb::flow::input_node<T> src(g, source_body<T>() );
132    test_push_receiver<T> dest(g);
133    tbb::flow::make_edge( src, dest );
134    src.activate();
135    g.wait_for_all();
136    for (int i = 0; i < N; ++i ) {
137        ASSERT( dest.get_count(i) == 1, NULL );
138    }
139 
140    // push only
141    tbb::atomic<int> counters3[N];
142    tbb::flow::input_node<T> src3(g, source_body<T>() );
143 
144    function_body<T> b3( counters3 );
145    tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 );
146    tbb::flow::make_edge( src3, dest3 );
147    src3.activate();
148    g.wait_for_all();
149    for (int i = 0; i < N; ++i ) {
150        int v = counters3[i];
151        ASSERT( v == 1, NULL );
152    }
153 
154    // push & pull
155    tbb::flow::input_node<T> src2(g, source_body<T>() );
156    tbb::atomic<int> counters2[N];
157    function_body<T> b2( counters2 );
158    tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 );
159    tbb::flow::make_edge( src2, dest2 );
160    src2.activate();
161    g.wait_for_all();
162    for (int i = 0; i < N; ++i ) {
163        int v = counters2[i];
164        ASSERT( v == 1, NULL );
165    }
166 
167    // test copy constructor
168    tbb::flow::input_node<T> src_copy(src);
169    test_push_receiver<T> dest_c(g);
170    ASSERT( src_copy.register_successor(dest_c), NULL );
171    src_copy.activate();
172    g.wait_for_all();
173    for (int i = 0; i < N; ++i ) {
174        ASSERT( dest_c.get_count(i) == 1, NULL );
175    }
176 }
177 
test_reset()178 void test_reset() {
179     //    source_node -> function_node
180     tbb::flow::graph g;
181     tbb::atomic<int> counters3[N];
182     tbb::flow::input_node<int> src3(g, source_body<int>() );
183     tbb::flow::input_node<int> src_inactive(g, source_body<int>());
184     function_body<int> b3( counters3 );
185     tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3 );
186     tbb::flow::make_edge( src3, dest3 );
187     src3.activate();
188     //    source_node is now in active state.  Let the graph run,
189     g.wait_for_all();
190     //    check the array for each value.
191     for (int i = 0; i < N; ++i ) {
192         int v = counters3[i];
193         ASSERT( v == 1, NULL );
194         counters3[i] = 0;
195     }
196 
197     g.reset(tbb::flow::rf_reset_bodies);  // <-- re-initializes the counts.
198     // and spawns task to run source
199     src3.activate();
200 
201     g.wait_for_all();
202     //    check output queue again.  Should be the same contents.
203     for (int i = 0; i < N; ++i ) {
204         int v = counters3[i];
205         ASSERT( v == 1, NULL );
206         counters3[i] = 0;
207     }
208     g.reset();  // doesn't reset the source_node_body to initial state, but does spawn a task
209                 // to run the source_node.
210 
211     g.wait_for_all();
212     // array should be all zero
213     for (int i = 0; i < N; ++i ) {
214         int v = counters3[i];
215         ASSERT( v == 0, NULL );
216     }
217 
218     remove_edge(src3, dest3);
219     make_edge(src_inactive, dest3);
220 
221     // src_inactive doesn't run
222     g.wait_for_all();
223     for (int i = 0; i < N; ++i ) {
224         int v = counters3[i];
225         ASSERT( v == 0, NULL );
226     }
227 
228     // run graph
229     src_inactive.activate();
230     g.wait_for_all();
231     // check output
232     for (int i = 0; i < N; ++i ) {
233         int v = counters3[i];
234         ASSERT( v == 1, NULL );
235         counters3[i] = 0;
236     }
237     g.reset(tbb::flow::rf_reset_bodies);  // <-- reinitializes the counts
238     // src_inactive doesn't run
239     g.wait_for_all();
240     for (int i = 0; i < N; ++i ) {
241         int v = counters3[i];
242         ASSERT( v == 0, NULL );
243     }
244 
245     // start it up
246     src_inactive.activate();
247     g.wait_for_all();
248     for (int i = 0; i < N; ++i ) {
249         int v = counters3[i];
250         ASSERT( v == 1, NULL );
251         counters3[i] = 0;
252     }
253     g.reset();  // doesn't reset the source_node_body to initial state, and doesn't
254                 // spawn a task to run the source_node.
255 
256     g.wait_for_all();
257     // array should be all zero
258     for (int i = 0; i < N; ++i ) {
259         int v = counters3[i];
260         ASSERT( v == 0, NULL );
261     }
262     src_inactive.activate();
263     // source_node_body is already in final state, so source_node will not forward a message.
264     g.wait_for_all();
265     for (int i = 0; i < N; ++i ) {
266         int v = counters3[i];
267         ASSERT( v == 0, NULL );
268     }
269 }
270 
271 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
272 #if TBB_DEPRECATED_INPUT_NODE_BODY
source_body_f(int & i)273     bool source_body_f(int& i) { return i > 5; }
274 #else
source_body_f(tbb::flow_control &)275     int source_body_f(tbb::flow_control&) { return 42; }
276 #endif
test_deduction_guides()277 void test_deduction_guides() {
278     using namespace tbb::flow;
279     graph g;
280 
281 #if TBB_DEPRECATED_INPUT_NODE_BODY
282     auto lambda = [](int& i) { return i > 5; };
283     auto non_const_lambda = [](int& i) mutable { return i > 5; };
284 #else
285     auto lambda = [](tbb::flow_control&) { return 42; };
286     auto non_const_lambda = [](tbb::flow_control&) mutable { return 42; };
287 #endif
288     // Tests for source_node(graph&, Body)
289     input_node s1(g, lambda);
290     static_assert(std::is_same_v<decltype(s1), input_node<int>>);
291 
292     input_node s2(g, non_const_lambda);
293     static_assert(std::is_same_v<decltype(s2), input_node<int>>);
294 
295     input_node s3(g, source_body_f);
296     static_assert(std::is_same_v<decltype(s3), input_node<int>>);
297 
298     input_node s4(s3);
299     static_assert(std::is_same_v<decltype(s4), input_node<int>>);
300 
301 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
302     broadcast_node<int> bc(g);
303 
304     // Tests for source_node(const node_set<Args...>&, Body)
305     input_node s5(precedes(bc), lambda);
306     static_assert(std::is_same_v<decltype(s5), input_node<int>>);
307 
308     input_node s6(precedes(bc), non_const_lambda);
309     static_assert(std::is_same_v<decltype(s6), input_node<int>>);
310 
311     input_node s7(precedes(bc), source_body_f);
312     static_assert(std::is_same_v<decltype(s7), input_node<int>>);
313 #endif
314     g.wait_for_all();
315 }
316 
317 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
318 
319 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
320 #include <array>
test_follows_and_precedes_api()321 void test_follows_and_precedes_api() {
322     using namespace tbb::flow;
323 
324     graph g;
325 
326     std::array<buffer_node<bool>, 3> successors {{
327         buffer_node<bool>(g),
328         buffer_node<bool>(g),
329         buffer_node<bool>(g)
330     }};
331 
332     bool do_try_put = true;
333     input_node<bool> src(precedes(successors[0], successors[1], successors[2]),
334     #if TBB_DEPRECATED_INPUT_NODE_BODY
335     [&](bool& v) -> bool {
336         if(do_try_put) {
337             v = do_try_put;
338             do_try_put = false;
339             return true;
340         }
341         else {
342             return false;
343         }
344     }
345     #else
346     [&](tbb::flow_control& fc) -> bool {
347         if(!do_try_put)
348             fc.stop();
349         do_try_put = !do_try_put;
350         return true;
351     }
352     #endif
353     );
354 
355     src.activate();
356     g.wait_for_all();
357 
358     bool storage;
359     for(auto& successor: successors) {
360         ASSERT((successor.try_get(storage) && !successor.try_get(storage)),
361             "Not exact edge quantity was made");
362     }
363 }
364 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
365 
TestMain()366 int TestMain() {
367     if( MinThread<1 ) {
368         REPORT("number of threads must be positive\n");
369         exit(1);
370     }
371     for ( int p = MinThread; p < MaxThread; ++p ) {
372         tbb::task_scheduler_init init(p);
373         test_single_dest<int>();
374         test_single_dest<float>();
375     }
376     test_reset();
377 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
378     test_extract();
379 #endif
380 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
381     test_follows_and_precedes_api();
382 #endif
383 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
384     test_deduction_guides();
385 #endif
386     return Harness::Done;
387 }
388 
389