1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION __TBB_CPF_BUILD
18 #define TBB_DEPRECATED_FLOW_NODE_ALLOCATOR __TBB_CPF_BUILD
19
20 #include "harness.h"
21 #include "harness_graph.h"
22
23 #include "tbb/flow_graph.h"
24 #include "tbb/task_scheduler_init.h"
25 #include "tbb/tick_count.h"
26 #include "tbb/atomic.h"
27 #include "test_follows_and_precedes_api.h"
28
29 #include <cstdio>
30
31 #define N 1000
32 #define C 10
33
34 template< typename T >
35 struct seq_inspector {
operator ()seq_inspector36 size_t operator()(const T &v) const { return size_t(v); }
37 };
38
39 template< typename T >
wait_try_get(tbb::flow::graph & g,tbb::flow::sequencer_node<T> & q,T & value)40 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) {
41 g.wait_for_all();
42 return q.try_get(value);
43 }
44
45 template< typename T >
spin_try_get(tbb::flow::queue_node<T> & q,T & value)46 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
47 while ( q.try_get(value) != true ) ;
48 }
49
50 template< typename T >
51 struct parallel_puts : NoAssign {
52
53 tbb::flow::sequencer_node<T> &my_q;
54 int my_num_threads;
55
parallel_putsparallel_puts56 parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {}
57
operator ()parallel_puts58 void operator()(int tid) const {
59 for (int j = tid; j < N; j+=my_num_threads) {
60 bool msg = my_q.try_put( T(j) );
61 ASSERT( msg == true, NULL );
62 }
63 }
64
65 };
66
67 template< typename T >
68 struct touches {
69
70 bool **my_touches;
71 T *my_last_touch;
72 int my_num_threads;
73
touchestouches74 touches( int num_threads ) : my_num_threads(num_threads) {
75 my_last_touch = new T[my_num_threads];
76 my_touches = new bool* [my_num_threads];
77 for ( int p = 0; p < my_num_threads; ++p) {
78 my_last_touch[p] = T(-1);
79 my_touches[p] = new bool[N];
80 for ( int n = 0; n < N; ++n)
81 my_touches[p][n] = false;
82 }
83 }
84
~touchestouches85 ~touches() {
86 for ( int p = 0; p < my_num_threads; ++p) {
87 delete [] my_touches[p];
88 }
89 delete [] my_touches;
90 delete [] my_last_touch;
91 }
92
checktouches93 bool check( int tid, T v ) {
94 if ( my_touches[tid][v] != false ) {
95 printf("Error: value seen twice by local thread\n");
96 return false;
97 }
98 if ( v <= my_last_touch[tid] ) {
99 printf("Error: value seen in wrong order by local thread\n");
100 return false;
101 }
102 my_last_touch[tid] = v;
103 my_touches[tid][v] = true;
104 return true;
105 }
106
validate_touchestouches107 bool validate_touches() {
108 bool *all_touches = new bool[N];
109 for ( int n = 0; n < N; ++n)
110 all_touches[n] = false;
111
112 for ( int p = 0; p < my_num_threads; ++p) {
113 for ( int n = 0; n < N; ++n) {
114 if ( my_touches[p][n] == true ) {
115 ASSERT( all_touches[n] == false, "value see by more than one thread\n" );
116 all_touches[n] = true;
117 }
118 }
119 }
120 for ( int n = 0; n < N; ++n) {
121 if ( !all_touches[n] )
122 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
123 //ASSERT( all_touches[n] == true, "value not seen by any thread\n" );
124 }
125 delete [] all_touches;
126 return true;
127 }
128
129 };
130
131 template< typename T >
132 struct parallel_gets : NoAssign {
133
134 tbb::flow::sequencer_node<T> &my_q;
135 int my_num_threads;
136 touches<T> &my_touches;
137
parallel_getsparallel_gets138 parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {}
139
operator ()parallel_gets140 void operator()(int tid) const {
141 for (int j = tid; j < N; j+=my_num_threads) {
142 T v;
143 spin_try_get( my_q, v );
144 my_touches.check( tid, v );
145 }
146 }
147
148 };
149
150 template< typename T >
151 struct parallel_put_get : NoAssign {
152
153 tbb::flow::sequencer_node<T> &my_s1;
154 tbb::flow::sequencer_node<T> &my_s2;
155 int my_num_threads;
156 tbb::atomic< int > &my_counter;
157 touches<T> &my_touches;
158
parallel_put_getparallel_put_get159 parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads,
160 tbb::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {}
161
operator ()parallel_put_get162 void operator()(int tid) const {
163 int i_start = 0;
164
165 while ( (i_start = my_counter.fetch_and_add(C)) < N ) {
166 int i_end = ( N < i_start + C ) ? N : i_start + C;
167 for (int i = i_start; i < i_end; ++i) {
168 bool msg = my_s1.try_put( T(i) );
169 ASSERT( msg == true, NULL );
170 }
171
172 for (int i = i_start; i < i_end; ++i) {
173 T v;
174 spin_try_get( my_s2, v );
175 my_touches.check( tid, v );
176 }
177 }
178 }
179
180 };
181
182 //
183 // Tests
184 //
185 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
186 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
187 //
188
189 template< typename T >
test_parallel(int num_threads)190 int test_parallel(int num_threads) {
191 tbb::flow::graph g;
192
193 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
194 NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) );
195 {
196 touches<T> t( num_threads );
197 NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) );
198 g.wait_for_all();
199 ASSERT( t.validate_touches(), NULL );
200 }
201 T bogus_value(-1);
202 T j = bogus_value;
203 ASSERT( s.try_get( j ) == false, NULL );
204 ASSERT( j == bogus_value, NULL );
205 g.wait_for_all();
206
207 tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>());
208 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
209 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
210 tbb::flow::make_edge( s1, s2 );
211 tbb::flow::make_edge( s2, s3 );
212
213 {
214 touches<T> t( num_threads );
215 tbb::atomic<int> counter;
216 counter = 0;
217 NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) );
218 g.wait_for_all();
219 t.validate_touches();
220 }
221 g.wait_for_all();
222 ASSERT( s1.try_get( j ) == false, NULL );
223 g.wait_for_all();
224 ASSERT( s2.try_get( j ) == false, NULL );
225 g.wait_for_all();
226 ASSERT( s3.try_get( j ) == false, NULL );
227 ASSERT( j == bogus_value, NULL );
228
229 // test copy constructor
230 tbb::flow::sequencer_node<T> s_copy(s);
231 NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) );
232 for (int i = 0; i < N; ++i) {
233 j = bogus_value;
234 spin_try_get( s_copy, j );
235 ASSERT( i == j, NULL );
236 }
237 j = bogus_value;
238 g.wait_for_all();
239 ASSERT( s_copy.try_get( j ) == false, NULL );
240 ASSERT( j == bogus_value, NULL );
241
242 return 0;
243 }
244
245
246 //
247 // Tests
248 //
249 // No predecessors can be registered
250 // Request from empty buffer fails
251 // In-order puts, single sender, single receiver, properly sequenced at output
252 // Reverse-order puts, single sender, single receiver, properly sequenced at output
253 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output
254 //
255
256 template< typename T >
test_serial()257 int test_serial() {
258 tbb::flow::graph g;
259 T bogus_value(-1);
260
261 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
262 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
263 T j = bogus_value;
264
265 //
266 // Rejects attempts to add / remove predecessor
267 // Rejects request from empty Q
268 //
269 ASSERT( s.register_predecessor( s2 ) == false, NULL );
270 ASSERT( s.remove_predecessor( s2 ) == false, NULL );
271 ASSERT( s.try_get( j ) == false, NULL );
272 ASSERT( j == bogus_value, NULL );
273
274 //
275 // In-order simple puts and gets
276 //
277
278 for (int i = 0; i < N; ++i) {
279 bool msg = s.try_put( T(i) );
280 ASSERT( msg == true, NULL );
281 ASSERT(!s.try_put( T(i) ), NULL); // second attempt to put should reject
282 }
283
284
285 for (int i = 0; i < N; ++i) {
286 j = bogus_value;
287 ASSERT(wait_try_get( g, s, j ) == true, NULL);
288 ASSERT( i == j, NULL );
289 ASSERT(!s.try_put( T(i) ),NULL ); // after retrieving value, subsequent put should fail
290 }
291 j = bogus_value;
292 g.wait_for_all();
293 ASSERT( s.try_get( j ) == false, NULL );
294 ASSERT( j == bogus_value, NULL );
295
296 //
297 // Reverse-order simple puts and gets
298 //
299
300 for (int i = N-1; i >= 0; --i) {
301 bool msg = s2.try_put( T(i) );
302 ASSERT( msg == true, NULL );
303 }
304
305 for (int i = 0; i < N; ++i) {
306 j = bogus_value;
307 ASSERT(wait_try_get( g, s2, j ) == true, NULL);
308 ASSERT( i == j, NULL );
309 }
310 j = bogus_value;
311 g.wait_for_all();
312 ASSERT( s2.try_get( j ) == false, NULL );
313 ASSERT( j == bogus_value, NULL );
314
315 //
316 // Chained in-order simple puts and gets
317 //
318
319 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
320 tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>());
321 tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>());
322 tbb::flow::make_edge( s3, s4 );
323 tbb::flow::make_edge( s4, s5 );
324
325 for (int i = 0; i < N; ++i) {
326 bool msg = s3.try_put( T(i) );
327 ASSERT( msg == true, NULL );
328 }
329
330 for (int i = 0; i < N; ++i) {
331 j = bogus_value;
332 ASSERT(wait_try_get( g, s5, j ) == true, NULL);
333 ASSERT( i == j, NULL );
334 }
335 j = bogus_value;
336 ASSERT( wait_try_get( g, s3, j ) == false, NULL );
337 ASSERT( wait_try_get( g, s4, j ) == false, NULL );
338 ASSERT( wait_try_get( g, s5, j ) == false, NULL );
339 ASSERT( j == bogus_value, NULL );
340
341 g.wait_for_all();
342 tbb::flow::remove_edge( s3, s4 );
343 ASSERT( s3.try_put( N ) == true, NULL );
344 ASSERT( wait_try_get( g, s4, j ) == false, NULL );
345 ASSERT( j == bogus_value, NULL );
346 ASSERT( wait_try_get( g, s5, j ) == false, NULL );
347 ASSERT( j == bogus_value, NULL );
348 ASSERT( wait_try_get( g, s3, j ) == true, NULL );
349 ASSERT( j == N, NULL );
350
351 //
352 // Chained reverse-order simple puts and gets
353 //
354
355 tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>());
356 tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>());
357 tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>());
358 tbb::flow::make_edge( s6, s7 );
359 tbb::flow::make_edge( s7, s8 );
360
361 for (int i = N-1; i >= 0; --i) {
362 bool msg = s6.try_put( T(i) );
363 ASSERT( msg == true, NULL );
364 }
365
366 for (int i = 0; i < N; ++i) {
367 j = bogus_value;
368 ASSERT( wait_try_get( g, s8, j ) == true, NULL );
369 ASSERT( i == j, NULL );
370 }
371 j = bogus_value;
372 ASSERT( wait_try_get( g, s6, j ) == false, NULL );
373 ASSERT( wait_try_get( g, s7, j ) == false, NULL );
374 ASSERT( wait_try_get( g, s8, j ) == false, NULL );
375 ASSERT( j == bogus_value, NULL );
376
377 g.wait_for_all();
378 tbb::flow::remove_edge( s6, s7 );
379 ASSERT( s6.try_put( N ) == true, NULL );
380 ASSERT( wait_try_get( g, s7, j ) == false, NULL );
381 ASSERT( j == bogus_value, NULL );
382 ASSERT( wait_try_get( g, s8, j ) == false, NULL );
383 ASSERT( j == bogus_value, NULL );
384 ASSERT( wait_try_get( g, s6, j ) == true, NULL );
385 ASSERT( j == N, NULL );
386
387 return 0;
388 }
389
390 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
391 #include <array>
392 #include <vector>
test_follows_and_precedes_api()393 void test_follows_and_precedes_api() {
394 std::array<int, 3> messages_for_follows = { {0, 1, 2} };
395 std::vector<int> messages_for_precedes = {0, 1, 2};
396
397 follows_and_precedes_testing::test_follows
398 <int, tbb::flow::sequencer_node<int>>
399 (messages_for_follows, [](const int& i) { return i; });
400
401 follows_and_precedes_testing::test_precedes
402 <int, tbb::flow::sequencer_node<int>>
403 (messages_for_precedes, [](const int& i) { return i; });
404 }
405 #endif
406
407 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
408 template <typename Body>
test_deduction_guides_common(Body body)409 void test_deduction_guides_common(Body body) {
410 using namespace tbb::flow;
411 graph g;
412 broadcast_node<int> br(g);
413
414 sequencer_node s1(g, body);
415 static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>);
416
417 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
418 sequencer_node s2(follows(br), body);
419 static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>);
420 #endif
421
422 sequencer_node s3(s1);
423 static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>);
424 }
425
sequencer_body_f(const int &)426 int sequencer_body_f(const int&) { return 1; }
427
test_deduction_guides()428 void test_deduction_guides() {
429 test_deduction_guides_common([](const int&)->int { return 1; });
430 test_deduction_guides_common([](const int&) mutable ->int { return 1; });
431 test_deduction_guides_common(sequencer_body_f);
432 }
433 #endif
434
435 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
test_node_allocator()436 void test_node_allocator() {
437 tbb::flow::graph g;
438 tbb::flow::sequencer_node< int, std::allocator<int> > tmp(g, seq_inspector<int>());
439 }
440 #endif
441
TestMain()442 int TestMain() {
443 tbb::tick_count start = tbb::tick_count::now(), stop;
444 for (int p = 2; p <= 4; ++p) {
445 tbb::task_scheduler_init init(p);
446 test_serial<int>();
447 test_parallel<int>(p);
448 }
449 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
450 test_follows_and_precedes_api();
451 #endif
452 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
453 test_deduction_guides();
454 #endif
455 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
456 test_buffer_extract<tbb::flow::sequencer_node<int> >().run_tests();
457 #endif
458 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
459 test_node_allocator();
460 #endif
461 stop = tbb::tick_count::now();
462 REMARK("Sequencer_Node Time=%6.6f\n", (stop-start).seconds());
463 return Harness::Done;
464 }
465