1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #define TBB_DEPRECATED_FLOW_NODE_ALLOCATOR __TBB_CPF_BUILD
18 #define TBB_DEPRECATED_INPUT_NODE_BODY __TBB_CPF_BUILD
19
20 #include "harness.h"
21 #include "harness_graph.h"
22 #include "tbb/flow_graph.h"
23 #include "tbb/task_scheduler_init.h"
24
25 #if defined(_MSC_VER) && _MSC_VER < 1600
26 #pragma warning (disable : 4503) //disabling the "decorated name length exceeded" warning for VS2008 and earlier
27 #endif
28
29 //
30 // Tests
31 //
32
33 const int Count = 300;
34 const int MaxPorts = 10;
35 const int MaxNSources = 5; // max # of source_nodes to register for each split_node input in parallel test
36
37 std::vector<bool> flags; // for checking output
38
39 template<typename T>
40 class name_of {
41 public:
name()42 static const char* name() { return "Unknown"; }
43 };
44 template<>
45 class name_of<int> {
46 public:
name()47 static const char* name() { return "int"; }
48 };
49 template<>
50 class name_of<float> {
51 public:
name()52 static const char* name() { return "float"; }
53 };
54 template<>
55 class name_of<double> {
56 public:
name()57 static const char* name() { return "double"; }
58 };
59 template<>
60 class name_of<long> {
61 public:
name()62 static const char* name() { return "long"; }
63 };
64 template<>
65 class name_of<short> {
66 public:
name()67 static const char* name() { return "short"; }
68 };
69
70 // T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10,
71 // so the max number generated right now is 1500 or so.) Source will generate a series of TT with value
72 // (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body. We are attaching addend
73 // source nodes to a join_port, and each will generate part of the numerical series the port is expecting
74 // to receive. If there is only one source node, the series order will be maintained; if more than one,
75 // this is not guaranteed.
76
77 template<int N>
78 struct tuple_helper {
79 template<typename TupleType>
set_elementtuple_helper80 static void set_element( TupleType &t, int i) {
81 tbb::flow::get<N-1>(t) = (typename tbb::flow::tuple_element<N-1,TupleType>::type)(i * (N+1));
82 tuple_helper<N-1>::set_element(t, i);
83 }
84 };
85
86 template<>
87 struct tuple_helper<1> {
88 template<typename TupleType>
set_elementtuple_helper89 static void set_element(TupleType &t, int i) {
90 tbb::flow::get<0>(t) = (typename tbb::flow::tuple_element<0,TupleType>::type)(i * 2);
91 }
92 };
93
94 // if we start N source_bodys they will all have the addend N, and my_count should be initialized to 0 .. N-1.
95 // the output tuples should have all the sequence, but the order will in general vary.
96 template<typename TupleType>
97 class source_body {
98 typedef TupleType TT;
99 static const int N = tbb::flow::tuple_size<TT>::value;
100 int my_count;
101 int addend;
102 public:
source_body(int init_val,int addto)103 source_body(int init_val, int addto) : my_count(init_val), addend(addto) { }
104 #if TBB_DEPRECATED_INPUT_NODE_BODY
operator ()(TT & v)105 bool operator()( TT &v) {
106 if(my_count >= Count) return false;
107 tuple_helper<N>::set_element(v, my_count);
108 my_count += addend;
109 return true;
110 }
111 #else
operator ()(tbb::flow_control & fc)112 TT operator()( tbb::flow_control &fc) {
113 if(my_count >= Count){
114 fc.stop();
115 return TT();
116 }
117 TT v;
118 tuple_helper<N>::set_element(v, my_count);
119 my_count += addend;
120 return v;
121 }
122 #endif
123 };
124
125 // allocator for split_node.
126
127 template<int N, typename SType>
128 class makeSplit {
129 public:
create(tbb::flow::graph & g)130 static SType *create(tbb::flow::graph& g) {
131 SType *temp = new SType(g);
132 return temp;
133 }
destroy(SType * p)134 static void destroy(SType *p) { delete p; }
135 };
136
137 // holder for sink_node pointers for eventual deletion
138
139 static void* all_sink_nodes[MaxPorts];
140
141
142 template<int ELEM, typename SType>
143 class sink_node_helper {
144 public:
145 typedef typename SType::input_type TT;
146 typedef typename tbb::flow::tuple_element<ELEM-1,TT>::type IT;
147 typedef typename tbb::flow::queue_node<IT> my_sink_node_type;
print_parallel_remark()148 static void print_parallel_remark() {
149 sink_node_helper<ELEM-1,SType>::print_parallel_remark();
150 REMARK(", %s", name_of<IT>::name());
151 }
print_serial_remark()152 static void print_serial_remark() {
153 sink_node_helper<ELEM-1,SType>::print_serial_remark();
154 REMARK(", %s", name_of<IT>::name());
155 }
add_sink_nodes(SType & my_split,tbb::flow::graph & g)156 static void add_sink_nodes(SType &my_split, tbb::flow::graph &g) {
157 my_sink_node_type *new_node = new my_sink_node_type(g);
158 tbb::flow::make_edge( tbb::flow::output_port<ELEM-1>(my_split) , *new_node);
159 all_sink_nodes[ELEM-1] = (void *)new_node;
160 sink_node_helper<ELEM-1, SType>::add_sink_nodes(my_split, g);
161 }
162
check_sink_values()163 static void check_sink_values() {
164 my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[ELEM-1]);
165 for(int i = 0; i < Count; ++i) {
166 IT v;
167 ASSERT(dp->try_get(v), NULL);
168 flags[((int)v) / (ELEM+1)] = true;
169 }
170 for(int i = 0; i < Count; ++i) {
171 ASSERT(flags[i], NULL);
172 flags[i] = false; // reset for next test
173 }
174 sink_node_helper<ELEM-1,SType>::check_sink_values();
175 }
remove_sink_nodes(SType & my_split)176 static void remove_sink_nodes(SType& my_split) {
177 my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[ELEM-1]);
178 tbb::flow::remove_edge( tbb::flow::output_port<ELEM-1>(my_split) , *dp);
179 delete dp;
180 sink_node_helper<ELEM-1, SType>::remove_sink_nodes(my_split);
181 }
182 };
183
184 template<typename SType>
185 class sink_node_helper<1, SType> {
186 typedef typename SType::input_type TT;
187 typedef typename tbb::flow::tuple_element<0,TT>::type IT;
188 typedef typename tbb::flow::queue_node<IT> my_sink_node_type;
189 public:
print_parallel_remark()190 static void print_parallel_remark() {
191 REMARK("Parallel test of split_node< %s", name_of<IT>::name());
192 }
print_serial_remark()193 static void print_serial_remark() {
194 REMARK("Serial test of split_node< %s", name_of<IT>::name());
195 }
add_sink_nodes(SType & my_split,tbb::flow::graph & g)196 static void add_sink_nodes(SType &my_split, tbb::flow::graph &g) {
197 my_sink_node_type *new_node = new my_sink_node_type(g);
198 tbb::flow::make_edge( tbb::flow::output_port<0>(my_split) , *new_node);
199 all_sink_nodes[0] = (void *)new_node;
200 }
check_sink_values()201 static void check_sink_values() {
202 my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[0]);
203 for(int i = 0; i < Count; ++i) {
204 IT v;
205 ASSERT(dp->try_get(v), NULL);
206 flags[((int)v) / 2] = true;
207 }
208 for(int i = 0; i < Count; ++i) {
209 ASSERT(flags[i], NULL);
210 flags[i] = false; // reset for next test
211 }
212 }
remove_sink_nodes(SType & my_split)213 static void remove_sink_nodes(SType& my_split) {
214 my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[0]);
215 tbb::flow::remove_edge( tbb::flow::output_port<0>(my_split) , *dp);
216 delete dp;
217 }
218 };
219
220 // parallel_test: create source_nodes that feed tuples into the split node
221 // and queue_nodes that receive the output.
222 template<typename SType>
223 class parallel_test {
224 public:
225 typedef typename SType::input_type TType;
226 typedef tbb::flow::input_node<TType> source_type;
227 static const int N = tbb::flow::tuple_size<TType>::value;
test()228 static void test() {
229 source_type* all_source_nodes[MaxNSources];
230 sink_node_helper<N,SType>::print_parallel_remark();
231 REMARK(" >\n");
232 for(int i=0; i < MaxPorts; ++i) {
233 all_sink_nodes[i] = NULL;
234 }
235 // try test for # sources 1 .. MaxNSources
236 for(int nInputs = 1; nInputs <= MaxNSources; ++nInputs) {
237 tbb::flow::graph g;
238 SType* my_split = makeSplit<N,SType>::create(g);
239
240 // add sinks first so when sources start spitting out values they are there to catch them
241 sink_node_helper<N, SType>::add_sink_nodes((*my_split), g);
242
243 // now create nInputs source_nodes, each spitting out i, i+nInputs, i+2*nInputs ...
244 // each element of the tuple is i*(n+1), where n is the tuple element index (1-N)
245 for(int i = 0; i < nInputs; ++i) {
246 // create source node
247 source_type *s = new source_type(g, source_body<TType>(i, nInputs) );
248 tbb::flow::make_edge(*s, *my_split);
249 all_source_nodes[i] = s;
250 s->activate();
251 }
252
253 g.wait_for_all();
254
255 // check that we got Count values in each output queue, and all the index values
256 // are there.
257 sink_node_helper<N, SType>::check_sink_values();
258
259 sink_node_helper<N, SType>::remove_sink_nodes(*my_split);
260 for(int i = 0; i < nInputs; ++i) {
261 delete all_source_nodes[i];
262 }
263 makeSplit<N,SType>::destroy(my_split);
264 }
265 }
266 };
267
268 //
269 // Single predecessor, single accepting successor at each port
270
271 template<typename SType>
test_one_serial(SType & my_split,tbb::flow::graph & g)272 void test_one_serial( SType &my_split, tbb::flow::graph &g) {
273 typedef typename SType::input_type TType;
274 static const int TUPLE_SIZE = tbb::flow::tuple_size<TType>::value;
275 sink_node_helper<TUPLE_SIZE, SType>::add_sink_nodes(my_split,g);
276 typedef TType q3_input_type;
277 tbb::flow::queue_node< q3_input_type > q3(g);
278
279 tbb::flow::make_edge( q3, my_split );
280
281 // fill the queue with its value one-at-a-time
282 flags.clear();
283 for (int i = 0; i < Count; ++i ) {
284 TType v;
285 tuple_helper<TUPLE_SIZE>::set_element(v, i);
286 ASSERT(my_split.try_put(v), NULL);
287 flags.push_back(false);
288 }
289
290 g.wait_for_all();
291
292 sink_node_helper<TUPLE_SIZE,SType>::check_sink_values();
293
294 sink_node_helper<TUPLE_SIZE, SType>::remove_sink_nodes(my_split);
295
296 }
297
298 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
test_follow_and_precedes_api()299 void test_follow_and_precedes_api() {
300 using namespace tbb::flow;
301 using msg_t = tuple<int, float, double>;
302
303 graph g;
304
305 function_node<msg_t, msg_t> f1(g, unlimited, [](msg_t msg) { return msg; } );
306 auto f2(f1);
307 auto f3(f1);
308
309 tbb::atomic<int> body_calls = 0;
310
311 function_node<int, int> f4(g, unlimited, [&](int val) { ++body_calls; return val; } );
312 function_node<float, float> f5(g, unlimited, [&](float val) { ++body_calls; return val; } );
313 function_node<double, double> f6(g, unlimited, [&](double val) { ++body_calls; return val; } );
314
315 split_node<msg_t> following_node(follows(f1, f2, f3));
316 make_edge(output_port<0>(following_node), f4);
317 make_edge(output_port<1>(following_node), f5);
318 make_edge(output_port<2>(following_node), f6);
319
320 split_node<msg_t> preceding_node(precedes(f4, f5, f6));
321 make_edge(f1, preceding_node);
322 make_edge(f2, preceding_node);
323 make_edge(f3, preceding_node);
324
325 msg_t msg(1, 2.2f, 3.3);
326 f1.try_put(msg);
327 f2.try_put(msg);
328 f3.try_put(msg);
329
330 g.wait_for_all();
331
332 // <number of try puts> * <number of splits by a source node> * <number of source nodes>
333 ASSERT((body_calls == 3*3*2), "Not exact edge quantity was made");
334 }
335 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
336
337 template<typename SType>
338 class serial_test {
339 typedef typename SType::input_type TType;
340 static const int TUPLE_SIZE = tbb::flow::tuple_size<TType>::value;
341 static const int ELEMS = 3;
342 public:
test()343 static void test() {
344 tbb::flow::graph g;
345 flags.reserve(Count);
346 SType* my_split = makeSplit<TUPLE_SIZE,SType>::create(g);
347 sink_node_helper<TUPLE_SIZE, SType>::print_serial_remark(); REMARK(" >\n");
348
349 test_output_ports_return_ref(*my_split);
350
351 test_one_serial<SType>(*my_split, g);
352 // build the vector with copy construction from the used split node.
353 std::vector<SType>split_vector(ELEMS, *my_split);
354 // destroy the tired old split_node in case we're accidentally reusing pieces of it.
355 makeSplit<TUPLE_SIZE,SType>::destroy(my_split);
356
357
358 for(int e = 0; e < ELEMS; ++e) { // exercise each of the vector elements
359 test_one_serial<SType>(split_vector[e], g);
360 }
361 }
362
363 }; // serial_test
364
365 template<
366 template<typename> class TestType, // serial_test or parallel_test
367 typename TupleType > // type of the input of the split
368 struct generate_test {
369 typedef tbb::flow::split_node<TupleType> split_node_type;
do_testgenerate_test370 static void do_test() {
371 TestType<split_node_type>::test();
372 }
373 }; // generate_test
374
375 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
376
test_deduction_guides()377 void test_deduction_guides() {
378 using namespace tbb::flow;
379 using tuple_type = std::tuple<int, int>;
380
381 graph g;
382 split_node<tuple_type> s0(g);
383
384 split_node s1(s0);
385 static_assert(std::is_same_v<decltype(s1), split_node<tuple_type>>);
386
387 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
388 broadcast_node<tuple_type> b1(g), b2(g);
389 broadcast_node<int> b3(g), b4(g);
390
391 split_node s2(follows(b1, b2));
392 static_assert(std::is_same_v<decltype(s2), split_node<tuple_type>>);
393
394 split_node s3(precedes(b3, b4));
395 static_assert(std::is_same_v<decltype(s3), split_node<tuple_type>>);
396 #endif
397 }
398
399 #endif
400
401 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
test_node_allocator()402 void test_node_allocator() {
403 tbb::flow::graph g;
404 tbb::flow::split_node< tbb::flow::tuple<int,int>, std::allocator<int> > tmp(g);
405 }
406 #endif
407
TestMain()408 int TestMain() {
409 #if __TBB_USE_TBB_TUPLE
410 REMARK(" Using TBB tuple\n");
411 #else
412 REMARK(" Using platform tuple\n");
413 #endif
414 for (int p = 0; p < 2; ++p) {
415 generate_test<serial_test, tbb::flow::tuple<float, double> >::do_test();
416 #if MAX_TUPLE_TEST_SIZE >= 4
417 generate_test<serial_test, tbb::flow::tuple<float, double, int, long> >::do_test();
418 #endif
419 #if MAX_TUPLE_TEST_SIZE >= 6
420 generate_test<serial_test, tbb::flow::tuple<double, double, int, long, int, short> >::do_test();
421 #endif
422 #if MAX_TUPLE_TEST_SIZE >= 8
423 generate_test<serial_test, tbb::flow::tuple<float, double, double, double, float, int, float, long> >::do_test();
424 #endif
425 #if MAX_TUPLE_TEST_SIZE >= 10
426 generate_test<serial_test, tbb::flow::tuple<float, double, int, double, double, float, long, int, float, long> >::do_test();
427 #endif
428 generate_test<parallel_test, tbb::flow::tuple<float, double> >::do_test();
429 #if MAX_TUPLE_TEST_SIZE >= 3
430 generate_test<parallel_test, tbb::flow::tuple<float, int, long> >::do_test();
431 #endif
432 #if MAX_TUPLE_TEST_SIZE >= 5
433 generate_test<parallel_test, tbb::flow::tuple<double, double, int, int, short> >::do_test();
434 #endif
435 #if MAX_TUPLE_TEST_SIZE >= 7
436 generate_test<parallel_test, tbb::flow::tuple<float, int, double, float, long, float, long> >::do_test();
437 #endif
438 #if MAX_TUPLE_TEST_SIZE >= 9
439 generate_test<parallel_test, tbb::flow::tuple<float, double, int, double, double, long, int, float, long> >::do_test();
440 #endif
441 }
442 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
443 test_follow_and_precedes_api();
444 #endif
445 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
446 test_deduction_guides();
447 #endif
448 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
449 test_node_allocator();
450 #endif
451 return Harness::Done;
452 }
453