1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // TODO: Add overlapping put / receive tests
18 
19 #include "common/config.h"
20 
21 #include "tbb/flow_graph.h"
22 #include "tbb/global_control.h"
23 
24 #include "common/test.h"
25 #include "common/utils.h"
26 #include "common/utils_assert.h"
27 #include "common/checktype.h"
28 #include "common/graph_utils.h"
29 #include "common/test_follows_and_precedes_api.h"
30 
31 #include <cstdio>
32 
33 
34 //! \file test_priority_queue_node.cpp
35 //! \brief Test for [flow_graph.priority_queue_node] specification
36 
37 
38 #define N 10
39 #define C 10
40 
41 template< typename T >
spin_try_get(tbb::flow::priority_queue_node<T> & q,T & value)42 void spin_try_get( tbb::flow::priority_queue_node<T> &q, T &value ) {
43     while ( q.try_get(value) != true ) ;
44 }
45 
46 template< typename T >
check_item(T * next_value,T & value)47 void check_item( T* next_value, T &value ) {
48     int tid = value / N;
49     int offset = value % N;
50     CHECK_MESSAGE( next_value[tid] == T(offset), "" );
51     ++next_value[tid];
52 }
53 
54 template< typename T >
55 struct parallel_puts : utils::NoAssign {
56     tbb::flow::priority_queue_node<T> &my_q;
parallel_putsparallel_puts57     parallel_puts( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {}
operator ()parallel_puts58     void operator()(int i) const {
59         for (int j = 0; j < N; ++j) {
60             bool msg = my_q.try_put( T(N*i + j) );
61             CHECK_MESSAGE( msg == true, "" );
62         }
63     }
64 };
65 
66 template< typename T >
67 struct parallel_gets : utils::NoAssign {
68     tbb::flow::priority_queue_node<T> &my_q;
parallel_getsparallel_gets69     parallel_gets( tbb::flow::priority_queue_node<T> &q) : my_q(q) {}
operator ()parallel_gets70     void operator()(int) const {
71         T prev;
72         spin_try_get( my_q, prev );
73         for (int j = 0; j < N-1; ++j) {
74             T v;
75             spin_try_get( my_q, v );
76             CHECK_MESSAGE(v < prev, "");
77         }
78     }
79 };
80 
81 template< typename T >
82 struct parallel_put_get : utils::NoAssign {
83     tbb::flow::priority_queue_node<T> &my_q;
parallel_put_getparallel_put_get84     parallel_put_get( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {}
operator ()parallel_put_get85     void operator()(int tid) const {
86         for ( int i = 0; i < N; i+=C ) {
87             int j_end = ( N < i + C ) ? N : i + C;
88             // dump about C values into the Q
89             for ( int j = i; j < j_end; ++j ) {
90                 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" );
91             }
92             // receive about C values from the Q
93             for ( int j = i; j < j_end; ++j ) {
94                 T v;
95                 spin_try_get( my_q, v );
96             }
97         }
98     }
99 };
100 
101 //
102 // Tests
103 //
104 // Item can be reserved, released, consumed ( single serial receiver )
105 //
106 template< typename T >
test_reservation(int)107 int test_reservation(int) {
108     tbb::flow::graph g;
109 
110     // Simple tests
111     tbb::flow::priority_queue_node<T> q(g);
112 
113     {
114 
115         T bogus_value(-1);
116 
117         q.try_put(T(1));
118         q.try_put(T(2));
119         q.try_put(T(3));
120         g.wait_for_all();
121 
122         T v=bogus_value, w=bogus_value;
123         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
124         CHECK_MESSAGE( v == T(3), "" );
125         CHECK_MESSAGE( q.try_release() == true, "" );
126         v = bogus_value;
127         g.wait_for_all();
128         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
129         CHECK_MESSAGE( v == T(3), "" );
130         CHECK_MESSAGE( q.try_consume() == true, "" );
131         v = bogus_value;
132         g.wait_for_all();
133 
134         CHECK_MESSAGE( q.try_get(v) == true, "" );
135         CHECK_MESSAGE( v == T(2), "" );
136         v = bogus_value;
137         g.wait_for_all();
138 
139         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
140         CHECK_MESSAGE( v == T(1), "" );
141         CHECK_MESSAGE( q.try_reserve(w) == false, "" );
142         CHECK_MESSAGE( w == bogus_value, "" );
143         CHECK_MESSAGE( q.try_get(w) == false, "" );
144         CHECK_MESSAGE( w == bogus_value, "" );
145         CHECK_MESSAGE( q.try_release() == true, "" );
146         v = bogus_value;
147         g.wait_for_all();
148         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
149         CHECK_MESSAGE( v == T(1), "" );
150         CHECK_MESSAGE( q.try_consume() == true, "" );
151         v = bogus_value;
152         g.wait_for_all();
153         CHECK_MESSAGE( q.try_get(v) == false, "" );
154     }
155     return 0;
156 }
157 
158 //
159 // Tests
160 //
161 // multiple parallel senders, items in FIFO (relatively to sender) order
162 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
163 //   * overlapped puts / gets
164 //   * all puts finished before any getS
165 //
166 template< typename T >
test_parallel(int num_threads)167 int test_parallel(int num_threads) {
168     tbb::flow::graph g;
169     tbb::flow::priority_queue_node<T> q(g);
170     tbb::flow::priority_queue_node<T> q2(g);
171     tbb::flow::priority_queue_node<T> q3(g);
172     T bogus_value(-1);
173     T j = bogus_value;
174 
175     NativeParallelFor( num_threads, parallel_puts<T>(q) );
176     for (int i = num_threads*N -1; i>=0; --i) {
177         spin_try_get( q, j );
178         CHECK_MESSAGE(j == i, "");
179         j = bogus_value;
180     }
181     g.wait_for_all();
182     CHECK_MESSAGE( q.try_get( j ) == false, "" );
183     CHECK_MESSAGE( j == bogus_value, "" );
184 
185     NativeParallelFor( num_threads, parallel_puts<T>(q) );
186     g.wait_for_all();
187     NativeParallelFor( num_threads, parallel_gets<T>(q) );
188     g.wait_for_all();
189     j = bogus_value;
190     CHECK_MESSAGE( q.try_get( j ) == false, "" );
191     CHECK_MESSAGE( j == bogus_value, "" );
192 
193     NativeParallelFor( num_threads, parallel_put_get<T>(q) );
194     g.wait_for_all();
195     j = bogus_value;
196     CHECK_MESSAGE( q.try_get( j ) == false, "" );
197     CHECK_MESSAGE( j == bogus_value, "" );
198 
199     tbb::flow::make_edge( q, q2 );
200     tbb::flow::make_edge( q2, q3 );
201     NativeParallelFor( num_threads, parallel_puts<T>(q) );
202     g.wait_for_all();
203     NativeParallelFor( num_threads, parallel_gets<T>(q3) );
204     g.wait_for_all();
205     j = bogus_value;
206     CHECK_MESSAGE( q.try_get( j ) == false, "" );
207     CHECK_MESSAGE( j == bogus_value, "" );
208     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
209     CHECK_MESSAGE( j == bogus_value, "" );
210     CHECK_MESSAGE( q3.try_get( j ) == false, "" );
211     CHECK_MESSAGE( j == bogus_value, "" );
212 
213     // test copy constructor
214     CHECK_MESSAGE( remove_successor(q, q2) == true, "" );
215     NativeParallelFor( num_threads, parallel_puts<T>(q) );
216     tbb::flow::priority_queue_node<T> q_copy(q);
217     g.wait_for_all();
218     j = bogus_value;
219     CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
220     CHECK_MESSAGE( register_successor(q, q_copy) == true, "" );
221     for (int i = num_threads*N -1; i>=0; --i) {
222         spin_try_get( q_copy, j );
223         CHECK_MESSAGE(j == i, "");
224         j = bogus_value;
225     }
226     g.wait_for_all();
227     CHECK_MESSAGE( q.try_get( j ) == false, "" );
228     CHECK_MESSAGE( j == bogus_value, "" );
229     CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
230     CHECK_MESSAGE( j == bogus_value, "" );
231 
232     return 0;
233 }
234 
235 //
236 // Tests
237 //
238 // Predecessors cannot be registered
239 // Empty Q rejects item requests
240 // Single serial sender, items in FIFO order
241 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
242 //
243 
244 template< typename T >
test_serial()245 int test_serial() {
246     tbb::flow::graph g;
247     T bogus_value(-1);
248 
249     tbb::flow::priority_queue_node<T> q(g);
250     tbb::flow::priority_queue_node<T> q2(g);
251     T j = bogus_value;
252 
253     //
254     // Rejects attempts to add / remove predecessor
255     // Rejects request from empty Q
256     //
257     CHECK_MESSAGE( register_predecessor(q, q2) == false, "" );
258     CHECK_MESSAGE( remove_predecessor(q, q2) == false, "" );
259     CHECK_MESSAGE( q.try_get( j ) == false, "" );
260     CHECK_MESSAGE( j == bogus_value, "" );
261 
262     //
263     // Simple puts and gets
264     //
265 
266     for (int i = 0; i < N; ++i)
267         CHECK_MESSAGE( q.try_put( T(i) ), "" );
268     for (int i = N-1; i >=0; --i) {
269         j = bogus_value;
270         spin_try_get( q, j );
271         CHECK_MESSAGE( i == j, "" );
272     }
273     j = bogus_value;
274     g.wait_for_all();
275     CHECK_MESSAGE( q.try_get( j ) == false, "" );
276     CHECK_MESSAGE( j == bogus_value, "" );
277 
278     tbb::flow::make_edge( q, q2 );
279 
280     for (int i = 0; i < N; ++i)
281         CHECK_MESSAGE( q.try_put( T(i) ), "" );
282     g.wait_for_all();
283     for (int i = N-1; i >= 0; --i) {
284         j = bogus_value;
285         spin_try_get( q2, j );
286         CHECK_MESSAGE( i == j, "" );
287     }
288     j = bogus_value;
289     g.wait_for_all();
290     CHECK_MESSAGE( q.try_get( j ) == false, "" );
291     g.wait_for_all();
292     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
293     CHECK_MESSAGE( j == bogus_value, "" );
294 
295     tbb::flow::remove_edge( q, q2 );
296     CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
297     g.wait_for_all();
298     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
299     CHECK_MESSAGE( j == bogus_value, "" );
300     g.wait_for_all();
301     CHECK_MESSAGE( q.try_get( j ) == true, "" );
302     CHECK_MESSAGE( j == 1, "" );
303 
304     tbb::flow::priority_queue_node<T> q3(g);
305     tbb::flow::make_edge( q, q2 );
306     tbb::flow::make_edge( q2, q3 );
307 
308     for (int i = 0; i < N; ++i)
309         CHECK_MESSAGE(  q.try_put( T(i) ), "" );
310     g.wait_for_all();
311     for (int i = N-1; i >= 0; --i) {
312         j = bogus_value;
313         spin_try_get( q3, j );
314         CHECK_MESSAGE( i == j, "" );
315     }
316     j = bogus_value;
317     g.wait_for_all();
318     CHECK_MESSAGE( q.try_get( j ) == false, "" );
319     g.wait_for_all();
320     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
321     g.wait_for_all();
322     CHECK_MESSAGE( q3.try_get( j ) == false, "" );
323     CHECK_MESSAGE( j == bogus_value, "" );
324 
325     tbb::flow::remove_edge( q,  q2 );
326     CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
327     g.wait_for_all();
328     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
329     CHECK_MESSAGE( j == bogus_value, "" );
330     g.wait_for_all();
331     CHECK_MESSAGE( q3.try_get( j ) == false, "" );
332     CHECK_MESSAGE( j == bogus_value, "" );
333     g.wait_for_all();
334     CHECK_MESSAGE( q.try_get( j ) == true, "" );
335     CHECK_MESSAGE( j == 1, "" );
336 
337     return 0;
338 }
339 
340 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
341 #include <array>
342 #include <vector>
test_follows_and_precedes_api()343 void test_follows_and_precedes_api() {
344     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
345     std::vector<int> messages_for_precedes = {0, 1, 2};
346 
347     follows_and_precedes_testing::test_follows <int, tbb::flow::priority_queue_node<int>>(messages_for_follows);
348     follows_and_precedes_testing::test_precedes <int, tbb::flow::priority_queue_node<int>>(messages_for_precedes);
349 }
350 #endif
351 
352 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()353 void test_deduction_guides() {
354     using namespace tbb::flow;
355 
356     graph g;
357     broadcast_node<int> br(g);
358     priority_queue_node<int> pq0(g);
359 
360 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
361     using compare_type = std::greater<void>;
362     priority_queue_node pq1(follows(br));
363     static_assert(std::is_same_v<decltype(pq1), priority_queue_node<int>>);
364 
365     priority_queue_node pq2(follows(br), compare_type());
366     static_assert(std::is_same_v<decltype(pq2), priority_queue_node<int, compare_type>>);
367 
368     priority_queue_node pq3(precedes(br));
369     static_assert(std::is_same_v<decltype(pq3), priority_queue_node<int>>);
370 
371     priority_queue_node pq4(precedes(br), compare_type());
372     static_assert(std::is_same_v<decltype(pq4), priority_queue_node<int, compare_type>>);
373 #endif
374 
375     priority_queue_node pq5(pq0);
376     static_assert(std::is_same_v<decltype(pq5), priority_queue_node<int>>);
377     g.wait_for_all();
378 }
379 #endif
380 
381 //! Test serial, parallel behavior and reservation under parallelism
382 //! \brief \ref requirement \ref error_guessing
383 TEST_CASE("Serial, parallel and reservation tests"){
384     for (int p = 2; p <= 4; ++p) {
385         tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p);
386         tbb::task_arena arena(p);
387         arena.execute(
__anonddf9a73e0102() 388             [&]() {
389                 test_serial<int>();
390                 test_reservation<int>(p);
391                 test_reservation<CheckType<int> >(p);
392                 test_parallel<int>(p);
393             }
394         );
395 	}
396 }
397 
398 //! Test reset and cancellation
399 //! \brief \ref error_guessing
400 TEST_CASE("Reset tests"){
401     INFO("Testing resets\n");
402     test_resets<int,tbb::flow::priority_queue_node<int> >();
403     test_resets<float,tbb::flow::priority_queue_node<float> >();
404 }
405 
406 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
407 //! Test follows and precedes API
408 //! \brief \ref error_guessing
409 TEST_CASE("Test follows and precedes API"){
410     test_follows_and_precedes_api();
411 }
412 #endif
413 
414 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
415 //! Test decution guides
416 //! \brief \ref requirement
417 TEST_CASE("Test deduction guides"){
418     test_deduction_guides();
419 }
420 #endif
421 
422