1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION __TBB_CPF_BUILD
18 #define TBB_DEPRECATED_FLOW_NODE_ALLOCATOR __TBB_CPF_BUILD
19
20 #include "harness.h"
21 #include "harness_graph.h"
22
23 #include "tbb/task_scheduler_init.h"
24 #include "tbb/tick_count.h"
25 #include "test_follows_and_precedes_api.h"
26
27 #define N 1000
28 #define C 10
29
30 template< typename T >
spin_try_get(tbb::flow::buffer_node<T> & b,T & value)31 void spin_try_get( tbb::flow::buffer_node<T> &b, T &value ) {
32 while ( b.try_get(value) != true ) {}
33 }
34
35 template< typename T >
check_item(T * count_value,T & value)36 void check_item( T* count_value, T &value ) {
37 count_value[value / N] += value % N;
38 }
39
40 template< typename T >
41 struct parallel_puts : NoAssign {
42
43 tbb::flow::buffer_node<T> &my_b;
44
parallel_putsparallel_puts45 parallel_puts( tbb::flow::buffer_node<T> &b ) : my_b(b) {}
46
operator ()parallel_puts47 void operator()(int i) const {
48 for (int j = 0; j < N; ++j) {
49 bool msg = my_b.try_put( T(N*i + j) );
50 ASSERT( msg == true, NULL );
51 }
52 }
53 };
54
55 template< typename T >
56 struct touches {
57
58 bool **my_touches;
59 int my_num_threads;
60
touchestouches61 touches( int num_threads ) : my_num_threads(num_threads) {
62 my_touches = new bool* [my_num_threads];
63 for ( int p = 0; p < my_num_threads; ++p) {
64 my_touches[p] = new bool[N];
65 for ( int n = 0; n < N; ++n)
66 my_touches[p][n] = false;
67 }
68 }
69
~touchestouches70 ~touches() {
71 for ( int p = 0; p < my_num_threads; ++p) {
72 delete [] my_touches[p];
73 }
74 delete [] my_touches;
75 }
76
checktouches77 bool check( T v ) {
78 ASSERT ( my_touches[v/N][v%N] == false, NULL);
79 my_touches[v/N][v%N] = true;
80 return true;
81 }
82
validate_touchestouches83 bool validate_touches() {
84 for ( int p = 0; p < my_num_threads; ++p) {
85 for ( int n = 0; n < N; ++n) {
86 ASSERT ( my_touches[p][n] == true, NULL);
87 }
88 }
89 return true;
90 }
91 };
92
93 template< typename T >
94 struct parallel_gets : NoAssign {
95
96 tbb::flow::buffer_node<T> &my_b;
97 touches<T> &my_touches;
98
parallel_getsparallel_gets99 parallel_gets( tbb::flow::buffer_node<T> &b, touches<T> &t) : my_b(b), my_touches(t) {}
100
operator ()parallel_gets101 void operator()(int) const {
102 for (int j = 0; j < N; ++j) {
103 T v;
104 spin_try_get( my_b, v );
105 my_touches.check( v );
106 }
107 }
108
109 };
110
111 template< typename T >
112 struct parallel_put_get : NoAssign {
113
114 tbb::flow::buffer_node<T> &my_b;
115 touches<T> &my_touches;
116
parallel_put_getparallel_put_get117 parallel_put_get( tbb::flow::buffer_node<T> &b, touches<T> &t ) : my_b(b), my_touches(t) {}
118
operator ()parallel_put_get119 void operator()(int tid) const {
120
121 for ( int i = 0; i < N; i+=C ) {
122 int j_end = ( N < i + C ) ? N : i + C;
123 // dump about C values into the buffer
124 for ( int j = i; j < j_end; ++j ) {
125 ASSERT( my_b.try_put( T (N*tid + j ) ) == true, NULL );
126 }
127 // receiver about C values from the buffer
128 for ( int j = i; j < j_end; ++j ) {
129 T v;
130 spin_try_get( my_b, v );
131 my_touches.check( v );
132 }
133 }
134 }
135
136 };
137
138 //
139 // Tests
140 //
141 // Item can be reserved, released, consumed ( single serial receiver )
142 //
143 template< typename T >
test_reservation()144 int test_reservation() {
145 tbb::flow::graph g;
146 T bogus_value(-1);
147
148 // Simple tests
149 tbb::flow::buffer_node<T> b(g);
150
151 b.try_put(T(1));
152 b.try_put(T(2));
153 b.try_put(T(3));
154
155 T v, vsum;
156 ASSERT( b.try_reserve(v) == true, NULL );
157 ASSERT( b.try_release() == true, NULL );
158 v = bogus_value;
159 g.wait_for_all();
160 ASSERT( b.try_reserve(v) == true, NULL );
161 ASSERT( b.try_consume() == true, NULL );
162 vsum += v;
163 v = bogus_value;
164 g.wait_for_all();
165
166 ASSERT( b.try_get(v) == true, NULL );
167 vsum += v;
168 v = bogus_value;
169 g.wait_for_all();
170
171 ASSERT( b.try_reserve(v) == true, NULL );
172 ASSERT( b.try_release() == true, NULL );
173 v = bogus_value;
174 g.wait_for_all();
175 ASSERT( b.try_reserve(v) == true, NULL );
176 ASSERT( b.try_consume() == true, NULL );
177 vsum += v;
178 ASSERT( vsum == T(6), NULL);
179 v = bogus_value;
180 g.wait_for_all();
181
182 return 0;
183 }
184
185 //
186 // Tests
187 //
188 // multiple parallel senders, items in arbitrary order
189 // multiple parallel senders, multiple parallel receivers, items in arbitrary order and all items received
190 // * overlapped puts / gets
191 // * all puts finished before any getS
192 //
193 template< typename T >
test_parallel(int num_threads)194 int test_parallel(int num_threads) {
195 tbb::flow::graph g;
196 tbb::flow::buffer_node<T> b(g);
197 tbb::flow::buffer_node<T> b2(g);
198 tbb::flow::buffer_node<T> b3(g);
199 T bogus_value(-1);
200 T j = bogus_value;
201
202 NativeParallelFor( num_threads, parallel_puts<T>(b) );
203
204 T *next_value = new T[num_threads];
205 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
206
207 for (int i = 0; i < num_threads * N; ++i ) {
208 spin_try_get( b, j );
209 check_item( next_value, j );
210 j = bogus_value;
211 }
212 for (int tid = 0; tid < num_threads; ++tid) {
213 ASSERT( next_value[tid] == T((N*(N-1))/2), NULL );
214 }
215
216 j = bogus_value;
217 g.wait_for_all();
218 ASSERT( b.try_get( j ) == false, NULL );
219 ASSERT( j == bogus_value, NULL );
220
221 NativeParallelFor( num_threads, parallel_puts<T>(b) );
222
223 {
224 touches< T > t( num_threads );
225 NativeParallelFor( num_threads, parallel_gets<T>(b, t) );
226 g.wait_for_all();
227 ASSERT( t.validate_touches(), NULL );
228 }
229 j = bogus_value;
230 ASSERT( b.try_get( j ) == false, NULL );
231 ASSERT( j == bogus_value, NULL );
232
233 g.wait_for_all();
234 {
235 touches< T > t( num_threads );
236 NativeParallelFor( num_threads, parallel_put_get<T>(b, t) );
237 g.wait_for_all();
238 ASSERT( t.validate_touches(), NULL );
239 }
240 j = bogus_value;
241 ASSERT( b.try_get( j ) == false, NULL );
242 ASSERT( j == bogus_value, NULL );
243
244 tbb::flow::make_edge( b, b2 );
245 tbb::flow::make_edge( b2, b3 );
246
247 NativeParallelFor( num_threads, parallel_puts<T>(b) );
248 {
249 touches< T > t( num_threads );
250 NativeParallelFor( num_threads, parallel_gets<T>(b3, t) );
251 g.wait_for_all();
252 ASSERT( t.validate_touches(), NULL );
253 }
254 j = bogus_value;
255 g.wait_for_all();
256 ASSERT( b.try_get( j ) == false, NULL );
257 g.wait_for_all();
258 ASSERT( b2.try_get( j ) == false, NULL );
259 g.wait_for_all();
260 ASSERT( b3.try_get( j ) == false, NULL );
261 ASSERT( j == bogus_value, NULL );
262
263 // test copy constructor
264 ASSERT( b.remove_successor( b2 ), NULL );
265 // fill up b:
266 NativeParallelFor( num_threads, parallel_puts<T>(b) );
267 // copy b:
268 tbb::flow::buffer_node<T> b_copy(b);
269
270 // b_copy should be empty
271 j = bogus_value;
272 g.wait_for_all();
273 ASSERT( b_copy.try_get( j ) == false, NULL );
274
275 // hook them together:
276 ASSERT( b.register_successor(b_copy) == true, NULL );
277 // try to get content from b_copy
278 {
279 touches< T > t( num_threads );
280 NativeParallelFor( num_threads, parallel_gets<T>(b_copy, t) );
281 g.wait_for_all();
282 ASSERT( t.validate_touches(), NULL );
283 }
284 // now both should be empty
285 j = bogus_value;
286 g.wait_for_all();
287 ASSERT( b.try_get( j ) == false, NULL );
288 g.wait_for_all();
289 ASSERT( b_copy.try_get( j ) == false, NULL );
290 ASSERT( j == bogus_value, NULL );
291
292 delete [] next_value;
293 return 0;
294 }
295
296 //
297 // Tests
298 //
299 // Predecessors cannot be registered
300 // Empty buffer rejects item requests
301 // Single serial sender, items in arbitrary order
302 // Chained buffers ( 2 & 3 ), single sender, items at last buffer in arbitrary order
303 //
304
305 template< typename T >
test_serial()306 int test_serial() {
307 tbb::flow::graph g;
308 T bogus_value(-1);
309
310 tbb::flow::buffer_node<T> b(g);
311 tbb::flow::buffer_node<T> b2(g);
312 T j = bogus_value;
313
314 //
315 // Rejects attempts to add / remove predecessor
316 // Rejects request from empty buffer
317 //
318 ASSERT( b.register_predecessor( b2 ) == false, NULL );
319 ASSERT( b.remove_predecessor( b2 ) == false, NULL );
320 ASSERT( b.try_get( j ) == false, NULL );
321 ASSERT( j == bogus_value, NULL );
322
323 //
324 // Simple puts and gets
325 //
326
327 for (int i = 0; i < N; ++i) {
328 bool msg = b.try_put( T(i) );
329 ASSERT( msg == true, NULL );
330 }
331
332 T vsum = T(0);
333 for (int i = 0; i < N; ++i) {
334 j = bogus_value;
335 spin_try_get( b, j );
336 vsum += j;
337 }
338 ASSERT( vsum == (N*(N-1))/2, NULL);
339 j = bogus_value;
340 g.wait_for_all();
341 ASSERT( b.try_get( j ) == false, NULL );
342 ASSERT( j == bogus_value, NULL );
343
344 tbb::flow::make_edge(b, b2);
345 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
346 ASSERT( b.successor_count() == 1, NULL);
347 ASSERT( b.predecessor_count() == 0, NULL);
348 ASSERT( b2.successor_count() == 0, NULL);
349 ASSERT( b2.predecessor_count() == 1, NULL);
350 typename tbb::flow::buffer_node<T>::successor_list_type my_succs;
351 b.copy_successors(my_succs);
352 ASSERT(my_succs.size() == 1, NULL);
353 typename tbb::flow::buffer_node<T>::predecessor_list_type my_preds;
354 b.copy_predecessors(my_preds);
355 ASSERT(my_preds.size() == 0, NULL);
356 #endif
357
358 vsum = T(0);
359 for (int i = 0; i < N; ++i) {
360 bool msg = b.try_put( T(i) );
361 ASSERT( msg == true, NULL );
362 }
363
364 for (int i = 0; i < N; ++i) {
365 j = bogus_value;
366 spin_try_get( b2, j );
367 vsum += j;
368 }
369 ASSERT( vsum == (N*(N-1))/2, NULL);
370 j = bogus_value;
371 g.wait_for_all();
372 ASSERT( b.try_get( j ) == false, NULL );
373 g.wait_for_all();
374 ASSERT( b2.try_get( j ) == false, NULL );
375 ASSERT( j == bogus_value, NULL );
376
377 tbb::flow::remove_edge(b, b2);
378 ASSERT( b.try_put( 1 ) == true, NULL );
379 g.wait_for_all();
380 ASSERT( b2.try_get( j ) == false, NULL );
381 ASSERT( j == bogus_value, NULL );
382 g.wait_for_all();
383 ASSERT( b.try_get( j ) == true, NULL );
384 ASSERT( j == 1, NULL );
385
386 tbb::flow::buffer_node<T> b3(g);
387 tbb::flow::make_edge( b, b2 );
388 tbb::flow::make_edge( b2, b3 );
389
390 vsum = T(0);
391 for (int i = 0; i < N; ++i) {
392 bool msg = b.try_put( T(i) );
393 ASSERT( msg == true, NULL );
394 }
395
396 for (int i = 0; i < N; ++i) {
397 j = bogus_value;
398 spin_try_get( b3, j );
399 vsum += j;
400 }
401 ASSERT( vsum == (N*(N-1))/2, NULL);
402 j = bogus_value;
403 g.wait_for_all();
404 ASSERT( b.try_get( j ) == false, NULL );
405 g.wait_for_all();
406 ASSERT( b2.try_get( j ) == false, NULL );
407 g.wait_for_all();
408 ASSERT( b3.try_get( j ) == false, NULL );
409 ASSERT( j == bogus_value, NULL );
410
411 tbb::flow::remove_edge(b, b2);
412 ASSERT( b.try_put( 1 ) == true, NULL );
413 g.wait_for_all();
414 ASSERT( b2.try_get( j ) == false, NULL );
415 ASSERT( j == bogus_value, NULL );
416 g.wait_for_all();
417 ASSERT( b3.try_get( j ) == false, NULL );
418 ASSERT( j == bogus_value, NULL );
419 g.wait_for_all();
420 ASSERT( b.try_get( j ) == true, NULL );
421 ASSERT( j == 1, NULL );
422
423 return 0;
424 }
425
426 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
427 #include <array>
428 #include <vector>
test_follow_and_precedes_api()429 void test_follow_and_precedes_api() {
430 using msg_t = tbb::flow::continue_msg;
431
432 std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
433 std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()};
434
435 follows_and_precedes_testing::test_follows<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_follows);
436 follows_and_precedes_testing::test_precedes<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_precedes);
437 }
438 #endif
439
440 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()441 void test_deduction_guides() {
442 using namespace tbb::flow;
443 graph g;
444 broadcast_node<int> br(g);
445 buffer_node<int> b0(g);
446
447 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
448 buffer_node b1(follows(br));
449 static_assert(std::is_same_v<decltype(b1), buffer_node<int>>);
450
451 buffer_node b2(precedes(br));
452 static_assert(std::is_same_v<decltype(b2), buffer_node<int>>);
453 #endif
454
455 buffer_node b3(b0);
456 static_assert(std::is_same_v<decltype(b3), buffer_node<int>>);
457 g.wait_for_all();
458 }
459 #endif
460
461 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
test_node_allocator()462 void test_node_allocator() {
463 tbb::flow::graph g;
464 tbb::flow::buffer_node< int, std::allocator<int> > tmp(g);
465 }
466 #endif
467
TestMain()468 int TestMain() {
469 tbb::tick_count start = tbb::tick_count::now(), stop;
470 for (int p = 2; p <= 4; ++p) {
471 tbb::task_scheduler_init init(p);
472 test_serial<int>();
473 test_parallel<int>(p);
474 }
475 stop = tbb::tick_count::now();
476 REMARK("Buffer_Node Time=%6.6f\n", (stop-start).seconds());
477 test_resets<int,tbb::flow::buffer_node<int> >();
478 test_resets<float,tbb::flow::buffer_node<float> >();
479 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
480 test_follow_and_precedes_api();
481 #endif
482 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
483 test_deduction_guides();
484 #endif
485 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
486 test_buffer_extract<tbb::flow::buffer_node<int> >().run_tests();
487 #endif
488 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
489 test_node_allocator();
490 #endif
491 return Harness::Done;
492 }
493