1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifdef TBB_TEST_LOW_WORKLOAD
18 #undef MAX_TUPLE_TEST_SIZE
19 #define MAX_TUPLE_TEST_SIZE 3
20 #endif
21
22 #include "common/config.h"
23
24 #include "test_join_node.h"
25
26 //! \file test_join_node.cpp
27 //! \brief Test for [flow_graph.join_node] specification
28
29
30 static std::atomic<int> output_count;
31
32 // get the tag from the output tuple and emit it.
33 // the first tuple component is tag * 2 cast to the type
34 template<typename OutputTupleType>
35 class recirc_output_func_body {
36 public:
37 // we only need this to use input_node_helper
38 typedef typename tbb::flow::join_node<OutputTupleType, tbb::flow::tag_matching> join_node_type;
39 static const int N = std::tuple_size<OutputTupleType>::value;
operator ()(const OutputTupleType & v)40 int operator()(const OutputTupleType &v) {
41 int out = int(std::get<0>(v))/2;
42 input_node_helper<N, join_node_type>::only_check_value(out, v);
43 ++output_count;
44 return out;
45 }
46 };
47
48 template<typename JType>
49 class tag_recirculation_test {
50 public:
51 typedef typename JType::output_type TType;
52 typedef typename std::tuple<int, tbb::flow::continue_msg> input_tuple_type;
53 typedef tbb::flow::join_node<input_tuple_type, tbb::flow::reserving> input_join_type;
54 static const int N = std::tuple_size<TType>::value;
test()55 static void test() {
56 input_node_helper<N, JType>::print_remark("Recirculation test of tag-matching join");
57 INFO(" >\n");
58 for(int maxTag = 1; maxTag <10; maxTag *= 3) {
59 for(int i = 0; i < N; ++i) all_input_nodes[i][0] = NULL;
60
61 tbb::flow::graph g;
62 // this is the tag-matching join we're testing
63 JType * my_join = makeJoin<N, JType, tbb::flow::tag_matching>::create(g);
64 // input_node for continue messages
65 tbb::flow::input_node<tbb::flow::continue_msg> snode(g, recirc_input_node_body());
66 // reserving join that matches recirculating tags with continue messages.
67 input_join_type * my_input_join = makeJoin<2, input_join_type, tbb::flow::reserving>::create(g);
68 // tbb::flow::make_edge(snode, tbb::flow::input_port<1>(*my_input_join));
69 tbb::flow::make_edge(snode, std::get<1>(my_input_join->input_ports()));
70 // queue to hold the tags
71 tbb::flow::queue_node<int> tag_queue(g);
72 tbb::flow::make_edge(tag_queue, tbb::flow::input_port<0>(*my_input_join));
73 // add all the function_nodes that are inputs to the tag-matching join
74 input_node_helper<N, JType>::add_recirc_func_nodes(*my_join, *my_input_join, g);
75 // add the function_node that accepts the output of the join and emits the int tag it was based on
76 tbb::flow::function_node<TType, int> recreate_tag(g, tbb::flow::unlimited, recirc_output_func_body<TType>());
77 tbb::flow::make_edge(*my_join, recreate_tag);
78 // now the recirculating part (output back to the queue)
79 tbb::flow::make_edge(recreate_tag, tag_queue);
80
81 // put the tags into the queue
82 for(int t = 1; t<=maxTag; ++t) tag_queue.try_put(t);
83
84 input_count = Recirc_count;
85 output_count = 0;
86
87 // start up the source node to get things going
88 snode.activate();
89
90 // wait for everything to stop
91 g.wait_for_all();
92
93 CHECK_MESSAGE( (output_count==Recirc_count), "not all instances were received");
94
95 int j{};
96 // grab the tags from the queue, record them
97 std::vector<bool> out_tally(maxTag, false);
98 for(int i = 0; i < maxTag; ++i) {
99 CHECK_MESSAGE( (tag_queue.try_get(j)), "not enough tags in queue");
100 CHECK_MESSAGE( (!out_tally.at(j-1)), "duplicate tag from queue");
101 out_tally[j-1] = true;
102 }
103 CHECK_MESSAGE( (!tag_queue.try_get(j)), "Extra tags in recirculation queue");
104
105 // deconstruct graph
106 input_node_helper<N, JType>::remove_recirc_func_nodes(*my_join, *my_input_join);
107 tbb::flow::remove_edge(*my_join, recreate_tag);
108 makeJoin<N, JType, tbb::flow::tag_matching>::destroy(my_join);
109 tbb::flow::remove_edge(tag_queue, tbb::flow::input_port<0>(*my_input_join));
110 tbb::flow::remove_edge(snode, tbb::flow::input_port<1>(*my_input_join));
111 makeJoin<2, input_join_type, tbb::flow::reserving>::destroy(my_input_join);
112 }
113 }
114 };
115
116 template<typename JType>
117 class generate_recirc_test {
118 public:
119 typedef tbb::flow::join_node<JType, tbb::flow::tag_matching> join_node_type;
do_test()120 static void do_test() {
121 tag_recirculation_test<join_node_type>::test();
122 }
123 };
124
125 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
126 #include <array>
127 #include <vector>
test_follows_and_precedes_api()128 void test_follows_and_precedes_api() {
129 using msg_t = tbb::flow::continue_msg;
130 using JoinOutputType = std::tuple<msg_t, msg_t, msg_t>;
131
132 std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
133 std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()};
134
135 follows_and_precedes_testing::test_follows
136 <msg_t, tbb::flow::join_node<JoinOutputType>, tbb::flow::buffer_node<msg_t>>(messages_for_follows);
137 follows_and_precedes_testing::test_follows
138 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::queueing>>(messages_for_follows);
139 follows_and_precedes_testing::test_follows
140 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::reserving>, tbb::flow::buffer_node<msg_t>>(messages_for_follows);
141 auto b = [](msg_t) { return msg_t(); };
142 class hash_compare {
143 public:
144 std::size_t hash(msg_t) const { return 0; }
145 bool equal(msg_t, msg_t) const { return true; }
146 };
147 follows_and_precedes_testing::test_follows
148 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::key_matching<msg_t, hash_compare>>, tbb::flow::buffer_node<msg_t>>
149 (messages_for_follows, b, b, b);
150
151 follows_and_precedes_testing::test_precedes
152 <msg_t, tbb::flow::join_node<JoinOutputType>>(messages_for_precedes);
153 follows_and_precedes_testing::test_precedes
154 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::queueing>>(messages_for_precedes);
155 follows_and_precedes_testing::test_precedes
156 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::reserving>>(messages_for_precedes);
157 follows_and_precedes_testing::test_precedes
158 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::key_matching<msg_t, hash_compare>>>
159 (messages_for_precedes, b, b, b);
160 }
161 #endif
162
163 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()164 void test_deduction_guides() {
165 using namespace tbb::flow;
166
167 graph g;
168 using tuple_type = std::tuple<int, int, int>;
169 broadcast_node<int> b1(g), b2(g), b3(g);
170 broadcast_node<tuple_type> b4(g);
171 join_node<tuple_type> j0(g);
172
173 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
174 join_node j1(follows(b1, b2, b3));
175 static_assert(std::is_same_v<decltype(j1), join_node<tuple_type>>);
176
177 join_node j2(follows(b1, b2, b3), reserving());
178 static_assert(std::is_same_v<decltype(j2), join_node<tuple_type, reserving>>);
179
180 join_node j3(precedes(b4));
181 static_assert(std::is_same_v<decltype(j3), join_node<tuple_type>>);
182
183 join_node j4(precedes(b4), reserving());
184 static_assert(std::is_same_v<decltype(j4), join_node<tuple_type, reserving>>);
185 #endif
186
187 join_node j5(j0);
188 static_assert(std::is_same_v<decltype(j5), join_node<tuple_type>>);
189 }
190
191 #endif
192
193 namespace multiple_predecessors {
194
195 using namespace tbb::flow;
196
197 using join_node_t = join_node<std::tuple<continue_msg, continue_msg, continue_msg>, reserving>;
198 using queue_node_t = queue_node<std::tuple<continue_msg, continue_msg, continue_msg>>;
199
twist_join_connections(buffer_node<continue_msg> & bn1,buffer_node<continue_msg> & bn2,buffer_node<continue_msg> & bn3,join_node_t & jn)200 void twist_join_connections(
201 buffer_node<continue_msg>& bn1, buffer_node<continue_msg>& bn2, buffer_node<continue_msg>& bn3,
202 join_node_t& jn)
203 {
204 // order, in which edges are created/destroyed, is important
205 make_edge(bn1, input_port<0>(jn));
206 make_edge(bn2, input_port<0>(jn));
207 make_edge(bn3, input_port<0>(jn));
208
209 remove_edge(bn3, input_port<0>(jn));
210 make_edge (bn3, input_port<2>(jn));
211
212 remove_edge(bn2, input_port<0>(jn));
213 make_edge (bn2, input_port<1>(jn));
214 }
215
connect_join_via_make_edge(graph & g,buffer_node<continue_msg> & bn1,buffer_node<continue_msg> & bn2,buffer_node<continue_msg> & bn3,queue_node_t & qn)216 std::unique_ptr<join_node_t> connect_join_via_make_edge(
217 graph& g, buffer_node<continue_msg>& bn1, buffer_node<continue_msg>& bn2,
218 buffer_node<continue_msg>& bn3, queue_node_t& qn)
219 {
220 std::unique_ptr<join_node_t> jn( new join_node_t(g) );
221 twist_join_connections( bn1, bn2, bn3, *jn );
222 make_edge(*jn, qn);
223 return jn;
224 }
225
226 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
connect_join_via_follows(graph &,buffer_node<continue_msg> & bn1,buffer_node<continue_msg> & bn2,buffer_node<continue_msg> & bn3,queue_node_t & qn)227 std::unique_ptr<join_node_t> connect_join_via_follows(
228 graph&, buffer_node<continue_msg>& bn1, buffer_node<continue_msg>& bn2,
229 buffer_node<continue_msg>& bn3, queue_node_t& qn)
230 {
231 auto bn_set = make_node_set(bn1, bn2, bn3);
232 std::unique_ptr<join_node_t> jn( new join_node_t(follows(bn_set)) );
233 make_edge(*jn, qn);
234 return jn;
235 }
236
connect_join_via_precedes(graph &,buffer_node<continue_msg> & bn1,buffer_node<continue_msg> & bn2,buffer_node<continue_msg> & bn3,queue_node_t & qn)237 std::unique_ptr<join_node_t> connect_join_via_precedes(
238 graph&, buffer_node<continue_msg>& bn1, buffer_node<continue_msg>& bn2,
239 buffer_node<continue_msg>& bn3, queue_node_t& qn)
240 {
241 auto qn_set = make_node_set(qn);
242 auto qn_copy_set = qn_set;
243 std::unique_ptr<join_node_t> jn( new join_node_t(precedes(qn_copy_set)) );
244 twist_join_connections( bn1, bn2, bn3, *jn );
245 return jn;
246 }
247 #endif // TBB_PREVIEW_FLOW_GRAPH_FEATURES
248
run_and_check(graph & g,buffer_node<continue_msg> & bn1,buffer_node<continue_msg> & bn2,buffer_node<continue_msg> & bn3,queue_node_t & qn,bool expected)249 void run_and_check(
250 graph& g, buffer_node<continue_msg>& bn1, buffer_node<continue_msg>& bn2,
251 buffer_node<continue_msg>& bn3, queue_node_t& qn, bool expected)
252 {
253 std::tuple<continue_msg, continue_msg, continue_msg> msg;
254
255 bn1.try_put(continue_msg());
256 bn2.try_put(continue_msg());
257 bn3.try_put(continue_msg());
258 g.wait_for_all();
259
260 CHECK_MESSAGE(
261 (qn.try_get(msg) == expected),
262 "Unexpected message absence/existence at the end of the graph."
263 );
264 }
265
266 template<typename ConnectJoinNodeFunc>
test(ConnectJoinNodeFunc && connect_join_node)267 void test(ConnectJoinNodeFunc&& connect_join_node) {
268 graph g;
269 buffer_node<continue_msg> bn1(g);
270 buffer_node<continue_msg> bn2(g);
271 buffer_node<continue_msg> bn3(g);
272 queue_node_t qn(g);
273
274 auto jn = connect_join_node(g, bn1, bn2, bn3, qn);
275
276 run_and_check(g, bn1, bn2, bn3, qn, /*expected=*/true);
277
278 remove_edge(bn3, input_port<2>(*jn));
279 remove_edge(bn2, input_port<1>(*jn));
280 remove_edge(bn1, input_port<0>(*jn));
281 remove_edge(*jn, qn);
282
283 run_and_check(g, bn1, bn2, bn3, qn, /*expected=*/false);
284 }
285 } // namespace multiple_predecessors
286
287
288 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
289 //! Test follows and precedes API
290 //! \brief \ref error_guessing
291 TEST_CASE("Test follows and preceedes API"){
292 test_follows_and_precedes_api();
293 }
294 #endif
295
296 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
297 //! Test deduction guides
298 //! \brief \ref requirement
299 TEST_CASE("Deduction guides test"){
300 test_deduction_guides();
301 }
302 #endif
303
304 //! Test hash buffers behavior
305 //! \brief \ref error_guessing
306 TEST_CASE("Tagged buffers test"){
307 TestTaggedBuffers();
308 }
309
310 //! Test with various policies and tuple sizes
311 //! \brief \ref error_guessing
312 TEST_CASE("Main test"){
313 test_main<tbb::flow::queueing>();
314 test_main<tbb::flow::reserving>();
315 test_main<tbb::flow::tag_matching>();
316 }
317
318 //! Test with recirculating tags
319 //! \brief \ref error_guessing
320 TEST_CASE("Recirculation test"){
321 generate_recirc_test<std::tuple<int,float> >::do_test();
322 }
323
324 //! Test maintaining correct count of ports without input
325 //! \brief \ref error_guessing
326 TEST_CASE("Test removal of the predecessor while having none") {
327 using namespace multiple_predecessors;
328
329 test(connect_join_via_make_edge);
330 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
331 test(connect_join_via_follows);
332 test(connect_join_via_precedes);
333 #endif
334 }
335