1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #if _MSC_VER
20     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
21     #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
22         // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
23         #pragma warning (disable: 4702)
24     #endif
25 #endif
26 
27 // need these to get proper external names for private methods in library.
28 #include "tbb/spin_mutex.h"
29 #include "tbb/spin_rw_mutex.h"
30 #include "tbb/task_arena.h"
31 #include "tbb/task_group.h"
32 
33 #define private public
34 #define protected public
35 #include "tbb/flow_graph.h"
36 #undef protected
37 #undef private
38 
39 #include "common/test.h"
40 #include "common/utils.h"
41 #include "common/spin_barrier.h"
42 #include "common/graph_utils.h"
43 
44 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
45 
46 //! \file test_flow_graph_whitebox.cpp
47 //! \brief Test for [flow_graph.broadcast_node flow_graph.priority_queue_node flow_graph.indexer_node flow_graph.sequencer_node flow_graph.remove_edge flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
48 
49 template<typename T>
50 struct receiverBody {
operator ()receiverBody51     tbb::flow::continue_msg operator()(const T &/*in*/) {
52         return tbb::flow::continue_msg();
53     }
54 };
55 
56 // split_nodes cannot have predecessors
57 // they do not reject messages and always forward.
58 // they reject edge reversals from successors.
TestSplitNode()59 void TestSplitNode() {
60     typedef tbb::flow::split_node<std::tuple<int> > snode_type;
61     tbb::flow::graph g;
62     snode_type snode(g);
63     tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
64     INFO("Testing split_node\n");
65     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "Constructed split_node has successors");
66     // tbb::flow::output_port<0>(snode)
67     tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
68     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after make_edge, split_node has no successor.");
69     snode.try_put(std::tuple<int>(1));
70     g.wait_for_all();
71     g.reset();
72     CHECK_MESSAGE( (!(tbb::flow::output_port<0>(snode).my_successors.empty())), "after reset(), split_node has no successor.");
73     g.reset(tbb::flow::rf_clear_edges);
74     CHECK_MESSAGE( (tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(rf_clear_edges), split_node has a successor.");
75 }
76 
77 // buffering nodes cannot have predecessors
78 // they do not reject messages and always save or forward
79 // they allow edge reversals from successors
80 template< typename B >
TestBufferingNode(const char * name)81 void TestBufferingNode(const char * name) {
82     tbb::flow::graph g;
83     B bnode(g);
84     tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
85     INFO("Testing " << name << ":");
86     for(int icnt = 0; icnt < 2; icnt++) {
87         bool reverse_edge = (icnt & 0x2) != 0;
88         serial_fn_state0 = 0;  // reset to waiting state.
89         INFO(" make_edge");
90         tbb::flow::make_edge(bnode, fnode);
91         CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge");
92         std::thread t([&] {
93             INFO(" try_put");
94             bnode.try_put(1);  // will forward to the fnode
95             g.wait_for_all();
96         });
97         utils::SpinWaitWhileEq(serial_fn_state0, 0);
98         if(reverse_edge) {
99             INFO(" try_put2");
100             bnode.try_put(2);  // should reverse the edge
101             // waiting for the edge to reverse
102             utils::SpinWaitWhile([&] { return !bnode.my_successors.empty(); });
103         }
104         else {
105             CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message");
106         }
107         serial_fn_state0 = 0;  // release the function_node.
108         if(reverse_edge) {
109             // have to do a second release because the function_node will get the 2nd item
110             utils::SpinWaitWhileEq(serial_fn_state0, 0);
111             serial_fn_state0 = 0;  // release the function_node.
112         }
113         t.join();
114         INFO(" remove_edge");
115         tbb::flow::remove_edge(bnode, fnode);
116         CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
117     }
118     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
119     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
120     g.wait_for_all();
121     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
122     INFO(" reverse");
123     bnode.try_put(1);  // the edge should reverse
124     g.wait_for_all();
125     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
126     INFO(" reset()");
127     g.wait_for_all();
128     g.reset();  // should be in forward direction again
129     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
130     INFO(" remove_edge");
131     g.reset(tbb::flow::rf_clear_edges);
132     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
133     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // add edge again
134     // reverse edge by adding to buffer.
135     bnode.try_put(1);  // the edge should reverse
136     g.wait_for_all();
137     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
138     INFO(" remove_edge(reversed)");
139     g.reset(tbb::flow::rf_clear_edges);
140     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has no successor after reset()");
141     CHECK_MESSAGE( (tbb::flow::input_port<0>(jnode).my_predecessors.empty()), "predecessor not reset");
142     INFO("  done\n");
143     g.wait_for_all();
144 }
145 
146 // continue_node has only predecessor count
147 // they do not have predecessors, only the counts
148 // successor edges cannot be reversed
TestContinueNode()149 void TestContinueNode() {
150     tbb::flow::graph g;
151     tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
152     tbb::flow::continue_node<int> cnode(g, /*number_of_predecessors*/ 1,
153                                         serial_continue_body<int>(serial_continue_state0));
154     tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
155     tbb::flow::make_edge(fnode0, cnode);
156     tbb::flow::make_edge(cnode, fnode1);
157     INFO("Testing continue_node:");
158     for( int icnt = 0; icnt < 2; ++icnt ) {
159         INFO( " initial" << icnt);
160         CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor addition didn't increment count");
161         CHECK_MESSAGE( (!cnode.successors().empty()), "successors empty though we added one");
162         CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect");
163         serial_continue_state0 = 0;
164         serial_fn_state0 = 0;
165         serial_fn_state1 = 0;
166 
167         std::thread t([&] {
168             fnode0.try_put(1);  // start the first function node.
169             if(icnt == 0) {  // first time through, let the continue_node fire
170                 INFO(" firing");
171                 fnode0.try_put(1);  // second message
172                 g.wait_for_all();
173 
174                 // try a try_get()
175                 {
176                     int i;
177                     CHECK_MESSAGE( (!cnode.try_get(i)), "try_get not rejected");
178                 }
179 
180                 INFO(" reset");
181                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)");
182                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)");
183                 g.reset();  // should still be the same
184                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (after reset)" );
185                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (after reset)");
186             }
187             else {  // we're going to see if the rf_clear_edges resets things.
188                 g.wait_for_all();
189                 INFO(" reset(rf_clear_edges)");
190                 CHECK_MESSAGE( (!cnode.my_successors.empty()), "Empty successors in built graph (before reset)" );
191                 CHECK_MESSAGE( (cnode.my_predecessor_count == 2), "predecessor_count reset (before reset)" );
192                 g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
193                 CHECK_MESSAGE( (cnode.my_current_count == 0), "state of continue_receiver incorrect after reset(rf_clear_edges)" );
194                 CHECK_MESSAGE( (cnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)" );
195                 CHECK_MESSAGE( (cnode.my_predecessor_count == cnode.my_initial_predecessor_count), "predecessor count not reset" );
196             }
197         });
198 
199         utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the first message to arrive in function_node
200         // Now the body of function_node 0 is executing.
201         serial_fn_state0 = 0;  // release the node
202         if (icnt == 0) {
203             // wait for node to count the message (or for the node body to execute, which would be wrong)
204             utils::SpinWaitWhile([&] {
205                 tbb::spin_mutex::scoped_lock l(cnode.my_mutex);
206                 return serial_continue_state0 == 0 && cnode.my_current_count == 0;
207             });
208             CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node");
209             CHECK_MESSAGE( (cnode.my_current_count == 1), "state of continue_receiver incorrect");
210 
211             utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for the second message to arrive in function_node
212             // Now the body of function_node 0 is executing.
213             serial_fn_state0 = 0;  // release the node
214 
215             utils::SpinWaitWhileEq(serial_continue_state0, 0); // waiting for continue_node to start
216             CHECK_MESSAGE( (cnode.my_current_count == 0), " my_current_count not reset before body of continue_node started");
217             serial_continue_state0 = 0;  // release the continue_node
218 
219             utils::SpinWaitWhileEq(serial_fn_state1, 0); // wait for the successor function_node to enter body
220             serial_fn_state1 = 0;  // release successor function_node.
221         }
222 
223         t.join();
224     }
225 
226     INFO(" done\n");
227 
228 }
229 
230 // function_node has predecessors and successors
231 // try_get() rejects
232 // successor edges cannot be reversed
233 // predecessors will reverse (only rejecting will reverse)
TestFunctionNode()234 void TestFunctionNode() {
235     tbb::flow::graph g;
236     tbb::flow::queue_node<int> qnode0(g);
237     tbb::flow::function_node<int,int,   tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
238     tbb::flow::function_node<int,int/*, tbb::flow::queueing*/> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
239 
240     tbb::flow::queue_node<int> qnode1(g);
241 
242     tbb::flow::make_edge(fnode0, qnode1);
243     tbb::flow::make_edge(qnode0, fnode0);
244 
245     serial_fn_state0 = 2;  // just let it go
246     qnode0.try_put(1);
247     g.wait_for_all();
248     int ii;
249     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
250     tbb::flow::remove_edge(qnode0, fnode0);
251     tbb::flow::remove_edge(fnode0, qnode1);
252 
253     tbb::flow::make_edge(fnode1, qnode1);
254     tbb::flow::make_edge(qnode0, fnode1);
255 
256     serial_fn_state0 = 2;  // just let it go
257     qnode0.try_put(1);
258     g.wait_for_all();
259     CHECK_MESSAGE( (qnode1.try_get(ii) && ii == 1), "output not passed" );
260     tbb::flow::remove_edge(qnode0, fnode1);
261     tbb::flow::remove_edge(fnode1, qnode1);
262 
263     // rejecting
264     serial_fn_state0 = 0;
265     std::atomic<bool> rejected{ false };
266     std::thread t([&] {
267         g.reset(); // attach to the current arena
268         tbb::flow::make_edge(fnode0, qnode1);
269         tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
270         INFO("Testing rejecting function_node:");
271         CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue");
272         CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added");
273         qnode0.try_put(1);
274         qnode0.try_put(2);   // rejecting node should reject, reverse.
275         rejected = true;
276         g.wait_for_all();
277     });
278     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
279     utils::SpinWaitWhileEq(rejected, false);
280     // TODO: the assest below is not stable due to the logical race between try_put(1)
281     // try_put(2) and wait_for_all.
282     // Additionally, empty() cannot be called concurrently due to null_mutex used in implementation
283     // CHECK(fnode0.my_predecessors.empty() == false);
284     serial_fn_state0 = 2;   // release function_node body.
285     t.join();
286     INFO(" reset");
287     g.reset();  // should reverse the edge from the input to the function node.
288     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
289     CHECK_MESSAGE( (fnode0.my_predecessors.empty()), "predecessor not reversed");
290     tbb::flow::remove_edge(qnode0, fnode0);
291     tbb::flow::remove_edge(fnode0, qnode1);
292     INFO("\n");
293 
294     // queueing
295     tbb::flow::make_edge(fnode1, qnode1);
296     INFO("Testing queueing function_node:");
297     CHECK_MESSAGE( (fnode1.my_queue), "node should have no queue");
298     CHECK_MESSAGE( (!fnode1.my_successors.empty()), "successor edge not added");
299     INFO(" add_pred");
300     CHECK_MESSAGE( (fnode1.register_predecessor(qnode0)), "Cannot register as predecessor");
301     CHECK_MESSAGE( (!fnode1.my_predecessors.empty()), "Missing predecessor");
302     INFO(" reset");
303     g.wait_for_all();
304     g.reset();  // should reverse the edge from the input to the function node.
305     CHECK_MESSAGE( (!qnode0.my_successors.empty()), "empty successors after reset()");
306     CHECK_MESSAGE( (fnode1.my_predecessors.empty()), "predecessor not reversed");
307     tbb::flow::remove_edge(qnode0, fnode1);
308     tbb::flow::remove_edge(fnode1, qnode1);
309     INFO("\n");
310 
311     serial_fn_state0 = 0;  // make the function_node wait
312     rejected = false;
313     std::thread t2([&] {
314         g.reset(); // attach to the current arena
315 
316         tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
317 
318         INFO(" start_func");
319         qnode0.try_put(1);
320         // now if we put an item to the queues the edges to the function_node will reverse.
321         INFO(" put_node(2)");
322         qnode0.try_put(2);   // start queue node.
323         rejected = true;
324         g.wait_for_all();
325     });
326     utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
327     // wait for the edges to reverse
328     utils::SpinWaitWhileEq(rejected, false);
329     // TODO: the assest below is not stable due to the logical race between try_put(1)
330     // try_put(2) and wait_for_all.
331     // Additionally, empty() cannot be called concurrently due to null_mutex used in implementation
332     // CHECK(fnode0.my_predecessors.empty() == false);
333     g.my_context->cancel_group_execution();
334     // release the function_node
335     serial_fn_state0 = 2;
336     t2.join();
337     g.reset(tbb::flow::rf_clear_edges);
338     CHECK_MESSAGE( (fnode0.my_predecessors.empty() && qnode0.my_successors.empty()), "function_node edge not removed");
339     CHECK_MESSAGE( (fnode0.my_successors.empty()), "successor to fnode not removed");
340     INFO(" done\n");
341 }
342 
343 template<typename TT>
344 class tag_func {
345     TT my_mult;
346 public:
tag_func(TT multiplier)347     tag_func(TT multiplier) : my_mult(multiplier) { }
348     // operator() will return [0 .. Count)
operator ()(TT v)349     tbb::flow::tag_value operator()( TT v) {
350         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
351         return t;
352     }
353 };
354 
355 template<typename JNODE_TYPE>
356 void
TestSimpleSuccessorArc(const char * name)357 TestSimpleSuccessorArc(const char *name) {
358     tbb::flow::graph g;
359     {
360         INFO("Join<" << name << "> successor test ");
361         tbb::flow::join_node<std::tuple<int>, JNODE_TYPE> qj(g);
362         tbb::flow::broadcast_node<std::tuple<int> > bnode(g);
363         tbb::flow::make_edge(qj, bnode);
364         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
365         g.reset();
366         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
367         g.reset(tbb::flow::rf_clear_edges);
368         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
369     }
370 }
371 
372 template<>
373 void
TestSimpleSuccessorArc(const char * name)374 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
375     tbb::flow::graph g;
376     {
377         INFO("Join<" << name << "> successor test ");
378         typedef std::tuple<int,int> my_tuple;
379         tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
380                                                                    tag_func<int>(1),
381                                                                    tag_func<int>(1)
382         );
383         tbb::flow::broadcast_node<my_tuple > bnode(g);
384         tbb::flow::make_edge(qj, bnode);
385         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after linking");
386         g.reset();
387         CHECK_MESSAGE( (!qj.my_successors.empty()),"successor missing after reset()");
388         g.reset(tbb::flow::rf_clear_edges);
389         CHECK_MESSAGE( (qj.my_successors.empty()), "successors not removed after reset(rf_clear_edges)");
390     }
391 }
392 
393 void
TestJoinNode()394 TestJoinNode() {
395     tbb::flow::graph g;
396 
397     TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
398     TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
399     TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
400 
401     // queueing and tagging join nodes have input queues, so the input ports do not reverse.
402     INFO(" reserving preds");
403     {
404         tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> rj(g);
405         tbb::flow::queue_node<int> q0(g);
406         tbb::flow::queue_node<int> q1(g);
407         tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
408         tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
409         q0.try_put(1);
410         g.wait_for_all();  // quiesce
411         CHECK_MESSAGE( (!(tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port missing predecessor");
412         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred");
413         g.reset();
414         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
415         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
416         q1.try_put(2);
417         g.wait_for_all();  // quiesce
418         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
419         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
420         g.reset();
421         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
422         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
423         // should reset predecessors just as regular reset.
424         q1.try_put(3);
425         g.wait_for_all();  // quiesce
426         CHECK_MESSAGE( (!(tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port missing predecessor");
427         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred");
428         g.reset(tbb::flow::rf_clear_edges);
429         CHECK_MESSAGE( ((tbb::flow::input_port<1>(rj).my_predecessors.empty())),"reversed port has pred after reset()");
430         CHECK_MESSAGE( ((tbb::flow::input_port<0>(rj).my_predecessors.empty())),"non-reversed port has pred after reset()");
431         CHECK_MESSAGE( (q0.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
432         CHECK_MESSAGE( (q1.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
433     }
434     INFO(" done\n");
435 }
436 
437 template <typename DecrementerType>
438 struct limiter_node_type {
439     using type = tbb::flow::limiter_node<int, DecrementerType>;
440     using dtype = DecrementerType;
441 };
442 
443 template <>
444 struct limiter_node_type<void> {
445     using type = tbb::flow::limiter_node<int>;
446     using dtype = tbb::flow::continue_msg;
447 };
448 
449 template <typename DType>
450 struct DecrementerHelper {
451     template <typename Decrementer>
checkDecrementerHelper452     static void check(Decrementer&) {}
makeDTypeDecrementerHelper453     static DType makeDType() {
454         return DType(1);
455     }
456 };
457 
458 template <>
459 struct DecrementerHelper<tbb::flow::continue_msg> {
460     template <typename Decrementer>
checkDecrementerHelper461     static void check(Decrementer& decrementer) {
462         auto& d = static_cast<tbb::detail::d1::continue_receiver&>(decrementer);
463         CHECK_MESSAGE(d.my_predecessor_count == 0, "error in pred count");
464         CHECK_MESSAGE(d.my_initial_predecessor_count == 0, "error in initial pred count");
465         CHECK_MESSAGE(d.my_current_count == 0, "error in current count");
466     }
makeDTypeDecrementerHelper467     static tbb::flow::continue_msg makeDType() {
468         return tbb::flow::continue_msg();
469     }
470 };
471 
472 template <typename DecrementerType>
TestLimiterNode()473 void TestLimiterNode() {
474     int out_int{};
475     tbb::flow::graph g;
476     using dtype = typename limiter_node_type<DecrementerType>::dtype;
477     typename limiter_node_type<DecrementerType>::type ln(g,1);
478     INFO("Testing limiter_node: preds and succs");
479     DecrementerHelper<dtype>::check(ln.decrementer());
480     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
481     tbb::flow::queue_node<int> inq(g);
482     tbb::flow::queue_node<int> outq(g);
483     tbb::flow::broadcast_node<dtype> bn(g);
484 
485     tbb::flow::make_edge(inq,ln);
486     tbb::flow::make_edge(ln,outq);
487     tbb::flow::make_edge(bn,ln.decrementer());
488 
489     g.wait_for_all();
490     CHECK_MESSAGE( (!(ln.my_successors.empty())),"successors empty after make_edge");
491     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
492     inq.try_put(1);
493     g.wait_for_all();
494     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 1), "limiter_node didn't pass first value");
495     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge reversed");
496     inq.try_put(2);
497     g.wait_for_all();
498     CHECK_MESSAGE( (!outq.try_get(out_int)), "limiter_node incorrectly passed second input");
499     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge to limiter_node not reversed");
500     bn.try_put(DecrementerHelper<dtype>::makeDType());
501     g.wait_for_all();
502     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 2), "limiter_node didn't pass second value");
503     g.wait_for_all();
504     CHECK_MESSAGE( (!ln.my_predecessors.empty()), "input edge was reversed(after try_get())");
505     g.reset();
506     CHECK_MESSAGE( (ln.my_predecessors.empty()), "input edge not reset");
507     inq.try_put(3);
508     g.wait_for_all();
509     CHECK_MESSAGE( (outq.try_get(out_int) && out_int == 3), "limiter_node didn't pass third value");
510 
511     INFO(" rf_clear_edges");
512     // currently the limiter_node will not pass another message
513     g.reset(tbb::flow::rf_clear_edges);
514     DecrementerHelper<dtype>::check(ln.decrementer());
515     CHECK_MESSAGE( (ln.my_threshold == 1), "error in my_threshold");
516     CHECK_MESSAGE( (ln.my_predecessors.empty()), "preds not reset(rf_clear_edges)");
517     CHECK_MESSAGE( (ln.my_successors.empty()), "preds not reset(rf_clear_edges)");
518     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
519     CHECK_MESSAGE( (inq.my_successors.empty()), "Arc not removed on reset(rf_clear_edges)");
520     CHECK_MESSAGE( (bn.my_successors.empty()), "control edge not removed on reset(rf_clear_edges)");
521     tbb::flow::make_edge(inq,ln);
522     tbb::flow::make_edge(ln,outq);
523     inq.try_put(4);
524     inq.try_put(5);
525     g.wait_for_all();
526     CHECK_MESSAGE( (outq.try_get(out_int)),"missing output after reset(rf_clear_edges)");
527     CHECK_MESSAGE( (out_int == 4), "input incorrect (4)");
528     bn.try_put(DecrementerHelper<dtype>::makeDType());
529     g.wait_for_all();
530     CHECK_MESSAGE( (!outq.try_get(out_int)),"second output incorrectly passed (rf_clear_edges)");
531     INFO(" done\n");
532 }
533 
534 template<typename MF_TYPE>
535 struct mf_body {
536     std::atomic<int>& my_flag;
mf_bodymf_body537     mf_body(std::atomic<int>& flag) : my_flag(flag) { }
operator ()mf_body538     void operator()(const int& in, typename MF_TYPE::output_ports_type& outports) {
539         if(my_flag == 0) {
540             my_flag = 1;
541 
542             utils::SpinWaitWhileEq(my_flag, 1);
543         }
544 
545         if (in & 0x1)
546             std::get<1>(outports).try_put(in);
547         else
548             std::get<0>(outports).try_put(in);
549     }
550 };
551 
552 template<typename P, typename T>
553 struct test_reversal;
554 template<typename T>
555 struct test_reversal<tbb::flow::queueing, T> {
test_reversaltest_reversal556     test_reversal() { INFO("<queueing>"); }
557     // queueing node will not reverse.
operator ()test_reversal558     bool operator()(T& node) const { return node.my_predecessors.empty(); }
559 };
560 
561 template<typename T>
562 struct test_reversal<tbb::flow::rejecting, T> {
test_reversaltest_reversal563     test_reversal() { INFO("<rejecting>"); }
operator ()test_reversal564     bool operator()(T& node) const { return !node.my_predecessors.empty(); }
565 };
566 
567 template<typename P>
TestMultifunctionNode()568 void TestMultifunctionNode() {
569     typedef tbb::flow::multifunction_node<int, std::tuple<int, int>, P> multinode_type;
570     INFO("Testing multifunction_node");
571     test_reversal<P,multinode_type> my_test;
572     INFO(":");
573     tbb::flow::graph g;
574     multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
575     tbb::flow::queue_node<int> qin(g);
576     tbb::flow::queue_node<int> qodd_out(g);
577     tbb::flow::queue_node<int> qeven_out(g);
578     tbb::flow::make_edge(qin,mf);
579     tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
580     tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
581     g.wait_for_all();
582     for (int ii = 0; ii < 2 ; ++ii) {
583         std::atomic<bool> submitted{ false };
584         serial_fn_state0 = 0;
585         /* if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");*/
586         std::thread t([&] {
587             g.reset(); // attach to the current arena
588             qin.try_put(0);
589             qin.try_put(1);
590             submitted = true;
591             g.wait_for_all();
592         });
593         // wait for node to be active
594         utils::SpinWaitWhileEq(serial_fn_state0, 0);
595         utils::SpinWaitWhileEq(submitted, false);
596         g.my_context->cancel_group_execution();
597         // release node
598         serial_fn_state0 = 2;
599         t.join();
600         // The rejection test cannot guarantee the state of predecessors cache.
601         if (!std::is_same<P, tbb::flow::rejecting>::value) {
602             CHECK_MESSAGE((my_test(mf)), "fail cancel group test");
603         }
604         if( ii == 1) {
605             INFO(" rf_clear_edges");
606             g.reset(tbb::flow::rf_clear_edges);
607             CHECK_MESSAGE( (tbb::flow::output_port<0>(mf).my_successors.empty()), "output_port<0> not reset (rf_clear_edges)");
608             CHECK_MESSAGE( (tbb::flow::output_port<1>(mf).my_successors.empty()), "output_port<1> not reset (rf_clear_edges)");
609         }
610         else
611         {
612             g.reset();
613         }
614         CHECK_MESSAGE( (mf.my_predecessors.empty()), "edge didn't reset");
615         CHECK_MESSAGE( ((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty())), "edge didn't reset");
616     }
617     INFO(" done\n");
618 }
619 
620 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
621 // never allows a successor to reverse its edge, so we only need test the successors.
622 void
TestIndexerNode()623 TestIndexerNode() {
624     tbb::flow::graph g;
625     typedef tbb::flow::indexer_node< int, int > indexernode_type;
626     indexernode_type inode(g);
627     INFO("Testing indexer_node:");
628     tbb::flow::queue_node<indexernode_type::output_type> qout(g);
629     tbb::flow::make_edge(inode,qout);
630     g.wait_for_all();
631     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing");
632     g.reset();
633     CHECK_MESSAGE( (!inode.my_successors.empty()), "successor of indexer_node missing after reset");
634     g.reset(tbb::flow::rf_clear_edges);
635     CHECK_MESSAGE( (inode.my_successors.empty()), "successor of indexer_node not removed by reset(rf_clear_edges)");
636     INFO(" done\n");
637 }
638 
639 template<typename Node>
640 void
TestScalarNode(const char * name)641 TestScalarNode(const char *name) {
642     tbb::flow::graph g;
643     Node on(g);
644     tbb::flow::queue_node<int> qout(g);
645     INFO("Testing " << name << ":");
646     tbb::flow::make_edge(on,qout);
647     g.wait_for_all();
648     CHECK_MESSAGE( (!on.my_successors.empty()), "edge not added");
649     g.reset();
650     CHECK_MESSAGE( (!on.my_successors.empty()), "edge improperly removed");
651     g.reset(tbb::flow::rf_clear_edges);
652     CHECK_MESSAGE( (on.my_successors.empty()), "edge not removed by reset(rf_clear_edges)");
653     INFO(" done\n");
654 }
655 
656 struct seq_body {
operator ()seq_body657     size_t operator()(const int &in) {
658         return size_t(in / 3);
659     }
660 };
661 
662 // sequencer_node behaves like a queueing node, but requires a different constructor.
TestSequencerNode()663 void TestSequencerNode() {
664     tbb::flow::graph g;
665     tbb::flow::sequencer_node<int> bnode(g, seq_body());
666     INFO("Testing sequencer_node:");
667     tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
668     INFO("Testing sequencer_node:");
669     serial_fn_state0 = 0;  // reset to waiting state.
670     INFO(" make_edge");
671     tbb::flow::make_edge(bnode, fnode);
672     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after make_edge" );
673     INFO(" try_put");
674     std::thread t([&]{
675         bnode.try_put(0);  // will forward to the fnode
676         g.wait_for_all();
677     });
678     // wait for the function_node to fire up
679     utils::SpinWaitWhileEq(serial_fn_state0, 0);
680     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after forwarding message" );
681     serial_fn_state0 = 0;       // release the function node
682     t.join();
683 
684     INFO(" remove_edge");
685     tbb::flow::remove_edge(bnode, fnode);
686     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after remove_edge");
687     tbb::flow::join_node<std::tuple<int,int>,tbb::flow::reserving> jnode(g);
688     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
689     g.wait_for_all();
690     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after attaching to join");
691     INFO(" reverse");
692     bnode.try_put(3);  // the edge should reverse
693     g.wait_for_all();
694     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reserving");
695     INFO(" reset()");
696     g.wait_for_all();
697     g.reset();  // should be in forward direction again
698     CHECK_MESSAGE( (!bnode.my_successors.empty()), "buffering node has no successor after reset()");
699     INFO(" remove_edge");
700     g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
701     CHECK_MESSAGE( (bnode.my_successors.empty()), "buffering node has a successor after reset(rf_clear_edges)");
702     CHECK_MESSAGE( (fnode.my_predecessors.empty()), "buffering node reversed after reset(rf_clear_edges)");
703     INFO("  done\n");
704     g.wait_for_all();
705 }
706 
707 struct snode_body {
708     int max_cnt;
709     int my_cnt;
snode_bodysnode_body710     snode_body(const int& in) : max_cnt(in) { my_cnt = 0; }
operator ()snode_body711     int operator()(tbb::flow_control& fc) {
712         if (max_cnt <= my_cnt++) {
713             fc.stop();
714             return int();
715         }
716         return my_cnt;
717     }
718 };
719 
TestInputNode()720 void TestInputNode() {
721     tbb::flow::graph g;
722     tbb::flow::input_node<int> in(g, snode_body(4));
723     INFO("Testing input_node:");
724     tbb::flow::queue_node<int> qin(g);
725     tbb::flow::join_node<std::tuple<int,int>, tbb::flow::reserving> jn(g);
726     tbb::flow::queue_node<std::tuple<int,int> > qout(g);
727 
728     INFO(" make_edges");
729     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
730     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
731     tbb::flow::make_edge(jn,qout);
732     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after make_edge");
733     g.wait_for_all();
734     g.reset();
735     CHECK_MESSAGE( (!in.my_successors.empty()), "input node has no successor after reset");
736     g.wait_for_all();
737     g.reset(tbb::flow::rf_clear_edges);
738     CHECK_MESSAGE( (in.my_successors.empty()), "input node has successor after reset(rf_clear_edges)");
739     tbb::flow::make_edge(in, tbb::flow::input_port<0>(jn));
740     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
741     tbb::flow::make_edge(jn,qout);
742     g.wait_for_all();
743     INFO(" activate");
744     in.activate();  // will forward to the fnode
745     INFO(" wait1");
746     g.wait_for_all();
747     CHECK_MESSAGE( (in.my_successors.empty()), "input node has no successor after forwarding message");
748     g.reset();
749     CHECK_MESSAGE( (!in.my_successors.empty()), "input_node has no successors after reset");
750     CHECK_MESSAGE( (tbb::flow::input_port<0>(jn).my_predecessors.empty()), "successor of input_node has pred after reset.");
751     INFO(" done\n");
752 }
753 
754 //! Test buffering nodes
755 //! \brief \ref error_guessing
756 TEST_CASE("Test buffering nodes"){
757     unsigned int MinThread = utils::MinThread;
758     if(MinThread < 3) MinThread = 3;
759     tbb::task_arena arena(MinThread);
760 	arena.execute(
__anon7a6130c50902() 761         [&]() {
762             // tests presume at least three threads
763             TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
764             TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
765             TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
766         }
767 	);
768 }
769 
770 //! Test sequencer_node
771 //! \brief \ref error_guessing
772 TEST_CASE("Test sequencer node"){
773     TestSequencerNode();
774 }
775 
776 TEST_SUITE("Test multifunction node") {
777     //! Test multifunction_node with rejecting policy
778     //! \brief \ref error_guessing
779     TEST_CASE("with rejecting policy"){
780         TestMultifunctionNode<tbb::flow::rejecting>();
781     }
782 
783     //! Test multifunction_node with queueing policy
784     //! \brief \ref error_guessing
785     TEST_CASE("with queueing policy") {
786         TestMultifunctionNode<tbb::flow::queueing>();
787     }
788 }
789 
790 //! Test input_node
791 //! \brief \ref error_guessing
792 TEST_CASE("Test input node"){
793     TestInputNode();
794 }
795 
796 //! Test continue_node
797 //! \brief \ref error_guessing
798 TEST_CASE("Test continue node"){
799     TestContinueNode();
800 }
801 
802 //! Test function_node
803 //! \brief \ref error_guessing
may_fail()804 TEST_CASE("Test function node" * doctest::may_fail()){
805     TestFunctionNode();
806 }
807 
808 //! Test join_node
809 //! \brief \ref error_guessing
810 TEST_CASE("Test join node"){
811     TestJoinNode();
812 }
813 
814 //! Test limiter_node
815 //! \brief \ref error_guessing
816 TEST_CASE("Test limiter node"){
817     TestLimiterNode<void>();
818     TestLimiterNode<int>();
819     TestLimiterNode<tbb::flow::continue_msg>();
820 }
821 
822 //! Test indexer_node
823 //! \brief \ref error_guessing
824 TEST_CASE("Test indexer node"){
825     TestIndexerNode();
826 }
827 
828 //! Test split_node
829 //! \brief \ref error_guessing
830 TEST_CASE("Test split node"){
831     TestSplitNode();
832 }
833 
834 //! Test broadcast, overwrite, write_once nodes
835 //! \brief \ref error_guessing
836 TEST_CASE("Test scalar node"){
837     TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
838     TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
839     TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
840 }
841 
842 //! try_get in inactive graph
843 //! \brief \ref error_guessing
844 TEST_CASE("try_get in inactive graph"){
845     tbb::flow::graph g;
846 
__anon7a6130c50a02(tbb::flow_control& fc) 847     tbb::flow::input_node<int> src(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;});
848     deactivate_graph(g);
849 
850     int tmp = -1;
851     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
852 
853     src.activate();
854     tmp = -1;
855     CHECK_MESSAGE((src.try_get(tmp) == false), "try_get can not succeed");
856 }
857 
858 //! Test make_edge in inactive graph
859 //! \brief \ref error_guessing
860 TEST_CASE("Test make_edge in inactive graph"){
861     tbb::flow::graph g;
862 
__anon7a6130c50b02(const tbb::flow::continue_msg&)863     tbb::flow::continue_node<int> c(g, [](const tbb::flow::continue_msg&){ return 1; });
864 
865     tbb::flow::function_node<int, int> f(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
866 
867     c.try_put(tbb::flow::continue_msg());
868     g.wait_for_all();
869 
870     deactivate_graph(g);
871 
872     make_edge(c, f);
873 }
874 
875 //! Test make_edge from overwrite_node in inactive graph
876 //! \brief \ref error_guessing
877 TEST_CASE("Test make_edge from overwrite_node in inactive graph"){
878     tbb::flow::graph g;
879 
880     tbb::flow::queue_node<int> q(g);
881 
882     tbb::flow::overwrite_node<int> on(g);
883 
884     on.try_put(1);
885     g.wait_for_all();
886 
887     deactivate_graph(g);
888 
889     make_edge(on, q);
890 
891     int tmp = -1;
892     CHECK_MESSAGE((q.try_get(tmp) == false), "Message should not be passed on");
893 }
894 
895 //! Test iterators directly
896 //! \brief \ref error_guessing
897 TEST_CASE("graph_iterator details"){
898     tbb::flow::graph g;
899     const tbb::flow::graph cg;
900 
901     tbb::flow::graph::iterator b = g.begin();
902     tbb::flow::graph::iterator b2 = g.begin();
903     ++b2;
904     // Cast to a volatile pointer to workaround self assignment warnings from some compilers.
905     tbb::flow::graph::iterator* volatile b2_ptr = &b2;
906     b2 = *b2_ptr;
907     b = b2;
908     CHECK_MESSAGE((b == b2), "Assignment should make iterators equal");
909 }
910 
911 //! const graph
912 //! \brief \ref error_guessing
913 TEST_CASE("const graph"){
914     using namespace tbb::flow;
915 
916     const graph g;
917     CHECK_MESSAGE((g.cbegin() == g.cend()), "Starting graph is empty");
918     CHECK_MESSAGE((g.begin() == g.end()), "Starting graph is empty");
919 
920     graph g2;
921     CHECK_MESSAGE((g2.begin() == g2.end()), "Starting graph is empty");
922 }
923 
924 //! Send message to continue_node while graph is inactive
925 //! \brief \ref error_guessing
926 TEST_CASE("Send message to continue_node while graph is inactive") {
927     using namespace tbb::flow;
928 
929     graph g;
930 
__anon7a6130c50c02(const continue_msg&)931     continue_node<int> c(g, [](const continue_msg&){ return 1; });
932     buffer_node<int> b(g);
933 
934     make_edge(c, b);
935 
936     deactivate_graph(g);
937 
938     c.try_put(continue_msg());
939     g.wait_for_all();
940 
941     int tmp = -1;
942     CHECK_MESSAGE((b.try_get(tmp) == false), "Message should not arrive");
943     CHECK_MESSAGE((tmp == -1), "Value should not be altered");
944 }
945 
946 
947 //! Bypass of a successor's message in a node with lightweight policy
948 //! \brief \ref error_guessing
949 TEST_CASE("Bypass of a successor's message in a node with lightweight policy") {
950     using namespace tbb::flow;
951 
952     graph g;
953 
__anon7a6130c50d02(const int&v)954     auto body = [](const int&v)->int { return v * 2; };
955     function_node<int, int, lightweight> f1(g, unlimited, body);
956 
__anon7a6130c50e02(const int&v)957     auto body2 = [](const int&v)->int {return v / 2;};
958     function_node<int, int> f2(g, unlimited, body2);
959 
960     buffer_node<int> b(g);
961 
962     make_edge(f1, f2);
963     make_edge(f2, b);
964 
965     f1.try_put(1);
966     g.wait_for_all();
967 
968     int tmp = -1;
969     CHECK_MESSAGE((b.try_get(tmp) == true), "Functional nodes can work in succession");
970     CHECK_MESSAGE((tmp == 1), "Value should not be altered");
971 }
972