1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION __TBB_CPF_BUILD
18 #define TBB_DEPRECATED_FLOW_NODE_ALLOCATOR __TBB_CPF_BUILD
19 
20 #include "harness.h"
21 #include "harness_graph.h"
22 
23 #include "tbb/flow_graph.h"
24 #include "tbb/task_scheduler_init.h"
25 #include "tbb/tick_count.h"
26 #include "tbb/atomic.h"
27 #include "test_follows_and_precedes_api.h"
28 
29 #include <cstdio>
30 
31 #define N 1000
32 #define C 10
33 
34 template< typename T >
35 struct seq_inspector {
operator ()seq_inspector36     size_t operator()(const T &v) const { return size_t(v); }
37 };
38 
39 template< typename T >
wait_try_get(tbb::flow::graph & g,tbb::flow::sequencer_node<T> & q,T & value)40 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) {
41     g.wait_for_all();
42     return q.try_get(value);
43 }
44 
45 template< typename T >
spin_try_get(tbb::flow::queue_node<T> & q,T & value)46 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
47     while ( q.try_get(value) != true ) ;
48 }
49 
50 template< typename T >
51 struct parallel_puts : NoAssign {
52 
53     tbb::flow::sequencer_node<T> &my_q;
54     int my_num_threads;
55 
parallel_putsparallel_puts56     parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {}
57 
operator ()parallel_puts58     void operator()(int tid) const {
59         for (int j = tid; j < N; j+=my_num_threads) {
60             bool msg = my_q.try_put( T(j) );
61             ASSERT( msg == true, NULL );
62         }
63     }
64 
65 };
66 
67 template< typename T >
68 struct touches {
69 
70     bool **my_touches;
71     T *my_last_touch;
72     int my_num_threads;
73 
touchestouches74     touches( int num_threads ) : my_num_threads(num_threads) {
75         my_last_touch = new T[my_num_threads];
76         my_touches = new bool* [my_num_threads];
77         for ( int p = 0; p < my_num_threads; ++p) {
78             my_last_touch[p] = T(-1);
79             my_touches[p] = new bool[N];
80             for ( int n = 0; n < N; ++n)
81                 my_touches[p][n] = false;
82         }
83     }
84 
~touchestouches85     ~touches() {
86         for ( int p = 0; p < my_num_threads; ++p) {
87             delete [] my_touches[p];
88         }
89         delete [] my_touches;
90         delete [] my_last_touch;
91     }
92 
checktouches93     bool check( int tid, T v ) {
94         if ( my_touches[tid][v] != false ) {
95             printf("Error: value seen twice by local thread\n");
96             return false;
97         }
98         if ( v <= my_last_touch[tid] ) {
99             printf("Error: value seen in wrong order by local thread\n");
100             return false;
101         }
102         my_last_touch[tid] = v;
103         my_touches[tid][v] = true;
104         return true;
105     }
106 
validate_touchestouches107     bool validate_touches() {
108         bool *all_touches = new bool[N];
109         for ( int n = 0; n < N; ++n)
110             all_touches[n] = false;
111 
112         for ( int p = 0; p < my_num_threads; ++p) {
113             for ( int n = 0; n < N; ++n) {
114                 if ( my_touches[p][n] == true ) {
115                     ASSERT( all_touches[n] == false, "value see by more than one thread\n" );
116                     all_touches[n] = true;
117                 }
118             }
119         }
120         for ( int n = 0; n < N; ++n) {
121             if ( !all_touches[n] )
122                 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
123             //ASSERT( all_touches[n] == true, "value not seen by any thread\n" );
124         }
125         delete [] all_touches;
126         return true;
127     }
128 
129 };
130 
131 template< typename T >
132 struct parallel_gets : NoAssign {
133 
134     tbb::flow::sequencer_node<T> &my_q;
135     int my_num_threads;
136     touches<T> &my_touches;
137 
parallel_getsparallel_gets138     parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {}
139 
operator ()parallel_gets140     void operator()(int tid) const {
141         for (int j = tid; j < N; j+=my_num_threads) {
142             T v;
143             spin_try_get( my_q, v );
144             my_touches.check( tid, v );
145         }
146     }
147 
148 };
149 
150 template< typename T >
151 struct parallel_put_get : NoAssign {
152 
153     tbb::flow::sequencer_node<T> &my_s1;
154     tbb::flow::sequencer_node<T> &my_s2;
155     int my_num_threads;
156     tbb::atomic< int > &my_counter;
157     touches<T> &my_touches;
158 
parallel_put_getparallel_put_get159     parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads,
160                       tbb::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {}
161 
operator ()parallel_put_get162     void operator()(int tid) const {
163         int i_start = 0;
164 
165         while ( (i_start = my_counter.fetch_and_add(C)) < N ) {
166             int i_end = ( N < i_start + C ) ? N : i_start + C;
167             for (int i = i_start; i < i_end; ++i) {
168                 bool msg = my_s1.try_put( T(i) );
169                 ASSERT( msg == true, NULL );
170             }
171 
172             for (int i = i_start; i < i_end; ++i) {
173                 T v;
174                 spin_try_get( my_s2, v );
175                 my_touches.check( tid, v );
176             }
177         }
178     }
179 
180 };
181 
182 //
183 // Tests
184 //
185 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
186 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
187 //
188 
189 template< typename T >
test_parallel(int num_threads)190 int test_parallel(int num_threads) {
191     tbb::flow::graph g;
192 
193     tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
194     NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) );
195     {
196         touches<T> t( num_threads );
197         NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) );
198         g.wait_for_all();
199         ASSERT( t.validate_touches(), NULL );
200     }
201     T bogus_value(-1);
202     T j = bogus_value;
203     ASSERT( s.try_get( j ) == false, NULL );
204     ASSERT( j == bogus_value, NULL );
205     g.wait_for_all();
206 
207     tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>());
208     tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
209     tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
210     tbb::flow::make_edge( s1, s2 );
211     tbb::flow::make_edge( s2, s3 );
212 
213     {
214         touches<T> t( num_threads );
215         tbb::atomic<int> counter;
216         counter = 0;
217         NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) );
218         g.wait_for_all();
219         t.validate_touches();
220     }
221     g.wait_for_all();
222     ASSERT( s1.try_get( j ) == false, NULL );
223     g.wait_for_all();
224     ASSERT( s2.try_get( j ) == false, NULL );
225     g.wait_for_all();
226     ASSERT( s3.try_get( j ) == false, NULL );
227     ASSERT( j == bogus_value, NULL );
228 
229     // test copy constructor
230     tbb::flow::sequencer_node<T> s_copy(s);
231     NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) );
232     for (int i = 0; i < N; ++i) {
233         j = bogus_value;
234         spin_try_get( s_copy, j );
235         ASSERT( i == j, NULL );
236     }
237     j = bogus_value;
238     g.wait_for_all();
239     ASSERT( s_copy.try_get( j ) == false, NULL );
240     ASSERT( j == bogus_value, NULL );
241 
242     return 0;
243 }
244 
245 
246 //
247 // Tests
248 //
249 // No predecessors can be registered
250 // Request from empty buffer fails
251 // In-order puts, single sender, single receiver, properly sequenced at output
252 // Reverse-order puts, single sender, single receiver, properly sequenced at output
253 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output
254 //
255 
256 template< typename T >
test_serial()257 int test_serial() {
258     tbb::flow::graph g;
259     T bogus_value(-1);
260 
261     tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
262     tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
263     T j = bogus_value;
264 
265     //
266     // Rejects attempts to add / remove predecessor
267     // Rejects request from empty Q
268     //
269     ASSERT( s.register_predecessor( s2 ) == false, NULL );
270     ASSERT( s.remove_predecessor( s2 ) == false, NULL );
271     ASSERT( s.try_get( j ) == false, NULL );
272     ASSERT( j == bogus_value, NULL );
273 
274     //
275     // In-order simple puts and gets
276     //
277 
278     for (int i = 0; i < N; ++i) {
279         bool msg = s.try_put( T(i) );
280         ASSERT( msg == true, NULL );
281         ASSERT(!s.try_put( T(i) ), NULL);  // second attempt to put should reject
282     }
283 
284 
285     for (int i = 0; i < N; ++i) {
286         j = bogus_value;
287         ASSERT(wait_try_get( g, s, j ) == true, NULL);
288         ASSERT( i == j, NULL );
289         ASSERT(!s.try_put( T(i) ),NULL );  // after retrieving value, subsequent put should fail
290     }
291     j = bogus_value;
292     g.wait_for_all();
293     ASSERT( s.try_get( j ) == false, NULL );
294     ASSERT( j == bogus_value, NULL );
295 
296     //
297     // Reverse-order simple puts and gets
298     //
299 
300     for (int i = N-1; i >= 0; --i) {
301         bool msg = s2.try_put( T(i) );
302         ASSERT( msg == true, NULL );
303     }
304 
305     for (int i = 0; i < N; ++i) {
306         j = bogus_value;
307         ASSERT(wait_try_get( g, s2, j ) == true, NULL);
308         ASSERT( i == j, NULL );
309     }
310     j = bogus_value;
311     g.wait_for_all();
312     ASSERT( s2.try_get( j ) == false, NULL );
313     ASSERT( j == bogus_value, NULL );
314 
315     //
316     // Chained in-order simple puts and gets
317     //
318 
319     tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
320     tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>());
321     tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>());
322     tbb::flow::make_edge( s3, s4 );
323     tbb::flow::make_edge( s4, s5 );
324 
325     for (int i = 0; i < N; ++i) {
326         bool msg = s3.try_put( T(i) );
327         ASSERT( msg == true, NULL );
328     }
329 
330     for (int i = 0; i < N; ++i) {
331         j = bogus_value;
332         ASSERT(wait_try_get( g, s5, j ) == true, NULL);
333         ASSERT( i == j, NULL );
334     }
335     j = bogus_value;
336     ASSERT( wait_try_get( g, s3, j ) == false, NULL );
337     ASSERT( wait_try_get( g, s4, j ) == false, NULL );
338     ASSERT( wait_try_get( g, s5, j ) == false, NULL );
339     ASSERT( j == bogus_value, NULL );
340 
341     g.wait_for_all();
342     tbb::flow::remove_edge( s3, s4 );
343     ASSERT( s3.try_put( N ) == true, NULL );
344     ASSERT( wait_try_get( g, s4, j ) == false, NULL );
345     ASSERT( j == bogus_value, NULL );
346     ASSERT( wait_try_get( g, s5, j ) == false, NULL );
347     ASSERT( j == bogus_value, NULL );
348     ASSERT( wait_try_get( g, s3, j ) == true, NULL );
349     ASSERT( j == N, NULL );
350 
351     //
352     // Chained reverse-order simple puts and gets
353     //
354 
355     tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>());
356     tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>());
357     tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>());
358     tbb::flow::make_edge( s6, s7 );
359     tbb::flow::make_edge( s7, s8 );
360 
361     for (int i = N-1; i >= 0; --i) {
362         bool msg = s6.try_put( T(i) );
363         ASSERT( msg == true, NULL );
364     }
365 
366     for (int i = 0; i < N; ++i) {
367         j = bogus_value;
368         ASSERT( wait_try_get( g, s8, j ) == true, NULL );
369         ASSERT( i == j, NULL );
370     }
371     j = bogus_value;
372     ASSERT( wait_try_get( g, s6, j ) == false, NULL );
373     ASSERT( wait_try_get( g, s7, j ) == false, NULL );
374     ASSERT( wait_try_get( g, s8, j ) == false, NULL );
375     ASSERT( j == bogus_value, NULL );
376 
377     g.wait_for_all();
378     tbb::flow::remove_edge( s6, s7 );
379     ASSERT( s6.try_put( N ) == true, NULL );
380     ASSERT( wait_try_get( g, s7, j ) == false, NULL );
381     ASSERT( j == bogus_value, NULL );
382     ASSERT( wait_try_get( g, s8, j ) == false, NULL );
383     ASSERT( j == bogus_value, NULL );
384     ASSERT( wait_try_get( g, s6, j ) == true, NULL );
385     ASSERT( j == N, NULL );
386 
387     return 0;
388 }
389 
390 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
391 #include <array>
392 #include <vector>
test_follows_and_precedes_api()393 void test_follows_and_precedes_api() {
394     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
395     std::vector<int> messages_for_precedes = {0, 1, 2};
396 
397     follows_and_precedes_testing::test_follows
398         <int, tbb::flow::sequencer_node<int>>
399         (messages_for_follows, [](const int& i) { return i; });
400 
401     follows_and_precedes_testing::test_precedes
402         <int, tbb::flow::sequencer_node<int>>
403         (messages_for_precedes, [](const int& i) { return i; });
404 }
405 #endif
406 
407 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
408 template <typename Body>
test_deduction_guides_common(Body body)409 void test_deduction_guides_common(Body body) {
410     using namespace tbb::flow;
411     graph g;
412     broadcast_node<int> br(g);
413 
414     sequencer_node s1(g, body);
415     static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>);
416 
417 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
418     sequencer_node s2(follows(br), body);
419     static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>);
420 #endif
421 
422     sequencer_node s3(s1);
423     static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>);
424 }
425 
sequencer_body_f(const int &)426 int sequencer_body_f(const int&) { return 1; }
427 
test_deduction_guides()428 void test_deduction_guides() {
429     test_deduction_guides_common([](const int&)->int { return 1; });
430     test_deduction_guides_common([](const int&) mutable ->int { return 1; });
431     test_deduction_guides_common(sequencer_body_f);
432 }
433 #endif
434 
435 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
test_node_allocator()436 void test_node_allocator() {
437     tbb::flow::graph g;
438     tbb::flow::sequencer_node< int, std::allocator<int> > tmp(g, seq_inspector<int>());
439 }
440 #endif
441 
TestMain()442 int TestMain() {
443     tbb::tick_count start = tbb::tick_count::now(), stop;
444     for (int p = 2; p <= 4; ++p) {
445         tbb::task_scheduler_init init(p);
446         test_serial<int>();
447         test_parallel<int>(p);
448     }
449 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
450     test_follows_and_precedes_api();
451 #endif
452 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
453     test_deduction_guides();
454 #endif
455 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
456     test_buffer_extract<tbb::flow::sequencer_node<int> >().run_tests();
457 #endif
458 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
459     test_node_allocator();
460 #endif
461     stop = tbb::tick_count::now();
462     REMARK("Sequencer_Node Time=%6.6f\n", (stop-start).seconds());
463     return Harness::Done;
464 }
465