1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 #include "tbb/flow_graph.h"
24 
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/utils_assert.h"
28 #include "common/test_follows_and_precedes_api.h"
29 
30 #include <atomic>
31 
32 
33 //! \file test_limiter_node.cpp
34 //! \brief Test for [flow_graph.limiter_node] specification
35 
36 
37 const int L = 10;
38 const int N = 1000;
39 
40 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
41 using tbb::detail::d1::graph_task;
42 
43 template< typename T >
44 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
45    T next_value;
46    tbb::flow::graph& my_graph;
47 
serial_receiverserial_receiver48    serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
49 
try_put_taskserial_receiver50    graph_task* try_put_task( const T &v ) override {
51        CHECK_MESSAGE( next_value++  == v, "" );
52        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
53    }
54 
graph_referenceserial_receiver55     tbb::flow::graph& graph_reference() const override {
56         return my_graph;
57     }
58 };
59 
60 template< typename T >
61 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
62 
63     std::atomic<int> my_count;
64     tbb::flow::graph& my_graph;
65 
parallel_receiverparallel_receiver66     parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
67 
try_put_taskparallel_receiver68     graph_task* try_put_task( const T &/*v*/ ) override {
69        ++my_count;
70        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
71     }
72 
graph_referenceparallel_receiver73     tbb::flow::graph& graph_reference() const override {
74         return my_graph;
75     }
76 };
77 
78 template< typename T >
79 struct empty_sender : public tbb::flow::sender<T> {
80         typedef typename tbb::flow::sender<T>::successor_type successor_type;
81 
register_successorempty_sender82         bool register_successor( successor_type & ) override { return false; }
remove_successorempty_sender83         bool remove_successor( successor_type & ) override { return false; }
84 };
85 
86 
87 template< typename T >
88 struct put_body : utils::NoAssign {
89 
90     tbb::flow::limiter_node<T> &my_lim;
91     std::atomic<int> &my_accept_count;
92 
put_bodyput_body93     put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
94         my_lim(lim), my_accept_count(accept_count) {}
95 
operator ()put_body96     void operator()( int ) const {
97         for ( int i = 0; i < L; ++i ) {
98             bool msg = my_lim.try_put( T(i) );
99             if ( msg == true )
100                ++my_accept_count;
101         }
102     }
103 };
104 
105 template< typename T >
106 struct put_dec_body : utils::NoAssign {
107 
108     tbb::flow::limiter_node<T> &my_lim;
109     std::atomic<int> &my_accept_count;
110 
put_dec_bodyput_dec_body111     put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
112         my_lim(lim), my_accept_count(accept_count) {}
113 
operator ()put_dec_body114     void operator()( int ) const {
115         int local_accept_count = 0;
116         while ( local_accept_count < N ) {
117             bool msg = my_lim.try_put( T(local_accept_count) );
118             if ( msg == true ) {
119                 ++local_accept_count;
120                 ++my_accept_count;
121                 my_lim.decrementer().try_put( tbb::flow::continue_msg() );
122             }
123         }
124     }
125 
126 };
127 
128 template< typename T >
test_puts_with_decrements(int num_threads,tbb::flow::limiter_node<T> & lim,tbb::flow::graph & g)129 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
130     parallel_receiver<T> r(g);
131     empty_sender< tbb::flow::continue_msg > s;
132     std::atomic<int> accept_count;
133     accept_count = 0;
134     tbb::flow::make_edge( lim, r );
135     tbb::flow::make_edge(s, lim.decrementer());
136 
137     // test puts with decrements
138     utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
139     int c = accept_count;
140     CHECK_MESSAGE( c == N*num_threads, "" );
141     CHECK_MESSAGE( r.my_count == N*num_threads, "" );
142 }
143 
144 //
145 // Tests
146 //
147 // limiter only forwards below the limit, multiple parallel senders / single receiver
148 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
149 //
150 //
151 template< typename T >
test_parallel(int num_threads)152 int test_parallel(int num_threads) {
153 
154    // test puts with no decrements
155    for ( int i = 0; i < L; ++i ) {
156        tbb::flow::graph g;
157        tbb::flow::limiter_node< T > lim(g, i);
158        parallel_receiver<T> r(g);
159        std::atomic<int> accept_count;
160        accept_count = 0;
161        tbb::flow::make_edge( lim, r );
162        // test puts with no decrements
163        utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
164        g.wait_for_all();
165        int c = accept_count;
166        CHECK_MESSAGE( c == i, "" );
167    }
168 
169    // test puts with decrements
170    for ( int i = 1; i < L; ++i ) {
171        tbb::flow::graph g;
172        tbb::flow::limiter_node< T > lim(g, i);
173        test_puts_with_decrements(num_threads, lim, g);
174        tbb::flow::limiter_node< T > lim_copy( lim );
175        test_puts_with_decrements(num_threads, lim_copy, g);
176    }
177 
178    return 0;
179 }
180 
181 //
182 // Tests
183 //
184 // limiter only forwards below the limit, single sender / single receiver
185 // at reject, a put to decrement, will cause next message to be accepted
186 //
187 template< typename T >
test_serial()188 int test_serial() {
189 
190    // test puts with no decrements
191    for ( int i = 0; i < L; ++i ) {
192        tbb::flow::graph g;
193        tbb::flow::limiter_node< T > lim(g, i);
194        serial_receiver<T> r(g);
195        tbb::flow::make_edge( lim, r );
196        for ( int j = 0; j < L; ++j ) {
197            bool msg = lim.try_put( T(j) );
198            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
199        }
200        g.wait_for_all();
201    }
202 
203    // test puts with decrements
204    for ( int i = 1; i < L; ++i ) {
205        tbb::flow::graph g;
206        tbb::flow::limiter_node< T > lim(g, i);
207        serial_receiver<T> r(g);
208        empty_sender< tbb::flow::continue_msg > s;
209        tbb::flow::make_edge( lim, r );
210        tbb::flow::make_edge(s, lim.decrementer());
211        for ( int j = 0; j < N; ++j ) {
212            bool msg = lim.try_put( T(j) );
213            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
214            if ( msg == false ) {
215                lim.decrementer().try_put( tbb::flow::continue_msg() );
216                msg = lim.try_put( T(j) );
217                CHECK_MESSAGE( msg == true, "" );
218            }
219        }
220    }
221    return 0;
222 }
223 
224 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355)
225 #define DECREMENT_OUTPUT 1  // the port number of the decrement output of the multifunction_node
226 #define LIMITER_OUTPUT 0    // port number of the integer output
227 
228 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
229 
230 std::atomic<size_t> emit_count;
231 std::atomic<size_t> emit_sum;
232 std::atomic<size_t> receive_count;
233 std::atomic<size_t> receive_sum;
234 
235 struct mfnode_body {
236     int max_cnt;
237     std::atomic<int>* my_cnt;
mfnode_bodymfnode_body238     mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my)  { }
operator ()mfnode_body239     void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
240         int lcnt = ++(*my_cnt);
241         if(lcnt > max_cnt) {
242             return;
243         }
244         // put one continue_msg to the decrement of the limiter.
245         if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
246             CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
247         }
248         {
249             // put messages to the input of the limiter_node until it rejects.
250             while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
251                 emit_sum += lcnt;
252                 ++emit_count;
253             }
254         }
255     }
256 };
257 
258 struct fn_body {
operator ()fn_body259     int operator()(const int &in) {
260         receive_sum += in;
261         ++receive_count;
262         return in;
263     }
264 };
265 
266 //                   +------------+
267 //    +---------+    |            v
268 //    | mf_node |0---+       +----------+          +----------+
269 // +->|         |1---------->| lim_node |--------->| fn_node  |--+
270 // |  +---------+            +----------+          +----------+  |
271 // |                                                             |
272 // |                                                             |
273 // +-------------------------------------------------------------+
274 //
275 void
test_multifunction_to_limiter(int _max,int _nparallel)276 test_multifunction_to_limiter(int _max, int _nparallel) {
277     tbb::flow::graph g;
278     emit_count = 0;
279     emit_sum = 0;
280     receive_count = 0;
281     receive_sum = 0;
282     std::atomic<int> local_cnt;
283     local_cnt = 0;
284     mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
285     tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
286     tbb::flow::limiter_node<int> lim_node(g, _nparallel);
287     tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
288     tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer());
289     tbb::flow::make_edge(lim_node, fn_node);
290     tbb::flow::make_edge(fn_node, mf_node);
291 
292     mf_node.try_put(1);
293     g.wait_for_all();
294     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
295     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
296 
297     // reset, test again
298     g.reset();
299     emit_count = 0;
300     emit_sum = 0;
301     receive_count = 0;
302     receive_sum = 0;
303     local_cnt = 0;;
304     mf_node.try_put(1);
305     g.wait_for_all();
306     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
307     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
308 }
309 
310 
311 void
test_continue_msg_reception()312 test_continue_msg_reception() {
313     tbb::flow::graph g;
314     tbb::flow::limiter_node<int> ln(g,2);
315     tbb::flow::queue_node<int>   qn(g);
316     tbb::flow::make_edge(ln, qn);
317     ln.decrementer().try_put(tbb::flow::continue_msg());
318     ln.try_put(42);
319     g.wait_for_all();
320     int outint;
321     CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
322 }
323 
324 
325 //
326 // This test ascertains that if a message is not successfully put
327 // to a successor, the message is not dropped but released.
328 //
329 
test_reserve_release_messages()330 void test_reserve_release_messages() {
331     using namespace tbb::flow;
332     graph g;
333 
334     //making two queue_nodes: one broadcast_node and one limiter_node
335     queue_node<int> input_queue(g);
336     queue_node<int> output_queue(g);
337     broadcast_node<int> broad(g);
338     limiter_node<int, int> limit(g,2); //threshold of 2
339 
340     //edges
341     make_edge(input_queue, limit);
342     make_edge(limit, output_queue);
343     make_edge(broad,limit.decrementer());
344 
345     int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
346 
347     input_queue.try_put(list[0]); // succeeds
348     input_queue.try_put(list[1]); // succeeds
349     input_queue.try_put(list[2]); // fails, stored in upstream buffer
350     g.wait_for_all();
351 
352     remove_edge(limit, output_queue); //remove successor
353 
354     //sending message to the decrement port of the limiter
355     broad.try_put(1); //failed message retrieved.
356     g.wait_for_all();
357 
358 #if __GNUC__ && __GNUC__ < 12 && !TBB_USE_DEBUG
359     // Seemingly, GNU compiler generates incorrect code for the call of limiter.register_successor in release (-03)
360     // The function pointer to make_edge workarounds the issue for unknown reason
361     auto make_edge_ptr = make_edge<int>;
362     make_edge_ptr(limit, output_queue); //putting the successor back
363 #else
364     make_edge(limit, output_queue); //putting the successor back
365 #endif
366 
367     broad.try_put(1);  //drop the count
368 
369     input_queue.try_put(list[3]);  //success
370     g.wait_for_all();
371 
372     int var=0;
373 
374     for (int i=0; i<4; i++) {
375         output_queue.try_get(var);
376         CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
377         g.wait_for_all();
378     }
379 }
380 
test_decrementer()381 void test_decrementer() {
382     const int threshold = 5;
383     tbb::flow::graph g;
384     tbb::flow::limiter_node<int, int> limit(g, threshold);
385     tbb::flow::queue_node<int> queue(g);
386     make_edge(limit, queue);
387     int m = 0;
388     CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
389     CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate
390                    "Limiter node decrementer's port does not accept message." );
391     CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
392     CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ),  // open limiter's gate
393                    "Limiter node decrementer's port does not accept message." );
394     for( int i = 0; i < threshold; ++i )
395         CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
396     CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
397     g.wait_for_all();
398     int expected[] = {0, 2, 3, 4, 5, 6};
399     int actual = -1; m = 0;
400     while( queue.try_get(actual) )
401         CHECK_MESSAGE( actual == expected[m++], "" );
402     CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
403     g.wait_for_all();
404 
405     const size_t threshold2 = size_t(-1);
406     tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
407     make_edge(limit2, queue);
408     CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
409     long long decrement_value = (long long)( size_t(-1)/2 );
410     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
411                    "Limiter node decrementer's port does not accept message" );
412     CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
413     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
414                    "Limiter node decrementer's port does not accept message" );
415     CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
416     int expected2[] = {1, 2};
417     actual = -1; m = 0;
418     while( queue.try_get(actual) )
419         CHECK_MESSAGE( actual == expected2[m++], "" );
420     CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
421     g.wait_for_all();
422 
423     const size_t threshold3 = 10;
424     tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
425     make_edge(limit3, queue);
426     long long decrement_value3 = 3;
427     CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ),
428                    "Limiter node decrementer's port does not accept message" );
429 
430     m = 0;
431     while( limit3.try_put( m ) ){ m++; };
432     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." );
433 
434     actual = -1; m = 0;
435     while( queue.try_get(actual) ){
436         CHECK_MESSAGE( actual == m++, "Not all messages have been processed." );
437     }
438 
439     g.wait_for_all();
440     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." );
441 }
442 
test_try_put_without_successors()443 void test_try_put_without_successors() {
444     tbb::flow::graph g;
445     int try_put_num{3};
446     tbb::flow::buffer_node<int> bn(g);
447     tbb::flow::limiter_node<int> ln(g, try_put_num);
448     tbb::flow::make_edge(bn, ln);
449     int i = 1;
450     for (; i <= try_put_num; i++)
451         bn.try_put(i);
452 
453     std::atomic<int> counter{0};
454     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
455         [&](int input) {
456             counter += input;
457             return int{};
458         }
459     );
460     tbb::flow::make_edge(ln, fn);
461     g.wait_for_all();
462     CHECK((counter == i * try_put_num / 2));
463 
464     // Check the lost message
465     tbb::flow::remove_edge(bn, ln);
466     ln.decrementer().try_put(tbb::flow::continue_msg());
467     bn.try_put(try_put_num + 1);
468     g.wait_for_all();
469     CHECK((counter == i * try_put_num / 2));
470 
471 }
472 
473 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
474 #include <array>
475 #include <vector>
test_follows_and_precedes_api()476 void test_follows_and_precedes_api() {
477     using msg_t = tbb::flow::continue_msg;
478 
479     std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
480     std::vector<msg_t> messages_for_precedes = {msg_t()};
481 
482     follows_and_precedes_testing::test_follows
483         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
484     follows_and_precedes_testing::test_precedes
485         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
486 
487 }
488 #endif
489 
490 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()491 void test_deduction_guides() {
492     using namespace tbb::flow;
493 
494     graph g;
495     broadcast_node<int> br(g);
496     limiter_node<int> l0(g, 100);
497 
498 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
499     limiter_node l1(follows(br), 100);
500     static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
501 
502     limiter_node l2(precedes(br), 100);
503     static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
504 #endif
505 
506     limiter_node l3(l0);
507     static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
508 }
509 #endif
510 
511 //! Test puts on limiter_node with decrements and varying parallelism levels
512 //! \brief \ref error_guessing
513 TEST_CASE("Serial and parallel tests") {
514     for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
515         tbb::task_arena arena(i);
516         arena.execute(
__anoned8f994e0202() 517             [i]() {
518                 test_serial<int>();
519                 test_parallel<int>(i);
520             }
521         );
522     }
523 }
524 
525 //! Test initial put of continue_msg on decrementer port does not stop message flow
526 //! \brief \ref error_guessing
527 TEST_CASE("Test continue_msg reception") {
528     test_continue_msg_reception();
529 }
530 
531 //! Test multifunction_node connected to limiter_node
532 //! \brief \ref error_guessing
533 TEST_CASE("Multifunction connected to limiter") {
534     test_multifunction_to_limiter(30,3);
535     test_multifunction_to_limiter(300,13);
536     test_multifunction_to_limiter(3000,1);
537 }
538 
539 //! Test message release if successor doesn't accept
540 //! \brief \ref requirement
541 TEST_CASE("Message is released if successor does not accept") {
542     test_reserve_release_messages();
543 }
544 
545 //! Test decrementer
546 //! \brief \ref requirement \ref error_guessing
547 TEST_CASE("Decrementer") {
548     test_decrementer();
549 }
550 
551 //! Test try_put() without successor
552 //! \brief \ref error_guessing
553 TEST_CASE("Test try_put() without successors") {
554     test_try_put_without_successors();
555 }
556 
557 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
558 //! Test follows and precedes API
559 //! \brief \ref error_guessing
560 TEST_CASE( "Support for follows and precedes API" ) {
561     test_follows_and_precedes_api();
562 }
563 #endif
564 
565 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
566 //! Test deduction guides
567 //! \brief \ref requirement
568 TEST_CASE( "Deduction guides" ) {
569     test_deduction_guides();
570 }
571 #endif
572