1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 // TODO: Add overlapping put / receive tests
18
19 #include "common/config.h"
20
21 #include "tbb/flow_graph.h"
22 #include "tbb/global_control.h"
23
24 #include "common/test.h"
25 #include "common/utils.h"
26 #include "common/utils_assert.h"
27 #include "common/checktype.h"
28 #include "common/graph_utils.h"
29 #include "common/test_follows_and_precedes_api.h"
30
31 #include <cstdio>
32
33
34 //! \file test_priority_queue_node.cpp
35 //! \brief Test for [flow_graph.priority_queue_node] specification
36
37
38 #define N 10
39 #define C 10
40
41 template< typename T >
spin_try_get(tbb::flow::priority_queue_node<T> & q,T & value)42 void spin_try_get( tbb::flow::priority_queue_node<T> &q, T &value ) {
43 while ( q.try_get(value) != true ) ;
44 }
45
46 template< typename T >
check_item(T * next_value,T & value)47 void check_item( T* next_value, T &value ) {
48 int tid = value / N;
49 int offset = value % N;
50 CHECK_MESSAGE( next_value[tid] == T(offset), "" );
51 ++next_value[tid];
52 }
53
54 template< typename T >
55 struct parallel_puts : utils::NoAssign {
56 tbb::flow::priority_queue_node<T> &my_q;
parallel_putsparallel_puts57 parallel_puts( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {}
operator ()parallel_puts58 void operator()(int i) const {
59 for (int j = 0; j < N; ++j) {
60 bool msg = my_q.try_put( T(N*i + j) );
61 CHECK_MESSAGE( msg == true, "" );
62 }
63 }
64 };
65
66 template< typename T >
67 struct parallel_gets : utils::NoAssign {
68 tbb::flow::priority_queue_node<T> &my_q;
parallel_getsparallel_gets69 parallel_gets( tbb::flow::priority_queue_node<T> &q) : my_q(q) {}
operator ()parallel_gets70 void operator()(int) const {
71 T prev;
72 spin_try_get( my_q, prev );
73 for (int j = 0; j < N-1; ++j) {
74 T v;
75 spin_try_get( my_q, v );
76 CHECK_MESSAGE(v < prev, "");
77 }
78 }
79 };
80
81 template< typename T >
82 struct parallel_put_get : utils::NoAssign {
83 tbb::flow::priority_queue_node<T> &my_q;
parallel_put_getparallel_put_get84 parallel_put_get( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {}
operator ()parallel_put_get85 void operator()(int tid) const {
86 for ( int i = 0; i < N; i+=C ) {
87 int j_end = ( N < i + C ) ? N : i + C;
88 // dump about C values into the Q
89 for ( int j = i; j < j_end; ++j ) {
90 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" );
91 }
92 // receive about C values from the Q
93 for ( int j = i; j < j_end; ++j ) {
94 T v;
95 spin_try_get( my_q, v );
96 }
97 }
98 }
99 };
100
101 //
102 // Tests
103 //
104 // Item can be reserved, released, consumed ( single serial receiver )
105 //
106 template< typename T >
test_reservation(int)107 int test_reservation(int) {
108 tbb::flow::graph g;
109
110 // Simple tests
111 tbb::flow::priority_queue_node<T> q(g);
112
113 {
114
115 T bogus_value(-1);
116
117 q.try_put(T(1));
118 q.try_put(T(2));
119 q.try_put(T(3));
120 g.wait_for_all();
121
122 T v=bogus_value, w=bogus_value;
123 CHECK_MESSAGE( q.try_reserve(v) == true, "" );
124 CHECK_MESSAGE( v == T(3), "" );
125 CHECK_MESSAGE( q.try_release() == true, "" );
126 v = bogus_value;
127 g.wait_for_all();
128 CHECK_MESSAGE( q.try_reserve(v) == true, "" );
129 CHECK_MESSAGE( v == T(3), "" );
130 CHECK_MESSAGE( q.try_consume() == true, "" );
131 v = bogus_value;
132 g.wait_for_all();
133
134 CHECK_MESSAGE( q.try_get(v) == true, "" );
135 CHECK_MESSAGE( v == T(2), "" );
136 v = bogus_value;
137 g.wait_for_all();
138
139 CHECK_MESSAGE( q.try_reserve(v) == true, "" );
140 CHECK_MESSAGE( v == T(1), "" );
141 CHECK_MESSAGE( q.try_reserve(w) == false, "" );
142 CHECK_MESSAGE( w == bogus_value, "" );
143 CHECK_MESSAGE( q.try_get(w) == false, "" );
144 CHECK_MESSAGE( w == bogus_value, "" );
145 CHECK_MESSAGE( q.try_release() == true, "" );
146 v = bogus_value;
147 g.wait_for_all();
148 CHECK_MESSAGE( q.try_reserve(v) == true, "" );
149 CHECK_MESSAGE( v == T(1), "" );
150 CHECK_MESSAGE( q.try_consume() == true, "" );
151 v = bogus_value;
152 g.wait_for_all();
153 CHECK_MESSAGE( q.try_get(v) == false, "" );
154 }
155 return 0;
156 }
157
158 //
159 // Tests
160 //
161 // multiple parallel senders, items in FIFO (relatively to sender) order
162 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
163 // * overlapped puts / gets
164 // * all puts finished before any getS
165 //
166 template< typename T >
test_parallel(int num_threads)167 int test_parallel(int num_threads) {
168 tbb::flow::graph g;
169 tbb::flow::priority_queue_node<T> q(g);
170 tbb::flow::priority_queue_node<T> q2(g);
171 tbb::flow::priority_queue_node<T> q3(g);
172 T bogus_value(-1);
173 T j = bogus_value;
174
175 NativeParallelFor( num_threads, parallel_puts<T>(q) );
176 for (int i = num_threads*N -1; i>=0; --i) {
177 spin_try_get( q, j );
178 CHECK_MESSAGE(j == i, "");
179 j = bogus_value;
180 }
181 g.wait_for_all();
182 CHECK_MESSAGE( q.try_get( j ) == false, "" );
183 CHECK_MESSAGE( j == bogus_value, "" );
184
185 NativeParallelFor( num_threads, parallel_puts<T>(q) );
186 g.wait_for_all();
187 NativeParallelFor( num_threads, parallel_gets<T>(q) );
188 g.wait_for_all();
189 j = bogus_value;
190 CHECK_MESSAGE( q.try_get( j ) == false, "" );
191 CHECK_MESSAGE( j == bogus_value, "" );
192
193 NativeParallelFor( num_threads, parallel_put_get<T>(q) );
194 g.wait_for_all();
195 j = bogus_value;
196 CHECK_MESSAGE( q.try_get( j ) == false, "" );
197 CHECK_MESSAGE( j == bogus_value, "" );
198
199 tbb::flow::make_edge( q, q2 );
200 tbb::flow::make_edge( q2, q3 );
201 NativeParallelFor( num_threads, parallel_puts<T>(q) );
202 g.wait_for_all();
203 NativeParallelFor( num_threads, parallel_gets<T>(q3) );
204 g.wait_for_all();
205 j = bogus_value;
206 CHECK_MESSAGE( q.try_get( j ) == false, "" );
207 CHECK_MESSAGE( j == bogus_value, "" );
208 CHECK_MESSAGE( q2.try_get( j ) == false, "" );
209 CHECK_MESSAGE( j == bogus_value, "" );
210 CHECK_MESSAGE( q3.try_get( j ) == false, "" );
211 CHECK_MESSAGE( j == bogus_value, "" );
212
213 // test copy constructor
214 CHECK_MESSAGE( remove_successor(q, q2) == true, "" );
215 NativeParallelFor( num_threads, parallel_puts<T>(q) );
216 tbb::flow::priority_queue_node<T> q_copy(q);
217 g.wait_for_all();
218 j = bogus_value;
219 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
220 CHECK_MESSAGE( register_successor(q, q_copy) == true, "" );
221 for (int i = num_threads*N -1; i>=0; --i) {
222 spin_try_get( q_copy, j );
223 CHECK_MESSAGE(j == i, "");
224 j = bogus_value;
225 }
226 g.wait_for_all();
227 CHECK_MESSAGE( q.try_get( j ) == false, "" );
228 CHECK_MESSAGE( j == bogus_value, "" );
229 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
230 CHECK_MESSAGE( j == bogus_value, "" );
231
232 return 0;
233 }
234
235 //
236 // Tests
237 //
238 // Predecessors cannot be registered
239 // Empty Q rejects item requests
240 // Single serial sender, items in FIFO order
241 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
242 //
243
244 template< typename T >
test_serial()245 int test_serial() {
246 tbb::flow::graph g;
247 T bogus_value(-1);
248
249 tbb::flow::priority_queue_node<T> q(g);
250 tbb::flow::priority_queue_node<T> q2(g);
251 T j = bogus_value;
252
253 //
254 // Rejects attempts to add / remove predecessor
255 // Rejects request from empty Q
256 //
257 CHECK_MESSAGE( register_predecessor(q, q2) == false, "" );
258 CHECK_MESSAGE( remove_predecessor(q, q2) == false, "" );
259 CHECK_MESSAGE( q.try_get( j ) == false, "" );
260 CHECK_MESSAGE( j == bogus_value, "" );
261
262 //
263 // Simple puts and gets
264 //
265
266 for (int i = 0; i < N; ++i)
267 CHECK_MESSAGE( q.try_put( T(i) ), "" );
268 for (int i = N-1; i >=0; --i) {
269 j = bogus_value;
270 spin_try_get( q, j );
271 CHECK_MESSAGE( i == j, "" );
272 }
273 j = bogus_value;
274 g.wait_for_all();
275 CHECK_MESSAGE( q.try_get( j ) == false, "" );
276 CHECK_MESSAGE( j == bogus_value, "" );
277
278 tbb::flow::make_edge( q, q2 );
279
280 for (int i = 0; i < N; ++i)
281 CHECK_MESSAGE( q.try_put( T(i) ), "" );
282 g.wait_for_all();
283 for (int i = N-1; i >= 0; --i) {
284 j = bogus_value;
285 spin_try_get( q2, j );
286 CHECK_MESSAGE( i == j, "" );
287 }
288 j = bogus_value;
289 g.wait_for_all();
290 CHECK_MESSAGE( q.try_get( j ) == false, "" );
291 g.wait_for_all();
292 CHECK_MESSAGE( q2.try_get( j ) == false, "" );
293 CHECK_MESSAGE( j == bogus_value, "" );
294
295 tbb::flow::remove_edge( q, q2 );
296 CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
297 g.wait_for_all();
298 CHECK_MESSAGE( q2.try_get( j ) == false, "" );
299 CHECK_MESSAGE( j == bogus_value, "" );
300 g.wait_for_all();
301 CHECK_MESSAGE( q.try_get( j ) == true, "" );
302 CHECK_MESSAGE( j == 1, "" );
303
304 tbb::flow::priority_queue_node<T> q3(g);
305 tbb::flow::make_edge( q, q2 );
306 tbb::flow::make_edge( q2, q3 );
307
308 for (int i = 0; i < N; ++i)
309 CHECK_MESSAGE( q.try_put( T(i) ), "" );
310 g.wait_for_all();
311 for (int i = N-1; i >= 0; --i) {
312 j = bogus_value;
313 spin_try_get( q3, j );
314 CHECK_MESSAGE( i == j, "" );
315 }
316 j = bogus_value;
317 g.wait_for_all();
318 CHECK_MESSAGE( q.try_get( j ) == false, "" );
319 g.wait_for_all();
320 CHECK_MESSAGE( q2.try_get( j ) == false, "" );
321 g.wait_for_all();
322 CHECK_MESSAGE( q3.try_get( j ) == false, "" );
323 CHECK_MESSAGE( j == bogus_value, "" );
324
325 tbb::flow::remove_edge( q, q2 );
326 CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
327 g.wait_for_all();
328 CHECK_MESSAGE( q2.try_get( j ) == false, "" );
329 CHECK_MESSAGE( j == bogus_value, "" );
330 g.wait_for_all();
331 CHECK_MESSAGE( q3.try_get( j ) == false, "" );
332 CHECK_MESSAGE( j == bogus_value, "" );
333 g.wait_for_all();
334 CHECK_MESSAGE( q.try_get( j ) == true, "" );
335 CHECK_MESSAGE( j == 1, "" );
336
337 return 0;
338 }
339
340 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
341 #include <array>
342 #include <vector>
test_follows_and_precedes_api()343 void test_follows_and_precedes_api() {
344 std::array<int, 3> messages_for_follows = { {0, 1, 2} };
345 std::vector<int> messages_for_precedes = {0, 1, 2};
346
347 follows_and_precedes_testing::test_follows <int, tbb::flow::priority_queue_node<int>>(messages_for_follows);
348 follows_and_precedes_testing::test_precedes <int, tbb::flow::priority_queue_node<int>>(messages_for_precedes);
349 }
350 #endif
351
352 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()353 void test_deduction_guides() {
354 using namespace tbb::flow;
355
356 graph g;
357 broadcast_node<int> br(g);
358 priority_queue_node<int> pq0(g);
359
360 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
361 using compare_type = std::greater<void>;
362 priority_queue_node pq1(follows(br));
363 static_assert(std::is_same_v<decltype(pq1), priority_queue_node<int>>);
364
365 priority_queue_node pq2(follows(br), compare_type());
366 static_assert(std::is_same_v<decltype(pq2), priority_queue_node<int, compare_type>>);
367
368 priority_queue_node pq3(precedes(br));
369 static_assert(std::is_same_v<decltype(pq3), priority_queue_node<int>>);
370
371 priority_queue_node pq4(precedes(br), compare_type());
372 static_assert(std::is_same_v<decltype(pq4), priority_queue_node<int, compare_type>>);
373 #endif
374
375 priority_queue_node pq5(pq0);
376 static_assert(std::is_same_v<decltype(pq5), priority_queue_node<int>>);
377 g.wait_for_all();
378 }
379 #endif
380
381 //! Test serial, parallel behavior and reservation under parallelism
382 //! \brief \ref requirement \ref error_guessing
383 TEST_CASE("Serial, parallel and reservation tests"){
384 for (int p = 2; p <= 4; ++p) {
385 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p);
386 tbb::task_arena arena(p);
387 arena.execute(
__anonddf9a73e0102() 388 [&]() {
389 test_serial<int>();
390 test_reservation<int>(p);
391 test_reservation<CheckType<int> >(p);
392 test_parallel<int>(p);
393 }
394 );
395 }
396 }
397
398 //! Test reset and cancellation
399 //! \brief \ref error_guessing
400 TEST_CASE("Reset tests"){
401 INFO("Testing resets\n");
402 test_resets<int,tbb::flow::priority_queue_node<int> >();
403 test_resets<float,tbb::flow::priority_queue_node<float> >();
404 }
405
406 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
407 //! Test follows and precedes API
408 //! \brief \ref error_guessing
409 TEST_CASE("Test follows and precedes API"){
410 test_follows_and_precedes_api();
411 }
412 #endif
413
414 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
415 //! Test decution guides
416 //! \brief \ref requirement
417 TEST_CASE("Test deduction guides"){
418 test_deduction_guides();
419 }
420 #endif
421
422