1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/config.h"
18
19 #if _MSC_VER
20 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
21 #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
22 // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
23 #pragma warning (disable: 4702)
24 #endif
25 #endif
26
27 // need these to get proper external names for private methods in library.
28 #include "tbb/spin_mutex.h"
29 #include "tbb/spin_rw_mutex.h"
30 #include "tbb/task_arena.h"
31 #include "tbb/task_group.h"
32
33 #define private public
34 #define protected public
35 #include "tbb/flow_graph.h"
36 #undef protected
37 #undef private
38
39 #include "common/test.h"
40 #include "common/utils.h"
41 #include "common/spin_barrier.h"
42 #include "common/graph_utils.h"
43
44 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
45
46 //! \file test_flow_graph_whitebox.cpp
47 //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
48
49 template<typename T>
50 struct receiverBody {
operator ()receiverBody51 tbb::flow::continue_msg operator()(const T &/*in*/) {
52 return tbb::flow::continue_msg();
53 }
54 };
55
56 // split_nodes cannot have predecessors
57 // they do not reject messages and always forward.
58 // they reject edge reversals from successors.
TestSplitNode()59 void TestSplitNode() {
60 typedef tbb::flow::split_node<std::tuple<int> > snode_type;
61 tbb::flow::graph g;
62 snode_type snode(g);
63 tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
64 INFO("Testing split_node\n");
65 CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors");
66 // tbb::flow::output_port<0>(snode)
67 tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
68 CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor.");
69 snode.try_put(std::tuple<int>(1));
70 g.wait_for_all();
71 g.reset();
72 CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor.");
73 g.reset(tbb::flow::rf_clear_edges);
74 CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor.");
75 }
76
77 // buffering nodes cannot have predecessors
78 // they do not reject messages and always save or forward
79 // they allow edge reversals from successors
80 template< typename B >
TestBufferingNode(const char * name)81 void TestBufferingNode(const char * name) {
82 tbb::flow::graph g;
83 B bnode(g);
84 tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
85 INFO("Testing " << name << ":");
86 for(int icnt = 0; icnt < 2; icnt++) {
87 bool reverse_edge = (icnt & 0x2) != 0;
88 serial_fn_state0 = 0; // reset to waiting state.
89 INFO(" make_edge");
90 tbb::flow::make_edge(bnode, fnode);
91 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge");
92 std::thread t([&] {
93 INFO(" try_put");
94 bnode.try_put(1); // will forward to the fnode
95 g.wait_for_all();
96 });
97 utils::SpinWaitWhileEq(serial_fn_state0, 0);
98 if(reverse_edge) {
99 INFO(" try_put2");
100 bnode.try_put(2); // should reverse the edge
101 // waiting for the edge to reverse
102 utils::SpinWaitWhile([&] { return !bnode.my_successors.empty(); });
103 }
104 else {
105 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message");
106 }
107 serial_fn_state0 = 0; // release the function_node.
108 if(reverse_edge) {
109 // have to do a second release because the function_node will get the 2nd item
110 utils::SpinWaitWhileEq(serial_fn_state0, 0);
111 serial_fn_state0 = 0; // release the function_node.
112 }
113 t.join();
114 INFO(" remove_edge");
115 tbb::flow::remove_edge(bnode, fnode);
116 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
117 }
118 tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
119 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task
120 g.wait_for_all();
121 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
122 INFO(" reverse");
123 bnode.try_put(1); // the edge should reverse
124 g.wait_for_all();
125 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
126 INFO(" reset()");
127 g.wait_for_all();
128 g.reset(); // should be in forward direction again
129 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
130 INFO(" remove_edge");
131 g.reset(tbb::flow::rf_clear_edges);
132 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
133 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // add edge again
134 // reverse edge by adding to buffer.
135 bnode.try_put(1); // the edge should reverse
136 g.wait_for_all();
137 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
138 INFO(" remove_edge(reversed)");
139 g.reset(tbb::flow::rf_clear_edges);
140 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()");
141 CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset");
142 INFO(" done\n");
143 g.wait_for_all();
144 }
145
146 // continue_node has only predecessor count
147 // they do not have predecessors, only the counts
148 // successor edges cannot be reversed
TestContinueNode()149 void TestContinueNode() {
150 tbb::flow::graph g;
151 tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
152 tbb::flow::continue_node<int> cnode(g, /*number_of_predecessors*/ 1,
153 serial_continue_body<int>(serial_continue_state0));
154 tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
155 tbb::flow::make_edge(fnode0, cnode);
156 tbb::flow::make_edge(cnode, fnode1);
157 INFO("Testing continue_node:");
158 for( int icnt = 0; icnt < 2; ++icnt ) {
159 INFO( " initial" << icnt);
160 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count");
161 CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one");
162 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect");
163 serial_continue_state0 = 0;
164 serial_fn_state0 = 0;
165 serial_fn_state1 = 0;
166
167 std::thread t([&] {
168 fnode0.try_put(1); // start the first function node.
169 if(icnt == 0) { // first time through, let the continue_node fire
170 INFO(" firing");
171 fnode0.try_put(1); // second message
172 g.wait_for_all();
173
174 // try a try_get()
175 {
176 int i;
177 CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected");
178 }
179
180 INFO(" reset");
181 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)");
182 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)");
183 g.reset(); // should still be the same
184 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" );
185 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)");
186 }
187 else { // we're going to see if the rf_clear_edges resets things.
188 g.wait_for_all();
189 INFO(" reset(rf_clear_edges)");
190 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)" );
191 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)" );
192 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again
193 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)" );
194 CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)" );
195 CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset" );
196 }
197 });
198
199 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the first message to arrive in function_node
200 // Now the body of function_node 0 is executing.
201 serial_fn_state0 = 0; // release the node
202 if (icnt == 0) {
203 // wait for node to count the message (or for the node body to execute, which would be wrong)
204 utils::SpinWaitWhile([&] {
205 tbb::spin_mutex::scoped_lock l(cnode.my_mutex);
206 return serial_continue_state0 == 0 && cnode.my_current_count == 0;
207 });
208 CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node");
209 CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect");
210
211 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the second message to arrive in function_node
212 // Now the body of function_node 0 is executing.
213 serial_fn_state0 = 0; // release the node
214
215 utils::SpinWaitWhileEq(serial_continue_state0, 0); // waiting for continue_node to start
216 CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started");
217 serial_continue_state0 = 0; // release the continue_node
218
219 utils::SpinWaitWhileEq(serial_fn_state1, 0); // wait for the successor function_node to enter body
220 serial_fn_state1 = 0; // release successor function_node.
221 }
222
223 t.join();
224 }
225
226 INFO(" done\n");
227
228 }
229
230 // function_node has predecessors and successors
231 // try_get() rejects
232 // successor edges cannot be reversed
233 // predecessors will reverse (only rejecting will reverse)
TestFunctionNode()234 void TestFunctionNode() {
235 tbb::flow::graph g;
236 tbb::flow::queue_node<int> qnode0(g);
237 tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
238 tbb::flow::function_node<int,int/*, tbb::flow::queueing*/> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
239
240 tbb::flow::queue_node<int> qnode1(g);
241
242 tbb::flow::make_edge(fnode0, qnode1);
243 tbb::flow::make_edge(qnode0, fnode0);
244
245 serial_fn_state0 = 2; // just let it go
246 qnode0.try_put(1);
247 g.wait_for_all();
248 int ii;
249 CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
250 tbb::flow::remove_edge(qnode0, fnode0);
251 tbb::flow::remove_edge(fnode0, qnode1);
252
253 tbb::flow::make_edge(fnode1, qnode1);
254 tbb::flow::make_edge(qnode0, fnode1);
255
256 serial_fn_state0 = 2; // just let it go
257 qnode0.try_put(1);
258 g.wait_for_all();
259 CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
260 tbb::flow::remove_edge(qnode0, fnode1);
261 tbb::flow::remove_edge(fnode1, qnode1);
262
263 // rejecting
264 serial_fn_state0 = 0;
265 std::atomic<bool> rejected{ false };
266 std::thread t([&] {
267 g.reset(); // attach to the current arena
268 tbb::flow::make_edge(fnode0, qnode1);
269 tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
270 INFO("Testing rejecting function_node:");
271 CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue");
272 CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added");
273 qnode0.try_put(1);
274 qnode0.try_put(2); // rejecting node should reject, reverse.
275 rejected = true;
276 g.wait_for_all();
277 });
278 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
279 utils::SpinWaitWhileEq(rejected, false);
280 // TODO: the assest below is not stable due to the logical race between try_put(1)
281 // try_put(2) and wait_for_all.
282 // Additionally, empty() cannot be called concurrently due to null_mutex used in implementation
283 // CHECK(fnode0.my_predecessors.empty() == false);
284 serial_fn_state0 = 2; // release function_node body.
285 t.join();
286 INFO(" reset");
287 g.reset(); // should reverse the edge from the input to the function node.
288 CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
289 CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed");
290 tbb::flow::remove_edge(qnode0, fnode0);
291 tbb::flow::remove_edge(fnode0, qnode1);
292 INFO("\n");
293
294 // queueing
295 tbb::flow::make_edge(fnode1, qnode1);
296 INFO("Testing queueing function_node:");
297 CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue");
298 CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added");
299 INFO(" add_pred");
300 CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor");
301 CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor");
302 INFO(" reset");
303 g.wait_for_all();
304 g.reset(); // should reverse the edge from the input to the function node.
305 CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
306 CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed");
307 tbb::flow::remove_edge(qnode0, fnode1);
308 tbb::flow::remove_edge(fnode1, qnode1);
309 INFO("\n");
310
311 serial_fn_state0 = 0; // make the function_node wait
312 rejected = false;
313 std::thread t2([&] {
314 g.reset(); // attach to the current arena
315
316 tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
317
318 INFO(" start_func");
319 qnode0.try_put(1);
320 // now if we put an item to the queues the edges to the function_node will reverse.
321 INFO(" put_node(2)");
322 qnode0.try_put(2); // start queue node.
323 rejected = true;
324 g.wait_for_all();
325 });
326 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
327 // wait for the edges to reverse
328 utils::SpinWaitWhileEq(rejected, false);
329 // TODO: the assest below is not stable due to the logical race between try_put(1)
330 // try_put(2) and wait_for_all.
331 // Additionally, empty() cannot be called concurrently due to null_mutex used in implementation
332 // CHECK(fnode0.my_predecessors.empty() == false);
333 g.my_context->cancel_group_execution();
334 // release the function_node
335 serial_fn_state0 = 2;
336 t2.join();
337 g.reset(tbb::flow::rf_clear_edges);
338 CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed");
339 CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed");
340 INFO(" done\n");
341 }
342
343 template<typename TT>
344 class tag_func {
345 TT my_mult;
346 public:
tag_func(TT multiplier)347 tag_func(TT multiplier) : my_mult(multiplier) { }
348 // operator() will return [0 .. Count)
operator ()(TT v)349 tbb::flow::tag_value operator()( TT v) {
350 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
351 return t;
352 }
353 };
354
355 template<typename JNODE_TYPE>
356 void
TestSimpleSuccessorArc(const char * name)357 TestSimpleSuccessorArc(const char *name) {
358 tbb::flow::graph g;
359 {
360 INFO("Join<" << name << "> successor test ");
361 tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g);
362 tbb::flow::broadcast_node<std::tuple<int> > bnode(g);
363 tbb::flow::make_edge(qj, bnode);
364 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
365 g.reset();
366 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
367 g.reset(tbb::flow::rf_clear_edges);
368 CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
369 }
370 }
371
372 template<>
373 void
TestSimpleSuccessorArc(const char * name)374 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
375 tbb::flow::graph g;
376 {
377 INFO("Join<" << name << "> successor test ");
378 typedef std::tuple<int,int> my_tuple;
379 tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
380 tag_func<int>(1),
381 tag_func<int>(1)
382 );
383 tbb::flow::broadcast_node<my_tuple > bnode(g);
384 tbb::flow::make_edge(qj, bnode);
385 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
386 g.reset();
387 CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
388 g.reset(tbb::flow::rf_clear_edges);
389 CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
390 }
391 }
392
393 void
TestJoinNode()394 TestJoinNode() {
395 tbb::flow::graph g;
396
397 TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
398 TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
399 TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
400
401 // queueing and tagging join nodes have input queues, so the input ports do not reverse.
402 INFO(" reserving preds");
403 {
404 tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g);
405 tbb::flow::queue_node<int> q0(g);
406 tbb::flow::queue_node<int> q1(g);
407 tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
408 tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
409 q0.try_put(1);
410 g.wait_for_all(); // quiesce
411 CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor");
412 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred");
413 g.reset();
414 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
415 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
416 q1.try_put(2);
417 g.wait_for_all(); // quiesce
418 CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
419 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
420 g.reset();
421 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
422 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
423 // should reset predecessors just as regular reset.
424 q1.try_put(3);
425 g.wait_for_all(); // quiesce
426 CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
427 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
428 g.reset(tbb::flow::rf_clear_edges);
429 CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
430 CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
431 CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
432 CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
433 }
434 INFO(" done\n");
435 }
436
437 template <typename DecrementerType>
438 struct limiter_node_type {
439 using type = tbb::flow::limiter_node<int, DecrementerType>;
440 using dtype = DecrementerType;
441 };
442
443 template <>
444 struct limiter_node_type<void> {
445 using type = tbb::flow::limiter_node<int>;
446 using dtype = tbb::flow::continue_msg;
447 };
448
449 template <typename DType>
450 struct DecrementerHelper {
451 template <typename Decrementer>
checkDecrementerHelper452 static void check(Decrementer&) {}
makeDTypeDecrementerHelper453 static DType makeDType() {
454 return DType(1);
455 }
456 };
457
458 template <>
459 struct DecrementerHelper<tbb::flow::continue_msg> {
460 template <typename Decrementer>
checkDecrementerHelper461 static void check(Decrementer& decrementer) {
462 auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer);
463 CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count");
464 CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count");
465 CHECK_MESSAGE(d.my_current_count == 0, "error in current count");
466 }
makeDTypeDecrementerHelper467 static tbb::flow::continue_msg makeDType() {
468 return tbb::flow::continue_msg();
469 }
470 };
471
472 template <typename DecrementerType>
TestLimiterNode()473 void TestLimiterNode() {
474 int out_int{};
475 tbb::flow::graph g;
476 using dtype = typename limiter_node_type<DecrementerType>::dtype;
477 typename limiter_node_type<DecrementerType>::type ln(g,1);
478 INFO("Testing limiter_node: preds and succs");
479 DecrementerHelper<dtype>::check(ln.decrementer());
480 CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
481 tbb::flow::queue_node<int> inq(g);
482 tbb::flow::queue_node<int> outq(g);
483 tbb::flow::broadcast_node<dtype> bn(g);
484
485 tbb::flow::make_edge(inq,ln);
486 tbb::flow::make_edge(ln,outq);
487 tbb::flow::make_edge(bn,ln.decrementer());
488
489 g.wait_for_all();
490 CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge");
491 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
492 inq.try_put(1);
493 g.wait_for_all();
494 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value");
495 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
496 inq.try_put(2);
497 g.wait_for_all();
498 CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input");
499 CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed");
500 bn.try_put(DecrementerHelper<dtype>::makeDType());
501 g.wait_for_all();
502 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value");
503 g.wait_for_all();
504 CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())");
505 g.reset();
506 CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset");
507 inq.try_put(3);
508 g.wait_for_all();
509 CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value");
510
511 INFO(" rf_clear_edges");
512 // currently the limiter_node will not pass another message
513 g.reset(tbb::flow::rf_clear_edges);
514 DecrementerHelper<dtype>::check(ln.decrementer());
515 CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
516 CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)");
517 CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)");
518 CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
519 CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
520 CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)");
521 tbb::flow::make_edge(inq,ln);
522 tbb::flow::make_edge(ln,outq);
523 inq.try_put(4);
524 inq.try_put(5);
525 g.wait_for_all();
526 CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)");
527 CHECK_MESSAGE( (out_int == 4), "input incorrect (4)");
528 bn.try_put(DecrementerHelper<dtype>::makeDType());
529 g.wait_for_all();
530 CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)");
531 INFO(" done\n");
532 }
533
534 template<typename MF_TYPE>
535 struct mf_body {
536 std::atomic<int>& my_flag;
mf_bodymf_body537 mf_body(std::atomic<int>& flag) : my_flag(flag) { }
operator ()mf_body538 void operator()(const int& in, typename MF_TYPE::output_ports_type& outports) {
539 if(my_flag == 0) {
540 my_flag = 1;
541
542 utils::SpinWaitWhileEq(my_flag, 1);
543 }
544
545 if (in & 0x1)
546 std::get<1>(outports).try_put(in);
547 else
548 std::get<0>(outports).try_put(in);
549 }
550 };
551
552 template<typename P, typename T>
553 struct test_reversal;
554 template<typename T>
555 struct test_reversal<tbb::flow::queueing, T> {
test_reversaltest_reversal556 test_reversal() { INFO("<queueing>"); }
557 // queueing node will not reverse.
operator ()test_reversal558 bool operator()(T& node) const { return node.my_predecessors.empty(); }
559 };
560
561 template<typename T>
562 struct test_reversal<tbb::flow::rejecting, T> {
test_reversaltest_reversal563 test_reversal() { INFO("<rejecting>"); }
operator ()test_reversal564 bool operator()(T& node) const { return !node.my_predecessors.empty(); }
565 };
566
567 template<typename P>
TestMultifunctionNode()568 void TestMultifunctionNode() {
569 typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type;
570 INFO("Testing multifunction_node");
571 test_reversal<P,multinode_type> my_test;
572 INFO(":");
573 tbb::flow::graph g;
574 multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
575 tbb::flow::queue_node<int> qin(g);
576 tbb::flow::queue_node<int> qodd_out(g);
577 tbb::flow::queue_node<int> qeven_out(g);
578 tbb::flow::make_edge(qin,mf);
579 tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
580 tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
581 g.wait_for_all();
582 for (int ii = 0; ii < 2 ; ++ii) {
583 std::atomic<bool> submitted{ false };
584 serial_fn_state0 = 0;
585 /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/
586 std::thread t([&] {
587 g.reset(); // attach to the current arena
588 qin.try_put(0);
589 qin.try_put(1);
590 submitted = true;
591 g.wait_for_all();
592 });
593 // wait for node to be active
594 utils::SpinWaitWhileEq(serial_fn_state0, 0);
595 utils::SpinWaitWhileEq(submitted, false);
596 g.my_context->cancel_group_execution();
597 // release node
598 serial_fn_state0 = 2;
599 t.join();
600 // The rejection test cannot guarantee the state of predecessors cache.
601 if (!std::is_same<P, tbb::flow::rejecting>::value) {
602 CHECK_MESSAGE((my_test(mf)), "fail cancel group test");
603 }
604 if( ii == 1) {
605 INFO(" rf_clear_edges");
606 g.reset(tbb::flow::rf_clear_edges);
607 CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)");
608 CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)");
609 }
610 else
611 {
612 g.reset();
613 }
614 CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset");
615 CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset");
616 }
617 INFO(" done\n");
618 }
619
620 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
621 // never allows a successor to reverse its edge, so we only need test the successors.
622 void
TestIndexerNode()623 TestIndexerNode() {
624 tbb::flow::graph g;
625 typedef tbb::flow::indexer_node< int, int > indexernode_type;
626 indexernode_type inode(g);
627 INFO("Testing indexer_node:");
628 tbb::flow::queue_node<indexernode_type::output_type> qout(g);
629 tbb::flow::make_edge(inode,qout);
630 g.wait_for_all();
631 CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing");
632 g.reset();
633 CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset");
634 g.reset(tbb::flow::rf_clear_edges);
635 CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)");
636 INFO(" done\n");
637 }
638
639 template<typename Node>
640 void
TestScalarNode(const char * name)641 TestScalarNode(const char *name) {
642 tbb::flow::graph g;
643 Node on(g);
644 tbb::flow::queue_node<int> qout(g);
645 INFO("Testing " << name << ":");
646 tbb::flow::make_edge(on,qout);
647 g.wait_for_all();
648 CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added");
649 g.reset();
650 CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed");
651 g.reset(tbb::flow::rf_clear_edges);
652 CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
653 INFO(" done\n");
654 }
655
656 struct seq_body {
operator ()seq_body657 size_t operator()(const int &in) {
658 return size_t(in / 3);
659 }
660 };
661
662 // sequencer_node behaves like a queueing node, but requires a different constructor.
TestSequencerNode()663 void TestSequencerNode() {
664 tbb::flow::graph g;
665 tbb::flow::sequencer_node<int> bnode(g, seq_body());
666 INFO("Testing sequencer_node:");
667 tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
668 INFO("Testing sequencer_node:");
669 serial_fn_state0 = 0; // reset to waiting state.
670 INFO(" make_edge");
671 tbb::flow::make_edge(bnode, fnode);
672 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge" );
673 INFO(" try_put");
674 std::thread t([&]{
675 bnode.try_put(0); // will forward to the fnode
676 g.wait_for_all();
677 });
678 // wait for the function_node to fire up
679 utils::SpinWaitWhileEq(serial_fn_state0, 0);
680 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message" );
681 serial_fn_state0 = 0; // release the function node
682 t.join();
683
684 INFO(" remove_edge");
685 tbb::flow::remove_edge(bnode, fnode);
686 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
687 tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
688 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task
689 g.wait_for_all();
690 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
691 INFO(" reverse");
692 bnode.try_put(3); // the edge should reverse
693 g.wait_for_all();
694 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
695 INFO(" reset()");
696 g.wait_for_all();
697 g.reset(); // should be in forward direction again
698 CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
699 INFO(" remove_edge");
700 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again
701 CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
702 CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)");
703 INFO(" done\n");
704 g.wait_for_all();
705 }
706
707 struct snode_body {
708 int max_cnt;
709 int my_cnt;
snode_bodysnode_body710 snode_body(const int& in) : max_cnt(in) { my_cnt = 0; }
operator ()snode_body711 int operator()(tbb::flow_control& fc) {
712 if (max_cnt <= my_cnt++) {
713 fc.stop();
714 return int();
715 }
716 return my_cnt;
717 }
718 };
719
TestInputNode()720 void TestInputNode() {
721 tbb::flow::graph g;
722 tbb::flow::input_node<int> in(g, snode_body(4));
723 INFO("Testing input_node:");
724 tbb::flow::queue_node<int> qin(g);
725 tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g);
726 tbb::flow::queue_node<std::tuple<int,int> > qout(g);
727
728 INFO(" make_edges");
729 tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
730 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
731 tbb::flow::make_edge(jn,qout);
732 CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge");
733 g.wait_for_all();
734 g.reset();
735 CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset");
736 g.wait_for_all();
737 g.reset(tbb::flow::rf_clear_edges);
738 CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)");
739 tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
740 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
741 tbb::flow::make_edge(jn,qout);
742 g.wait_for_all();
743 INFO(" activate");
744 in.activate(); // will forward to the fnode
745 INFO(" wait1");
746 g.wait_for_all();
747 CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message");
748 g.reset();
749 CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset");
750 CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset.");
751 INFO(" done\n");
752 }
753
754 //! Test buffering nodes
755 //! \brief \ref error_guessing
756 TEST_CASE("Test buffering nodes"){
757 unsigned int MinThread = utils::MinThread;
758 if(MinThread < 3) MinThread = 3;
759 tbb::task_arena arena(MinThread);
760 arena.execute(
__anon7a6130c50902() 761 [&]() {
762 // tests presume at least three threads
763 TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
764 TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
765 TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
766 }
767 );
768 }
769
770 //! Test sequencer_node
771 //! \brief \ref error_guessing
772 TEST_CASE("Test sequencer node"){
773 TestSequencerNode();
774 }
775
776 TEST_SUITE("Test multifunction node") {
777 //! Test multifunction_node with rejecting policy
778 //! \brief \ref error_guessing
779 TEST_CASE("with rejecting policy"){
780 TestMultifunctionNode<tbb::flow::rejecting>();
781 }
782
783 //! Test multifunction_node with queueing policy
784 //! \brief \ref error_guessing
785 TEST_CASE("with queueing policy") {
786 TestMultifunctionNode<tbb::flow::queueing>();
787 }
788 }
789
790 //! Test input_node
791 //! \brief \ref error_guessing
792 TEST_CASE("Test input node"){
793 TestInputNode();
794 }
795
796 //! Test continue_node
797 //! \brief \ref error_guessing
798 TEST_CASE("Test continue node"){
799 TestContinueNode();
800 }
801
802 //! Test function_node
803 //! \brief \ref error_guessing
may_fail()804 TEST_CASE("Test function node" * doctest::may_fail()){
805 TestFunctionNode();
806 }
807
808 //! Test join_node
809 //! \brief \ref error_guessing
810 TEST_CASE("Test join node"){
811 TestJoinNode();
812 }
813
814 //! Test limiter_node
815 //! \brief \ref error_guessing
816 TEST_CASE("Test limiter node"){
817 TestLimiterNode<void>();
818 TestLimiterNode<int>();
819 TestLimiterNode<tbb::flow::continue_msg>();
820 }
821
822 //! Test indexer_node
823 //! \brief \ref error_guessing
824 TEST_CASE("Test indexer node"){
825 TestIndexerNode();
826 }
827
828 //! Test split_node
829 //! \brief \ref error_guessing
830 TEST_CASE("Test split node"){
831 TestSplitNode();
832 }
833
834 //! Test broadcast, overwrite, write_once nodes
835 //! \brief \ref error_guessing
836 TEST_CASE("Test scalar node"){
837 TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
838 TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
839 TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
840 }
841
842 //! try_get in inactive graph
843 //! \brief \ref error_guessing
844 TEST_CASE("try_get in inactive graph"){
845 tbb::flow::graph g;
846
__anon7a6130c50a02(tbb::flow_control& fc) 847 tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;});
848 deactivate_graph(g);
849
850 int tmp = -1;
851 CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
852
853 src.activate();
854 tmp = -1;
855 CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
856 }
857
858 //! Test make_edge in inactive graph
859 //! \brief \ref error_guessing
860 TEST_CASE("Test make_edge in inactive graph"){
861 tbb::flow::graph g;
862
__anon7a6130c50b02(const tbb::flow::continue_msg&)863 tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; });
864
865 tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
866
867 c.try_put(tbb::flow::continue_msg());
868 g.wait_for_all();
869
870 deactivate_graph(g);
871
872 make_edge(c, f);
873 }
874
875 //! Test make_edge from overwrite_node in inactive graph
876 //! \brief \ref error_guessing
877 TEST_CASE("Test make_edge from overwrite_node in inactive graph"){
878 tbb::flow::graph g;
879
880 tbb::flow::queue_node<int> q(g);
881
882 tbb::flow::overwrite_node<int> on(g);
883
884 on.try_put(1);
885 g.wait_for_all();
886
887 deactivate_graph(g);
888
889 make_edge(on, q);
890
891 int tmp = -1;
892 CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on");
893 }
894
895 //! Test iterators directly
896 //! \brief \ref error_guessing
897 TEST_CASE("graph_iterator details"){
898 tbb::flow::graph g;
899 const tbb::flow::graph cg;
900
901 tbb::flow::graph::iterator b = g.begin();
902 tbb::flow::graph::iterator b2 = g.begin();
903 ++b2;
904 // Cast to a volatile pointer to workaround self assignment warnings from some compilers.
905 tbb::flow::graph::iterator* volatile b2_ptr = &b2;
906 b2 = *b2_ptr;
907 b = b2;
908 CHECK_MESSAGE((b == b2), "Assignment should make iterators equal");
909 }
910
911 //! const graph
912 //! \brief \ref error_guessing
913 TEST_CASE("const graph"){
914 using namespace tbb::flow;
915
916 const graph g;
917 CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty");
918 CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty");
919
920 graph g2;
921 CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty");
922 }
923
924 //! Send message to continue_node while graph is inactive
925 //! \brief \ref error_guessing
926 TEST_CASE("Send message to continue_node while graph is inactive") {
927 using namespace tbb::flow;
928
929 graph g;
930
__anon7a6130c50c02(const continue_msg&)931 continue_node<int> c(g, [](const continue_msg&){ return 1; });
932 buffer_node<int> b(g);
933
934 make_edge(c, b);
935
936 deactivate_graph(g);
937
938 c.try_put(continue_msg());
939 g.wait_for_all();
940
941 int tmp = -1;
942 CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive");
943 CHECK_MESSAGE((tmp == -1), "Value should not be altered");
944 }
945
946
947 //! Bypass of a successor's message in a node with lightweight policy
948 //! \brief \ref error_guessing
949 TEST_CASE("Bypass of a successor's message in a node with lightweight policy") {
950 using namespace tbb::flow;
951
952 graph g;
953
__anon7a6130c50d02(const int&v)954 auto body = [](const int&v)->int { return v * 2; };
955 function_node<int, int, lightweight> f1(g, unlimited, body);
956
__anon7a6130c50e02(const int&v)957 auto body2 = [](const int&v)->int {return v / 2;};
958 function_node<int, int> f2(g, unlimited, body2);
959
960 buffer_node<int> b(g);
961
962 make_edge(f1, f2);
963 make_edge(f2, b);
964
965 f1.try_put(1);
966 g.wait_for_all();
967
968 int tmp = -1;
969 CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession");
970 CHECK_MESSAGE((tmp == 1), "Value should not be altered");
971 }
972