1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20
21 #include "common/config.h"
22
23 #include "tbb/flow_graph.h"
24
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/utils_assert.h"
28 #include "common/test_follows_and_precedes_api.h"
29
30 #include <atomic>
31
32
33 //! \file test_limiter_node.cpp
34 //! \brief Test for [flow_graph.limiter_node] specification
35
36
37 const int L = 10;
38 const int N = 1000;
39
40 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
41 using tbb::detail::d1::graph_task;
42
43 template< typename T >
44 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
45 T next_value;
46 tbb::flow::graph& my_graph;
47
serial_receiverserial_receiver48 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
49
try_put_taskserial_receiver50 graph_task* try_put_task( const T &v ) override {
51 CHECK_MESSAGE( next_value++ == v, "" );
52 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
53 }
54
graph_referenceserial_receiver55 tbb::flow::graph& graph_reference() const override {
56 return my_graph;
57 }
58 };
59
60 template< typename T >
61 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
62
63 std::atomic<int> my_count;
64 tbb::flow::graph& my_graph;
65
parallel_receiverparallel_receiver66 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
67
try_put_taskparallel_receiver68 graph_task* try_put_task( const T &/*v*/ ) override {
69 ++my_count;
70 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
71 }
72
graph_referenceparallel_receiver73 tbb::flow::graph& graph_reference() const override {
74 return my_graph;
75 }
76 };
77
78 template< typename T >
79 struct empty_sender : public tbb::flow::sender<T> {
80 typedef typename tbb::flow::sender<T>::successor_type successor_type;
81
register_successorempty_sender82 bool register_successor( successor_type & ) override { return false; }
remove_successorempty_sender83 bool remove_successor( successor_type & ) override { return false; }
84 };
85
86
87 template< typename T >
88 struct put_body : utils::NoAssign {
89
90 tbb::flow::limiter_node<T> &my_lim;
91 std::atomic<int> &my_accept_count;
92
put_bodyput_body93 put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
94 my_lim(lim), my_accept_count(accept_count) {}
95
operator ()put_body96 void operator()( int ) const {
97 for ( int i = 0; i < L; ++i ) {
98 bool msg = my_lim.try_put( T(i) );
99 if ( msg == true )
100 ++my_accept_count;
101 }
102 }
103 };
104
105 template< typename T >
106 struct put_dec_body : utils::NoAssign {
107
108 tbb::flow::limiter_node<T> &my_lim;
109 std::atomic<int> &my_accept_count;
110
put_dec_bodyput_dec_body111 put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
112 my_lim(lim), my_accept_count(accept_count) {}
113
operator ()put_dec_body114 void operator()( int ) const {
115 int local_accept_count = 0;
116 while ( local_accept_count < N ) {
117 bool msg = my_lim.try_put( T(local_accept_count) );
118 if ( msg == true ) {
119 ++local_accept_count;
120 ++my_accept_count;
121 my_lim.decrementer().try_put( tbb::flow::continue_msg() );
122 }
123 }
124 }
125
126 };
127
128 template< typename T >
test_puts_with_decrements(int num_threads,tbb::flow::limiter_node<T> & lim,tbb::flow::graph & g)129 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
130 parallel_receiver<T> r(g);
131 empty_sender< tbb::flow::continue_msg > s;
132 std::atomic<int> accept_count;
133 accept_count = 0;
134 tbb::flow::make_edge( lim, r );
135 tbb::flow::make_edge(s, lim.decrementer());
136
137 // test puts with decrements
138 utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
139 int c = accept_count;
140 CHECK_MESSAGE( c == N*num_threads, "" );
141 CHECK_MESSAGE( r.my_count == N*num_threads, "" );
142 }
143
144 //
145 // Tests
146 //
147 // limiter only forwards below the limit, multiple parallel senders / single receiver
148 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
149 //
150 //
151 template< typename T >
test_parallel(int num_threads)152 int test_parallel(int num_threads) {
153
154 // test puts with no decrements
155 for ( int i = 0; i < L; ++i ) {
156 tbb::flow::graph g;
157 tbb::flow::limiter_node< T > lim(g, i);
158 parallel_receiver<T> r(g);
159 std::atomic<int> accept_count;
160 accept_count = 0;
161 tbb::flow::make_edge( lim, r );
162 // test puts with no decrements
163 utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
164 g.wait_for_all();
165 int c = accept_count;
166 CHECK_MESSAGE( c == i, "" );
167 }
168
169 // test puts with decrements
170 for ( int i = 1; i < L; ++i ) {
171 tbb::flow::graph g;
172 tbb::flow::limiter_node< T > lim(g, i);
173 test_puts_with_decrements(num_threads, lim, g);
174 tbb::flow::limiter_node< T > lim_copy( lim );
175 test_puts_with_decrements(num_threads, lim_copy, g);
176 }
177
178 return 0;
179 }
180
181 //
182 // Tests
183 //
184 // limiter only forwards below the limit, single sender / single receiver
185 // at reject, a put to decrement, will cause next message to be accepted
186 //
187 template< typename T >
test_serial()188 int test_serial() {
189
190 // test puts with no decrements
191 for ( int i = 0; i < L; ++i ) {
192 tbb::flow::graph g;
193 tbb::flow::limiter_node< T > lim(g, i);
194 serial_receiver<T> r(g);
195 tbb::flow::make_edge( lim, r );
196 for ( int j = 0; j < L; ++j ) {
197 bool msg = lim.try_put( T(j) );
198 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
199 }
200 g.wait_for_all();
201 }
202
203 // test puts with decrements
204 for ( int i = 1; i < L; ++i ) {
205 tbb::flow::graph g;
206 tbb::flow::limiter_node< T > lim(g, i);
207 serial_receiver<T> r(g);
208 empty_sender< tbb::flow::continue_msg > s;
209 tbb::flow::make_edge( lim, r );
210 tbb::flow::make_edge(s, lim.decrementer());
211 for ( int j = 0; j < N; ++j ) {
212 bool msg = lim.try_put( T(j) );
213 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
214 if ( msg == false ) {
215 lim.decrementer().try_put( tbb::flow::continue_msg() );
216 msg = lim.try_put( T(j) );
217 CHECK_MESSAGE( msg == true, "" );
218 }
219 }
220 }
221 return 0;
222 }
223
224 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355)
225 #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node
226 #define LIMITER_OUTPUT 0 // port number of the integer output
227
228 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
229
230 std::atomic<size_t> emit_count;
231 std::atomic<size_t> emit_sum;
232 std::atomic<size_t> receive_count;
233 std::atomic<size_t> receive_sum;
234
235 struct mfnode_body {
236 int max_cnt;
237 std::atomic<int>* my_cnt;
mfnode_bodymfnode_body238 mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { }
operator ()mfnode_body239 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
240 int lcnt = ++(*my_cnt);
241 if(lcnt > max_cnt) {
242 return;
243 }
244 // put one continue_msg to the decrement of the limiter.
245 if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
246 CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
247 }
248 {
249 // put messages to the input of the limiter_node until it rejects.
250 while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
251 emit_sum += lcnt;
252 ++emit_count;
253 }
254 }
255 }
256 };
257
258 struct fn_body {
operator ()fn_body259 int operator()(const int &in) {
260 receive_sum += in;
261 ++receive_count;
262 return in;
263 }
264 };
265
266 // +------------+
267 // +---------+ | v
268 // | mf_node |0---+ +----------+ +----------+
269 // +->| |1---------->| lim_node |--------->| fn_node |--+
270 // | +---------+ +----------+ +----------+ |
271 // | |
272 // | |
273 // +-------------------------------------------------------------+
274 //
275 void
test_multifunction_to_limiter(int _max,int _nparallel)276 test_multifunction_to_limiter(int _max, int _nparallel) {
277 tbb::flow::graph g;
278 emit_count = 0;
279 emit_sum = 0;
280 receive_count = 0;
281 receive_sum = 0;
282 std::atomic<int> local_cnt;
283 local_cnt = 0;
284 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
285 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
286 tbb::flow::limiter_node<int> lim_node(g, _nparallel);
287 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
288 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer());
289 tbb::flow::make_edge(lim_node, fn_node);
290 tbb::flow::make_edge(fn_node, mf_node);
291
292 mf_node.try_put(1);
293 g.wait_for_all();
294 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
295 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
296
297 // reset, test again
298 g.reset();
299 emit_count = 0;
300 emit_sum = 0;
301 receive_count = 0;
302 receive_sum = 0;
303 local_cnt = 0;;
304 mf_node.try_put(1);
305 g.wait_for_all();
306 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
307 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
308 }
309
310
311 void
test_continue_msg_reception()312 test_continue_msg_reception() {
313 tbb::flow::graph g;
314 tbb::flow::limiter_node<int> ln(g,2);
315 tbb::flow::queue_node<int> qn(g);
316 tbb::flow::make_edge(ln, qn);
317 ln.decrementer().try_put(tbb::flow::continue_msg());
318 ln.try_put(42);
319 g.wait_for_all();
320 int outint;
321 CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
322 }
323
324
325 //
326 // This test ascertains that if a message is not successfully put
327 // to a successor, the message is not dropped but released.
328 //
329
test_reserve_release_messages()330 void test_reserve_release_messages() {
331 using namespace tbb::flow;
332 graph g;
333
334 //making two queue_nodes: one broadcast_node and one limiter_node
335 queue_node<int> input_queue(g);
336 queue_node<int> output_queue(g);
337 broadcast_node<int> broad(g);
338 limiter_node<int, int> limit(g,2); //threshold of 2
339
340 //edges
341 make_edge(input_queue, limit);
342 make_edge(limit, output_queue);
343 make_edge(broad,limit.decrementer());
344
345 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
346
347 input_queue.try_put(list[0]); // succeeds
348 input_queue.try_put(list[1]); // succeeds
349 input_queue.try_put(list[2]); // fails, stored in upstream buffer
350 g.wait_for_all();
351
352 remove_edge(limit, output_queue); //remove successor
353
354 //sending message to the decrement port of the limiter
355 broad.try_put(1); //failed message retrieved.
356 g.wait_for_all();
357
358 #if __GNUC__ && __GNUC__ < 12 && !TBB_USE_DEBUG
359 // Seemingly, GNU compiler generates incorrect code for the call of limiter.register_successor in release (-03)
360 // The function pointer to make_edge workarounds the issue for unknown reason
361 auto make_edge_ptr = make_edge<int>;
362 make_edge_ptr(limit, output_queue); //putting the successor back
363 #else
364 make_edge(limit, output_queue); //putting the successor back
365 #endif
366
367 broad.try_put(1); //drop the count
368
369 input_queue.try_put(list[3]); //success
370 g.wait_for_all();
371
372 int var=0;
373
374 for (int i=0; i<4; i++) {
375 output_queue.try_get(var);
376 CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
377 g.wait_for_all();
378 }
379 }
380
test_decrementer()381 void test_decrementer() {
382 const int threshold = 5;
383 tbb::flow::graph g;
384 tbb::flow::limiter_node<int, int> limit(g, threshold);
385 tbb::flow::queue_node<int> queue(g);
386 make_edge(limit, queue);
387 int m = 0;
388 CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
389 CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate
390 "Limiter node decrementer's port does not accept message." );
391 CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
392 CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ), // open limiter's gate
393 "Limiter node decrementer's port does not accept message." );
394 for( int i = 0; i < threshold; ++i )
395 CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
396 CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
397 g.wait_for_all();
398 int expected[] = {0, 2, 3, 4, 5, 6};
399 int actual = -1; m = 0;
400 while( queue.try_get(actual) )
401 CHECK_MESSAGE( actual == expected[m++], "" );
402 CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
403 g.wait_for_all();
404
405 const size_t threshold2 = size_t(-1);
406 tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
407 make_edge(limit2, queue);
408 CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
409 long long decrement_value = (long long)( size_t(-1)/2 );
410 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
411 "Limiter node decrementer's port does not accept message" );
412 CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
413 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
414 "Limiter node decrementer's port does not accept message" );
415 CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
416 int expected2[] = {1, 2};
417 actual = -1; m = 0;
418 while( queue.try_get(actual) )
419 CHECK_MESSAGE( actual == expected2[m++], "" );
420 CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
421 g.wait_for_all();
422
423 const size_t threshold3 = 10;
424 tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
425 make_edge(limit3, queue);
426 long long decrement_value3 = 3;
427 CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ),
428 "Limiter node decrementer's port does not accept message" );
429
430 m = 0;
431 while( limit3.try_put( m ) ){ m++; };
432 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." );
433
434 actual = -1; m = 0;
435 while( queue.try_get(actual) ){
436 CHECK_MESSAGE( actual == m++, "Not all messages have been processed." );
437 }
438
439 g.wait_for_all();
440 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." );
441 }
442
test_try_put_without_successors()443 void test_try_put_without_successors() {
444 tbb::flow::graph g;
445 int try_put_num{3};
446 tbb::flow::buffer_node<int> bn(g);
447 tbb::flow::limiter_node<int> ln(g, try_put_num);
448 tbb::flow::make_edge(bn, ln);
449 int i = 1;
450 for (; i <= try_put_num; i++)
451 bn.try_put(i);
452
453 std::atomic<int> counter{0};
454 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
455 [&](int input) {
456 counter += input;
457 return int{};
458 }
459 );
460 tbb::flow::make_edge(ln, fn);
461 g.wait_for_all();
462 CHECK((counter == i * try_put_num / 2));
463
464 // Check the lost message
465 tbb::flow::remove_edge(bn, ln);
466 ln.decrementer().try_put(tbb::flow::continue_msg());
467 bn.try_put(try_put_num + 1);
468 g.wait_for_all();
469 CHECK((counter == i * try_put_num / 2));
470
471 }
472
473 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
474 #include <array>
475 #include <vector>
test_follows_and_precedes_api()476 void test_follows_and_precedes_api() {
477 using msg_t = tbb::flow::continue_msg;
478
479 std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
480 std::vector<msg_t> messages_for_precedes = {msg_t()};
481
482 follows_and_precedes_testing::test_follows
483 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
484 follows_and_precedes_testing::test_precedes
485 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
486
487 }
488 #endif
489
490 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()491 void test_deduction_guides() {
492 using namespace tbb::flow;
493
494 graph g;
495 broadcast_node<int> br(g);
496 limiter_node<int> l0(g, 100);
497
498 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
499 limiter_node l1(follows(br), 100);
500 static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
501
502 limiter_node l2(precedes(br), 100);
503 static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
504 #endif
505
506 limiter_node l3(l0);
507 static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
508 }
509 #endif
510
511 //! Test puts on limiter_node with decrements and varying parallelism levels
512 //! \brief \ref error_guessing
513 TEST_CASE("Serial and parallel tests") {
514 for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
515 tbb::task_arena arena(i);
516 arena.execute(
__anoned8f994e0202() 517 [i]() {
518 test_serial<int>();
519 test_parallel<int>(i);
520 }
521 );
522 }
523 }
524
525 //! Test initial put of continue_msg on decrementer port does not stop message flow
526 //! \brief \ref error_guessing
527 TEST_CASE("Test continue_msg reception") {
528 test_continue_msg_reception();
529 }
530
531 //! Test multifunction_node connected to limiter_node
532 //! \brief \ref error_guessing
533 TEST_CASE("Multifunction connected to limiter") {
534 test_multifunction_to_limiter(30,3);
535 test_multifunction_to_limiter(300,13);
536 test_multifunction_to_limiter(3000,1);
537 }
538
539 //! Test message release if successor doesn't accept
540 //! \brief \ref requirement
541 TEST_CASE("Message is released if successor does not accept") {
542 test_reserve_release_messages();
543 }
544
545 //! Test decrementer
546 //! \brief \ref requirement \ref error_guessing
547 TEST_CASE("Decrementer") {
548 test_decrementer();
549 }
550
551 //! Test try_put() without successor
552 //! \brief \ref error_guessing
553 TEST_CASE("Test try_put() without successors") {
554 test_try_put_without_successors();
555 }
556
557 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
558 //! Test follows and precedes API
559 //! \brief \ref error_guessing
560 TEST_CASE( "Support for follows and precedes API" ) {
561 test_follows_and_precedes_api();
562 }
563 #endif
564
565 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
566 //! Test deduction guides
567 //! \brief \ref requirement
568 TEST_CASE( "Deduction guides" ) {
569 test_deduction_guides();
570 }
571 #endif
572