1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #define TBB_DEPRECATED_FLOW_NODE_ALLOCATOR __TBB_CPF_BUILD
18 #define TBB_DEPRECATED_INPUT_NODE_BODY __TBB_CPF_BUILD
19 
20 #include "harness.h"
21 #include "harness_graph.h"
22 #include "tbb/flow_graph.h"
23 #include "tbb/task_scheduler_init.h"
24 
25 #if defined(_MSC_VER) && _MSC_VER < 1600
26     #pragma warning (disable : 4503) //disabling the "decorated name length exceeded" warning for VS2008 and earlier
27 #endif
28 
29 //
30 // Tests
31 //
32 
33 const int Count = 300;
34 const int MaxPorts = 10;
35 const int MaxNSources = 5; // max # of source_nodes to register for each split_node input in parallel test
36 
37 std::vector<bool> flags;   // for checking output
38 
39 template<typename T>
40 class name_of {
41 public:
name()42     static const char* name() { return  "Unknown"; }
43 };
44 template<>
45 class name_of<int> {
46 public:
name()47     static const char* name() { return  "int"; }
48 };
49 template<>
50 class name_of<float> {
51 public:
name()52     static const char* name() { return  "float"; }
53 };
54 template<>
55 class name_of<double> {
56 public:
name()57     static const char* name() { return  "double"; }
58 };
59 template<>
60 class name_of<long> {
61 public:
name()62     static const char* name() { return  "long"; }
63 };
64 template<>
65 class name_of<short> {
66 public:
name()67     static const char* name() { return  "short"; }
68 };
69 
70 // T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10,
71 // so the max number generated right now is 1500 or so.)  Source will generate a series of TT with value
72 // (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body.  We are attaching addend
73 // source nodes to a join_port, and each will generate part of the numerical series the port is expecting
74 // to receive.  If there is only one source node, the series order will be maintained; if more than one,
75 // this is not guaranteed.
76 
77 template<int N>
78 struct tuple_helper {
79     template<typename TupleType>
set_elementtuple_helper80     static void set_element( TupleType &t, int i) {
81         tbb::flow::get<N-1>(t) = (typename tbb::flow::tuple_element<N-1,TupleType>::type)(i * (N+1));
82         tuple_helper<N-1>::set_element(t, i);
83     }
84 };
85 
86 template<>
87 struct tuple_helper<1> {
88     template<typename TupleType>
set_elementtuple_helper89     static void set_element(TupleType &t, int i) {
90         tbb::flow::get<0>(t) = (typename tbb::flow::tuple_element<0,TupleType>::type)(i * 2);
91     }
92 };
93 
94 // if we start N source_bodys they will all have the addend N, and my_count should be initialized to 0 .. N-1.
95 // the output tuples should have all the sequence, but the order will in general vary.
96 template<typename TupleType>
97 class source_body {
98     typedef TupleType TT;
99     static const int N = tbb::flow::tuple_size<TT>::value;
100     int my_count;
101     int addend;
102 public:
source_body(int init_val,int addto)103     source_body(int init_val, int addto) : my_count(init_val), addend(addto) { }
104 #if TBB_DEPRECATED_INPUT_NODE_BODY
operator ()(TT & v)105     bool operator()( TT &v) {
106         if(my_count >= Count) return false;
107         tuple_helper<N>::set_element(v, my_count);
108         my_count += addend;
109         return true;
110     }
111 #else
operator ()(tbb::flow_control & fc)112     TT operator()( tbb::flow_control &fc) {
113         if(my_count >= Count){
114             fc.stop();
115             return TT();
116         }
117         TT v;
118         tuple_helper<N>::set_element(v, my_count);
119         my_count += addend;
120         return v;
121     }
122 #endif
123 };
124 
125 // allocator for split_node.
126 
127 template<int N, typename SType>
128 class makeSplit {
129 public:
create(tbb::flow::graph & g)130     static SType *create(tbb::flow::graph& g) {
131         SType *temp = new SType(g);
132         return temp;
133     }
destroy(SType * p)134     static void destroy(SType *p) { delete p; }
135 };
136 
137 // holder for sink_node pointers for eventual deletion
138 
139 static void* all_sink_nodes[MaxPorts];
140 
141 
142 template<int ELEM, typename SType>
143 class sink_node_helper {
144 public:
145     typedef typename SType::input_type TT;
146     typedef typename tbb::flow::tuple_element<ELEM-1,TT>::type IT;
147     typedef typename tbb::flow::queue_node<IT> my_sink_node_type;
print_parallel_remark()148     static void print_parallel_remark() {
149         sink_node_helper<ELEM-1,SType>::print_parallel_remark();
150         REMARK(", %s", name_of<IT>::name());
151     }
print_serial_remark()152     static void print_serial_remark() {
153         sink_node_helper<ELEM-1,SType>::print_serial_remark();
154         REMARK(", %s", name_of<IT>::name());
155     }
add_sink_nodes(SType & my_split,tbb::flow::graph & g)156     static void add_sink_nodes(SType &my_split, tbb::flow::graph &g) {
157         my_sink_node_type *new_node = new my_sink_node_type(g);
158         tbb::flow::make_edge( tbb::flow::output_port<ELEM-1>(my_split) , *new_node);
159         all_sink_nodes[ELEM-1] = (void *)new_node;
160         sink_node_helper<ELEM-1, SType>::add_sink_nodes(my_split, g);
161     }
162 
check_sink_values()163     static void check_sink_values() {
164         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[ELEM-1]);
165         for(int i = 0; i < Count; ++i) {
166             IT v;
167             ASSERT(dp->try_get(v), NULL);
168             flags[((int)v) / (ELEM+1)] = true;
169         }
170         for(int i = 0; i < Count; ++i) {
171             ASSERT(flags[i], NULL);
172             flags[i] = false;  // reset for next test
173         }
174         sink_node_helper<ELEM-1,SType>::check_sink_values();
175     }
remove_sink_nodes(SType & my_split)176     static void remove_sink_nodes(SType& my_split) {
177         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[ELEM-1]);
178         tbb::flow::remove_edge( tbb::flow::output_port<ELEM-1>(my_split) , *dp);
179         delete dp;
180         sink_node_helper<ELEM-1, SType>::remove_sink_nodes(my_split);
181     }
182 };
183 
184 template<typename SType>
185 class sink_node_helper<1, SType> {
186     typedef typename SType::input_type TT;
187     typedef typename tbb::flow::tuple_element<0,TT>::type IT;
188     typedef typename tbb::flow::queue_node<IT> my_sink_node_type;
189 public:
print_parallel_remark()190     static void print_parallel_remark() {
191         REMARK("Parallel test of split_node< %s", name_of<IT>::name());
192     }
print_serial_remark()193     static void print_serial_remark() {
194         REMARK("Serial test of split_node< %s", name_of<IT>::name());
195     }
add_sink_nodes(SType & my_split,tbb::flow::graph & g)196     static void add_sink_nodes(SType &my_split, tbb::flow::graph &g) {
197         my_sink_node_type *new_node = new my_sink_node_type(g);
198         tbb::flow::make_edge( tbb::flow::output_port<0>(my_split) , *new_node);
199         all_sink_nodes[0] = (void *)new_node;
200     }
check_sink_values()201     static void check_sink_values() {
202         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[0]);
203         for(int i = 0; i < Count; ++i) {
204             IT v;
205             ASSERT(dp->try_get(v), NULL);
206             flags[((int)v) / 2] = true;
207         }
208         for(int i = 0; i < Count; ++i) {
209             ASSERT(flags[i], NULL);
210             flags[i] = false;  // reset for next test
211         }
212     }
remove_sink_nodes(SType & my_split)213     static void remove_sink_nodes(SType& my_split) {
214         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[0]);
215         tbb::flow::remove_edge( tbb::flow::output_port<0>(my_split) , *dp);
216         delete dp;
217     }
218 };
219 
220 // parallel_test: create source_nodes that feed tuples into the split node
221 //    and queue_nodes that receive the output.
222 template<typename SType>
223 class parallel_test {
224 public:
225     typedef typename SType::input_type TType;
226     typedef tbb::flow::input_node<TType> source_type;
227     static const int N = tbb::flow::tuple_size<TType>::value;
test()228     static void test() {
229         source_type* all_source_nodes[MaxNSources];
230         sink_node_helper<N,SType>::print_parallel_remark();
231         REMARK(" >\n");
232         for(int i=0; i < MaxPorts; ++i) {
233             all_sink_nodes[i] = NULL;
234         }
235         // try test for # sources 1 .. MaxNSources
236         for(int nInputs = 1; nInputs <= MaxNSources; ++nInputs) {
237             tbb::flow::graph g;
238             SType* my_split = makeSplit<N,SType>::create(g);
239 
240             // add sinks first so when sources start spitting out values they are there to catch them
241             sink_node_helper<N, SType>::add_sink_nodes((*my_split), g);
242 
243             // now create nInputs source_nodes, each spitting out i, i+nInputs, i+2*nInputs ...
244             // each element of the tuple is i*(n+1), where n is the tuple element index (1-N)
245             for(int i = 0; i < nInputs; ++i) {
246                 // create source node
247                 source_type *s = new source_type(g, source_body<TType>(i, nInputs) );
248                 tbb::flow::make_edge(*s, *my_split);
249                 all_source_nodes[i] = s;
250                 s->activate();
251             }
252 
253             g.wait_for_all();
254 
255             // check that we got Count values in each output queue, and all the index values
256             // are there.
257             sink_node_helper<N, SType>::check_sink_values();
258 
259             sink_node_helper<N, SType>::remove_sink_nodes(*my_split);
260             for(int i = 0; i < nInputs; ++i) {
261                 delete all_source_nodes[i];
262             }
263             makeSplit<N,SType>::destroy(my_split);
264         }
265     }
266 };
267 
268 //
269 // Single predecessor, single accepting successor at each port
270 
271 template<typename SType>
test_one_serial(SType & my_split,tbb::flow::graph & g)272 void test_one_serial( SType &my_split, tbb::flow::graph &g) {
273     typedef typename SType::input_type TType;
274     static const int TUPLE_SIZE = tbb::flow::tuple_size<TType>::value;
275     sink_node_helper<TUPLE_SIZE, SType>::add_sink_nodes(my_split,g);
276     typedef TType q3_input_type;
277     tbb::flow::queue_node< q3_input_type >  q3(g);
278 
279     tbb::flow::make_edge( q3, my_split );
280 
281     // fill the  queue with its value one-at-a-time
282     flags.clear();
283     for (int i = 0; i < Count; ++i ) {
284         TType v;
285         tuple_helper<TUPLE_SIZE>::set_element(v, i);
286         ASSERT(my_split.try_put(v), NULL);
287         flags.push_back(false);
288     }
289 
290     g.wait_for_all();
291 
292     sink_node_helper<TUPLE_SIZE,SType>::check_sink_values();
293 
294     sink_node_helper<TUPLE_SIZE, SType>::remove_sink_nodes(my_split);
295 
296 }
297 
298 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
test_follow_and_precedes_api()299 void test_follow_and_precedes_api() {
300     using namespace tbb::flow;
301     using msg_t = tuple<int, float, double>;
302 
303     graph g;
304 
305     function_node<msg_t, msg_t> f1(g, unlimited, [](msg_t msg) { return msg; } );
306     auto f2(f1);
307     auto f3(f1);
308 
309     tbb::atomic<int> body_calls = 0;
310 
311     function_node<int, int> f4(g, unlimited, [&](int val) { ++body_calls; return val; } );
312     function_node<float, float> f5(g, unlimited, [&](float val) { ++body_calls; return val; } );
313     function_node<double, double> f6(g, unlimited, [&](double val) { ++body_calls; return val; } );
314 
315     split_node<msg_t> following_node(follows(f1, f2, f3));
316     make_edge(output_port<0>(following_node), f4);
317     make_edge(output_port<1>(following_node), f5);
318     make_edge(output_port<2>(following_node), f6);
319 
320     split_node<msg_t> preceding_node(precedes(f4, f5, f6));
321     make_edge(f1, preceding_node);
322     make_edge(f2, preceding_node);
323     make_edge(f3, preceding_node);
324 
325     msg_t msg(1, 2.2f, 3.3);
326     f1.try_put(msg);
327     f2.try_put(msg);
328     f3.try_put(msg);
329 
330     g.wait_for_all();
331 
332     // <number of try puts> * <number of splits by a source node> * <number of source nodes>
333     ASSERT((body_calls == 3*3*2), "Not exact edge quantity was made");
334 }
335 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
336 
337 template<typename SType>
338 class serial_test {
339     typedef typename SType::input_type TType;
340     static const int TUPLE_SIZE = tbb::flow::tuple_size<TType>::value;
341     static const int ELEMS = 3;
342 public:
test()343 static void test() {
344     tbb::flow::graph g;
345     flags.reserve(Count);
346     SType* my_split = makeSplit<TUPLE_SIZE,SType>::create(g);
347     sink_node_helper<TUPLE_SIZE, SType>::print_serial_remark(); REMARK(" >\n");
348 
349     test_output_ports_return_ref(*my_split);
350 
351     test_one_serial<SType>(*my_split, g);
352     // build the vector with copy construction from the used split node.
353     std::vector<SType>split_vector(ELEMS, *my_split);
354     // destroy the tired old split_node in case we're accidentally reusing pieces of it.
355     makeSplit<TUPLE_SIZE,SType>::destroy(my_split);
356 
357 
358     for(int e = 0; e < ELEMS; ++e) {  // exercise each of the vector elements
359         test_one_serial<SType>(split_vector[e], g);
360     }
361 }
362 
363 }; // serial_test
364 
365 template<
366       template<typename> class TestType,  // serial_test or parallel_test
367       typename TupleType >                               // type of the input of the split
368 struct generate_test {
369     typedef tbb::flow::split_node<TupleType> split_node_type;
do_testgenerate_test370     static void do_test() {
371         TestType<split_node_type>::test();
372     }
373 }; // generate_test
374 
375 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
376 
test_deduction_guides()377 void test_deduction_guides() {
378     using namespace tbb::flow;
379     using tuple_type = std::tuple<int, int>;
380 
381     graph g;
382     split_node<tuple_type> s0(g);
383 
384     split_node s1(s0);
385     static_assert(std::is_same_v<decltype(s1), split_node<tuple_type>>);
386 
387 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
388     broadcast_node<tuple_type> b1(g), b2(g);
389     broadcast_node<int> b3(g), b4(g);
390 
391     split_node s2(follows(b1, b2));
392     static_assert(std::is_same_v<decltype(s2), split_node<tuple_type>>);
393 
394     split_node s3(precedes(b3, b4));
395     static_assert(std::is_same_v<decltype(s3), split_node<tuple_type>>);
396 #endif
397 }
398 
399 #endif
400 
401 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
test_node_allocator()402 void test_node_allocator() {
403     tbb::flow::graph g;
404     tbb::flow::split_node< tbb::flow::tuple<int,int>, std::allocator<int> > tmp(g);
405 }
406 #endif
407 
TestMain()408 int TestMain() {
409 #if __TBB_USE_TBB_TUPLE
410     REMARK("  Using TBB tuple\n");
411 #else
412     REMARK("  Using platform tuple\n");
413 #endif
414     for (int p = 0; p < 2; ++p) {
415         generate_test<serial_test, tbb::flow::tuple<float, double> >::do_test();
416 #if MAX_TUPLE_TEST_SIZE >= 4
417         generate_test<serial_test, tbb::flow::tuple<float, double, int, long> >::do_test();
418 #endif
419 #if MAX_TUPLE_TEST_SIZE >= 6
420         generate_test<serial_test, tbb::flow::tuple<double, double, int, long, int, short> >::do_test();
421 #endif
422 #if MAX_TUPLE_TEST_SIZE >= 8
423         generate_test<serial_test, tbb::flow::tuple<float, double, double, double, float, int, float, long> >::do_test();
424 #endif
425 #if MAX_TUPLE_TEST_SIZE >= 10
426         generate_test<serial_test, tbb::flow::tuple<float, double, int, double, double, float, long, int, float, long> >::do_test();
427 #endif
428         generate_test<parallel_test, tbb::flow::tuple<float, double> >::do_test();
429 #if MAX_TUPLE_TEST_SIZE >= 3
430         generate_test<parallel_test, tbb::flow::tuple<float, int, long> >::do_test();
431 #endif
432 #if MAX_TUPLE_TEST_SIZE >= 5
433         generate_test<parallel_test, tbb::flow::tuple<double, double, int, int, short> >::do_test();
434 #endif
435 #if MAX_TUPLE_TEST_SIZE >= 7
436         generate_test<parallel_test, tbb::flow::tuple<float, int, double, float, long, float, long> >::do_test();
437 #endif
438 #if MAX_TUPLE_TEST_SIZE >= 9
439         generate_test<parallel_test, tbb::flow::tuple<float, double, int, double, double, long, int, float, long> >::do_test();
440 #endif
441     }
442 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
443     test_follow_and_precedes_api();
444 #endif
445 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
446     test_deduction_guides();
447 #endif
448 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
449     test_node_allocator();
450 #endif
451     return Harness::Done;
452 }
453