1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION __TBB_CPF_BUILD
18 #define TBB_DEPRECATED_FLOW_NODE_ALLOCATOR __TBB_CPF_BUILD
19 
20 #include "harness.h"
21 #include "harness_graph.h"
22 
23 #include "tbb/task_scheduler_init.h"
24 #include "tbb/tick_count.h"
25 #include "test_follows_and_precedes_api.h"
26 
27 #define N 1000
28 #define C 10
29 
30 template< typename T >
spin_try_get(tbb::flow::buffer_node<T> & b,T & value)31 void spin_try_get( tbb::flow::buffer_node<T> &b, T &value ) {
32     while ( b.try_get(value) != true ) {}
33 }
34 
35 template< typename T >
check_item(T * count_value,T & value)36 void check_item( T* count_value, T &value ) {
37     count_value[value / N] += value % N;
38 }
39 
40 template< typename T >
41 struct parallel_puts : NoAssign {
42 
43     tbb::flow::buffer_node<T> &my_b;
44 
parallel_putsparallel_puts45     parallel_puts( tbb::flow::buffer_node<T> &b ) : my_b(b) {}
46 
operator ()parallel_puts47     void operator()(int i) const {
48         for (int j = 0; j < N; ++j) {
49             bool msg = my_b.try_put( T(N*i + j) );
50             ASSERT( msg == true, NULL );
51         }
52     }
53 };
54 
55 template< typename T >
56 struct touches {
57 
58     bool **my_touches;
59     int my_num_threads;
60 
touchestouches61     touches( int num_threads ) : my_num_threads(num_threads) {
62         my_touches = new bool* [my_num_threads];
63         for ( int p = 0; p < my_num_threads; ++p) {
64             my_touches[p] = new bool[N];
65             for ( int n = 0; n < N; ++n)
66                 my_touches[p][n] = false;
67         }
68     }
69 
~touchestouches70     ~touches() {
71         for ( int p = 0; p < my_num_threads; ++p) {
72             delete [] my_touches[p];
73         }
74         delete [] my_touches;
75     }
76 
checktouches77     bool check( T v ) {
78         ASSERT ( my_touches[v/N][v%N] == false, NULL);
79         my_touches[v/N][v%N] = true;
80         return true;
81     }
82 
validate_touchestouches83     bool validate_touches() {
84         for ( int p = 0; p < my_num_threads; ++p) {
85             for ( int n = 0; n < N; ++n) {
86                 ASSERT ( my_touches[p][n] == true, NULL);
87             }
88         }
89         return true;
90     }
91 };
92 
93 template< typename T >
94 struct parallel_gets : NoAssign {
95 
96     tbb::flow::buffer_node<T> &my_b;
97     touches<T> &my_touches;
98 
parallel_getsparallel_gets99     parallel_gets( tbb::flow::buffer_node<T> &b, touches<T> &t) : my_b(b), my_touches(t) {}
100 
operator ()parallel_gets101     void operator()(int) const {
102         for (int j = 0; j < N; ++j) {
103             T v;
104             spin_try_get( my_b, v );
105             my_touches.check( v );
106         }
107     }
108 
109 };
110 
111 template< typename T >
112 struct parallel_put_get : NoAssign {
113 
114     tbb::flow::buffer_node<T> &my_b;
115     touches<T> &my_touches;
116 
parallel_put_getparallel_put_get117     parallel_put_get( tbb::flow::buffer_node<T> &b, touches<T> &t ) : my_b(b), my_touches(t) {}
118 
operator ()parallel_put_get119     void operator()(int tid) const {
120 
121         for ( int i = 0; i < N; i+=C ) {
122             int j_end = ( N < i + C ) ? N : i + C;
123             // dump about C values into the buffer
124             for ( int j = i; j < j_end; ++j ) {
125                 ASSERT( my_b.try_put( T (N*tid + j ) ) == true, NULL );
126             }
127             // receiver about C values from the buffer
128             for ( int j = i; j < j_end; ++j ) {
129                 T v;
130                 spin_try_get( my_b, v );
131                 my_touches.check( v );
132             }
133         }
134     }
135 
136 };
137 
138 //
139 // Tests
140 //
141 // Item can be reserved, released, consumed ( single serial receiver )
142 //
143 template< typename T >
test_reservation()144 int test_reservation() {
145     tbb::flow::graph g;
146     T bogus_value(-1);
147 
148     // Simple tests
149     tbb::flow::buffer_node<T> b(g);
150 
151     b.try_put(T(1));
152     b.try_put(T(2));
153     b.try_put(T(3));
154 
155     T v, vsum;
156     ASSERT( b.try_reserve(v) == true, NULL );
157     ASSERT( b.try_release() == true, NULL );
158     v = bogus_value;
159     g.wait_for_all();
160     ASSERT( b.try_reserve(v) == true, NULL );
161     ASSERT( b.try_consume() == true, NULL );
162     vsum += v;
163     v = bogus_value;
164     g.wait_for_all();
165 
166     ASSERT( b.try_get(v) == true, NULL );
167     vsum += v;
168     v = bogus_value;
169     g.wait_for_all();
170 
171     ASSERT( b.try_reserve(v) == true, NULL );
172     ASSERT( b.try_release() == true, NULL );
173     v = bogus_value;
174     g.wait_for_all();
175     ASSERT( b.try_reserve(v) == true, NULL );
176     ASSERT( b.try_consume() == true, NULL );
177     vsum += v;
178     ASSERT( vsum == T(6), NULL);
179     v = bogus_value;
180     g.wait_for_all();
181 
182     return 0;
183 }
184 
185 //
186 // Tests
187 //
188 // multiple parallel senders, items in arbitrary order
189 // multiple parallel senders, multiple parallel receivers, items in arbitrary order and all items received
190 //   * overlapped puts / gets
191 //   * all puts finished before any getS
192 //
193 template< typename T >
test_parallel(int num_threads)194 int test_parallel(int num_threads) {
195     tbb::flow::graph g;
196     tbb::flow::buffer_node<T> b(g);
197     tbb::flow::buffer_node<T> b2(g);
198     tbb::flow::buffer_node<T> b3(g);
199     T bogus_value(-1);
200     T j = bogus_value;
201 
202     NativeParallelFor( num_threads, parallel_puts<T>(b) );
203 
204     T *next_value = new T[num_threads];
205     for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
206 
207     for (int i = 0; i < num_threads * N; ++i ) {
208         spin_try_get( b, j );
209         check_item( next_value, j );
210         j = bogus_value;
211     }
212     for (int tid = 0; tid < num_threads; ++tid)  {
213         ASSERT( next_value[tid] == T((N*(N-1))/2), NULL );
214     }
215 
216     j = bogus_value;
217     g.wait_for_all();
218     ASSERT( b.try_get( j ) == false, NULL );
219     ASSERT( j == bogus_value, NULL );
220 
221     NativeParallelFor( num_threads, parallel_puts<T>(b) );
222 
223     {
224         touches< T > t( num_threads );
225         NativeParallelFor( num_threads, parallel_gets<T>(b, t) );
226         g.wait_for_all();
227         ASSERT( t.validate_touches(), NULL );
228     }
229     j = bogus_value;
230     ASSERT( b.try_get( j ) == false, NULL );
231     ASSERT( j == bogus_value, NULL );
232 
233     g.wait_for_all();
234     {
235         touches< T > t( num_threads );
236         NativeParallelFor( num_threads, parallel_put_get<T>(b, t) );
237         g.wait_for_all();
238         ASSERT( t.validate_touches(), NULL );
239     }
240     j = bogus_value;
241     ASSERT( b.try_get( j ) == false, NULL );
242     ASSERT( j == bogus_value, NULL );
243 
244     tbb::flow::make_edge( b, b2 );
245     tbb::flow::make_edge( b2, b3 );
246 
247     NativeParallelFor( num_threads, parallel_puts<T>(b) );
248     {
249         touches< T > t( num_threads );
250         NativeParallelFor( num_threads, parallel_gets<T>(b3, t) );
251         g.wait_for_all();
252         ASSERT( t.validate_touches(), NULL );
253     }
254     j = bogus_value;
255     g.wait_for_all();
256     ASSERT( b.try_get( j ) == false, NULL );
257     g.wait_for_all();
258     ASSERT( b2.try_get( j ) == false, NULL );
259     g.wait_for_all();
260     ASSERT( b3.try_get( j ) == false, NULL );
261     ASSERT( j == bogus_value, NULL );
262 
263     // test copy constructor
264     ASSERT( b.remove_successor( b2 ), NULL );
265     // fill up b:
266     NativeParallelFor( num_threads, parallel_puts<T>(b) );
267     // copy b:
268     tbb::flow::buffer_node<T> b_copy(b);
269 
270     // b_copy should be empty
271     j = bogus_value;
272     g.wait_for_all();
273     ASSERT( b_copy.try_get( j ) == false, NULL );
274 
275     // hook them together:
276     ASSERT( b.register_successor(b_copy) == true, NULL );
277     // try to get content from b_copy
278     {
279         touches< T > t( num_threads );
280         NativeParallelFor( num_threads, parallel_gets<T>(b_copy, t) );
281         g.wait_for_all();
282         ASSERT( t.validate_touches(), NULL );
283     }
284     // now both should be empty
285     j = bogus_value;
286     g.wait_for_all();
287     ASSERT( b.try_get( j ) == false, NULL );
288     g.wait_for_all();
289     ASSERT( b_copy.try_get( j ) == false, NULL );
290     ASSERT( j == bogus_value, NULL );
291 
292     delete [] next_value;
293     return 0;
294 }
295 
296 //
297 // Tests
298 //
299 // Predecessors cannot be registered
300 // Empty buffer rejects item requests
301 // Single serial sender, items in arbitrary order
302 // Chained buffers ( 2 & 3 ), single sender, items at last buffer in arbitrary order
303 //
304 
305 template< typename T >
test_serial()306 int test_serial() {
307     tbb::flow::graph g;
308     T bogus_value(-1);
309 
310     tbb::flow::buffer_node<T> b(g);
311     tbb::flow::buffer_node<T> b2(g);
312     T j = bogus_value;
313 
314     //
315     // Rejects attempts to add / remove predecessor
316     // Rejects request from empty buffer
317     //
318     ASSERT( b.register_predecessor( b2 ) == false, NULL );
319     ASSERT( b.remove_predecessor( b2 ) == false, NULL );
320     ASSERT( b.try_get( j ) == false, NULL );
321     ASSERT( j == bogus_value, NULL );
322 
323     //
324     // Simple puts and gets
325     //
326 
327     for (int i = 0; i < N; ++i) {
328         bool msg = b.try_put( T(i) );
329         ASSERT( msg == true, NULL );
330     }
331 
332     T vsum = T(0);
333     for (int i = 0; i < N; ++i) {
334         j = bogus_value;
335         spin_try_get( b, j );
336         vsum += j;
337     }
338     ASSERT( vsum == (N*(N-1))/2, NULL);
339     j = bogus_value;
340     g.wait_for_all();
341     ASSERT( b.try_get( j ) == false, NULL );
342     ASSERT( j == bogus_value, NULL );
343 
344     tbb::flow::make_edge(b, b2);
345 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
346     ASSERT( b.successor_count() == 1, NULL);
347     ASSERT( b.predecessor_count() == 0, NULL);
348     ASSERT( b2.successor_count() == 0, NULL);
349     ASSERT( b2.predecessor_count() == 1, NULL);
350     typename tbb::flow::buffer_node<T>::successor_list_type my_succs;
351     b.copy_successors(my_succs);
352     ASSERT(my_succs.size() == 1, NULL);
353     typename tbb::flow::buffer_node<T>::predecessor_list_type my_preds;
354     b.copy_predecessors(my_preds);
355     ASSERT(my_preds.size() == 0, NULL);
356 #endif
357 
358     vsum = T(0);
359     for (int i = 0; i < N; ++i) {
360         bool msg = b.try_put( T(i) );
361         ASSERT( msg == true, NULL );
362     }
363 
364     for (int i = 0; i < N; ++i) {
365         j = bogus_value;
366         spin_try_get( b2, j );
367         vsum += j;
368     }
369     ASSERT( vsum == (N*(N-1))/2, NULL);
370     j = bogus_value;
371     g.wait_for_all();
372     ASSERT( b.try_get( j ) == false, NULL );
373     g.wait_for_all();
374     ASSERT( b2.try_get( j ) == false, NULL );
375     ASSERT( j == bogus_value, NULL );
376 
377     tbb::flow::remove_edge(b, b2);
378     ASSERT( b.try_put( 1 ) == true, NULL );
379     g.wait_for_all();
380     ASSERT( b2.try_get( j ) == false, NULL );
381     ASSERT( j == bogus_value, NULL );
382     g.wait_for_all();
383     ASSERT( b.try_get( j ) == true, NULL );
384     ASSERT( j == 1, NULL );
385 
386     tbb::flow::buffer_node<T> b3(g);
387     tbb::flow::make_edge( b, b2 );
388     tbb::flow::make_edge( b2, b3 );
389 
390     vsum = T(0);
391     for (int i = 0; i < N; ++i) {
392         bool msg = b.try_put( T(i) );
393         ASSERT( msg == true, NULL );
394     }
395 
396     for (int i = 0; i < N; ++i) {
397         j = bogus_value;
398         spin_try_get( b3, j );
399         vsum += j;
400     }
401     ASSERT( vsum == (N*(N-1))/2, NULL);
402     j = bogus_value;
403     g.wait_for_all();
404     ASSERT( b.try_get( j ) == false, NULL );
405     g.wait_for_all();
406     ASSERT( b2.try_get( j ) == false, NULL );
407     g.wait_for_all();
408     ASSERT( b3.try_get( j ) == false, NULL );
409     ASSERT( j == bogus_value, NULL );
410 
411     tbb::flow::remove_edge(b, b2);
412     ASSERT( b.try_put( 1 ) == true, NULL );
413     g.wait_for_all();
414     ASSERT( b2.try_get( j ) == false, NULL );
415     ASSERT( j == bogus_value, NULL );
416     g.wait_for_all();
417     ASSERT( b3.try_get( j ) == false, NULL );
418     ASSERT( j == bogus_value, NULL );
419     g.wait_for_all();
420     ASSERT( b.try_get( j ) == true, NULL );
421     ASSERT( j == 1, NULL );
422 
423     return 0;
424 }
425 
426 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
427 #include <array>
428 #include <vector>
test_follow_and_precedes_api()429 void test_follow_and_precedes_api() {
430     using msg_t = tbb::flow::continue_msg;
431 
432     std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
433     std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()};
434 
435     follows_and_precedes_testing::test_follows<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_follows);
436     follows_and_precedes_testing::test_precedes<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_precedes);
437 }
438 #endif
439 
440 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()441 void test_deduction_guides() {
442     using namespace tbb::flow;
443     graph g;
444     broadcast_node<int> br(g);
445     buffer_node<int> b0(g);
446 
447 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
448     buffer_node b1(follows(br));
449     static_assert(std::is_same_v<decltype(b1), buffer_node<int>>);
450 
451     buffer_node b2(precedes(br));
452     static_assert(std::is_same_v<decltype(b2), buffer_node<int>>);
453 #endif
454 
455     buffer_node b3(b0);
456     static_assert(std::is_same_v<decltype(b3), buffer_node<int>>);
457     g.wait_for_all();
458 }
459 #endif
460 
461 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
test_node_allocator()462 void test_node_allocator() {
463     tbb::flow::graph g;
464     tbb::flow::buffer_node< int, std::allocator<int> > tmp(g);
465 }
466 #endif
467 
TestMain()468 int TestMain() {
469     tbb::tick_count start = tbb::tick_count::now(), stop;
470     for (int p = 2; p <= 4; ++p) {
471         tbb::task_scheduler_init init(p);
472         test_serial<int>();
473         test_parallel<int>(p);
474     }
475     stop = tbb::tick_count::now();
476     REMARK("Buffer_Node Time=%6.6f\n", (stop-start).seconds());
477     test_resets<int,tbb::flow::buffer_node<int> >();
478     test_resets<float,tbb::flow::buffer_node<float> >();
479 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
480     test_follow_and_precedes_api();
481 #endif
482 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
483     test_deduction_guides();
484 #endif
485 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
486     test_buffer_extract<tbb::flow::buffer_node<int> >().run_tests();
487 #endif
488 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
489     test_node_allocator();
490 #endif
491     return Harness::Done;
492 }
493